Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

added nvd_cve analyzer closes (#2245) #2560

Merged
merged 8 commits into from
Nov 11, 2024
136 changes: 136 additions & 0 deletions api_app/analyzers_manager/migrations/0128_analyzer_config_nvd_cve.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
ReverseManyToOneDescriptor,
ReverseOneToOneDescriptor,
)

plugin = {
"python_module": {
"health_check_schedule": None,
"update_schedule": None,
"module": "nvd_cve.NVDDetails",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "NVD_CVE",
"description": "[NationalVunerabilityDatabase(NVD)](https://nvd.nist.gov/developers/vulnerabilities) is the U.S. government repository of standards based vulnerability management data represented using the Security Content Automation Protocol (SCAP). This data enables automation of vulnerability management, security measurement, and compliance. The NVD includes databases of security checklist references, security-related software flaws, product names, and impact metrics.",
"disabled": False,
"soft_time_limit": 60,
"routing_key": "default",
"health_check_status": True,
"type": "observable",
"docker_based": False,
"maximum_tlp": "AMBER",
"observable_supported": ["generic"],
"supported_filetypes": [],
"run_hash": False,
"run_hash_type": "",
"not_supported_filetypes": [],
"model": "analyzers_manager.AnalyzerConfig",
}

params = [
{
"python_module": {
"module": "nvd_cve.NVDDetails",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "nvd_api_key",
"type": "str",
"description": "API Key is optional. In case you want to increase the request quota to 50 requests in a rolling 30 second window, you need API key. Get yours at [NIST](https://nvd.nist.gov/developers/request-an-api-key)",
"is_secret": True,
"required": False,
}
]

values = []


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [
ForwardManyToOneDescriptor,
ReverseManyToOneDescriptor,
ReverseOneToOneDescriptor,
ForwardOneToOneDescriptor,
]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value


def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True


def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)


def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()


class Migration(migrations.Migration):
atomic = False
dependencies = [
("api_app", "0063_singleton_and_elastic_report"),
("analyzers_manager", "0127_analyzer_config_dshield"),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
131 changes: 131 additions & 0 deletions api_app/analyzers_manager/observable_analyzers/nvd_cve.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import re

import requests

from api_app.analyzers_manager.classes import AnalyzerRunException, ObservableAnalyzer
from tests.mock_utils import MockUpResponse, if_mock_connections, patch


class NVDDetails(ObservableAnalyzer):
url: str = "https://services.nvd.nist.gov/rest/json/cves/2.0"
_nvd_api_key: str = None
cve_pattern = r"^CVE-\d{4}-\d{4,7}$"

@classmethod
def update(self) -> bool:
pass

def run(self):
headers = {}
if self._nvd_api_key:
headers.update({"apiKey": self._nvd_api_key})

try:
# Validate if CVE format is correct E.g CVE-2014-1234 or CVE-2022-1234567
if not re.match(self.cve_pattern, self.observable_name):
raise ValueError(f"Invalid CVE format: {self.observable_name}")

params = {"cveId": self.observable_name}
response = requests.get(url=self.url, params=params, headers=headers)
response.raise_for_status()

except ValueError as e:
raise AnalyzerRunException(e)

except requests.RequestException as e:
raise AnalyzerRunException(e)

return response.json()

@classmethod
def _monkeypatch(cls):
patches = [
if_mock_connections(
patch(
"requests.get",
return_value=MockUpResponse(
{
"resultsPerPage": 1,
"startIndex": 0,
"totalResults": 1,
"format": "NVD_CVE",
"version": "2.0",
"timestamp": "2024-11-01T05:25:09.787",
"vulnerabilities": [
{
"cve": {
"id": "CVE-2024-51181",
"sourceIdentifier": "cve@mitre.org",
"published": "2024-10-29T13:15:07.297",
"lastModified": "2024-10-29T20:35:37.490",
"vulnStatus": "Undergoing Analysis",
"cveTags": [],
"descriptions": [
{
"lang": "en",
"value": "A Reflected Cross Site Scripting (XSS) vulnerability was found"
"in /ifscfinder/admin/profile.php in PHPGurukul IFSC Code Finder"
"Project v1.0, which allows remote attackers to execute arbitrary"
'code via " searchifsccode" parameter.',
},
{
"lang": "es",
"value": " Se encontró una vulnerabilidad de Cross Site Scripting reflejado"
"(XSS) en /ifscfinder/admin/profile.php en PHPGurukul IFSC Code Finder"
"Project v1.0, que permite a atacantes remotos ejecutar código arbitrario"
'a través del parámetro "searchifsccode".',
},
],
"metrics": {
"cvssMetricV31": [
{
"source": "134c704f-9b21-4f2e-91b3-4a467353bcc0",
"type": "Secondary",
"cvssData": {
"version": "3.1",
"vectorString": "CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:C/C:H/I:L/A:L",
"attackVector": "NETWORK",
"attackComplexity": "LOW",
"privilegesRequired": "NONE",
"userInteraction": "REQUIRED",
"scope": "CHANGED",
"confidentialityImpact": "HIGH",
"integrityImpact": "LOW",
"availabilityImpact": "LOW",
"baseScore": 8.8,
"baseSeverity": "HIGH",
},
"exploitabilityScore": 2.8,
"impactScore": 5.3,
}
]
},
"weaknesses": [
{
"source": "134c704f-9b21-4f2e-91b3-4a467353bcc0",
"type": "Secondary",
"description": [
{
"lang": "en",
"value": "CWE-79",
}
],
}
],
"references": [
{
"url": "https://github.com/Santoshcyber1/CVE-wirteup/blob/main/"
"Phpgurukul/IFSC%20Code%20Finder/IFSC%20Code%20Finder%20Admin.pdf",
"source": "cve@mitre.org",
}
],
}
}
],
},
200,
),
)
)
]
return super()._monkeypatch(patches=patches)
Loading