-
Notifications
You must be signed in to change notification settings - Fork 4
/
gabriel_example.py
executable file
·185 lines (155 loc) · 6.36 KB
/
gabriel_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
#!/usr/bin/env python
"""Example of using gabrieltool.statemachine with tensorflow/OpenTPOD models for analysis.
The TF models can be exported from OpenTPOD or any TF SavedModel that can be
served by TF-Serving.
An example model (SSD-MobilenetV2) can be downloaded from
https://storage.cmusatyalab.org/openworkflow/ssd_mobilenet_v2_saved_model.zip
After downloading and unzipping the file, arrange the saved_model
directory into the following structure (in the same directory as this script).
├── ssd_mobilenet_v2_saved_model
│ └── 00001
│ ├── saved_model.pb
│ └── variables
└── gabriel_example.py
In this example, we create a naive 2-state FSM that detects the presence of a
person or a chair. The labels for COCO dataset can be found at
https://github.com/tensorflow/models/blob/master/research/object_detection/data/mscoco_label_map.pbtxt
Usage: ./gabriel_example.py -h
"""
import cv2
import fire
from logzero import logger
from gabriel_server.local_engine import runner as gabriel_runner
from gabrieltool.statemachine import fsm, predicate_zoo, processor_zoo, runner
def _add_custom_transition_predicates():
"""Here is how you can add a custom transition predicate to the predicate zoo
See _build_fsm to see how this custom transition predicate is used
"""
from gabrieltool.statemachine import callable_zoo
class HasChairClass(callable_zoo.CallableBase):
def __call__(self, app_state):
# id 62 is chair
return '62' in app_state
predicate_zoo.HasChairClass = HasChairClass
def _build_fsm():
"""Build an example FSM for detecting a person or a chair.
Returns:
gabrieltool.statemchine.fsm.State -- The start state of the generated FSM.
"""
st_start = fsm.State(
name='start',
processors=[],
transitions=[
fsm.Transition(
name='tran_start_to_proc',
predicates=[
fsm.TransitionPredicate(
callable_obj=predicate_zoo.Always()
)
],
instruction=fsm.Instruction(audio='This app will tell you if a person or a chair is present.')
)
]
)
st_tf = fsm.State(
name='tf_serving',
processors=[fsm.Processor(
name='proc_start',
callable_obj=processor_zoo.TFServingContainerCallable('ssd_mobilenet_v2',
'ssd_mobilenet_v2_saved_model',
conf_threshold=0.8
)
)],
transitions=[
fsm.Transition(
name='tf_serving_to_tf_serving_person',
predicates=[
fsm.TransitionPredicate(
# person id is 1 in coco labelmap
callable_obj=predicate_zoo.HasObjectClass(class_name='1')
)
],
instruction=fsm.Instruction(audio='Found Person!')
),
fsm.Transition(
name='tf_serving_to_tf_serving_chair',
predicates=[
fsm.TransitionPredicate(
# You can also use the custom transition predicate we
# created in _add_custom_transition_predicate here. e.g.
# callable_obj=predicate_zoo.HasChairClass()
callable_obj=predicate_zoo.HasObjectClass(class_name='62')
)
],
instruction=fsm.Instruction(audio='Found Chair!')
)
]
)
# We need the state objects to mark the destinations of transitions
st_start.transitions[0].next_state = st_tf
st_tf.transitions[0].next_state = st_tf
st_tf.transitions[1].next_state = st_tf
return st_start
def generate_fsm(output_path):
"""Create and save an example FSM for detecting a person.
Arguments:
output_path {string} -- Path to save the FSM to.
"""
start_state = _build_fsm()
# save to disk
with open(output_path, 'wb') as f:
f.write(fsm.StateMachine.to_bytes(
name='tf_serving_example',
start_state=start_state
))
def run_fsm(video_uri=0):
"""Create and execute a state machine locally.
Create a FSM and feed it images from video_uri. The output instruction is
displayed on both command line and the OpenCV window.
Keyword Arguments:
video_uri {OpenCV VideoCapture Input String} -- id of the video
capturing device to open. (default: {0}). For all supported formats,
see https://docs.opencv.org/master/d8/dfe/classcv_1_1VideoCapture.html.
"""
start_state = _build_fsm()
fsm_runner = runner.Runner(start_state)
cam = cv2.VideoCapture(video_uri)
while True:
_, image = cam.read()
inst = fsm_runner.feed(image)
if inst is not None:
logger.info('instruction text: {}'.format(inst.audio))
cv2.putText(image, inst.audio, (100, 100),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
cv2.imshow('input', image)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cam.release()
cv2.destroyAllWindows()
def run_gabriel_server():
"""Create and execute a gabriel server for detecting people.
This gabriel server uses a gabrieltool.statemachine.fsm to represents
application logic. Use Gabriel Client to stream images and receive feedback.
"""
logger.info('Building Person Detection FSM...')
start_state = _build_fsm()
logger.info('Initializing Cognitive Engine...')
# engine_name has to be 'instruction' to work with
# gabriel client from App Store. Someone working on Gabriel needs to fix this.
engine_name = 'instruction'
logger.info('Launching Gabriel server...')
gabriel_runner.run(
engine_setup=lambda: runner.BasicCognitiveEngineRunner(
engine_name=engine_name, fsm=start_state),
engine_name=engine_name,
input_queue_maxsize=60,
port=9099,
num_tokens=1
)
if __name__ == '__main__':
_add_custom_transition_predicates()
fire.Fire({
'generate_fsm': generate_fsm,
'run_fsm': run_fsm,
'run_gabriel_server': run_gabriel_server,
})