Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Tweaks phishing analyzer framework #2596

Merged
merged 11 commits into from
Dec 13, 2024
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from datetime import date, timedelta
from typing import Dict
from urllib.parse import urlparse

import requests
from faker import Faker
Expand Down Expand Up @@ -135,11 +136,31 @@ def identify_text_input(self, input_name: str) -> str:
if input_name in names:
return fake_value

def compile_form_field(self, form) -> (dict, str):
result: {} = {}
# setting default to page itself if action is not specified
def extract_action_attribute(self, form) -> str:
if not (form_action := form.get("action", None)):
logger.info(
f"'action' attribute not found in form. Defaulting to {self.target_site=}"
)
form_action = self.target_site

# if relative url extracted, clean it from '/' and concatenate everything
# if action was not extracted in previous step the if should not pass as it is a url
if not urlparse(form_action).netloc:
logger.info(f"Found relative url in {form_action=}")
base_site = self.target_site
if base_site.endswith("/"):
base_site = base_site[:-1]
if form_action.startswith("/"):
form_action = form_action.replace("/", "", 1)

form_action = base_site + "/" + form_action

logger.info(f"Extracted action to post data to: {form_action}")
return form_action

def compile_form_field(self, form) -> dict:
result: {} = {}

for element in form.findall(".//input"):
input_type: str = element.get("type", None)
input_name: str = element.get("name", None)
Expand Down Expand Up @@ -169,12 +190,13 @@ def compile_form_field(self, form) -> (dict, str):
f"Job #{self.job_id}: Sending value {value_to_set} for {input_name=}"
)
result.setdefault(input_name, value_to_set)
return result, form_action
return result

def perform_request_to_form(self, form) -> Response:
params, dest_url = self.compile_form_field(form)
params = self.compile_form_field(form)
dest_url = self.extract_action_attribute(form)
logger.info(f"Job #{self.job_id}: Sending {params=} to submit url {dest_url}")
return requests.post(
response = requests.post(
url=dest_url,
data=params,
proxies=(
Expand All @@ -183,14 +205,24 @@ def perform_request_to_form(self, form) -> Response:
else None
),
)
logger.info(f"Request headers: {response.request.headers}")
return response

@staticmethod
def handle_3xx_response(response: Response) -> [str]:
result: [] = []
# extract all redirection history
return [history.request.url for history in response.history]
for history in response.history:
logger.info(
f"Extracting 3xx {response.status_code} HTTP response with url {history.request.url}"
)
result.append(history.request.url)

@staticmethod
def handle_2xx_response(response: Response) -> str:
logger.info(
f"Extracting 2xx {response.status_code} response with url {response.request.url}"
)
return response.request.url

def is_js_used_in_page(self) -> bool:
Expand All @@ -202,6 +234,7 @@ def is_js_used_in_page(self) -> bool:
def analyze_responses(self, responses: [Response]) -> {}:
result: [] = []
for response in responses:
logger.info(f"Response headers for {response.url}: {response.headers}")
try:
# handle 4xx and 5xx
response.raise_for_status()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Generated by Django 4.2.16 on 2024-12-12 11:45

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("analyzers_manager", "0139_alter_analyzerconfig_mapping_data_model"),
]

operations = [
migrations.AddIndex(
model_name="analyzerreport",
index=models.Index(
fields=["data_model_content_type", "data_model_object_id"],
name="analyzers_m_data_mo_a1952b_idx",
),
),
]
4 changes: 3 additions & 1 deletion api_app/analyzers_manager/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ class AnalyzerReport(AbstractReport):

class Meta:
unique_together = [("config", "job")]
indexes = AbstractReport.Meta.indexes
indexes = AbstractReport.Meta.indexes + [
models.Index(fields=["data_model_content_type", "data_model_object_id"])
]

def clean(self):
if self.data_model_content_type:
Expand Down
19 changes: 15 additions & 4 deletions integrations/phishing_analyzers/analyzers/driver_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from typing import Iterator

from selenium.common import WebDriverException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.wait import WebDriverWait
from seleniumwire.request import Request
from seleniumwire.webdriver import ChromeOptions, Remote

Expand Down Expand Up @@ -40,7 +43,8 @@ def handle_exception(self, *args, **kwargs):
f"Error while performing {func.__name__}"
f"{' for url=' + url if func.__name__ == 'navigate' else ''}: {e}"
)
self.restart(motivation=func.__name__)
# default is 5
self.restart(motivation=func.__name__, timeout_wait_page=5)
func(self, *args, **kwargs)

return handle_exception
Expand Down Expand Up @@ -90,25 +94,32 @@ def _init_driver(self, window_width: int, window_height: int) -> Remote:
)
return driver

def restart(self, motivation: str = ""):
def restart(self, motivation: str = "", timeout_wait_page: int = 0):
logger.info(f"Restarting driver: {motivation=}")
self._driver.quit()
self._driver = self._init_driver(
window_width=self.window_width, window_height=self.window_height
)
if self.last_url:
logger.info(f"Navigating to {self.last_url} after driver has restarted")
self.navigate(self.last_url)
self.navigate(self.last_url, timeout_wait_page=timeout_wait_page)

@driver_exception_handler
def navigate(self, url: str = ""):
def navigate(self, url: str = "", timeout_wait_page: int = 0):
if not url:
logger.error("Empty URL! Something's wrong!")
return

self.last_url = url
logger.info(f"Navigating to {url=}")
self._driver.get(url)
# dinamically wait for page to load its content with a fallback
# of `timeout_wait_page` seconds.
# waiting to see if any visible input tag appears
if timeout_wait_page:
WebDriverWait(self._driver, timeout=timeout_wait_page).until(
EC.visibility_of_any_elements_located((By.TAG_NAME, "input"))
)

@driver_exception_handler
def get_page_source(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def analyze_target(
window_width=window_width,
window_height=window_height,
)
driver_wrapper.navigate(url=target_url)
driver_wrapper.navigate(url=target_url, timeout_wait_page=5)

result: str = json.dumps(extract_driver_result(driver_wrapper), default=str)
logger.debug(f"JSON dump of driver {result=}")
Expand Down
Loading