Skip to content

Commit

Permalink
added nvd_cve analyzer closes (#2245) (#2560)
Browse files Browse the repository at this point in the history
* added nvd_cve analyzer closes (#2245)

* fixed nvd_cve analyzer

* Update api_app/analyzers_manager/migrations/0128_analyzer_config_nvd_cve.py

* Update api_app/analyzers_manager/migrations/0128_analyzer_config_nvd_cve.py

* Update api_app/analyzers_manager/observable_analyzers/nvd_cve.py

* added CVE format validation

* added tests, modified cve format validation and fixed analyzer config

---------

Co-authored-by: spoiicy <spoiicy@spoiicys-Laptop.local>
Co-authored-by: Matteo Lodi <30625432+mlodic@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 11, 2024
1 parent 11a1137 commit 85ee54a
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 0 deletions.
136 changes: 136 additions & 0 deletions api_app/analyzers_manager/migrations/0130_analyzer_config_nvd_cve.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
ReverseManyToOneDescriptor,
ReverseOneToOneDescriptor,
)

plugin = {
"python_module": {
"health_check_schedule": None,
"update_schedule": None,
"module": "nvd_cve.NVDDetails",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "NVD_CVE",
"description": "[NationalVunerabilityDatabase(NVD)](https://nvd.nist.gov/developers/vulnerabilities) is the U.S. government repository of standards based vulnerability management data represented using the Security Content Automation Protocol (SCAP). This data enables automation of vulnerability management, security measurement, and compliance. The NVD includes databases of security checklist references, security-related software flaws, product names, and impact metrics.",
"disabled": False,
"soft_time_limit": 60,
"routing_key": "default",
"health_check_status": True,
"type": "observable",
"docker_based": False,
"maximum_tlp": "AMBER",
"observable_supported": ["generic"],
"supported_filetypes": [],
"run_hash": False,
"run_hash_type": "",
"not_supported_filetypes": [],
"model": "analyzers_manager.AnalyzerConfig",
}

params = [
{
"python_module": {
"module": "nvd_cve.NVDDetails",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "nvd_api_key",
"type": "str",
"description": "API Key is optional. In case you want to increase the request quota to 50 requests in a rolling 30 second window, you need API key. Get yours at [NIST](https://nvd.nist.gov/developers/request-an-api-key)",
"is_secret": True,
"required": False,
}
]

values = []


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [
ForwardManyToOneDescriptor,
ReverseManyToOneDescriptor,
ReverseOneToOneDescriptor,
ForwardOneToOneDescriptor,
]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value


def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True


def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)


def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()


class Migration(migrations.Migration):
atomic = False
dependencies = [
("api_app", "0063_singleton_and_elastic_report"),
("analyzers_manager", "0129_analyzer_config_phishing_extractor"),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
133 changes: 133 additions & 0 deletions api_app/analyzers_manager/observable_analyzers/nvd_cve.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import re

import requests
from django.conf import settings

from api_app.analyzers_manager.classes import AnalyzerRunException, ObservableAnalyzer
from tests.mock_utils import MockUpResponse, if_mock_connections, patch


class NVDDetails(ObservableAnalyzer):
url: str = "https://services.nvd.nist.gov/rest/json/cves/2.0"
_nvd_api_key: str = None
cve_pattern = r"^CVE-\d{4}-\d{4,7}$"

@classmethod
def update(self) -> bool:
pass

def run(self):
headers = {}
if self._nvd_api_key:
headers.update({"apiKey": self._nvd_api_key})

try:
# Validate if CVE format is correct E.g CVE-2014-1234 or cve-2022-1234567
if not settings.STAGE_CI and not re.match(
self.cve_pattern, self.observable_name, flags=re.IGNORECASE
):
raise ValueError(f"Invalid CVE format: {self.observable_name}")

params = {"cveId": self.observable_name.upper()}
response = requests.get(url=self.url, params=params, headers=headers)
response.raise_for_status()

except ValueError as e:
raise AnalyzerRunException(e)
except requests.RequestException as e:
raise AnalyzerRunException(e)

return response.json()

@classmethod
def _monkeypatch(cls):
patches = [
if_mock_connections(
patch(
"requests.get",
return_value=MockUpResponse(
{
"resultsPerPage": 1,
"startIndex": 0,
"totalResults": 1,
"format": "NVD_CVE",
"version": "2.0",
"timestamp": "2024-11-01T05:25:09.787",
"vulnerabilities": [
{
"cve": {
"id": "CVE-2024-51181",
"sourceIdentifier": "cve@mitre.org",
"published": "2024-10-29T13:15:07.297",
"lastModified": "2024-10-29T20:35:37.490",
"vulnStatus": "Undergoing Analysis",
"cveTags": [],
"descriptions": [
{
"lang": "en",
"value": "A Reflected Cross Site Scripting (XSS) vulnerability was found"
"in /ifscfinder/admin/profile.php in PHPGurukul IFSC Code Finder"
"Project v1.0, which allows remote attackers to execute arbitrary"
'code via " searchifsccode" parameter.',
},
{
"lang": "es",
"value": " Se encontró una vulnerabilidad de Cross Site Scripting reflejado"
"(XSS) en /ifscfinder/admin/profile.php en PHPGurukul IFSC Code Finder"
"Project v1.0, que permite a atacantes remotos ejecutar código arbitrario"
'a través del parámetro "searchifsccode".',
},
],
"metrics": {
"cvssMetricV31": [
{
"source": "134c704f-9b21-4f2e-91b3-4a467353bcc0",
"type": "Secondary",
"cvssData": {
"version": "3.1",
"vectorString": "CVSS:3.1/AV:N/AC:L/PR:N/UI:R/S:C/C:H/I:L/A:L",
"attackVector": "NETWORK",
"attackComplexity": "LOW",
"privilegesRequired": "NONE",
"userInteraction": "REQUIRED",
"scope": "CHANGED",
"confidentialityImpact": "HIGH",
"integrityImpact": "LOW",
"availabilityImpact": "LOW",
"baseScore": 8.8,
"baseSeverity": "HIGH",
},
"exploitabilityScore": 2.8,
"impactScore": 5.3,
}
]
},
"weaknesses": [
{
"source": "134c704f-9b21-4f2e-91b3-4a467353bcc0",
"type": "Secondary",
"description": [
{
"lang": "en",
"value": "CWE-79",
}
],
}
],
"references": [
{
"url": "https://github.com/Santoshcyber1/CVE-wirteup/blob/main/"
"Phpgurukul/IFSC%20Code%20Finder/IFSC%20Code%20Finder%20Admin.pdf",
"source": "cve@mitre.org",
}
],
}
}
],
},
200,
),
)
)
]
return super()._monkeypatch(patches=patches)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from django.test import TestCase

from api_app.analyzers_manager.classes import AnalyzerRunException
from api_app.analyzers_manager.models import AnalyzerConfig
from api_app.analyzers_manager.observable_analyzers.nvd_cve import NVDDetails


class NVDCVETestCase(TestCase):
config = AnalyzerConfig.objects.get(python_module=NVDDetails.python_module)

def test_valid_cve_format(self):
"""Test that a valid CVE format passes without raising an exception"""

analyzer = NVDDetails(self.config)
analyzer.observable_name = "cve-2024-51181" # Valid format

try:
analyzer.run()
except AnalyzerRunException:
self.fail("AnalyzerRunException raised with valid CVE format")

def test_invalid_cve_format(self):
"""Test that an invalid CVE format raises an AnalyzerRunException"""
analyzer = NVDDetails(self.config)
analyzer.observable_name = "2024-51181" # Invalid format

with self.assertRaises(AnalyzerRunException):
analyzer.run()

0 comments on commit 85ee54a

Please # to comment.