Skip to content
This repository has been archived by the owner on Nov 2, 2024. It is now read-only.

Commit

Permalink
Spamhaus_WQS Analyzer, closes intelowlproject#1526 (intelowlproject#2378
Browse files Browse the repository at this point in the history
)

* init

* init

* migration

* docs

* python

* better code

* code handling and migrations

* better code

* docs link

* docs link

---------

Co-authored-by: g4ze <bhaiyajionline@gmail.com>
  • Loading branch information
2 people authored and Michalsus committed Oct 11, 2024
1 parent 17ea925 commit 4ab8c5b
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
)

plugin = {
"python_module": {
"health_check_schedule": {
"minute": "0",
"hour": "0",
"day_of_week": "*",
"day_of_month": "*",
"month_of_year": "*",
},
"update_schedule": None,
"module": "dns.dns_malicious_detectors.spamhaus_wqs.SpamhausWQS",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "Spamhaus_WQS",
"description": "[Spamhaus_WQS](https://docs.spamhaus.com/datasets/docs/source/70-access-methods/web-query-service/000-intro.html) : The Spamhaus Web Query Service (WQS) is a method of accessing Spamhaus block lists using the HTTPS protocol.",
"disabled": False,
"soft_time_limit": 10,
"routing_key": "default",
"health_check_status": True,
"type": "observable",
"docker_based": False,
"maximum_tlp": "AMBER",
"observable_supported": ["ip", "domain"],
"supported_filetypes": [],
"run_hash": False,
"run_hash_type": "",
"not_supported_filetypes": [],
"model": "analyzers_manager.AnalyzerConfig",
}

params = [
{
"python_module": {
"module": "dns.dns_malicious_detectors.spamhaus_wqs.SpamhausWQS",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "api_key",
"type": "str",
"description": "api key for Spamhaus_WQS",
"is_secret": True,
"required": True,
}
]

values = []


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value


def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True


def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)


def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()


class Migration(migrations.Migration):
atomic = False
dependencies = [
("api_app", "0062_alter_parameter_python_module"),
("analyzers_manager", "0098_analyzer_config_crt_sh"),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import logging

import requests

from api_app.analyzers_manager import classes
from api_app.analyzers_manager.exceptions import AnalyzerRunException
from tests.mock_utils import MockUpResponse, if_mock_connections, patch

from ..dns_responses import malicious_detector_response

logger = logging.getLogger(__name__)


class SpamhausWQS(classes.ObservableAnalyzer):
url: str = "https://apibl.spamhaus.net/lookup/v1"
_api_key: str = None

def update(self):
pass

def run(self):
headers = {"Authorization": f"Bearer {self._api_key}"}
response = requests.get(
url=f"""{self.url}/
{
"DBL"
if self.observable_classification == self.ObservableTypes.DOMAIN.value
else "AUTHBL"
}
/{self.observable_name}""",
headers=headers,
)
# refer to the link for status code info
# https://docs.spamhaus.com/datasets/docs/source/70-access-methods/web-query-service/060-api-info.html#http-response-status-codes
if response.status_code == 200:
# 200 - Found - The record is listed
return malicious_detector_response(self.observable_name, True)
elif response.status_code == 404:
# 404 - Not found - The record is not listed
return malicious_detector_response(self.observable_name, False)
else:
raise AnalyzerRunException(f"result not expected: {response.status_code}")

@classmethod
def _monkeypatch(cls):
patches = [
if_mock_connections(
patch(
"requests.get",
return_value=MockUpResponse({"resp": [1020], "status": 200}, 200),
),
)
]
return super()._monkeypatch(patches=patches)
1 change: 1 addition & 0 deletions docs/source/Usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ The following is the list of the available analyzers you can run out-of-the-box.
* `MalprobSearch`:[Malprob](https://malprob.io/) is a leading malware detection and identification service, powered by cutting-edge AI technology.
* `OrklSearch`:[Orkl](https://orkl.eu/) is the Community Driven Cyber Threat Intelligence Library.
* `Crt_sh`:[Crt_Sh](https://crt.sh/) lets you get certificates info about a domain.
* `Spamhaus_WQS`:[Spamhaus_WQS](https://docs.spamhaus.com/datasets/docs/source/70-access-methods/web-query-service/000-intro.html) : The Spamhaus Web Query Service (WQS) is a method of accessing Spamhaus block lists using the HTTPS protocol.

##### Generic analyzers (email, phone number, etc.; anything really)

Expand Down

0 comments on commit 4ab8c5b

Please # to comment.