forked from astra-robotics/IRC2020
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvideo_grabber2.py
110 lines (84 loc) · 2.6 KB
/
video_grabber2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import cv2
import numpy as np
from threading import Thread, Lock
import time
import sys
try:
from turbojpeg import TurboJPEG
except:
pass
import utils
class VideoGrabber(Thread):
"""A threaded video grabber.
Attributes:
encode_params ():
cap (str):
attr2 (:obj:`int`, optional): Description of `attr2`.
"""
def __init__(self, jpeg_quality, jpeg_lib):
"""Constructor.
Args:
jpeg_quality (:obj:`int`): Quality of JPEG encoding, in 0, 100.
"""
Thread.__init__(self)
self.cap = cv2.VideoCapture(2)
self.turbojpeg = TurboJPEG()
self.running = True
self.buffer = None
self.lock = Lock()
if jpeg_lib == 'turbo':
self.jpeg = TurboJPEG()
self.jpeg_encode_func = lambda img, jpeg_quality=jpeg_quality, jpeg=self.jpeg: utils.turbo_encode_image(img, jpeg, jpeg_quality)
else:
self.jpeg_encode_func = lambda img, jpeg_quality=jpeg_quality: utils.cv2_encode_image(img, jpeg_quality)
def stop(self):
self.running = False
def get_buffer(self):
"""Method to access the encoded buffer.
Returns:
np.ndarray: the compressed image if one has been acquired. None otherwise.
"""
if self.buffer is not None:
self.lock.acquire()
cpy = self.buffer
self.lock.release()
return cpy
def run(self):
while self.running:
success, img = self.cap.read()
if not success:
continue
# JPEG compression
# Protected by a lock
# As the main thread may asks to access the buffer
self.lock.acquire()
self.buffer = self.jpeg_encode_func(img)
self.lock.release()
if __name__ == '__main__':
jpeg_quality = 100
grabber = VideoGrabber(jpeg_quality, jpeg_lib='turbo')
grabber.start()
time.sleep(1)
turbo_jpeg = TurboJPEG()
cv2.namedWindow("Image")
keep_running = True
idx = 0
t0 = time.time()
while keep_running:
data = grabber.get_buffer()
if data is None:
time.sleep(1)
continue
img = turbo_jpeg.decode(data)
cv2.imshow("Image", img)
keep_running = not(cv2.waitKey(1) & 0xFF == ord('q'))
idx += 1
if idx == 100:
t1 = time.time()
sys.stdout.write("\r {:04} images/second ".format(100/(t1-t0)))
sys.stdout.flush()
t0 = t1
idx = 0
print()
print("Quitting")
grabber.stop()