-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathexpandHistogram.py
executable file
·150 lines (122 loc) · 5.58 KB
/
expandHistogram.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#!/usr/bin/env python3
#
import argparse
import csv
import sys
import os
import xopen
import collections
import functools
import itertools
import operator
import fcntl
F_SETPIPE_SZ = 1031 if not hasattr(fcntl, "F_SETPIPE_SZ") else fcntl.F_SETPIPE_SZ
F_GETPIPE_SZ = 1032 if not hasattr(fcntl, "F_GETPIPE_SZ") else fcntl.F_GETPIPE_SZ
parser = argparse.ArgumentParser(description="Expand compressed histogramm notation")
parser.add_argument("input", nargs="?", help="compressed histogramm csv")
parser.add_argument("-c", "--column", help="parse this column (default %(default)s)", type=int, default=1)
parser.add_argument("-vd", "--value-delimiter", help="histogramm delimiter between category and value (default '%(default)s')", default='/')
parser.add_argument("-bd", "--bucket-delimiter", help="histogramm delimiter between category and value (default '%(default)s')", default=':')
parser.add_argument("--prebins", type=str, help="parse each line with prepared bins", default=None, nargs='*')
parser.add_argument("--force-prebins", default=False, action="store_true", help="force prebins instead of failing")
parser.add_argument("--delimiter", help="csv delimiter (default '%(default)s')", default=';')
parser.add_argument("--flatten", action="store_true", help="output flat histogramm", default=False)
parser.add_argument("--no-header", action="store_true", help="input file does not contain a header", default=False)
parser.add_argument("-o", "--output", help="output file (default stdout)", default=None)
args = parser.parse_args()
if args.input and not os.path.exists(args.input):
print("ERROR: csv input file not found!")
parser.print_help()
sys.exit(1)
if args.column < 0:
print("ERROR: process column cannot be negative!")
parser.print_help()
sys.exit(1)
if args.prebins is not None:
preBins = []
for x in args.prebins:
if x.isnumeric():
preBins.append(x)
else:
try:
p = [int(t) if t else None for t in x.split(':')]
p[1] += 1
preBins.extend(range(int(p[1]))[slice(*p)])
except Exception:
raise Exception(f"Could not parse prebin {x}")
args.prebins = [str(k) for k in preBins]
del preBins
if not args.input:
try:
fcntl.fcntl(sys.stdin.fileno(), F_SETPIPE_SZ, int(open("/proc/sys/fs/pipe-max-size", 'r').read()))
except Exception:
pass
fInput = sys.stdin
else:
fInput = xopen.xopen(args.input, 'r')
csvFile = csv.reader(fInput, delimiter=args.delimiter)
inputHeader = None
if not args.no_header:
for header in csvFile:
if header[0].startswith('#'):
continue
inputHeader = header
break
if not args.no_header and inputHeader is None:
raise Exception("Could not find header row")
if args.flatten:
inputHeader = []
outputFile = None
dictWriter = None
fullHist = []
flatHist = collections.Counter() if args.prebins is None else collections.Counter({k: 0 for k in args.prebins})
if args.prebins is not None:
outputFile = sys.stdout if not args.output else xopen.xopen(args.output, 'w')
header = list(flatHist.keys())
if all(x.isdigit() for x in header):
header.sort(key=int)
header = inputHeader[:args.column] + header + inputHeader[args.column + 1:]
dictWriter = csv.DictWriter(outputFile, delimiter=args.delimiter, fieldnames=header, extrasaction='ignore')
if not args.no_header:
dictWriter.writeheader()
for line in csvFile:
if line[0].startswith('#'):
continue
if inputHeader is None:
inputHeader = list(range(len(line)))
if args.prebins is not None and not args.force_prebins and any(False if k in args.prebins else True for k, _ in (x.split(args.bucket_delimiter) for x in line[args.column].split(args.value_delimiter))):
invalidBins = [k for k, _ in (x.split(args.bucket_delimiter) for x in line[args.column].split(args.value_delimiter)) if k not in args.prebins]
raise Exception(f'ERROR: input data contained bins {invalidBins} which were not provided over the prebins')
values = functools.reduce(lambda a, b: a + collections.Counter(b), ({k: float(v)} for k, v in (x.split(args.bucket_delimiter) for x in line[args.column].split(args.value_delimiter)) if args.prebins is None or k in args.prebins), collections.Counter())
flatHist.update(values)
if not args.flatten:
outDict = {
**{k: v for k, v in zip(inputHeader[:args.column], line[:args.column])},
**values,
**{k: v for k, v in zip(inputHeader[args.column + 1:], line[args.column + 1:])}
}
if args.prebins is not None:
dictWriter.writerow(outDict)
else:
fullHist.append(outDict)
if dictWriter is None:
outputFile = outputFile if outputFile is not None else sys.stdout if not args.output else xopen.xopen(args.output, 'w')
header = list(flatHist.keys())
if all(x.isdigit() for x in header):
header.sort(key=int)
header = inputHeader[:args.column] + header + inputHeader[args.column + 1:]
dictWriter = csv.DictWriter(outputFile, delimiter=args.delimiter, fieldnames=header, extrasaction='ignore')
if args.prebins is None:
if not args.no_header:
dictWriter.writeheader()
if not args.flatten:
dictWriter.writerows(fullHist)
if args.flatten:
outDict = {
**{k: v for k, v in zip(inputHeader[:args.column], line[:args.column])},
**flatHist,
**{k: v for k, v in zip(inputHeader[args.column + 1:], line[args.column + 1:])}
}
dictWriter.writerow(outDict)
if outputFile is not None and args.output:
outputFile.close()