-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDepthPipeline.py
153 lines (115 loc) · 4.63 KB
/
DepthPipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/usr/bin/env python
import torch
import torch.nn as nn
from torchvision import transforms
import numpy as np
import os
import sys
import rospy
from sensor_msgs.msg import Image as Image_msg
sys.path.remove('/opt/ros/kinetic/lib/python2.7/dist-packages')
import cv2
sys.path.append('/opt/ros/kinetic/lib/python2.7/dist-packages')
from cv_bridge import CvBridge
from utils.model import BtsModel
import matplotlib.pyplot as plt
from PIL import Image
class DepthPipeline:
""" The class represents a single depth estimation pipeline
"""
def __init__(self, model, input_topic, output_topic):
"""Method for initializing a pipeline object
Args:
model (pytorch class nn.Module)
output_topic (str)
input_topic (str)
Attributes:
model: depth estimating DCNN model
transform_pipeline: preprocessing input image transformations
br: open cv bridge
sub: input subscribed topic
pub: output publishing topic
"""
self.model = self._load_state(model)
self.device = torch.device(0 if torch.cuda.is_available() else 'cpu')
# transform images
self.transform_pipeline = transforms.Compose([transforms.ToPILImage(),
transforms.Resize((352, 1216)),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])]
)
self.br = CvBridge()
self.pub = rospy.Publisher(output_topic,Image_msg,queue_size=1)
self.sub = input_topic
self.count = 0
self.model.to(self.device)
self.model.eval()
def _callback(self, data):
"""The _callback method is a ros callback function that is the backbone
of the pipeline. It parses images and feeds is to the model and then
publishes the output of the deep model.
Args:
data (ros imgmsg message): contains input image
Returns: None
"""
image = self.br.imgmsg_to_cv2(data)
imgData = self.transform_pipeline(image)
imgData = imgData.unsqueeze(0)
out = self.model(imgData, 0)
# image should be (height x width x channels)
# transformed image is test_img
output = out[4][0].cpu().detach().numpy()
output = np.transpose(output, (1, 2, 0))
output = np.squeeze(output).astype(np.float32)
self._publish(output)
def start(self):
"""The start method starts the pipeline.
Args: None
Returns: None
"""
print("---===Depth Pipeline Started===---")
self.rate = rospy.Rate(10) # 10Hz
rospy.Subscriber(self.sub, Image_msg, self._callback, queue_size=1, buff_size=2**24)
rospy.spin()
def _publish(self, output):
"""The _publish method publishes the output to a ros topic.
Args: output
Returns: None
"""
msg = self.br.cv2_to_imgmsg(output)
self.pub.publish(msg)
self.rate.sleep()
# to save images
# self._save_output(output)
print("published image..\n")
def _load_state(self, model):
"""The _load_state method loads the weights from a state_dict into the model
and returns it.
Args: model
Returns: model
"""
# model init
model = nn.DataParallel(model)
checkpoint_path = "./utils/model_weights"
checkpoint = torch.load(checkpoint_path) # add map_location=torch.device
# ('cpu') if it doesn't work
model.load_state_dict(checkpoint['model'])
return model
def _save_output(self, img):
if os.path.exists("./images"):
# cv2.imwrite("images/output_{:05d}.png".format(self.count), img, [cv2.IMWRITE_PNG_COMPRESSION, 0])
depth_map = Image.fromarray(img)
depth_map = depth_map.convert("L")
depth_map = depth_map.save("images/output_{:05d}.png".format(self.count))
else:
os.mkdir("./images")
self._save_output(img)
self.count += 1
if __name__ == "__main__":
rospy.init_node("depth_pipeline")
# depth pipeline
model = BtsModel()
DP = DepthPipeline(model, input_topic="camera/kitti", output_topic="depth/output")
DP.start()