-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprogram.py
115 lines (84 loc) · 3.71 KB
/
program.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# ************************************************************************
# Import necessary packages
# ************************************************************************
import json
import sys
import sendOMF
import omfHelper
from EndpointTypes import EndpointTypes
def get_json_file(filename):
''' Get a json file by the path specified relative to the application's path'''
# Try to open the configuration file
try:
with open(
filename,
'r',
) as f:
loaded_json = json.load(f)
except Exception as error:
print(f'Error: {str(error)}')
print(f'Could not open/read file: {filename}')
exit()
return loaded_json
def get_appsettings():
''' Return the appsettings.json as a json object, while also populating base_endpoint, omf_endpoint, and default values'''
# Try to open the configuration file
appsettings = get_json_file('appsettings.json')
endpoints = appsettings["Endpoints"]
omf_version = appsettings["OMFVersion"]
# for each endpoint construct the check base and OMF endpoint and populate default values
for endpoint in endpoints:
endpoint["EndpointType"] = EndpointTypes(endpoint["EndpointType"])
endpoint_type = endpoint["EndpointType"]
# If the endpoint is CDS
if endpoint_type == EndpointTypes.CDS:
base_endpoint = f'{endpoint["Resource"]}/api/{endpoint["ApiVersion"]}' + \
f'/tenants/{endpoint["TenantId"]}/namespaces/{endpoint["NamespaceId"]}'
# If the endpoint is EDS
elif endpoint_type == EndpointTypes.EDS:
base_endpoint = f'{endpoint["Resource"]}/api/{endpoint["ApiVersion"]}' + \
f'/tenants/default/namespaces/default'
# If the endpoint is PI
elif endpoint_type == EndpointTypes.PI:
base_endpoint = endpoint["Resource"]
else:
raise ValueError('Invalid endpoint type')
omf_endpoint = f'{base_endpoint}/omf'
# add the base_endpoint and omf_endpoint to the endpoint configuration
endpoint["BaseEndpoint"] = base_endpoint
endpoint["OmfEndpoint"] = omf_endpoint
# check for optional/nullable parameters
if 'VerifySSL' not in endpoint or endpoint["VerifySSL"] == None:
endpoint["VerifySSL"] = True
if 'UseCompression' not in endpoint or endpoint["UseCompression"] == None:
endpoint["UseCompression"] = True
if 'WebRequestTimeoutSeconds' not in endpoint or endpoint["WebRequestTimeoutSeconds"] == None:
endpoint["WebRequestTimeoutSeconds"] = 30
return endpoints, omf_version
def main(test=False, entries=[]):
# Main program. Seperated out so that we can add a test function and call this easily
print('Welcome')
endpoints, omf_version = get_appsettings()
sendOMF.set_omf_version(omf_version)
for endpoint in endpoints:
if not endpoint['Selected']:
continue
sendOMF.send_type_create(endpoint, omfHelper.get_type())
sendOMF.send_container_create(endpoint, omfHelper.get_container())
while not test or len(entries) > 0:
ans = None
# can read entries fromt he command line here
if len(entries) > 0:
ans = entries.pop(0)
else:
ans = input('Enter pressure, temperature: n to cancel:')
if ans == 'n':
break
split = ans.split(',')
sendOMF.send_data_create(endpoint, omfHelper.get_data(
pressure=float(split[0]), temperature=float(split[1])))
return endpoints, omf_version
if __name__ == "__main__":
sys.argv.pop()
main(entries=sys.argv)
print("done")