-
Notifications
You must be signed in to change notification settings - Fork 447
/
Copy pathvulnerabilityToStix2.py
159 lines (138 loc) · 5.88 KB
/
vulnerabilityToStix2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import datetime
import stix2
from pycti import Identity, StixCoreRelationship, Vulnerability # type: ignore
from services.utils import APP_VERSION, ConfigCVE # type: ignore
from ..client import CVEVulnerability # type: ignore
class CVEConverter:
def __init__(self, helper):
self.config = ConfigCVE()
self.helper = helper
self.client_api = CVEVulnerability(
api_key=self.config.api_key,
helper=self.helper,
header=f"OpenCTI-cve/{APP_VERSION}",
)
self.author = self._create_author()
def send_bundle(self, cve_params: dict, work_id: str) -> None:
"""
Send bundle to API
:param cve_params: Dict of params
:param work_id: work id in string
:return:
"""
vulnerabilities_objects = self.vulnerabilities_to_stix2(cve_params)
if len(vulnerabilities_objects) != 0:
vulnerabilities_objects.append(self.author)
vulnerabilities_bundle = self._to_stix_bundle(vulnerabilities_objects)
vulnerabilities_to_json = self._to_json_bundle(vulnerabilities_bundle)
# Retrieve the author object for the info message
info_msg = (
f"[CONVERTER] Sending bundle to server with {len(vulnerabilities_bundle)} objects, "
f"concerning {len(vulnerabilities_objects) - 1} vulnerabilities"
)
self.helper.log_info(info_msg)
self.helper.send_stix2_bundle(
vulnerabilities_to_json,
work_id=work_id,
)
else:
pass
def vulnerabilities_to_stix2(self, cve_params: dict) -> list:
"""
Retrieve all CVEs from NVD to convert into STIX2 format
:param cve_params: Dict of params
:return: List of data converted into STIX2
"""
vulnerabilities = self.client_api.get_vulnerabilities(cve_params)
vulnerabilities_to_stix2 = []
for vulnerability in vulnerabilities:
# Getting different fields
name = vulnerability["cve"]["id"]
description = vulnerability["cve"]["descriptions"][0]["value"]
created_date = datetime.datetime.strptime(
vulnerability["cve"]["published"], "%Y-%m-%dT%H:%M:%S.%f"
)
modified_date = datetime.datetime.strptime(
vulnerability["cve"]["lastModified"], "%Y-%m-%dT%H:%M:%S.%f"
)
# Create external references
external_reference = stix2.ExternalReference(
source_name="NIST NVD", url=f"https://nvd.nist.gov/vuln/detail/{name}"
)
external_references = [external_reference]
if "references" in vulnerability["cve"]:
for reference in vulnerability["cve"]["references"]:
external_reference = stix2.ExternalReference(
source_name=reference["source"], url=reference["url"]
)
external_references.append(external_reference)
# Getting metrics based on CVSS V3.1
cvss_metrics = vulnerability["cve"]["metrics"]["cvssMetricV31"][0][
"cvssData"
]
attack_vector = cvss_metrics["attackVector"]
availability_impact = cvss_metrics["availabilityImpact"]
base_score = cvss_metrics["baseScore"]
base_severity = cvss_metrics["baseSeverity"]
confidentiality_impact = cvss_metrics["confidentialityImpact"]
integrity_impact = cvss_metrics["integrityImpact"]
# Creating the vulnerability with the extracted fields
vulnerability_to_stix2 = stix2.Vulnerability(
id=Vulnerability.generate_id(name),
name=name,
created=created_date,
modified=modified_date,
description=description,
created_by_ref=self.author,
confidence=(
100 if description is not None and len(description) > 0 else 60
),
external_references=external_references,
custom_properties={
"x_opencti_base_score": base_score,
"x_opencti_base_severity": base_severity,
"x_opencti_attack_vector": attack_vector,
"x_opencti_integrity_impact": integrity_impact,
"x_opencti_availability_impact": availability_impact,
"x_opencti_confidentiality_impact": confidentiality_impact,
},
)
# Adding the vulnerability to the list of vulnerabilities
vulnerabilities_to_stix2.append(vulnerability_to_stix2)
return vulnerabilities_to_stix2
def _create_relationship(self, from_id: str, to_id: str, relation):
"""
:param from_id: From id in string
:param to_id: To id in string
:param relation:
:return: Relationship STIX object
"""
return stix2.Relationship(
id=StixCoreRelationship.generate_id(relation, from_id, to_id),
relationship_type=relation,
source_ref=from_id,
target_ref=to_id,
created_by_ref=self.author.id,
)
@staticmethod
def _create_author():
"""
:return: CVEs' default author
"""
return stix2.Identity(
id=Identity.generate_id("The MITRE Corporation", "organization"),
name="The MITRE Corporation",
identity_class="organization",
)
@staticmethod
def _to_stix_bundle(stix_objects):
"""
:return: STIX objects as a Bundle
"""
return stix2.Bundle(objects=stix_objects, allow_custom=True)
@staticmethod
def _to_json_bundle(stix_bundle):
"""
:return: STIX bundle as JSON format
"""
return stix_bundle.serialize()