forked from lionellevine/emili
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtunnel.py
198 lines (168 loc) · 9.38 KB
/
tunnel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# real-time emotion visualizer using FER labels sourced from on-device camera
from PyQt5.QtWidgets import QApplication # GUI uses PyQt
from PyQt5.QtCore import QThread # videoplayer lives in a QThread
from gui import Visualizer, VideoPlayerWorker
#from sonifier import Sonifier # optional audio thread
from paz import processors as pr
from paz.pipelines import DetectMiniXceptionFER # facial emotion recognition pipeline
import numpy as np
import sys
import argparse
from paz.backend.camera import Camera
import threading
import time
from datetime import datetime
from copy import deepcopy
import cProfile
import pstats
class EmoTunnel(DetectMiniXceptionFER): # video pipeline for real-time FER visualizer
def __init__(self, start_time, dims, offsets, speed=25):
super().__init__(offsets)
self.start_time = start_time
self.current_frame = None # other threads have read access
self.frame_lock = threading.Lock() # Protects access to current_frame
self.display_width = dims[1]
self.display_height = dims[0]
self.time_series = [] # list of [time, scores] pairs
self.binned_time_series = [] # list of [time, mean_scores] pairs
self.current_bin = [] # list of [time, scores] pairs in the current bin
self.speed = speed # tunnel expansion rate in pixels per second, recommend 25-50
self.interval = 1000//speed # ms per pixel
self.bin_end_time = self.interval # start a new bin every interval ms
self.no_data_indicator = np.full(7,1e5) # mean scores for an empty bin
self.last_bin_mean = np.full(7,1e5) # mean scores for the most recent bin
#self.signal = signal
#self.draw = pr.TunnelBoxes(self.time_series, self.colors, True) # override the default draw method
def get_current_frame(self):
with self.frame_lock: # Ensure exclusive access to current_frame
return self.current_frame
def call(self, image):
# binning logic: every interval ms, record the mean scores of the current bin, signal GUI to update, start a new bin
current_time = time_since(self.start_time)
#print(f"(pipeline.call) current_time: {current_time}")
#print(f"(pipeline.call) bin_end_time: {self.bin_end_time}")
if self.bin_end_time < current_time: # done with current bin
new_bin_data = []
if(len(self.current_bin)>0):
#print(f"(pipeline.call) done with bin")
#print(f"(pipeline.call) current_bin: {self.current_bin}")
self.last_bin_mean = np.mean(self.current_bin, axis=0)
#print(f"(pipeline.call) bin_mean: {self.last_bin_mean}")
new_bin_data.append([self.bin_end_time, deepcopy(self.last_bin_mean)])
self.bin_end_time += self.interval
self.current_bin = [] # start a new bin
while(self.bin_end_time < current_time): # catch up to the current time
#print("(pipeline.call) catching up, empty bin")
self.last_bin_mean = 0.9*self.last_bin_mean + 0.1*self.no_data_indicator # no new data, discount to indicate staleness
#print(f"(pipeline.call) bin_end_time: {self.bin_end_time}")
#print(f"(pipeline.call) last_bin_mean: {self.last_bin_mean}")
new_bin_data.append([self.bin_end_time, deepcopy(self.last_bin_mean)]) # empty bin
# new_bin_data.append([self.bin_end_time, np.full(7,1e5)]) # empty bin
self.bin_end_time += self.interval
#print(f"(pipeline.call) new_bin_data: ")
#for timestamp, scores in new_bin_data:
#print(f" (pipeline.call) timestamp, scores/1e6: {timestamp, scores/1e6}")
self.binned_time_series.extend(new_bin_data)
#print("(pipeline.call) binned_time_series:")
#for timestamp, scores in reversed(self.binned_time_series):
# print(f" (pipeline.call) timestamp, scores/1e6: {timestamp, scores/1e6}")
#self.signal.emit() # signal GUI to update the visualizer tab
# get emotion data from current frame
results = super().call(image) # classify faces in the image, draw boxes and labels
#image, faces = results['image'], results['boxes2D']
faces = results['boxes2D']
emotion_data = self.report_emotion(faces)
if(emotion_data is not None):
timestamp, scores = emotion_data['time'], emotion_data['scores']
self.time_series.append([timestamp,scores])
self.current_bin.append(scores)
return results
def report_emotion(self, faces): # add to emotion_queue to make available to other threads
current_time = time_since(self.start_time) # milliseconds since start of session
num_faces = len(faces)
if(num_faces>0):
max_height = 0
for k,box in enumerate(faces): # find the largest face
if(box.height > max_height):
max_height = box.height
argmax = k
if(max_height>150): # don't log small faces (helps remove false positives)
face_id = f"{argmax+1} of {num_faces}"
box = faces[argmax] # log emotions for the largest face only. works well in a single-user setting. todo: improve for social situations!
emotion_data = {
"time": current_time,
"face": face_id,
"class": box.class_name,
"size": box.height,
"scores": (box.scores.tolist())[0] # 7-vector of emotion scores, converted from np.array to list
}
#emotion_queue.put(emotion_data)
return emotion_data
return None # no large faces found
#new_data_event.set() # Tell the other threads that new data is available
# def __del__(self): # no log file, not needed
# self.log_file.close() # Close the file when the instance is deleted
# print("Log file closed.")
def time_since(start_time):
return int((time.time() - start_time) * 1000) # milliseconds since start of session
if __name__ == "__main__":
profiler = cProfile.Profile() # for performance profiling
profiler.enable()
start_time = time.time()
start_time_str = datetime.now().strftime("%Y%m%d_%H%M%S")
end_session_event = threading.Event() # triggered when the user closes the GUI window
parser = argparse.ArgumentParser(description='Real-time face classifier')
parser.add_argument('-c', '--camera_id', type=int, default=0, help='Camera device ID')
parser.add_argument('-o', '--offset', type=float, default=0.1, help='Scaled offset to be added to bounding boxes')
args = parser.parse_args()
camera = Camera(args.camera_id)
#emotion_queue = queue.Queue() # real-time emotion logs updated continuously
window_dims = [720, 720] # width, height
speed = 40 # tunnel speed in pixels per second
pipeline = EmoTunnel(start_time,
window_dims,
[args.offset, args.offset],
#gui_app.signal.fresh_scores, # signals GUI to update the visualizer tab
speed
) # video processing pipeline
EMOTION_COLORS = [[255, 0, 0], [45, 90, 45], [255, 0, 255], [255, 255, 0],
[0, 0, 255], [0, 255, 255], [0, 255, 0]]
tonic = 110 # Hz
app = QApplication(sys.argv)
gui_app = Visualizer(start_time, window_dims, np.array(EMOTION_COLORS), speed, pipeline, end_session_event)
print(f"Real-time emotion visualizer using FER labels sourced from on-device camera.")
gui_app.show() # Start the GUI
print("Started GUI app.")
print("gui_app.thread()", gui_app.thread())
print("QThread.currentThread()", QThread.currentThread())
video_dims = [800, 450] # width, height (16:9 aspect ratio)
video_thread = QThread() # video thread: OpenCV is safe in a QThread but not a regular thread
video_worker = VideoPlayerWorker(
start_time,
video_dims,
pipeline, # applied to each frame of video
camera)
video_worker.moveToThread(video_thread)
video_thread.started.connect(video_worker.run) # connect signals and slots
video_worker.finished.connect(video_thread.quit)
video_worker.finished.connect(video_worker.deleteLater)
video_thread.finished.connect(video_thread.deleteLater)
video_worker.frameReady.connect(gui_app.display_frame) # update the FER tab with new video frame
video_thread.start()
print("Started video thread.")
# audio_thread = QThread() # audio thread
# audio_worker = Sonifier(start_time, speed, tonic, pipeline, end_session_event)
# audio_worker.moveToThread(audio_thread)
# audio_thread.start()
# print("Started audio thread.")
app.exec_() # start the GUI app. This should run in the main thread. Lines after this only execute if user closes the GUI.
print("GUI app closed by user.")
video_worker.stop() # Signal the worker to stop
#video_thread.quit() # redundant with above, the finished signal will do this
print("Quitting video thread...")
video_thread.wait() # Wait for the thread to finish
print("Session ended.")
profiler.disable()
# Print profiling stats
stats = pstats.Stats(profiler)
stats.strip_dirs().sort_stats('cumulative').print_stats(20) # stats from the most expensive processes