forked from hjerner-i-team/CREPE
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
115 lines (95 loc) · 4.75 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# This file represents the main file and entrypoint to this entire project
# This project is a part of the EiT village NTNU Cyborg
# In this project we will demonstrate an exepriment with the neurocellculture.
# This is done by connecting a hardware platform and a software platform to the MEAME
# interface at St. Olavs.
#
# Github repo: https://github.com/hjerner-i-team/CREPE
""" Import fix - check README for documentation """
import os,sys,inspect
__currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
sys.path.insert(0, __currentdir[0:__currentdir.find("CREPE")+len("CREPE")])
""" End import fix """
import time
from communication.hdf5_reader import HDF5Reader
#from neuro_processing.neuro_processing import NeuroProcessor
from neuro_processing.meame_listener import MeameListener
from communication.queue_service import QueueService, StartQueueService
from communication.meame_speaker.meame_speaker import MeameSpeaker
#from communication.meame_listener import MeameListener
from multiprocessing import Process, Queue
from crepe_modus import CrepeModus
import signal
class CREPE():
# starts the required communication services and inits crepe
# @param modus is a CrepeModus enum
# @param data_file_path is the file path to an optional .h5 file
# @param queue_services is a list of different child classes of QueueService with corresponding kwargs
# On the form [[ChildQueueService, {"queue_in": <a queue>, "queue_out": None}], [.., {...}], ...]
def __init__(self, modus=CrepeModus.LIVE, meame_speaker_periods=None, file_path = None, queue_services = None):
self.modus = modus
self.queue_services = []
print("\n[CREPE.init] init crepe with args:\n\tmodus_\t",modus,"\n\tfile_path:\t",
file_path,"\n\tqueue_services:\t",queue_services)
init_meame_speaker = False
if modus == CrepeModus.LIVE:
listener = StartQueueService(MeameListener, server_address = "10.20.92.130", port = 12340, bitrate=10000)
self.queue_services.append(listener)
init_meame_speaker = True
elif modus == CrepeModus.FILE:
# initates a h5 reader and start the service
hdf5 = StartQueueService(HDF5Reader, file_path=file_path)
self.queue_services.append(hdf5)
elif modus == CrepeModus.TEST:
hdf5 = StartQueueService(HDF5Reader, mode=self.modus)
self.queue_services.append(hdf5)
elif modus == CrepeModus.OFFLINE:
listener = StartQueueService(MeameListener, server_address = "127.0.0.1", port = 40000, bitrate=10000)
self.queue_services.append(listener)
else:
raise ValueError("Wrong crepe modus supplied")
if queue_services is not None:
for i, service in enumerate(queue_services):
queue_in = self.queue_services[-1].queue_out
service[1]["queue_in"] = queue_in
qs = StartQueueService(service[0], **service[1])
self.queue_services.append(qs)
if len(self.queue_services) > 1:
print("\n[CREPE.init] started ", len(self.queue_services) - 1 ," extra services")
# connect meame speaker here
if init_meame_speaker:
kw = {"queue_in": self.queue_services[-1].queue_out, "periods": meame_speaker_periods}
qs = StartQueueService(MeameSpeaker, **kw)
self.queue_services.append(qs)
signal.signal(signal.SIGINT, lambda signal, frame: self._shutdown())
def get_first_queue(self):
return self.queue_services[0].queue_out
def get_last_queue(self):
return self.queue_services[-1].queue_out
def wait(self, data_func=None):
last_queue = self.get_last_queue()
dummy = QueueService(name="END", queue_in=last_queue)
while True:
data = dummy.get()
if data is False:
self.shutdown()
return
if data_func is not None:
data_func(data)
def _shutdown(self):
print("\n[CREPE._shutdown] sigint intercepted, shutting down")
self.shutdown()
sys.exit(0)
# Function that runs the required shutdown commands before the project is closed
def shutdown(self):
# print("[CREPE] queue_services ", [x.get_name() for x in self.queue_services] )
for x in self.queue_services:
print("[CREPE.shutdown] Terminating ", x.get_name())
x.process.terminate()
self.queue_service = None
print("[CREPE.shutdown] Terminated all CREPE processes")
if __name__ == "__main__":
#crep = CREPE(modus=CrepeModus.FILE, file_path="../test_data/4.h5")
crep = CREPE()
time.sleep(3)
crep.shutdown()