forked from thomasbkahn/step-detect
-
Notifications
You must be signed in to change notification settings - Fork 0
/
step_detect.py
285 lines (240 loc) · 9.85 KB
/
step_detect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
"""
Thomas Kahn
"""
from __future__ import absolute_import
from math import sqrt
import multiprocessing as mp
import numpy as np
from six.moves import range
from six.moves import zip
def t_scan(L, window = 1e3, num_workers = -1):
"""
Computes t statistic for i to i+window points versus i-window to i
points for each point i in input array. Uses multiple processes to
do this calculation asynchronously. Array is decomposed into window
number of frames, each consisting of points spaced at window
intervals. This optimizes the calculation, as the drone function
need only compute the mean and variance for each set once.
Parameters
----------
L : numpy array
1 dimensional array that represents time series of datapoints
window : int / float
Number of points that comprise the windows of data that are
compared
num_workers : int
Number of worker processes for multithreaded t_stat computation
Defult value uses num_cpu - 1 workers
Returns
-------
t_stat : numpy array
Array which holds t statistic values for each point. The first
and last (window) points are replaced with zero, since the t
statistic calculation cannot be performed in that case.
"""
size = L.size
window = int(window)
frames = list(range(window))
n_cols = (size // window) - 1
t_stat = np.zeros((window, n_cols))
if num_workers == 1:
results = [_t_scan_drone(L, n_cols, frame, window) for frame in frames]
else:
if num_workers == -1:
num_workers = mp.cpu_count() - 1
pool = mp.Pool(processes = num_workers)
results = [pool.apply_async(_t_scan_drone, args=(L, n_cols, frame, window)) for frame in frames]
results = [r.get() for r in results]
pool.close()
for index, row in results:
t_stat[index] = row
t_stat = np.concatenate((
np.zeros(window),
t_stat.transpose().ravel(order='C'),
np.zeros(size % window)
))
return t_stat
def _t_scan_drone(L, n_cols, frame, window=1e3):
"""
Drone function for t_scan. Not Intended to be called manually.
Computes t_scan for the designated frame, and returns result as
array along with an integer tag for proper placement in the
aggregate array
"""
size = L.size
window = int(window)
root_n = sqrt(window)
output = np.zeros(n_cols)
b = L[frame:window+frame]
b_mean = b.mean()
b_var = b.var()
for i in range(window+frame, size-window, window):
a = L[i:i+window]
a_mean = a.mean()
a_var = a.var()
output[i // window - 1] = root_n * (a_mean - b_mean) / sqrt(a_var + b_var)
b_mean, b_var = a_mean, a_var
return frame, output
def mz_fwt(x, n=2):
"""
Computes the multiscale product of the Mallat-Zhong discrete forward
wavelet transform up to and including scale n for the input data x.
If n is even, the spikes in the signal will be positive. If n is odd
the spikes will match the polarity of the step (positive for steps
up, negative for steps down).
This function is essentially a direct translation of the MATLAB code
provided by Sadler and Swami in section A.4 of the following:
http://www.dtic.mil/dtic/tr/fulltext/u2/a351960.pdf
Parameters
----------
x : numpy array
1 dimensional array that represents time series of data points
n : int
Highest scale to multiply to
Returns
-------
prod : numpy array
The multiscale product for x
"""
N_pnts = x.size
lambda_j = [1.5, 1.12, 1.03, 1.01][0:n]
if n > 4:
lambda_j += [1.0]*(n-4)
H = np.array([0.125, 0.375, 0.375, 0.125])
G = np.array([2.0, -2.0])
Gn = [2]
Hn = [3]
for j in range(1,n):
q = 2**(j-1)
Gn.append(q+1)
Hn.append(3*q+1)
S = np.concatenate((x[::-1], x))
S = np.concatenate((S, x[::-1]))
prod = np.ones(N_pnts)
for j in range(n):
n_zeros = 2**j - 1
Gz = _insert_zeros(G, n_zeros)
Hz = _insert_zeros(H, n_zeros)
current = (1.0/lambda_j[j])*np.convolve(S,Gz)
current = current[N_pnts+Gn[j]:2*N_pnts+Gn[j]]
prod *= current
if j == n-1:
break
S_new = np.convolve(S, Hz)
S_new = S_new[N_pnts+Hn[j]:2*N_pnts+Hn[j]]
S = np.concatenate((S_new[::-1], S_new))
S = np.concatenate((S, S_new[::-1]))
return prod
def _insert_zeros(x, n):
"""
Helper function for mz_fwt. Splits input array and adds n zeros
between values.
"""
newlen = (n+1)*x.size
out = np.zeros(newlen)
indices = list(range(0, newlen-n, n+1))
out[indices] = x
return out
def find_steps(array, threshold):
"""
Finds local maxima by segmenting array based on positions at which
the threshold value is crossed. Note that this thresholding is
applied after the absolute value of the array is taken. Thus,
the distinction between upward and downward steps is lost. However,
get_step_sizes can be used to determine directionality after the
fact.
Parameters
----------
array : numpy array
1 dimensional array that represents time series of data points
threshold : int / float
Threshold value that defines a step
Returns
-------
steps : list
List of indices of the detected steps
"""
steps = list()
# First create array ap_dif which is 1 where data in series crossed from below threshold to above threshold
# (upward), -1 where it crosses downward, and 0 elsewhere.
array_abs = np.abs(array)
above_points = np.where(array_abs > threshold, 1, 0)
ap_dif = np.diff(above_points)
# To handle the edge case where data skips from being greater than the threshold on one side (positive or negative)
# to the opposite side without any transition points, manually insert -1 and 1 values into ap_diff so that these
# transitions will be accounted for.
pos_to_neg_diff = np.diff(np.where(array > threshold, 1, 0) + np.where(array < -threshold, -1, 0))
pos_to_neg_skips = np.where(np.abs(pos_to_neg_diff) == 2)[0]
for ix in pos_to_neg_skips:
if ap_dif[ix - 1] != 1: # only change if there's not an overlapping jump to avoid making ap_dif inconsistent
ap_dif[ix - 1] = -1
ap_dif[ix] = 1
# Compute list of indices in series where the differences / "first derivative" cross 0.
# This allows us to handle the edge case where multiple step changes occur in the source data while series remains
# above the threshold the entire time (otherwise only one step would be returned for this region). By checking
# the number of zero crossings that lie within the region bounded by the two threshold crossings
# we can iterate through the odd zero crossing indices and return the maximum value of series between each
# subset, each representing a different step event. deriv_zero_crossings_cnt should normally be 1, however if there
# are two step changes while above threshold it should be 3, three step changes -> 5, and so on.
series_first_deriv_zero_crossings = np.where(np.diff(np.sign(np.diff(array))))[0]
cross_ups = np.where(ap_dif == 1)[0]
cross_dns = np.where(ap_dif == -1)[0]
# In case the first identified threshold crossing is down instead of an up, add 0 to array of ups to pair it up
if len(cross_ups) > 0 and len(cross_dns) > 0 and cross_ups[0] > cross_dns[0]:
cross_ups = np.insert(cross_ups, 0, 0)
for upi, dni in zip(cross_ups, cross_dns):
deriv_zero_crossings = series_first_deriv_zero_crossings[
(series_first_deriv_zero_crossings > upi) & (series_first_deriv_zero_crossings < dni)]
deriv_zero_crossings_cnt = len(deriv_zero_crossings)
if deriv_zero_crossings_cnt % 2 == 1:
for i, cross in enumerate(deriv_zero_crossings):
if i % 2 == 1:
steps.append(np.argmax(array_abs[upi:cross + 1]) + upi)
upi = cross
steps.append(np.argmax(array_abs[upi:cross + 1]) + upi)
else:
steps.append(np.argmax(array_abs[upi:dni+1]) + upi)
return steps
def get_step_sizes(array, indices, window=1000):
"""
Calculates step size for each index within the supplied list. Step
size is determined by averaging over a range of points (specified
by the window parameter) before and after the index of step
occurrence. The directionality of the step is reflected by the sign
of the step size (i.e. a positive value indicates an upward step,
and a negative value indicates a downward step). The combined
standard deviation of both measurements (as a measure of uncertainty
in step calculation) is also provided.
Parameters
----------
array : numpy array
1 dimensional array that represents time series of data points
indices : list
List of indices of the detected steps (as provided by
find_steps, for example)
window : int, optional
Number of points to average over to determine baseline levels
before and after step.
Returns
-------
step_sizes : list
List of the calculated sizes of each step
step_error : list
"""
step_sizes = []
step_error = []
indices = sorted(indices)
last = len(indices) - 1
for i, index in enumerate(indices):
if i == 0:
q = min(window, indices[i+1]-index)
elif i == last:
q = min(window, index - indices[i-1])
else:
q = min(window, index-indices[i-1], indices[i+1]-index)
a = array[index:index+q]
b = array[index-q:index]
step_sizes.append(a.mean() - b.mean())
step_error.append(sqrt(a.var()+b.var()))
return step_sizes, step_error