-
Notifications
You must be signed in to change notification settings - Fork 1
/
visual_odometry.py
173 lines (140 loc) · 5.83 KB
/
visual_odometry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import cv2
import time
import detector
import numpy as np
from config import Config
from pinhole_camera import PinholeCamera
from detector import DetectorDescriptorInterface
from point_correspondence import OpticalFlowTracker, FLANN_Matcher
import utils
STAGE_FIRST_FRAME = 0
STAGE_SECOND_FRAME = 1
STAGE_DEFAULT_FRAME = 2
class VisualOdometry:
def __init__(self,
cam: PinholeCamera,
annotations: str,
config: Config):
self.frame_stage = 0
self.frame_id = 0
self.cam = cam
self.cur_frame = None
self.prev_frame = None
self.prev_t = None
self.cur_t = np.zeros((3, 1))
self.cur_R = np.eye(3)
self.true_t = np.zeros((3, 1))
self.true_R = np.eye(3)
self.prev_points = None
self.cur_points = None
self.prev_desc = None
self.cur_desc = None
self.all_cur_desc = None
self.all_prev_desc = None
self.cur_lines = None
self.inlier_ratio = 0
self.focal = cam.fx
self.pp = (cam.cx, cam.cy)
self.cur_runtime = 0
self.detector = config.detector
self.detector.set_extractor(config.extractor)
self.correspondence_method = config.correspondence_method
self.k_min_features = config.k_min_features
if self.correspondence_method == 'tracking':
self.point_corr_computer = OpticalFlowTracker(config.lk_params)
else:
self.point_corr_computer = FLANN_Matcher(config.flann_params)
with open(annotations) as f:
self.annotations = f.readlines()
def get_absolute_scale(self, frame_id):
""" Gets absolute scale from ground truth data, in order to properly compare estimated with ground truth.
Also, ground truth position and rotation is updated
"""
xi, yi, zi = 3, 7, 11
ss = self.annotations[frame_id - 1].strip().split()
x_prev = float(ss[xi])
y_prev = float(ss[yi])
z_prev = float(ss[zi])
ss = self.annotations[frame_id].strip().split()
x = float(ss[xi])
y = float(ss[yi])
z = float(ss[zi])
self.true_t = np.array([[x], [y], [z]])
# self.true_x, self.true_y, self.true_z = x, y, z
r11, r12, r13 = float(ss[0]), float(ss[1]), float(ss[2])
r21, r22, r23 = float(ss[4]), float(ss[5]), float(ss[6])
r31, r32, r33 = float(ss[8]), float(ss[9]), float(ss[10])
self.true_R = np.array([r11, r12, r13, r21, r22, r23, r31, r32, r33]).reshape((3, 3))
return np.sqrt((x - x_prev) ** 2 + (y - y_prev) ** 2 + (z - z_prev) ** 2)
def get_relative_scale(self):
"""
Triangulate 3-D points X_(k-1) and X_k from current and previous frame to get relative scale
:return: relative scale of translation between previous and current frame
"""
raise NotImplementedError("Relative Scale Method not implemented yet.")
def process_initial_frame(self):
self.prev_points = self.detector.get_keypoints(self.cur_frame)
if self.correspondence_method == 'matching':
self.prev_points, self.prev_desc = self.detector.get_descriptors(self.cur_frame, self.prev_points)
self.all_prev_desc = self.prev_desc
self.prev_points = np.array([x.pt for x in self.prev_points], dtype=np.float32)
self.all_px_ref = self.prev_points
self.frame_stage = STAGE_DEFAULT_FRAME
def process_frame(self, frame_id):
if self.correspondence_method == 'tracking':
self.prev_points, self.cur_points = self.point_corr_computer.get_corresponding_points(
img_ref=self.prev_frame,
img_cur=self.cur_frame,
px_ref=self.prev_points)
else:
self.cur_points = self.detector.get_keypoints(self.cur_frame)
self.cur_points, self.cur_desc = self.detector.get_descriptors(self.cur_frame, self.cur_points)
self.cur_points = np.array([x.pt for x in self.cur_points], dtype=np.float32)
temp_px = self.cur_points
temp_des = self.cur_desc
self.prev_points, self.cur_points = self.point_corr_computer.get_corresponding_points(
px_ref=self.all_px_ref,
px_cur=self.cur_points,
des_ref=self.all_prev_desc,
des_cur=self.cur_desc)
self.all_px_ref = temp_px
self.all_prev_desc = temp_des
# If no point correspondences found, continue til next frame
if len(self.cur_points) == 0:
return
try:
E, mask = cv2.findEssentialMat(self.cur_points, self.prev_points, focal=self.focal, pp=self.pp, method=cv2.RANSAC, prob=0.999, threshold=1.0)
# Inlier ratio in final essential matrix estimation
if mask is not None:
self.inlier_ratio = np.sum(mask) / (len(mask) + 1)
# self.cur_lines = cv2.computeCorrespondEpilines(self.prev_points.reshape(-1, 1, 2), 2, E)
_, R, t, mask = cv2.recoverPose(E, self.cur_points, self.prev_points, focal=self.focal, pp=self.pp, mask=mask)
except:
print(f"[ERROR] frame {frame_id}: Error in computation of Essential Matrix or recovering of pose")
return
absolute_scale = self.get_absolute_scale(frame_id)
if absolute_scale > 0.01:
self.cur_t = self.cur_t + absolute_scale * self.cur_R.dot(t)
# Check estimated rotation. If above certain threshold, ignore it
roll, pitch, yaw = utils.rotation_matrix_to_euler_angles(R)
roll, pitch, yaw = roll * 180 / np.pi, pitch * 180 / np.pi, yaw * 180 / np.pi
rot_lim = 10
if abs(roll) < rot_lim and abs(pitch) < rot_lim and abs(yaw) < rot_lim:
self.cur_R = R.dot(self.cur_R)
if self.correspondence_method == 'tracking' and self.prev_points.shape[0] < self.k_min_features:
self.cur_points = self.detector.get_keypoints(self.cur_frame)
self.cur_points = np.array([x.pt for x in self.cur_points], dtype=np.float32)
self.prev_points = self.cur_points
self.prev_desc = self.cur_desc
def update(self, img, frame_id):
assert (img.ndim == 2 and img.shape[0] == self.cam.height and img.shape[1] == self.cam.width), \
"Frame: provided image has not the same size as the camera model or image is not grayscale"
start_time = time.time()
self.cur_frame = img
if self.frame_stage == STAGE_DEFAULT_FRAME:
self.process_frame(frame_id)
elif self.frame_stage == STAGE_FIRST_FRAME:
self.process_initial_frame()
self.prev_frame = self.cur_frame
self.frame_id = frame_id
self.cur_runtime = time.time() - start_time