-
Notifications
You must be signed in to change notification settings - Fork 6
/
afl_mutation_graph.py
executable file
·393 lines (298 loc) · 12.7 KB
/
afl_mutation_graph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
#!/usr/bin/env python3
"""
Reconstructs an approximate AFL mutation graph based on the file names of seeds
in a queue.
Author: Adrian Herrera
"""
from argparse import ArgumentParser, Namespace
from collections import defaultdict
from pathlib import Path
from typing import Dict, List, Tuple
import multiprocessing.pool as mpp
import logging
import re
import sys
from networkx.readwrite.graphml import write_graphml
import networkx as nx
try:
import pygraphviz
from networkx.drawing.nx_agraph import write_dot
except ImportError:
try:
import pydot
from networkx.drawing.nx_pydot import write_dot
except ImportError:
print('Neither pygraphviz or pydot were found')
raise
# Regexs for extracting mutation information from seeds in the AFL queue
QUEUE_ORIG_SEED_RE = re.compile(r'id[:_](?P<id>\d+),(?:time[:_](?P<time>\d+),)?orig[:_](?P<orig_seed>\w+)')
QUEUE_MUTATE_SEED_RE = re.compile(r'id[:_](?P<id>\d+),(?:sig[:_](?P<sig>\d+),)?src[:_](?P<src>\d+),(?:time[:_](?P<time>\d+),)?op[:_](?P<op>(?!.*splice)\w+)(?:,pos[:_](?P<pos>\d+))?(?:,val[:_](?P<val_type>[\w:_]+)?(?P<val>[+-]\d+))?(?:,rep[:_](?P<rep>\d+))?')
QUEUE_MUTATE_SEED_SPLICE_RE = re.compile(r'id[:_](?P<id>\d+),(?:sig[:_](?P<sig>\d+),)?src[:_](?P<src_1>\d+)\+(?P<src_2>\d+),(?:time[:_](?P<time>\d+),)?op[:_](?P<op>.*splice),rep[:_](?P<rep>\d+)')
QUEUE_MUTATE_SEED_SYNC_RE = re.compile(r'id[:_](?P<id>\d+),sync[:_](?P<syncing_party>[\w-]+),src[:_](?P<src>\d+)')
# Maps short stag names to full stage names
OP_MAPPING = {
'flip1': 'bitflip 1/1',
'flip2': 'bitflip 2/1',
'flip4': 'bitflip 4/1',
'flip8': 'bitflip 8/8',
'flip16': 'bitflip 16/8',
'flip32': 'bitflip 32/8',
'arith8': 'arith 8/8',
'arith16': 'arith 16/8',
'arith32': 'arith 32/8',
'int8': 'interest 8/8',
'int16': 'interest 16/8',
'int32': 'interest 32/8',
'ext_UO': 'user extras (over)',
'ext_UI': 'user extras (insert)',
'ext_AO': 'auto extras (over)',
'havoc': 'havoc',
'splice': 'splice',
# AFL++ operators
'colorization': 'colorization',
'MOpt_core_havoc': 'MOpt core havoc',
'MOpt_core_splice': 'MOpt core splice',
'MOpt_havoc': 'MOpt havoc',
'MOpt_splice': 'MOpt splice',
}
# Regex elements to convert to ints
CONVERT_TO_INTS = ('id', 'sig', 'src', 'src_1', 'src_2', 'pos', 'rep', 'val')
# Logging
FORMATTER = logging.Formatter('[%(asctime)s] %(levelname)s: %(message)s')
logger = logging.getLogger()
def parse_args() -> Namespace:
"""Parse command-line arguments."""
def log_level(val: str) -> int:
"""Ensure that an argument value is a valid log level."""
numeric_level = getattr(logging, val.upper(), None)
if not isinstance(numeric_level, int):
raise ArgumentTypeError('%r is not a valid log level' % val)
return numeric_level
parser = ArgumentParser(description='Recover (approximate) mutation graph'
'from a set of AFL seeds')
parser.add_argument('-d', '--dot', required=False, metavar='DOT',
type=Path, help='Output path for DOT file')
parser.add_argument('-g', '--graphml', required=False, metavar='GRAPHML',
type=Path, help='Output path for GraphML file')
parser.add_argument('-j', '--jobs', type=int, default=1,
help='Number of parallel jobs')
parser.add_argument('-l', '--log', default=logging.WARN, type=log_level,
help='Logging level')
parser.add_argument('-s', '--stats', required=False, action='store_true',
help='Print statistics about the mutation graph')
parser.add_argument('seed_path', nargs='+', metavar='SEED', type=Path,
help='Path to the seed(s) to recover mutation graph')
return parser.parse_args()
def fix_regex_dict(mutation: dict) -> dict:
"""
Fix the groupdict returned by the regex match.
Convert strings to int, etc.
"""
# Remove None values
mutation = {k:v for k, v in mutation.items() if v is not None}
# Convert ints
for key in CONVERT_TO_INTS:
if key in mutation:
mutation[key] = int(mutation[key])
# Expand op names to full stage names
if 'op' in mutation:
mutation['op'] = OP_MAPPING[mutation['op']]
return mutation
def find_seed(seed_dir: Path, seed_id: int):
"""Find a seed file with the given ID."""
seed_files = list(seed_dir.glob('id[:_]%06d,*' % seed_id))
if not seed_files:
raise Exception('Could not find seed %s in %s' % (seed_id, seed_dir))
# Each seed should have a unique ID, so there should only be one result
return seed_files[0]
def is_crash(mutation: dict) -> bool:
"""Returns `True` if the given mutation dict is for a crashing input."""
return 'crashes' in mutation['path'].parent.name
def is_seed(mutation: dict) -> bool:
"""
Returns `True` if the given mutation dict is for a seed from the fuzzing
corpus.
"""
return 'orig_seed' in mutation
def get_parent_seeds(mutation: dict) -> List[Path]:
"""Get a list of parent seeds from the given mutation dictionary."""
seed_dir = mutation['path'].parent
# If the seed is a crash, move across to the queue directory
if is_crash(mutation):
seed_dir = seed_dir.parent / 'queue'
if 'orig_seed' in mutation:
return []
if 'syncing_party' in mutation:
seed_dir = seed_dir.parents[1] / mutation['syncing_party'] / 'queue'
return [find_seed(seed_dir, mutation['src'])]
if 'src_1' in mutation:
return [find_seed(seed_dir, mutation['src_1']),
find_seed(seed_dir, mutation['src_2'])]
if 'src' in mutation:
return [find_seed(seed_dir, mutation['src'])]
raise Exception('Invalid mutation dictionary %r' % mutation)
def get_mutation_dict(seed: Path) -> dict:
"""Parse out a mutation dict from the given seed."""
# If the seed is a crash, move across to the queue directory
seed_dir = seed.parent
if seed_dir.name == 'crashes':
seed_dir = seed_dir.parent / 'queue'
seed_name = seed.name
match = QUEUE_ORIG_SEED_RE.match(seed_name)
if match:
# We've reached the end of the chain
mutation = fix_regex_dict(match.groupdict())
mutation['path'] = seed
return mutation
match = QUEUE_MUTATE_SEED_RE.match(seed_name)
if match:
# Recurse on the parent 'src' seed
mutation = fix_regex_dict(match.groupdict())
mutation['path'] = seed
return mutation
match = QUEUE_MUTATE_SEED_SPLICE_RE.match(seed_name)
if match:
# Spliced seeds have two parents. Recurse on both
mutation = fix_regex_dict(match.groupdict())
mutation['path'] = seed
return mutation
match = QUEUE_MUTATE_SEED_SYNC_RE.match(seed_name)
if match:
# Seed synced from another fuzzer node
mutation = fix_regex_dict(match.groupdict())
mutation['path'] = seed
return mutation
raise Exception('Failed to find parent seed for `%s`' % seed_name)
def gen_mutation_graph(seed: Path) -> nx.DiGraph:
"""
Generate a mutation graph (this _should_ be a DAG) basd on the given seed.
"""
def get_seed_stack(seed: Path, mutation: dict) -> List[Tuple[Path, Path]]:
return [(seed, parent_seed) for parent_seed in
get_parent_seeds(mutation)]
logger.info('Generating mutation graph for %s', seed)
if not seed.exists():
logger.warn('%s does not exist. Skipping...', seed)
return nx.DiGraph()
mutation = get_mutation_dict(seed)
seed_stack = get_seed_stack(seed, mutation)
mutate_graph = nx.DiGraph()
mutate_graph.add_node(mutation['path'], **mutation)
# The seed stack is a list of (seed, parent seed) tuples. Once we hit an
# "orig" seed, parent seed becomes None and we stop
while seed_stack:
prev_seed, seed = seed_stack.pop()
mutation = get_mutation_dict(seed)
node = mutation['path']
prev_node = prev_seed
# If we've already seen this seed before, don't look at it again.
# Otherwise we'll end up in an infinite loop
if mutate_graph.has_edge(node, prev_node):
continue
mutate_graph.add_node(node, **mutation)
mutate_graph.add_edge(node, prev_node)
seed_stack.extend(get_seed_stack(seed, mutation))
return mutate_graph
def create_node_label(mutation: dict) -> str:
"""Create a meaningful label for a node in the mutation graph."""
return mutation['path'].name
def create_edge_label(mutation: dict) -> str:
"""Create a meaningful label for an edge in the mutation graph."""
label_elems = []
if 'op' in mutation:
label_elems.append('op: %s' % mutation['op'])
if 'pos' in mutation:
label_elems.append('pos: %d' % mutation['pos'])
if 'val' in mutation:
label_elems.append('val: %s%d' % (mutation.get('val_type', ''),
mutation['val']))
if 'rep' in mutation:
label_elems.append('rep: %d' % mutation['rep'])
if 'syncing_party' in mutation:
label_elems.append('sync: %s' % mutation['syncing_party'])
return ', '.join(label_elems)
def node_shape(mutation: dict) -> str:
"""Decide the Graphviz node shape."""
if is_crash(mutation):
return 'hexagon'
if is_seed(mutation):
return 'rect'
return 'oval'
def to_dot_graph(graph: nx.DiGraph) -> nx.DiGraph:
"""Generate a graph that can be serialized to Graphviz's DOT format."""
dot_graph = nx.DiGraph()
node_mapping = {}
for count, (node, mutation) in enumerate(graph.nodes(data=True)):
dot_graph.add_node(count, shape=node_shape(mutation),
label='"%s"' % create_node_label(mutation))
node_mapping[node] = count
for u, v in graph.edges():
mutation = graph.nodes[v]
dot_graph.add_edge(node_mapping[u], node_mapping[v],
label='"%s"' % create_edge_label(mutation))
return dot_graph
def to_graphml(graph: nx.DiGraph) -> nx.DiGraph:
"""Generate a graph that can be serialized to GraphML format."""
gml_graph = graph.copy()
for node, mutation in gml_graph.nodes(data=True):
# The path is embedded in the node name
del gml_graph.nodes[node]['path']
return gml_graph
def get_path_stats(graph: nx.DiGraph, sources: List[str],
sinks: List[str]) -> Tuple[int, int]:
"""
Get the longest and shortest paths through the graph from a set of source
nodes to a set of sink nodes.
Note that this is not very accurate due to splices!!
"""
paths = [path for sink in sinks
for source in sources
for path in nx.all_simple_paths(graph, source, sink)]
len_calc = lambda f: len(f(paths, key=lambda p: len(p))) + 1
return len_calc(min), len_calc(max)
def get_mutation_stats(graph: nx.DiGraph) -> Dict[str, int]:
"""Count the number of mutation operators used."""
ops = defaultdict(int)
for _, mutation in graph.nodes.data('mutation', default={}):
if 'op' in mutation:
ops[mutation['op']] += 1
return ops
def print_stats(graph: nx.DiGraph) -> None:
"""Print statistics about the mutation graph."""
print('mutations:')
for op, count in get_mutation_stats(graph).items():
print(' %s: %d' % (op, count))
sources = [n for n, in_degree in graph.in_degree() if in_degree == 0]
sinks = [n for n, out_degree in graph.out_degree() if out_degree == 0]
min_len, max_len = get_path_stats(graph, sources, sinks)
num_connected_components = nx.number_weakly_connected_components(graph)
print('num. source nodes: %d' % len(sources))
print('num. sink nodes: %d' % len(sinks))
print('num. connected components: %d' % num_connected_components)
print('shortest mutation chain: %d' % min_len)
print('longest mutation chain: %d' % max_len)
def main():
"""The main function."""
args = parse_args()
# Configure logger
handler = logging.StreamHandler()
handler.setFormatter(FORMATTER)
logger.addHandler(handler)
logger.setLevel(args.log)
mutation_graph = nx.DiGraph()
with mpp.Pool(processes=args.jobs) as pool:
seed_graphs = pool.map(gen_mutation_graph, args.seed_path)
for seed_graph in seed_graphs:
mutation_graph.update(seed_graph)
if len(mutation_graph) == 0:
logger.error('empty mutation graph')
sys.exit(1)
if args.dot:
write_dot(to_dot_graph(mutation_graph), args.dot)
if args.graphml:
write_graphml(to_graphml(mutation_graph), args.graphml)
if args.stats:
print_stats(mutation_graph)
sys.exit(0)
if __name__ == '__main__':
main()