Skip to content

Commit

Permalink
apply new, more generic analysis format (#10)
Browse files Browse the repository at this point in the history
  • Loading branch information
Konstanty Cieśliński authored May 13, 2021
1 parent 637bd98 commit 96cdbf4
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 76 deletions.
34 changes: 29 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,38 @@ Extracts static configuration from samples and memory dumps using the malduck en
},
{
"type": "analysis",
"kind": "drakrun"
},
{
"type": "analysis",
"kind": "joesandbox"
}
```

While `sample` type is self explanatory, the `analysis` type might be confusing. The `analysis` task is an output from
one of sandboxes: `drakvuf-sandbox`, `cuckoo`, or `joesandbox`. Analysis is a `sample` with additional memory dumps
attached.

The `analysis` type task is expected to be in format:
```
task = Task(
headers={"type": "analysis"}
payload={
"sample": <sample>,
"dumps.zip": Resource.from_directory("dumps.zip", dumps_path.as_posix()),
"dumps_metadata": [
{"filename": <dump1_filename>, "base_address": <dump1_base_address>},
{"filename": <dump2_filename>, "base_address": <dump2_base_address>},
{"filename": <dump3_filename>, "base_address": <dump3_base_address>},
[...]
],
}
)
```
where `dumps_metadata` contains information about filename and base address for every memory dump in `dumps.zip`. The
following attributes are:
- `filename` which is relative path to the dumps.zip contents;
- `base_address` which hex-encoded base address for dump (leading `0x` is supported)
You can specify multiple entries for the same file if the same memory dump was found on different base addresses.

The extractor tries to retrieve config from each memory dump and will pick only the best candidate from each malware
family.

**Produces:**
```
# Dropped dumps related with static configuration
Expand Down
2 changes: 1 addition & 1 deletion karton/config_extractor/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.1.1"
__version__ = "2.0.0"
90 changes: 20 additions & 70 deletions karton/config_extractor/config_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
import hashlib
import json
import os
import re
import tempfile
import zipfile
from collections import defaultdict, namedtuple
from typing import DefaultDict, Dict, List, Optional

Expand Down Expand Up @@ -59,8 +56,7 @@ class ConfigExtractor(Karton):
"kind": "runnable",
"platform": "linux",
},
{"type": "analysis", "kind": "drakrun"},
{"type": "analysis", "kind": "joesandbox"},
{"type": "analysis"},
]

@classmethod
Expand Down Expand Up @@ -258,80 +254,34 @@ def analyze_dumps(self, sample, dump_infos):

self.log.info("done analysing, results: {}".format(json.dumps(results)))

def get_base_from_drakrun_dump(self, dump_name):
"""
Drakrun dumps come in form: <base>_<hash> e.g. 405000_688f58c58d798ecb,
that can be read as a dump from address 0x405000 with a content hash
equal to 688f58c58d798ecb.
"""
return int(dump_name.split("_")[0], 16)

def analyze_drakrun(self, sample, dumps):
with dumps.extract_temporary() as tmpdir: # type: ignore
dumps_path = os.path.join(tmpdir, "dumps")
dump_infos = []
for fname in os.listdir(dumps_path):
# Drakrun stores meta information in seperate file for each dump.
# Filter it as we want to analyse only dumps.
if not re.match(r"^[a-f0-9]{4,16}_[a-f0-9]{16}$", fname):
continue
dump_path = os.path.join(dumps_path, fname)
dump_base = self.get_base_from_drakrun_dump(fname)
dump_infos.append(DumpInfo(path=dump_path, base=dump_base))
self.analyze_dumps(sample, dump_infos)

def get_base_from_joesandbox_dump(self, dump_name):
"""
JoeSandbox dumps come in three formats:
1) raw dumps with .sdmp extension, e.g.
00000002.00000003.385533966.003C0000.00000004.00000001.sdmp
2) dumps that start with 0x4d5a bytes
2.1) unmodified with .raw.unpack extension, e.g.
0.0.tmpi0shwswy.exe.1290000.0.raw.unpack
2.2) modified by joesandbox engine with .unpack extension, e.g.
0.0.tmpi0shwswy.exe.1290000.0.unpack
"""
if "sdmp" in dump_name:
return int(dump_name.split(".")[3], 16)
elif "raw.unpack" in dump_name:
return int(dump_name.split(".")[4], 16)
elif "unpack" in dump_name:
return int(dump_name.split(".")[4], 16)

def analyze_joesandbox(self, sample, dumps):
with tempfile.TemporaryDirectory() as tmpdir:
dumpsf = os.path.join(tmpdir, "dumps.zip")
dumps.download_to_file(dumpsf)
zipf = zipfile.ZipFile(dumpsf)
dumps_path = tmpdir + "/dumps"
zipf.extractall(dumps_path, pwd=b"infected")
dump_infos = []
for fname in os.listdir(dumps_path):
dump_path = os.path.join(dumps_path, fname)
dump_base = self.get_base_from_joesandbox_dump(fname)
dump_infos.append(DumpInfo(path=dump_path, base=dump_base))
self.analyze_dumps(sample, dump_infos)

def process(self, task: Task) -> None: # type: ignore
sample = task.get_resource("sample")
headers = task.headers

if headers["type"] == "sample":
self.log.info("Analyzing original binary")
self.analyze_sample(sample)
elif headers["type"] == "analysis" and headers["kind"] == "drakrun":
# DRAKVUF Sandbox (codename: drakmon OSS)
elif headers["type"] == "analysis":
sample_hash = hashlib.sha256(sample.content or b"").hexdigest()
self.log.info(
"Processing drakmon OSS analysis, sample: {}".format(sample_hash)
)
self.log.info(f"Processing analysis, sample: {sample_hash}")
dumps = task.get_resource("dumps.zip")
self.analyze_drakrun(sample, dumps)
elif headers["type"] == "analysis" and headers["kind"] == "joesandbox":
sample_hash = hashlib.sha256(sample.content or b"").hexdigest()
self.log.info(f"Processing joesandbox analysis, sample: {sample_hash}")
dumps = task.get_resource("dumps.zip")
self.analyze_joesandbox(sample, dumps)
dumps_metadata = task.get_payload("dumps_metadata")
with dumps.extract_temporary() as tmpdir: # type: ignore
dump_infos = []
for dump_metadata in dumps_metadata:
dump_path = os.path.join(tmpdir, dump_metadata["filename"])
if not self._is_safe_path(tmpdir, dump_path):
self.log.warning(f"Path traversal attempt: {dump_path}")
continue
dump_base = int(dump_metadata["base_address"], 16)
dump_infos.append(DumpInfo(path=dump_path, base=dump_base))
self.analyze_dumps(sample, dump_infos)

self.log.debug("Printing gc stats")
self.log.debug(gc.get_stats())

def _is_safe_path(self, basedir, path):
"""
Check if path points to a file within basedir.
"""
return basedir == os.path.commonpath((basedir, os.path.realpath(path)))

0 comments on commit 96cdbf4

Please # to comment.