-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
104 lines (74 loc) · 3.62 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import logging
from frinx.client.frinx_conductor_wrapper import FrinxConductorWrapper
from frinx.common.logging.config import LoggerConfig
def register_tasks(conductor_client: FrinxConductorWrapper) -> None:
logging.info("Register tasks")
def _uniconfig_workers() -> None:
from frinx_worker.uniconfig.cli_network_topology import CliNetworkTopology
from frinx_worker.uniconfig.connection_manager import ConnectionManager
from frinx_worker.uniconfig.snapshot_manager import SnapshotManager
from frinx_worker.uniconfig.structured_data import StructuredData
from frinx_worker.uniconfig.uniconfig_manager import UniconfigManager
UniconfigManager().register(conductor_client=conductor_client)
SnapshotManager().register(conductor_client=conductor_client)
ConnectionManager().register(conductor_client=conductor_client)
StructuredData().register(conductor_client=conductor_client)
CliNetworkTopology().register(conductor_client=conductor_client)
def _http_workers() -> None:
from frinx_worker.http import HTTPWorkersService
HTTPWorkersService().register(conductor_client=conductor_client)
def _inventory_workers() -> None:
from frinx_worker.inventory import InventoryService
InventoryService().register(conductor_client=conductor_client)
def _schellar_workers() -> None:
from frinx_worker.schellar import Schellar
Schellar().register(conductor_client=conductor_client)
def _conductor_test() -> None:
from frinx_worker.conductor_system_test import TestWorker
TestWorker().register(conductor_client=conductor_client)
def _misc_workers() -> None:
from frinx_worker.python_lambda import PythonLambda
PythonLambda().register(conductor_client=conductor_client)
def _local_worker() -> None:
from app.workers.boilerplate_worker import SumWorker
SumWorker().register(conductor_client=conductor_client)
_uniconfig_workers()
_http_workers()
_inventory_workers()
_schellar_workers()
_conductor_test()
_misc_workers()
_local_worker()
def register_workflows() -> None:
logging.info("Register workflows")
from frinx_worker.inventory.workflows import Inventory
Inventory().register(overwrite=True)
from frinx_worker.schellar.workflows.schellar import SchellarWorkflows
SchellarWorkflows().register(overwrite=True)
from frinx_worker.http.workflows.post_to_slack import PostToSlackService
PostToSlackService().register(overwrite=True)
from frinx_worker.http.workflows.generic import GenericRequestService
GenericRequestService().register(overwrite=True)
from frinx_worker.conductor_system_test.workflows import TestWorkflows
TestWorkflows().register(overwrite=True)
from app.workflows.boilerplate_workflow import SumWorkflow
SumWorkflow().register(overwrite=True)
def main() -> None:
LoggerConfig().setup_logging()
from frinx.common.telemetry.metrics import Metrics
from frinx.common.telemetry.metrics import MetricsSettings
Metrics(settings=MetricsSettings(metrics_enabled=True))
from frinx.client.frinx_conductor_wrapper import FrinxConductorWrapper
from frinx.common.frinx_rest import CONDUCTOR_HEADERS
from frinx.common.frinx_rest import CONDUCTOR_URL_BASE
conductor_client = FrinxConductorWrapper(
server_url=CONDUCTOR_URL_BASE,
polling_interval=0.1,
max_thread_count=50,
headers=CONDUCTOR_HEADERS,
)
register_tasks(conductor_client)
register_workflows()
conductor_client.start_workers()
if __name__ == "__main__":
main()