-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathtelemetry_grpc_dial_out_no_tls.py
138 lines (97 loc) · 4.85 KB
/
telemetry_grpc_dial_out_no_tls.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
'''
Telemetry GRPC dial out mode , no TLS
tests passed with :
IOS XR Software, Version 6.4.1
Nexus NX-OS 9.2.2
MDS NX-OS 8.4.1
'''
from concurrent import futures
import time
import json
from google.protobuf.json_format import MessageToJson
from google.protobuf.descriptor import FieldDescriptor
import grpc
import telemetry_pb2
import fabric_telemetry_pb2
import cisco_grpc_dialout_pb2
import cisco_grpc_dialout_pb2_grpc
import uptime_pb2
_ONE_DAY_IN_SECONDS = 60 * 60 * 24 #grpc time out
class gRPCMdtDialoutServicer(cisco_grpc_dialout_pb2_grpc.gRPCMdtDialoutServicer):
def __init__(self):
print("Initializing gRPCMdtDialoutServicer()")
def MdtDialout(self, message, context):
grpcPeerStr = context.peer()
grpcPeer = {}
(grpcPeerProto, grpcPeer['telemetry_node'], grpcPeer['telemetry_node_port']) = grpcPeerStr.split(":")
jsonTelemetryNode = json.dumps(grpcPeer)
print(jsonTelemetryNode)
for new_msg in message:
telemetry_msg = telemetry_pb2.Telemetry()
telemetry_msg.ParseFromString(new_msg.data)
#print("RAW message")
print(telemetry_msg)
print("="*100)
#print(type(telemetry_msg))
#print(telemetry_msg.data_gpb.row[0].content)
jsonStrTelemetry = MessageToJson(telemetry_msg)
dictTelemetry = json.loads(jsonStrTelemetry)
#print telemetry json message
print(jsonStrTelemetry)
print("Message Length {}".format(len(jsonStrTelemetry)))
print("="*40)
print(dictTelemetry["encodingPath"])
if "dataGpb" in dictTelemetry:
print("Message in GPB compact mode")
if "dataGpbkv" in dictTelemetry:
print("message in GPB-kv mode")
# according to encoding path and dataGpb OR dataGpbkv to select which gpb-compact pb2 to be used
if dictTelemetry["encodingPath"] == "Cisco-IOS-XR-shellutil-oper:system-time/uptime" and "dataGpb" in dictTelemetry:
gpb_compact_content = telemetry_msg.data_gpb.row[0].content # should be use list method tohandle it
#TBD
Telemetry_row_content = uptime_pb2.system_uptime()
Telemetry_row_content.ParseFromString(gpb_compact_content)
print(Telemetry_row_content)
print("="*40)
#if dictTelemetry["encodingPath"] == "analytics:test_query" and "dataGpb" in dictTelemetry:
if dictTelemetry["encodingPath"] == "analytics:dcnminitITL" and "dataGpb" in dictTelemetry:
'''
MDS 97 32G line card SAN Analytics feature,
encoding path should be predefined push analytics query name.
'''
gpb_compact_content = telemetry_msg.data_gpb.row[0].content # should be use list method tohandle it
#gpb_compact_content = telemetry_msg.data_gpb.row[0]
Telemetry_row_content = fabric_telemetry_pb2.FlowRecordsTable()
#Telemetry_row_content = fabric_telemetry_pb2.FlowRecordRow()
Telemetry_row_content.ParseFromString(gpb_compact_content)
fabric_jsonStrTelemetry = MessageToJson(Telemetry_row_content)
print(fabric_jsonStrTelemetry)
print("=" * 40)
if dictTelemetry["encodingPath"] == "show_stats_fc2/2" and "dataGpb" in dictTelemetry:
'''
MDS 97 32G line card SAN Analytics feature,
encoding path should be predefined push analytics query name.
'''
gpb_compact_content = telemetry_msg.data_gpb.row[0].content # should be use list method tohandle it
# gpb_compact_content = telemetry_msg.data_gpb.row[0]
Telemetry_row_content = fabric_telemetry_pb2.FlowRecordsTable()
# Telemetry_row_content = fabric_telemetry_pb2.FlowRecordRow()
Telemetry_row_content.ParseFromString(gpb_compact_content)
fabric_jsonStrTelemetry = MessageToJson(Telemetry_row_content)
print(fabric_jsonStrTelemetry)
print("=" * 40)
#json_dict = proto_to_dict(Telemetry_row_content)
#print(json_dict)
return cisco_grpc_dialout_pb2.MdtDialoutArgs() # no return should be ok , if get telemetry stream only
def serve():
gRPCserver = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
cisco_grpc_dialout_pb2_grpc.add_gRPCMdtDialoutServicer_to_server(gRPCMdtDialoutServicer(), gRPCserver)
gRPCserver.add_insecure_port('0.0.0.0:50051')
gRPCserver.start()
try:
while True:
time.sleep(_ONE_DAY_IN_SECONDS)
except KeyboardInterrupt:
gRPCserver.stop(0)
if __name__ == '__main__':
serve()