-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathexpr_analyzer.py
148 lines (119 loc) · 4.59 KB
/
expr_analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
from typing import Dict
# from base.project import FACT, QueryType as QType
from base.exper import Experiment, QueryExpResult
from query.analyzer import *
from utils.query_utils import Mutation, emit_mutant_query
from utils.system_utils import *
from utils.analysis_utils import *
from utils.cache_utils import *
class ExprAnalyzer:
def __init__(self, exp: Experiment, ana: QueryAnalyzer, enable_dummy=False):
self.exp = exp
self.ana: QueryAnalyzer = ana
self.__qrs: Dict[str, QueryExpResult] = self.exp.load_sum_table(enable_dummy)
self.__qr_keys = list(sorted(self.__qrs.keys()))
self.__cats: Categorizer = ana.categorize_queries(
self.__qrs.values())
def __getitem__(self, qid):
return self.__qrs[qid]
def __contains__(self, qid):
return qid in self.__qrs
@property
def qids(self):
return self.__qr_keys
def print_plain_status(self):
for qr in self.__qrs.values():
qr.print_status()
print("")
def get_query_stability(self, qid):
c = self.__cats.get_category(qid)
if c is not None:
return c
return Stability.MISSING_F
def get_overall(self):
return self.__cats
def print_status(self, verbosity=0):
print_banner("Overall Report")
print("")
print(f"project dir:\t{self.exp.proj.sub_root}")
print(f"exp config:\t{self.exp.exp_name}")
print(f"solver path:\t{self.exp.solver.path}")
print(f"analyzer:\t{self.ana.name}\n")
self.__cats.print_status(skip_empty=True)
print("")
if verbosity == 0:
print_banner("Report End")
return
for cat, cs in self.__cats.items():
if verbosity <= 1 and cat != Stability.UNSTABLE:
continue
if verbosity <= 3 and cat == Stability.UNSOLVABLE:
continue
if verbosity <= 4 and cat == Stability.STABLE:
continue
ccount = len(cs)
if ccount == 0:
continue
for i, qs in enumerate(cs):
print_banner(f"{cat.value} ({i+1}/{ccount})")
self[qs].enforce_timeout(self.ana._timeout)
self[qs].print_status(verbosity)
print("")
print_banner("Report End")
def get_mutant_details(self, qr):
rows = self.exp.get_mutants(qr.query_path)
passed, failed = [], []
for (m_path, rc, et) in rows:
rc = RCode(rc)
if self.ana.is_timeout(et):
rc = RCode.TIMEOUT
mutation, seed = Experiment.parse_mutant_path(m_path)
if mutation == Mutation.QUAKE:
# TODO: handle quake if needed
continue
if rc == RCode.UNSAT:
passed += [(mutation, seed, None)]
else:
failed += [(mutation, seed, rc)]
return passed, failed
def get_unstable_query_mutants(self):
res = []
for qid in self.qids:
qr = self[qid]
if self.get_query_stability(qid) != Stability.UNSTABLE:
continue
s, f = self.get_mutant_details(qr)
if len(s) == 0 or len(f) == 0:
log_warn(f"only quake was effective, skipping {qid}")
continue
res.append((qr, s, f))
return res
def get_unstable_reasons(self):
cats = Categorizer([c for c in UnstableReason])
for qid in self.qids:
qr = self[qid]
if self.get_query_stability(qid) != Stability.UNSTABLE:
continue
qr.enforce_timeout(self.ana._timeout)
reason = self.ana.sub_categorize_unstable(qr.blob)
cats.add_item(reason, qid)
cats.finalize()
return cats
# def get_assert_counts(self, update=False):
# from tqdm import tqdm
# cache_path = f"asserts/{self.proj.full_name}"
# if has_cache(cache_path) and not update:
# counts = load_cache(cache_path)
# else:
# log_info(f"assert counts for {self.proj.full_name}")
# counts = dict()
# for query_path in tqdm(self.proj.list_queries()):
# base_name = os.path.basename(query_path)
# counts[base_name] = count_asserts(query_path)
# save_cache(cache_path, counts)
# return counts
# def get_veri_times(self):
# data = []
# for qr in self.__qrs.values():
# data.append(qr.get_original_status()[1])
# return np.array(data) / 1000