forked from JoelBender/bacpypes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPDUsPerMinuteFilter.py
executable file
·108 lines (84 loc) · 2.62 KB
/
PDUsPerMinuteFilter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/usr/bin/python
"""
PDUs per Minute Filter - present a table of number of PDUs per minute
"""
import sys
from collections import defaultdict
from bacpypes.debugging import Logging, function_debugging, ModuleLogger
from bacpypes.consolelogging import ConsoleLogHandler
from bacpypes.analysis import trace, strftimestamp, Tracer
try:
from CSStat import Statistics
except ImportError:
Statistics = lambda: None
# some debugging
_debug = 0
_log = ModuleLogger(globals())
# globals
counter = defaultdict(int)
#
# PDUsPerMinuteTracer
#
class PDUsPerMinuteTracer(Tracer, Logging):
def __init__(self):
if _debug: PDUsPerMinuteTracer._debug("__init__")
Tracer.__init__(self, self.Filter)
def Filter(self, pkt):
if _debug: PDUsPerMinuteTracer._debug("Filter %r", pkt)
slot = ((int(pkt._timestamp) / interval) * interval)
if _debug: PDUsPerMinuteTracer._debug(" - slot: %r", slot)
# count the packets in the slot
counter[slot] += 1
#
# __main__
#
try:
if ('--debug' in sys.argv):
indx = sys.argv.index('--debug')
for i in range(indx+1, len(sys.argv)):
ConsoleLogHandler(sys.argv[i])
del sys.argv[indx:]
if _debug: _log.debug("initialization")
# check for a custom interval
if ('--interval' in sys.argv):
i = sys.argv.index('--interval')
interval = int(sys.argv[i+1])
if _debug: _log.debug(" - interval: %r", interval)
del sys.argv[i:i+2]
else:
interval = 60
# trace the file(s)
for fname in sys.argv[1:]:
trace(fname, [PDUsPerMinuteTracer])
# print some stats at the end
stats = Statistics()
# dump the counters
for ts in range(min(counter), max(counter)+1, interval):
print strftimestamp(ts), counter[ts]
if stats:
stats.Record(counter[ts], ts)
if stats:
smin, smax, _, _, _, _ = stats.Stats()
xlw, lw, q1, m, q3, uw, xuw = stats.Whisker()
if m is not None:
print "\t %-8.1f" % smax
print "\t %-8.1f" % xuw
print "\t--- %-8.1f" % uw
print "\t | "
print "\t.'. %-8.1f" % q3
print "\t| |"
print "\t|-| %-8.1f" % m
print "\t| |"
print "\t'-' %-8.1f" % q1
print "\t | "
print "\t--- %-8.1f" % lw
print "\t %-8.1f" % xlw
print "\t %-8.1f" % smin
else:
print "No stats"
except KeyboardInterrupt:
pass
except Exception, e:
_log.exception("an error has occurred: %s", e)
finally:
if _debug: _log.debug("finally")