-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstats.py
102 lines (92 loc) · 4.17 KB
/
stats.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from scipy import stats
import numpy as np
import math
import annotation
import calendar
from datetime import datetime as dt, timedelta as delta
from store import r
lmh_map = {
'low': 0,
'medium': 1,
'high': 2
}
def values_to_int(l, mapping=lmh_map):
v_dict = {}
count = 0
for elm in l:
try:
yield int(elm)
except Exception:
if elm not in v_dict:
if mapping is not None and elm in mapping:
v_dict[elm] = mapping[elm]
else:
v_dict[elm] = count
count += 1
yield v_dict[elm]
def reject_outliers(data, m=2):
return data[abs(data - np.mean(data)) < m * np.std(data)]
def check_agreement(fid):
time_threshold = calendar.timegm((dt.utcnow() - delta(days=7)).timetuple())
anns = annotation.get_lamppost_annotations(fid, begin=time_threshold)
known_attrs = annotation.get_lamppost_attributes(fid)
for attr in known_attrs:
orig_value = r.hget('f:orig:{}'.format(fid), attr)
attr_annotations = map(lambda (auuid, ts): annotation.parse_annotation(auuid), anns)
attr_annotations = filter(lambda x: x['attribute'] == attr, attr_annotations)
n_attr_annotations = len(attr_annotations)
if n_attr_annotations > 0:
# print " - '{}' [total={}]".format(attr, n_attr_annotations),
if n_attr_annotations >= 2:
attr_values = map(lambda x: x['value'], attr_annotations)
if orig_value is not None:
# print "[original value={}]:".format(orig_value),
attr_values = attr_values + [orig_value] * 10
# else:
# print ":",
f_uri = attr_annotations[0].get('uri', None)
value_array = np.array(attr_values)
numeric = False
if attr == 'heading' or attr == 'pitch':
numeric_array = np.array(list(map(lambda x: float(x), attr_values)))
numeric_array = reject_outliers(numeric_array)
std = stats.tstd(numeric_array)
mean = stats.trim_mean(numeric_array, 0.0)
numeric = True
else:
int_classes = list(values_to_int(attr_values))
std = stats.tstd(np.array(int_classes))
mean = stats.trim_mean(int_classes, 0.0)
convergence = std
r.hset('f:cons:{}:{}'.format(fid, attr), 'dispersion', convergence)
if numeric and convergence < 1.0:
# print "AGREEMENT on '{}', values={}".format(mean, numeric_array)
with r.pipeline(transaction=True) as p:
p.hset('f:cons:{}:{}'.format(fid, attr), 'value', mean)
p.execute()
yield {'attribute': attr, 'value': str(mean), 'uri': f_uri}
elif not numeric and convergence < 0.5:
lower_class = float(int(mean))
upper_class = round(mean)
print lower_class,
print mean,
print upper_class,
if not (mean - convergence < lower_class and mean + convergence > upper_class):
print 'pass',
if len(stats.mode(value_array).mode) == 1:
mode = stats.mode(value_array).mode[0]
print mode,
# print "AGREEMENT on '{}', values={}".format(mode, attr_values)
with r.pipeline(transaction=True) as p:
p.hset('f:cons:{}:{}'.format(fid, attr), 'value', mode)
p.execute()
yield {'attribute': attr, 'value': mode, 'uri': f_uri}
print ''
else:
r.hdel('f:cons:{}:{}'.format(fid, attr), 'value')
# print "DISPERSION factor of {}%, values={}".format(
# convergence * 100, attr_values)
# else:
# print "NOT ENOUGH annotations"
def get_agreement_status(fid, attr):
return r.hgetall('f:cons:{}:{}'.format(fid, attr))