-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpublish.py
129 lines (115 loc) · 4.59 KB
/
publish.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import argparse
import re
import urllib.parse
from datetime import datetime
import requests
from pymisp import ExpandedPyMISP
from vulnerabilitylookupsighting import config
def remove_case_insensitive_duplicates(input_list: list[str]) -> list[str]:
"""Remove duplicates in a list, ignoring case.
This approach preserves the last occurrence of each unique item based on
lowercase equivalence. The dictionary keys are all lowercase to ensure
case-insensitive comparison, while the original case is preserved in the output.
"""
return list({item.lower(): item for item in input_list}.values())
def push_sighting_to_vulnerability_lookup(attribute, vulnerability_ids):
"""Create a sighting from a MISP attribute and push it to the Vulnerability Lookup instance."""
print("Pushing sightings to Vulnerability Lookup...")
headers_json = {
"Content-Type": "application/json",
"accept": "application/json",
"X-API-KEY": f"{config.vulnerability_auth_token}",
}
for vuln in vulnerability_ids:
# Set the creation timestamp for the sighting
creation_timestamp = ""
if attribute.get("first_seen", False):
try:
creation_timestamp = datetime.fromtimestamp(
int(attribute["first_seen"])
).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
except Exception:
print(
"Bad timestamp: {} for event {}".format(
attribute["first_seen"], attribute["Event"]["uuid"]
)
)
pass
else:
try:
creation_timestamp = datetime.fromtimestamp(
int(attribute["timestamp"])
).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
except Exception:
print(
"Bad timestamp: {} for event {}".format(
attribute["timestamp"], attribute["Event"]["uuid"]
)
)
continue
if not creation_timestamp:
continue
# Create the sighting
sighting = {
"type": "seen",
"source": f"MISP/{attribute['Event']['uuid']}",
"vulnerability": vuln,
"creation_timestamp": creation_timestamp,
}
# Post the JSON to Vulnerability Lookup
try:
r = requests.post(
urllib.parse.urljoin(config.vulnerability_lookup_base_url, "sighting/"),
json=sighting,
headers=headers_json,
)
if r.status_code not in (200, 201):
print(
f"Error when sending POST request to the Vulnerability Lookup server: {r.reason}"
)
except requests.exceptions.ConnectionError as e:
print(
f"Error when sending POST request to the Vulnerability Lookup server:\n{e}"
)
def main() -> None:
parser = argparse.ArgumentParser(
prog="FediVuln-Stream", description="Allows access to the streaming API."
)
parser.add_argument(
"--since",
default="1d",
help="Maximum timestamp of the MISP attribute.",
)
arguments = parser.parse_args()
body = {
"returnFormat": "json",
"type": "vulnerability",
"timestamp": arguments.since,
}
relative_path = "attributes/restSearch"
misp = ExpandedPyMISP(config.misp_url, config.misp_key, config.misp_verifycert)
print("Querying MISP...")
result = misp.direct_call(relative_path, body)
attributes = result["Attribute"]
vulnerability_pattern = re.compile(
r"\b(CVE-\d{4}-\d{4,})\b" # CVE pattern
r"|\b(GHSA-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4}-[a-zA-Z0-9]{4})\b" # GHSA pattern
r"|\b(PYSEC-\d{4}-\d{2,5})\b" # PYSEC pattern
r"|\b(GSD-\d{4}-\d{4,5})\b" # GSD pattern
r"|\b(wid-sec-w-\d{4}-\d{4})\b" # CERT-Bund pattern
r"|\b(cisco-sa-\d{8}-[a-zA-Z0-9]+)\b" # CISCO pattern
r"|\b(RHSA-\d{4}:\d{4})\b", # RedHat pattern
re.IGNORECASE,
)
print("Query completed successfully.")
for attribute in attributes:
matches = vulnerability_pattern.findall(attribute["value"])
vulnerability_ids = [
match for match_tuple in matches for match in match_tuple if match
]
vulnerability_ids = remove_case_insensitive_duplicates(vulnerability_ids)
if vulnerability_ids:
push_sighting_to_vulnerability_lookup(attribute, vulnerability_ids)
if __name__ == "__main__":
# Point of entry in execution mode.
main()