Skip to content

Commit

Permalink
Merge pull request #396 from blacklanternsecurity/txt-module
Browse files Browse the repository at this point in the history
Adding TXT Module
  • Loading branch information
liquidsec authored Jan 12, 2024
2 parents 90147e8 + 2a19ee5 commit bacc3b9
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 5 deletions.
2 changes: 1 addition & 1 deletion baddns/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def print_version():


def validate_target(
arg_value, pattern=re.compile(r"^(?:[a-z0-9](?:[a-z0-9-_]{0,61}[a-z0-9])?\.)+[a-z0-9][a-z0-9-]{0,61}[a-z0-9]$")
arg_value, pattern=re.compile(r"^(?:[a-z0-9_](?:[a-z0-9-_]{0,61}[a-z0-9])?\.)+[a-z0-9][a-z0-9-]{0,61}[a-z0-9]$")
):
if not pattern.match(arg_value):
raise argparse.ArgumentTypeError("Target subdomain is not correctly formatted")
Expand Down
5 changes: 1 addition & 4 deletions baddns/lib/dnsmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ def process_answer(self, answer, rdatatype):
elif rdtype == "TXT":
for s in record.strings:
s = s.decode()
for match in self.dns_name_regex.finditer(s):
start, end = match.span()
host = s[start:end]
results.add(host)
results.add(s)
elif rdtype == "NSEC":
results.add(self._clean_dns_record(record.next))
else:
Expand Down
110 changes: 110 additions & 0 deletions baddns/modules/txt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
from baddns.base import BadDNS_base
from baddns.lib.dnsmanager import DNSManager
from baddns.lib.httpmanager import HttpManager
from baddns.modules.cname import BadDNS_cname
from baddns.lib.findings import Finding

import logging

log = logging.getLogger(__name__)


class BadDNS_txt(BadDNS_base):
name = "TXT"
description = "Check TXT contents for hijackable domains"

def __init__(self, target, **kwargs):
super().__init__(target, **kwargs)
self.target = target
self.target_dnsmanager = DNSManager(target, dns_client=self.dns_client)
self.target_httpmanager = HttpManager(
self.target, http_client_class=self.http_client_class, skip_redirects=True
)
self.cname_findings = None
self.cname_findings_direct = None
self.reference_data = {}

async def dispatch(self):
self.cname_findings_direct = []
self.cname_findings = []

await self.target_dnsmanager.dispatchDNS(omit_types=["A", "AAAA", "CNAME", "NS", "SOA", "MX"])
if self.target_dnsmanager.answers["TXT"] == None:
log.debug("No TXT records found, aborting")
return False

for txt_record in self.target_dnsmanager.answers["TXT"]:
log.debug(f"Got TXT record [{txt_record}]")

for match in DNSManager.dns_name_regex.finditer(txt_record):
start, end = match.span()
host = txt_record[start:end]
log.info(f"Found host [{host}] in TXT record [{txt_record}] and analyzing with CNAME module")

cname_instance_direct = BadDNS_cname(
host,
custom_nameservers=self.custom_nameservers,
signatures_dir=self.signatures_dir,
direct_mode=True,
parent_class="txt",
http_client_class=self.http_client_class,
dns_client=self.dns_client,
)
if await cname_instance_direct.dispatch():
self.cname_findings_direct.append(
{
"finding": cname_instance_direct.analyze(),
"description": "Vulnerable Host in TXT Record",
"trigger": self.target_dnsmanager.target,
}
)

cname_instance = BadDNS_cname(
host,
custom_nameservers=self.custom_nameservers,
signatures_dir=self.signatures_dir,
direct_mode=False,
parent_class="txt",
http_client_class=self.http_client_class,
dns_client=self.dns_client,
)
if await cname_instance.dispatch():
self.cname_findings.append(
{
"finding": cname_instance.analyze(),
"description": "Vulnerable Host in TXT Record",
"trigger": self.target_dnsmanager.target,
}
)

return True

def _convert_findings(self, finding_sets):
converted_findings = []
for finding_set in finding_sets:
for finding in finding_set["finding"]:
finding_dict = finding.to_dict()
log.debug(f"Found finding during cname check for target: {finding_dict['target']}")
converted_findings.append(
Finding(
{
"target": self.target,
"description": f"{finding_set['description']}. Original Event: [{finding_dict['description']}]",
"confidence": finding_dict["confidence"],
"signature": finding_dict["signature"],
"indicator": finding_dict["indicator"],
"trigger": finding_set["trigger"],
"module": type(self),
}
)
)
return converted_findings

def analyze(self):
findings = []
log.debug("in txt analyze")
if self.cname_findings_direct:
findings.extend(self._convert_findings(self.cname_findings_direct))
if self.cname_findings:
findings.extend(self._convert_findings(self.cname_findings))
return findings

0 comments on commit bacc3b9

Please # to comment.