-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsnapshot.py
143 lines (128 loc) · 5.38 KB
/
snapshot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import sys
import time
from itertools import count
from enum import Enum
from math import degrees, sin, cos, pi
from poses import Pose
import constants
class SnapshotGrowth(Enum):
EMPTY = 0
IMAGED = 1
POSED = 2
PLANNED = 3
class Snapshot():
'''
Represents the single frame from a raw capture
'''
# class variables
location_stats = {}
_ssid = count(0)
_consecutive_extrapolations = 0
def __init__(self, container=None, ssid=None, logger=None):
self._t_zero = time.time()
self._container = container
if ssid is None:
self.ssid = next(self._ssid) % constants.MAX_SNAPSHOT_ID
else:
self.ssid = ssid
self._logger = logger
self._growth = SnapshotGrowth.EMPTY
self._fence_masked_img_arr = None
self._pose = None
self.run_elapsed_secs = 0
self.loc_stat_count = 0
self.loc_quality = 100
self.best_proj_conf_pc = 0
self._contours = []
self.cont_count = 0
self.fltrd_count = 0
self._strategy_name = None
self._terms = None
self._rules = None
# we haven't been added yet, so latest is p2 and penultimate is p1
p1 = self._container.penultimate_pose()
p2 = self._container.latest_pose()
p1e = self._container.penultimate_extrap_pose()
p2e = self._container.latest_extrap_pose()
# first log all the ssids
if self._logger:
self._logger.debug(
'Snapshot Constructor - Locate Snapshots: {0}'.format(self._container.keys()))
self._logger.debug('Snapshot Constructor - Poses: {0}'.format(
[ss._pose.as_concise_str() if ss._pose is not None else
'|{0}|'.format(ss._extrapolated_pose.as_concise_str()) if ss._extrapolated_pose is not None else
'None' for ss in self._container.values()]))
# [p0, p1, p2] => p3
if p1 is not None and p2 is not None:
p_start = p1
p_finish = p2
else:
if p1 is None and p1e is not None:
p_start = p1e
else:
p_start = None
if p2 is None and p2e is not None:
p_finish = p2e
else:
p_finish = p2
self.extrapolate_pose(p_start, p_finish)
def set_pose(self, pose):
self._pose = pose
self._growth = SnapshotGrowth.POSED
def extrapolate_pose(self, p_start, p_finish):
# use previous snapshots to extrapolate pose
# assumes snapshots occur at regular intervals
try:
if p_start is not None and p_finish is not None and self._consecutive_extrapolations < constants.CONSECUTIVE_EXTRAPOLATION_LIMIT:
linear_displacement, angular_displacement = p_finish - p_start
t1 = p_start.t_zero
t2 = p_finish.t_zero
time_displacement = round(t2 - t1, 2)
if self._logger:
self._logger.debug(
'Snapshot Constructor - linear_displacement: {0:.2f}m angular_displacement: {1:.2f}deg time_displacement: {2:.2f}secs'.format(
linear_displacement,
degrees(angular_displacement),
time_displacement)
)
x_displacement = linear_displacement * \
sin((2 * pi) - p_finish.arena.t_rad) # ccw
y_displacement = linear_displacement * \
cos((2 * pi) - p_finish.arena.t_rad)
if self._logger:
self._logger.debug(
'Snapshot Constructor - calculating extrapolation displacement ({0:.2f}, {1:.2f})'.format(x_displacement, y_displacement))
p3_cxm = p_finish.arena.c_x_m + \
(x_displacement * time_displacement)
p3_cym = p_finish.arena.c_y_m + \
(y_displacement * time_displacement)
if self._logger:
self._logger.debug(
'Snapshot Constructor - calculating extrapolation landing ({0:.2f}, {1:.2f})'.format(p3_cxm, p3_cym))
self._extrapolated_pose = Pose(
p3_cxm, p3_cym, p_finish.arena.t_rad)
self._consecutive_extrapolations += 1
if self._logger:
self._logger.debug('Snapshot Constructor - Posting Extrapolation {0}'.format(
self._extrapolated_pose.as_concise_str()))
else:
if self._logger:
self._logger.debug(
'Snapshot Constructor - Unable to compute extrapolated pose')
self._extrapolated_pose = None
self._consecutive_extrapolations = 0
except Exception as e:
err_line = sys.exc_info()[-1].tb_lineno
msg = 'Snapshot Extrapolated Pose Error: ' + \
str(e) + ' on line ' + str(err_line)
if self._logger:
self._logger.error(msg)
else:
print(msg)
def as_public_dict(self):
pdict = {k: round(v, 2) for k, v in vars(
self).items() if not k.startswith('_')}
pdict['growth'] = self._growth.value
if self._pose is not None:
pdict['current'] = self._pose.as_concise_str()
return pdict