-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmod_cdr_main.py
248 lines (220 loc) · 10.6 KB
/
mod_cdr_main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
###############################################################################
#
# Cisco CDR to ES Parser (cdr_parser)
#
# FILENAME: mod_cdr_main.py
# DESCRIPTION: Module that contains the applications main parsing functions
#
# AUTHOR: Patrick K. Ryon (slashdoom)
# COPYWRITE: Copyright (c) 2016, Patrick Ryon (Slashdoom) All rights reserved.
# LICENSE: 3 clause BSD (see LICENSE file)
#
################################################################################
import csv, os, shutil, time
import mod_conf, mod_cdr_decode
from datetime import datetime
from elasticsearch import Elasticsearch
def initial_program_setup():
print "initial_program_setup"
def do_main_program(logger):
# Connect to ES in config file
es = Elasticsearch([{'host': mod_conf.es_host, 'port': mod_conf.es_port}], timeout=30, max_retries=10, retry_on_timeout=True)
for file in os.listdir(mod_conf.cdr_path):
# Get pathes from config file
src_file = os.path.join(mod_conf.cdr_path, file)
dest_file = os.path.join(mod_conf.archive_path, file)
if os.path.isfile(dest_file):
logger.warning("archive file exists; trying alternate file name")
n = 0
while True:
if os.path.isfile(dest_file + "_dup_" + str(n)):
logger.warning("archive file exists; trying alternate file name")
n += 1
else:
dest_file += "_dup_" + str(n)
logger.info("archive file renamed: %s" % dest_file)
break
logger.debug("src_file = %s" % src_file)
logger.debug("dest_file = %s" % dest_file)
# Check for configured type, if none use filename
if mod_conf.es_type == "":
es_type = file
else:
es_type = mod_conf.es_type
logger.debug("es_type = %s" % es_type)
file_moved = True
try:
shutil.move(src_file, dest_file)
except:
logger.warning("could not move CDR file to archive; will retry")
file_moved = False
if mod_conf.es_file_check:
es_file_search = es.search(index="_all",doc_type=es_type, body={"query": {"match": {"filename": "%s" % file}}})
logger.debug("%d documents found" % es_file_search['hits']['total'])
if bool(int("%d" % es_file_search['hits']['total'])):
logger.warning("matching cdr file found in ES; aborting parsing")
break
time.sleep(.5)
if os.access(dest_file, os.W_OK) and file_moved:
logger.debug("CDR successfully moved to archive.")
# Open CDR Log File
csv_file = open(dest_file, 'rb')
csv_read = csv.reader(csv_file, delimiter=',', quotechar='"')
# Read CDR header row
csv_keys = next(csv_read)
# Skip CDR types row
csv_types = next(csv_read)
# Process actual CDR rows
for csv_line in csv_read:
# Combine headers with data
csv_zip = zip(csv_keys, csv_line)
# Check for empty lines
if csv_zip:
es_body = {}
# Call Sequence Variables
origNum = ""
finNum = ""
for csv_data in csv_zip:
# Decode cdrRecordType
if csv_data[0] == "cdrRecordType":
index_build = False
es_body[csv_data[0]] = mod_cdr_decode.decode_RecordType(val=csv_data[1])
# Check for configured type, if none use cdr/cmr-YYYY.MM.DD
if mod_conf.es_index == "":
if csv_data[1] == "1":
es_index = "cdr-"
index_build = True
elif csv_data[1] == "2":
es_index = "cmr-"
index_build = True
else:
es_index = "index_err"
else:
es_index = mod_conf.es_index
# Decode dateTimeStamp
elif csv_data[0] == "dateTimeStamp":
cdr_time = mod_cdr_decode.decode_Time(val=csv_data[1])
es_body[csv_data[0]] = cdr_time
# Build @timestamp
es_body['@timestamp'] = cdr_time
# Decode dateTimeOrigination
elif csv_data[0] == "dateTimeOrigination":
cdr_time = mod_cdr_decode.decode_Time(val=csv_data[1])
es_body[csv_data[0]] = cdr_time
# Build @timestamp
es_body['@timestamp'] = cdr_time
# Decode origIpAddr
elif csv_data[0] == "origIpAddr":
es_body[csv_data[0]] = mod_cdr_decode.decode_IP(val=csv_data[1])
# Decode origCause_location
elif csv_data[0] == "origCause_location":
es_body[csv_data[0]] = mod_cdr_decode.decode_TermCauseCode(val=csv_data[1])
# Decode origCause_value
elif csv_data[0] == "origCause_value":
es_body[csv_data[0]] = mod_cdr_decode.decode_TermCauseCode(val=csv_data[1])
# Decode origPrecedenceLevel
elif csv_data[0] == "origPrecedenceLevel":
es_body[csv_data[0]] = mod_cdr_decode.decode_PrecedenceLevel(val=csv_data[1])
# Decode origMediaTransportAddress_IP
elif csv_data[0] == "origMediaTransportAddress_IP":
es_body[csv_data[0]] = mod_cdr_decode.decode_IP(val=csv_data[1])
# Decode origMediaCap_payloadCapability
elif csv_data[0] == "origMediaCap_payloadCapability":
es_body[csv_data[0]] = mod_cdr_decode.decode_CodecType(val=csv_data[1])
# Decode origVideoCap_Codec
elif csv_data[0] == "origVideoCap_Codec":
es_body[csv_data[0]] = mod_cdr_decode.decode_CodecType(val=csv_data[1])
# Decode origVideoCap_Resolution
elif csv_data[0] == "origVideoCap_Resolution":
es_body[csv_data[0]] = mod_cdr_decode.decode_VideoRes(val=csv_data[1])
# Decode origRSVPAudioStat
elif csv_data[0] == "origRSVPAudioStat":
es_body[csv_data[0]] = mod_cdr_decode.decode_RSVPStat(val=csv_data[1])
# Decode origRSVPVideoStat
elif csv_data[0] == "origRSVPVideoStat":
es_body[csv_data[0]] = mod_cdr_decode.decode_RSVPStat(val=csv_data[1])
# Decode destIpAddr
elif csv_data[0] == "destIpAddr":
es_body[csv_data[0]] = mod_cdr_decode.decode_IP(val=csv_data[1])
# Decode destCause_location
elif csv_data[0] == "destCause_location":
es_body[csv_data[0]] = mod_cdr_decode.decode_TermCauseCode(val=csv_data[1])
# Decode destCause_value
elif csv_data[0] == "destCause_value":
es_body[csv_data[0]] = mod_cdr_decode.decode_TermCauseCode(val=csv_data[1])
# Decode destPrecedenceLevel
elif csv_data[0] == "destCause_value":
es_body[csv_data[0]] = mod_cdr_decode.decode_PrecedenceLevel(val=csv_data[1])
# Decode destMediaTransportAddress_IP
elif csv_data[0] == "destMediaTransportAddress_IP":
es_body[csv_data[0]] = mod_cdr_decode.decode_IP(val=csv_data[1])
# Decode destMediaCap_payloadCapability
elif csv_data[0] == "destMediaCap_payloadCapability":
es_body[csv_data[0]] = mod_cdr_decode.decode_CodecType(val=csv_data[1])
# Decode destVideoCap_Codec
elif csv_data[0] == "destVideoCap_Codec":
es_body[csv_data[0]] = mod_cdr_decode.decode_CodecType(val=csv_data[1])
# Decode destVideoCap_Resolution
elif csv_data[0] == "destVideoCap_Resolution":
es_body[csv_data[0]] = mod_cdr_decode.decode_VideoRes(val=csv_data[1])
# Decode destVideoTransportAddressdest_IP
elif csv_data[0] == "destVideoTransportAddress_IP":
es_body[csv_data[0]] = mod_cdr_decode.decode_IP(val=csv_data[1])
# Decode destRSVPAudioStat
elif csv_data[0] == "destRSVPAudioStat":
es_body[csv_data[0]] = mod_cdr_decode.decode_RSVPStat(val=csv_data[1])
# Decode destRSVPVideoStat
elif csv_data[0] == "destRSVPVideoStat":
es_body[csv_data[0]] = mod_cdr_decode.decode_RSVPStat(val=csv_data[1])
# Decode dateTimeConnect
elif csv_data[0] == "dateTimeConnect":
es_body[csv_data[0]] = mod_cdr_decode.decode_Time(val=csv_data[1])
# Decode dateTimeDisconnect
elif csv_data[0] == "dateTimeDisconnect":
es_body[csv_data[0]] = mod_cdr_decode.decode_Time(val=csv_data[1])
# Decode origDTMFMethod
elif csv_data[0] == "origDTMFMethod":
es_body[csv_data[0]] = mod_cdr_decode.decode_DTMFMethod(val=csv_data[1])
# Decode destDTMFMethod
elif csv_data[0] == "destDTMFMethod":
es_body[csv_data[0]] = mod_cdr_decode.decode_DTMFMethod(val=csv_data[1])
# Decode callSecuredStatus
elif csv_data[0] == "callSecuredStatus":
es_body[csv_data[0]] = mod_cdr_decode.decode_SecuredStatus(val=csv_data[1])
# Link origLegCallIdentifier to callIdentifier
elif csv_data[0] == "origLegCallIdentifier":
es_body[csv_data[0]] = csv_data[1]
es_body["callIdentifier"] = csv_data[1]
# Decode duration
elif csv_data[0] == "duration":
es_body[csv_data[0]] = mod_cdr_decode.decode_duration(val=csv_data[1])
# Save originalCalledPartyNumber for call sequence summary
elif csv_data[0] == "originalCalledPartyNumber":
origNum = csv_data[1]
es_body[csv_data[0]] = csv_data[1]
# Save finalCalledPartyNumber for call sequence summary
elif csv_data[0] == "finalCalledPartyNumber":
finNum = csv_data[1]
es_body[csv_data[0]] = csv_data[1]
# Write non-decoded values as is
else:
es_body[csv_data[0]] = csv_data[1]
# Build call sequence summary if data is present
if not origNum == "":
if origNum == finNum:
es_body["Call Sequence"] = "Call to: " + origNum
else:
es_body["Call Sequence"] = "Call to: " + origNum + " Forwarded to: " + finNum
# Write original filename to ES for checking
es_body["filename"] = file
# Update index if building in datetime
if index_build:
es_index += cdr_time.strftime(mod_conf.es_index_format)
logger.debug("built es_index: %s" % es_index)
# Send CDR to ElasticSearch
logger.debug(es.index(index=es_index,doc_type=es_type,body=es_body))
time.sleep(.5)
def program_cleanup():
print "program_cleanup - not implemented"
def reload_program_config():
print "reload_program_config - not implemented"