-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRegistration.py
281 lines (236 loc) · 13.4 KB
/
Registration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
# -*- coding: utf-8 -*-
"""
The align class
"""
import numpy as np
import tensorflow as tf
tf.get_logger().setLevel('ERROR')
import copy
from photonpy import PostProcessMethods, Context, Dataset
from CatmullRomSpline2D import CatmullRomSpline2D
from Plot import Plot
from Channel import Channel
#%% Align class
class Registration(Plot):
'''
The AlignModel Class is a class used for the optimization of a certain Dataset class. Important is
that the loaded class contains the next variables:
ch1, ch2, ch20 : Nx2 float32
The two channels (and original channel2) that both contain positions callable by .pos
linked : bool
True if the dataset is linked (points are one-to-one)
img, imgsize, mid: np.array
The image borders, size and midpoint. Generated by self.imgparams
'''
def __init__(self):
self.AffineMat=None #LLS affine
self.AffineMatClusters=None #LLS affine
self.SplinesModel = None
self.CP_locs = None
self.gridsize=None
self.edge_grids=None
## Neighbours
self.ControlPoints=None
self.NN_maxDistance=None
self.NN_threshold=None
self.Neighbours=False
self.loss=[]
Plot.__init__(self)
def AffineLLS(self, maxDistance=None, k=-1):
# linear least squares for affine
def p_inv(matrix):
"""Returns the Moore-Penrose pseudo-inverse
(https://gist.github.com/popcornell/502f42666496c5bf9f7266bc42758761)"""
s, u, v = tf.linalg.svd(matrix)
threshold = tf.reduce_max(s) * 1e-5
s_mask = tf.boolean_mask(s, s > threshold)
s_inv = tf.linalg.diag(tf.concat([1. / s_mask, tf.zeros([tf.size(s) - tf.size(s_mask)])], 0))
return tf.matmul(v, tf.matmul(s_inv, tf.transpose(u)))
def LLS(X, Y, SVD=False):
Y1=Y[:,0][None]
Y2=Y[:,1][None]
if SVD: # use Moore-Penrose pseudo inverse
Xinv=p_inv(X)
M1=Y1@Xinv
M2=Y1@Xinv
else:
MatInv=tf.linalg.inv([email protected](X))
M1=Y1@(tf.transpose(X))@MatInv
M2=Y2@(tf.transpose(X))@MatInv
return tf.concat([M1,M2], axis=0)
X=tf.concat([tf.transpose(self.ch2.pos), tf.ones((1,self.ch2.pos.shape[0]),dtype=tf.float32)], axis=0)
if self.execute_linked:
self.AffineMat=LLS(X,self.ch1.pos)
self.ch2.pos.assign(tf.transpose(self.AffineMat@X))
else:
if maxDistance is None: raise ValueError("No maxdistance selected yet")
if not self.Neighbours: self.kNearestNeighbour(self.ch1.pos.numpy(), self.ch2.pos.numpy(),
k=k, maxDistance=maxDistance)
XNN=tf.concat([tf.transpose(self.ch2NN.pos), tf.ones((1,self.ch2NN.pos.shape[0]),dtype=tf.float32)], axis=0)
self.AffineMat=LLS(XNN,self.ch1NN.pos)
self.ch2.pos.assign(tf.transpose(self.AffineMat@X))
def Apply_Affine(self, AffineMat):
if AffineMat is None: raise Exception("AffineLLS has not been trained yet")
X=tf.concat([tf.transpose(self.ch2.pos), tf.ones((1,self.ch2.pos.shape[0]),dtype=tf.float32)], axis=0)
self.ch2.pos.assign(tf.transpose(AffineMat@X))
#%% Optimization functions
def Train_Model(self, model, lr=1, epochs=100, opt_fn=tf.optimizers.Adagrad,
ch1=None, ch2=None, opt=None, maxDistance=1000):
if epochs!=0 and epochs is not None:
if self.BatchOptimization:
if self.execute_linked: batches=self.counts_linked
else: batches=self.counts_Neighbours
if batches is None: raise Exception('Batches have not been initialized yet!')
self.Nbatches=len(batches)
print('Training '+model.name+' Mapping with (lr,#it)='+str((lr,epochs))+' for '+str(self.Nbatches)+' Batches...')
else:
print('Training '+model.name+' Mapping with (lr,#it)='+str((lr,epochs))+'...')
batches=None # take whole dataset as single batch
## Initialize Variables
if ch1 is None and ch2 is None:
if self.execute_linked and self.linked:
ch1, ch2 = self.ch1, self.ch2
elif (not self.execute_linked):
ch1, ch2 = (self.ch1NN, self.ch2NN)
elif self.execute_linked and (not self.linked): raise Exception('Tried to execute linked but dataset has not been linked.')
elif (not self.execute_linked) and (not self.Neighbours): raise Exception('Tried to execute linked but no neighbours have been generated.')
else:
raise Exception('Dataset is not linked but no Neighbours have been generated yet')
## The training loop
if opt is None: opt=opt_fn(lr)
for i in range(epochs):
loss=self.train_step(model, epochs, opt, ch1, ch2, batches)
if i%100==0 and i!=0: print('iteration='+str(i)+'/'+str(epochs))
return loss
def train_step(self, model, epochs, opt, ch1, ch2, batches=None):
# the optimization step
if self.BatchOptimization: ## work with batches of frames
pos,loss=(0,0)
frame,_=tf.unique(ch1.frame) ###########
for i in range(len(batches)): # work with batches
batch=batches[i]
idx1=tf.range(pos, pos+batch)[:,None]
pos1_fr=tf.gather_nd(ch1.pos,idx1)
pos2_fr=tf.gather_nd(ch2.pos,idx1)
with tf.GradientTape() as tape: # calculate loss
if self.execute_linked:
loss+=tf.reduce_sum(tf.square(pos1_fr-model(pos2_fr)))
else:
loss-=tf.reduce_sum(tf.exp(-1*tf.reduce_sum(tf.square(pos1_fr-model(pos2_fr))/(1e6),axis=-1)))
#loss+=-tf.reduce_sum(tf.math.log((self.Neighbours_mat[i] @
# tf.exp(-1*tf.reduce_sum(tf.square(pos1_fr-model(pos2_fr)),axis=-1)
# /(self.pix_size**2))[:,None])))
pos+=batch
# calculate and apply gradients
grads = tape.gradient(loss, model.trainable_weights)
opt.apply_gradients(zip(grads, model.trainable_weights))
else :## take whole dataset as single batch
with tf.GradientTape() as tape: # calculate loss
if self.execute_linked:
loss=tf.reduce_sum(tf.square(ch1.pos-model(ch2.pos)))
else:
#loss=tf.reduce_sum(tf.square(ch1.pos-model(ch2.pos)))
loss=-tf.math.log(
tf.reduce_sum(tf.exp(-1*tf.reduce_sum(tf.square(ch1.pos-model(ch2.pos))/(1e6),axis=-1)))
)
#loss=-tf.reduce_sum(tf.math.log((self.Neighbours_mat @
# tf.exp(-1*tf.reduce_sum(tf.square(ch1.pos-model(ch2.pos)),axis=-1)
# /(self.pix_size**2))[:,None])))
# calculate and apply gradients
grads = tape.gradient(loss, model.trainable_weights)
opt.apply_gradients(zip(grads, model.trainable_weights))
#print('loss='+str(loss))
return loss
def Apply_Model(self, model, ch2=None):
print('Applying '+model.name+' Mapping...')
if model is None: print('Model not trained yet, will pass without Applying.')
else:
if ch2 is None:
ch2_mapped=model(self.ch2.pos)
if tf.reduce_any(tf.math.is_nan( ch2_mapped )):
raise ValueError('ch2 contains infinities. The mapping likely exploded.')
self.ch2.pos.assign(ch2_mapped)
if self.Neighbours:
self.ch2NN.pos.assign((model(self.ch2NN.pos)))
else:
ch2_mapped=model(ch2)
if tf.reduce_any(tf.math.is_nan( ch2_mapped )):
raise ValueError('ch2 contains infinities. The mapping likely exploded.')
return ch2_mapped
#%% CatmullRom Splines
def Train_Splines(self, learning_rate, epochs, gridsize=3000,
edge_grids=1, opt_fn=tf.optimizers.SGD, maxDistance=1000, k=1):
# initializing and training the model
ch1_input,ch2_input=self.InitializeSplines(gridsize=gridsize, edge_grids=edge_grids,
maxDistance=maxDistance, k=k)
self.SplinesModel=CatmullRomSpline2D(self.ControlPoints)
self.Train_Model(self.SplinesModel, lr=learning_rate, epochs=epochs, opt_fn=opt_fn,
ch1=ch1_input, ch2=ch2_input)
self.ControlPoints = self.SplinesModel.ControlPoints
def Apply_Splines(self):
self.ControlPoints = self.SplinesModel.ControlPoints
self.ch2.pos.assign(self.InputSplines(
self.Apply_Model(self.SplinesModel, ch2=self.InputSplines(self.ch2.pos)),
inverse=True))
if self.Neighbours:
self.ch2NN.pos.assign(self.InputSplines(
self.Apply_Model(self.SplinesModel, ch2=self.InputSplines(self.ch2NN.pos)),
inverse=True))
def InitializeSplines(self, gridsize=3000, edge_grids=1, maxDistance=1000, k=-1):
self.ControlPoints=self.generate_CPgrid(gridsize, edge_grids)
self.edge_grids = edge_grids
self.gridsize=gridsize
## Create Nearest Neighbours
if self.execute_linked and self.linked:
## Create variables normalized by gridsize
ch1_input = Channel(self.InputSplines(self.ch1.pos), self.ch1.frame )
ch2_input = Channel(self.InputSplines(self.ch2.pos), self.ch2.frame )
elif not self.execute_linked:
if not self.Neighbours: self.kNearestNeighbour(self.ch1.pos.numpy(), self.ch2.pos.numpy(),
k=k, maxDistance=maxDistance)
## Create variables normalized by gridsize
ch1_input = Channel(self.InputSplines(self.ch1NN.pos), np.zeros(self.ch1NN.pos.shape[0]) )
ch2_input = Channel(self.InputSplines(self.ch2NN.pos), np.zeros(self.ch2NN.pos.shape[0]) )
elif self.execute_linked and (not self.linked): raise Exception('Tried to execute linked but dataset has not been linked.')
elif (not self.execute_linked) and (not self.Neighbours): raise Exception('Tried to execute linked but no neighbours have been generated.')
else:
raise Exception('Trying to calculate loss without Dataset being linked or Neighbours having been generated!')
return ch1_input, ch2_input
def InputSplines(self, pts, inverse=False, gridsize=None):
if inverse:
return tf.Variable( tf.stack([
(pts[:,0] + self.x1_min-1 - self.edge_grids) * self.gridsize,
(pts[:,1] + self.x2_min-1 - self.edge_grids) * self.gridsize
], axis=-1), dtype=tf.float32, trainable=False)
else:
return tf.Variable( tf.stack([
pts[:,0] / self.gridsize - self.x1_min+1 + self.edge_grids,
pts[:,1] / self.gridsize - self.x2_min+1 + self.edge_grids
], axis=-1), dtype=tf.float32, trainable=False)
def generate_CPgrid(self, gridsize=3000, edge_grids=1):
## Generate the borders of the system
self.x1_min = np.min([np.min(tf.reduce_min((self.ch1.pos[:,0]))),
np.min(tf.reduce_min((self.ch2.pos[:,0])))])/gridsize
self.x2_min = np.min([np.min(tf.reduce_min((self.ch1.pos[:,1]))),
np.min(tf.reduce_min((self.ch2.pos[:,1])))])/gridsize
self.x1_max = np.max([np.max(tf.reduce_max((self.ch1.pos[:,0]))),
np.max(tf.reduce_max((self.ch2.pos[:,0])))])/gridsize
self.x2_max = np.max([np.max(tf.reduce_max((self.ch1.pos[:,1]))),
np.max(tf.reduce_max((self.ch2.pos[:,1])))])/gridsize
## Create grid
x1_grid = tf.range(0, self.x1_max+2 - self.x1_min+1 + 2*edge_grids, dtype=tf.float32)
x2_grid = tf.range(0, self.x2_max+2 - self.x2_min+1 + 2*edge_grids, dtype=tf.float32)
ControlPoints = tf.stack(tf.meshgrid(x1_grid, x2_grid), axis=-1)
return ControlPoints
#%% miscaleneous fn
def copy_models(self, other):
self.AffineMat = copy.deepcopy(other.AffineMat)
self.SplinesModel = copy.deepcopy(other.SplinesModel)
self.CP_locs = copy.deepcopy(other.CP_locs)
self.gridsize = other.gridsize
self.edge_grids = other.edge_grids
if self.gridsize is not None:
self.x1_min = other.x1_min
self.x2_min = other.x2_min
self.x1_max = other.x1_max
self.x2_max = other.x2_max