Skip to content
This repository has been archived by the owner on Nov 2, 2024. It is now read-only.

Commit

Permalink
Pdf uri extractor and pivoting (intelowlproject#2391)
Browse files Browse the repository at this point in the history
* uri extraction

* added download file analyzer and pivot configs

* fixed code review doctor

* made code review changes

added job creation check to avoid graph related issues

* added abstract update method

* fixed migration order

* fixed validated_data dict access

* fixed migrations order

* fixed migrations order
  • Loading branch information
federicofantini authored and Michalsus committed Oct 11, 2024
1 parent e22fab2 commit e144e9b
Show file tree
Hide file tree
Showing 9 changed files with 814 additions and 1 deletion.
17 changes: 17 additions & 0 deletions api_app/analyzers_manager/file_analyzers/pdf_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,30 @@ class PDFInfo(FileAnalyzer):
def flatten(list_of_lists: List[List[Any]]) -> List[Any]:
return [item for sublist in list_of_lists for item in sublist]

@classmethod
def update(cls) -> bool:
pass

def run(self):
self.results = {"peepdf": {}, "pdfid": {}}
# the analysis fails only when BOTH fails
peepdf_success = self.__peepdf_analysis()
pdfid_success = self.__pdfid_analysis()
if not peepdf_success and not pdfid_success:
raise AnalyzerRunException("both peepdf and pdfid failed")

# pivot uris in the pdf only if we have one page
if "reports" in self.results["pdfid"] and isinstance(
self.results["pdfid"]["reports"], list
):
for elem in self.results["pdfid"]["reports"]:
if "/Page" in elem and elem["/Page"] == 1:
self.results["uris"] = []
for s in self.results["peepdf"]["stats"]:
self.results["uris"].extend(s["uris"])

logger.info(f"extracted urls from file {self.md5}: {self.results['uris']}")

return self.results

def __peepdf_analysis(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
from django.db import migrations
from django.db.models.fields.related_descriptors import (
ForwardManyToOneDescriptor,
ForwardOneToOneDescriptor,
ManyToManyDescriptor,
)

plugin = {
"python_module": {
"health_check_schedule": None,
"update_schedule": None,
"module": "download_file_from_uri.DownloadFileFromUri",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "DownloadFileFromUri",
"description": "performs an http request to an uri and download the file through the http proxy",
"disabled": False,
"soft_time_limit": 60,
"routing_key": "default",
"health_check_status": True,
"type": "observable",
"docker_based": False,
"maximum_tlp": "RED",
"observable_supported": ["url"],
"supported_filetypes": [],
"run_hash": False,
"run_hash_type": "",
"not_supported_filetypes": [],
"model": "analyzers_manager.AnalyzerConfig",
}

params = [
{
"python_module": {
"module": "download_file_from_uri.DownloadFileFromUri",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "http_proxy",
"type": "str",
"description": "http proxy url",
"is_secret": True,
"required": True,
},
{
"python_module": {
"module": "download_file_from_uri.DownloadFileFromUri",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "header_user_agent",
"type": "str",
"description": "http header user-agent field",
"is_secret": False,
"required": True,
},
{
"python_module": {
"module": "download_file_from_uri.DownloadFileFromUri",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "header_cookies",
"type": "str",
"description": "http header cookies field (e.g. $Version=1; Skin=new;)",
"is_secret": False,
"required": True,
},
{
"python_module": {
"module": "download_file_from_uri.DownloadFileFromUri",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "header_content_type",
"type": "str",
"description": "http header content-type field",
"is_secret": False,
"required": True,
},
{
"python_module": {
"module": "download_file_from_uri.DownloadFileFromUri",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "header_accept",
"type": "str",
"description": "http header accept field",
"is_secret": False,
"required": True,
},
{
"python_module": {
"module": "download_file_from_uri.DownloadFileFromUri",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "timeout",
"type": "int",
"description": "http requests timeout",
"is_secret": False,
"required": True,
},
]

values = [
{
"parameter": {
"python_module": {
"module": "download_file_from_uri.DownloadFileFromUri",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "header_user_agent",
"type": "str",
"description": "http header user-agent field",
"is_secret": False,
"required": True,
},
"analyzer_config": "DownloadFileFromUri",
"connector_config": None,
"visualizer_config": None,
"ingestor_config": None,
"pivot_config": None,
"for_organization": False,
"value": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36 Edg/125.0.2535.92",
"updated_at": "2024-06-19T10:23:03.145744Z",
"owner": None,
},
{
"parameter": {
"python_module": {
"module": "download_file_from_uri.DownloadFileFromUri",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "header_cookies",
"type": "str",
"description": "http header cookies field (e.g. $Version=1; Skin=new;)",
"is_secret": False,
"required": True,
},
"analyzer_config": "DownloadFileFromUri",
"connector_config": None,
"visualizer_config": None,
"ingestor_config": None,
"pivot_config": None,
"for_organization": False,
"value": "",
"updated_at": "2024-06-19T10:23:03.145744Z",
"owner": None,
},
{
"parameter": {
"python_module": {
"module": "download_file_from_uri.DownloadFileFromUri",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "header_content_type",
"type": "str",
"description": "http header content-type field",
"is_secret": False,
"required": True,
},
"analyzer_config": "DownloadFileFromUri",
"connector_config": None,
"visualizer_config": None,
"ingestor_config": None,
"pivot_config": None,
"for_organization": False,
"value": "application/octet-stream",
"updated_at": "2024-06-19T10:23:03.145744Z",
"owner": None,
},
{
"parameter": {
"python_module": {
"module": "download_file_from_uri.DownloadFileFromUri",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "header_accept",
"type": "str",
"description": "http header accept field",
"is_secret": False,
"required": True,
},
"analyzer_config": "DownloadFileFromUri",
"connector_config": None,
"visualizer_config": None,
"ingestor_config": None,
"pivot_config": None,
"for_organization": False,
"value": "application/octet-stream",
"updated_at": "2024-06-19T10:23:03.145744Z",
"owner": None,
},
{
"parameter": {
"python_module": {
"module": "download_file_from_uri.DownloadFileFromUri",
"base_path": "api_app.analyzers_manager.observable_analyzers",
},
"name": "timeout",
"type": "int",
"description": "http requests timeout",
"is_secret": False,
"required": True,
},
"analyzer_config": "DownloadFileFromUri",
"connector_config": None,
"visualizer_config": None,
"ingestor_config": None,
"pivot_config": None,
"for_organization": False,
"value": 50,
"updated_at": "2024-06-19T10:23:03.145744Z",
"owner": None,
},
]


def _get_real_obj(Model, field, value):
def _get_obj(Model, other_model, value):
if isinstance(value, dict):
real_vals = {}
for key, real_val in value.items():
real_vals[key] = _get_real_obj(other_model, key, real_val)
value = other_model.objects.get_or_create(**real_vals)[0]
# it is just the primary key serialized
else:
if isinstance(value, int):
if Model.__name__ == "PluginConfig":
value = other_model.objects.get(name=plugin["name"])
else:
value = other_model.objects.get(pk=value)
else:
value = other_model.objects.get(name=value)
return value

if (
type(getattr(Model, field))
in [ForwardManyToOneDescriptor, ForwardOneToOneDescriptor]
and value
):
other_model = getattr(Model, field).get_queryset().model
value = _get_obj(Model, other_model, value)
elif type(getattr(Model, field)) in [ManyToManyDescriptor] and value:
other_model = getattr(Model, field).rel.model
value = [_get_obj(Model, other_model, val) for val in value]
return value


def _create_object(Model, data):
mtm, no_mtm = {}, {}
for field, value in data.items():
value = _get_real_obj(Model, field, value)
if type(getattr(Model, field)) is ManyToManyDescriptor:
mtm[field] = value
else:
no_mtm[field] = value
try:
o = Model.objects.get(**no_mtm)
except Model.DoesNotExist:
o = Model(**no_mtm)
o.full_clean()
o.save()
for field, value in mtm.items():
attribute = getattr(o, field)
if value is not None:
attribute.set(value)
return False
return True


def migrate(apps, schema_editor):
Parameter = apps.get_model("api_app", "Parameter")
PluginConfig = apps.get_model("api_app", "PluginConfig")
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
if not Model.objects.filter(name=plugin["name"]).exists():
exists = _create_object(Model, plugin)
if not exists:
for param in params:
_create_object(Parameter, param)
for value in values:
_create_object(PluginConfig, value)


def reverse_migrate(apps, schema_editor):
python_path = plugin.pop("model")
Model = apps.get_model(*python_path.split("."))
Model.objects.get(name=plugin["name"]).delete()


class Migration(migrations.Migration):
atomic = False
dependencies = [
("api_app", "0062_alter_parameter_python_module"),
("analyzers_manager", "0099_analyzer_config_spamhaus_wqs"),
]

operations = [migrations.RunPython(migrate, reverse_migrate)]
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# This file is a part of IntelOwl https://github.com/intelowlproject/IntelOwl
# See the file 'LICENSE' for copying permission.
import base64
import logging

import requests

from api_app.analyzers_manager.classes import ObservableAnalyzer
from api_app.analyzers_manager.exceptions import AnalyzerRunException

logger = logging.getLogger(__name__)


class DownloadFileFromUri(ObservableAnalyzer):
_http_proxy: str
header_user_agent: str
header_cookies: str
header_content_type: str
header_accept: str
timeout: int

@classmethod
def update(cls) -> bool:
pass

def run(self):
result = {"stored_base64": ""}

proxies = {"http": self._http_proxy} if self._http_proxy else {}
headers = {
"User-Agent": self.header_user_agent,
"Cookie": self.header_cookies,
"Content-type": self.header_content_type,
"Accept": self.header_accept,
}

try:
r = requests.get(
self.observable_name,
headers=headers,
proxies=proxies,
timeout=self.timeout,
)
except Exception as e:
raise AnalyzerRunException(f"requests exception: {e}")
else:
if r.content:
if "text/html" not in r.headers["Content-Type"]:
result["stored_base64"] = base64.b64encode(r.content).decode(
"ascii"
)
else:
logger.info(
f"discarded text/html response for {self.observable_name}"
)
else:
logger.info(f"no response content for {self.observable_name}")

return result
Loading

0 comments on commit e144e9b

Please # to comment.