-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathcamera.py
128 lines (112 loc) · 5.88 KB
/
camera.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import os
import cv2
from base_camera import BaseCamera
import torch
from pathlib import Path
import sys
from models.common import DetectMultiBackend
from utils.datasets import IMG_FORMATS, VID_FORMATS, LoadImages, LoadStreams
from utils.general import (check_img_size,increment_path, non_max_suppression, scale_coords)
from utils.plots import Annotator, colors
from utils.torch_utils import select_device, time_sync
FILE = Path(__file__).resolve()
ROOT = FILE.parents[0] # YOLOv5 root directory
if str(ROOT) not in sys.path:
sys.path.append(str(ROOT)) # add ROOT to PATH
ROOT = Path(os.path.relpath(ROOT, Path.cwd())) # relative
class Camera(BaseCamera):
video_source = None
def __init__(self, source_name="", img_flag=True):
if os.environ.get('OPENCV_CAMERA_SOURCE'):
Camera.set_video_source(int(os.environ['OPENCV_CAMERA_SOURCE']))
Camera.video_source = source_name
self.img_flag = img_flag
super(Camera, self).__init__()
@staticmethod
def set_video_source(source):
Camera.video_source = source
@staticmethod
def frames():
out, weights, imgsz = \
ROOT / 'inference/output', ROOT / 'runs/train/exp/weights/best.pt', 640
device = select_device(0) # 0 or 0,1,2,3 or cpu 没有gpu的记得改为cpu
#device = select_device("cpu")
# Directories
save_dir = increment_path(Path(out) / 'exp', exist_ok=False) # increment run
(save_dir).mkdir(parents=True, exist_ok=True) # make dir
model = DetectMultiBackend(weights, device=device, dnn=False, data=ROOT / 'data/my_person.yaml')
stride, names, pt, jit, onnx, engine = model.stride, model.names, model.pt, model.jit, model.onnx, model.engine
imgsz = check_img_size(imgsz, s=stride) # check image size
# Half
half = False
half &= (pt or jit or onnx or engine) and device.type != 'cpu' # FP16 supported on limited backends with CUDA
if pt or jit:
model.model.half() if half else model.model.float()
# Dataloader
print(Camera.video_source)
is_file = Path(Camera.video_source).suffix[1:] in (IMG_FORMATS + VID_FORMATS)
is_url = Camera.video_source.lower().startswith(('rtsp://', 'rtmp://', 'http://', 'https://'))
webcam = Camera.video_source.isnumeric() or Camera.video_source.endswith('.txt') or (is_url and not is_file)
if webcam:dataset = LoadStreams(Camera.video_source, img_size=imgsz)
else:dataset = LoadImages(Camera.video_source, img_size=imgsz)
vid_path, vid_writer = [None], [None]
# save_result
save_img = True
# Run inference
t0 = time_sync()
img = torch.zeros((1, 3, imgsz, imgsz), device=device) # init img
_ = model(img.half() if half else img) if device.type != 'cpu' else None # run once
for path, img, im0s, vid_cap, s in dataset:
img = torch.from_numpy(img).to(device)
img = img.half() if half else img.float() # uint8 to fp16/32
img /= 255.0 # 0 - 255 to 0.0 - 1.0
if img.ndimension() == 3:
img = img.unsqueeze(0)
# Inference
t1 = time_sync()
pred = model(img, augment=False, visualize=False)
# nms
pred = non_max_suppression(pred, conf_thres=0.25, iou_thres=0.45, classes=None, agnostic=False,
max_det=1000)
t2 = time_sync()
for i, det in enumerate(pred): # detections per image
if webcam: # batch_size >= 1
p, im0, frame = path[i], im0s[i].copy(), dataset.count
s += f'{i}: '
else:
# p, s, im0 = path, '', im0s
p, im0, frame = path, im0s.copy(), getattr(dataset, 'frame', 0)
p = Path(p) # to Path
save_path = str(save_dir / p.name)
s += '%gx%g ' % img.shape[2:]
gn = torch.tensor(im0.shape)[[1, 0, 1, 0]]
annotator = Annotator(im0, line_width=3, example=str(names))
if det is not None and len(det):
det[:, :4] = scale_coords(img.shape[2:], det[:, :4], im0.shape).round()
for c in det[:, -1].detach().unique():
n = (det[:, -1] == c).sum()
s += '%g %s, ' % (n, names[int(c)])
for *xyxy, conf, cls in det:
label = '%s %.2f' % (names[int(cls)], conf)
c = int(cls)
annotator.box_label(xyxy, label, color=colors(c, True))
print('%sDone. (%.3fs)' % (s, t2 - t1))
# Save results (image with detections)
if save_img:
if dataset.mode == 'image':
cv2.imwrite(save_path, im0)
else: # 'video' or 'stream'
if vid_path[i] != save_path: # new video
vid_path[i] = save_path
if isinstance(vid_writer[i], cv2.VideoWriter):
vid_writer[i].release() # release previous video writer
if vid_cap: # video
fps = vid_cap.get(cv2.CAP_PROP_FPS)
w = int(vid_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(vid_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
else: # stream
fps, w, h = 30, im0.shape[1], im0.shape[0]
save_path = str(Path(save_path).with_suffix('.mp4')) # force *.mp4 suffix on results videos
vid_writer[i] = cv2.VideoWriter(save_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
vid_writer[i].write(im0)
yield cv2.imencode('.jpg', im0)[1].tobytes()