-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_simulation_large.py
158 lines (139 loc) · 5.6 KB
/
data_simulation_large.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import matplotlib.pyplot as plt
import numpy as np
import h5py
import random
from typing import Tuple
import cv2
image_size = 50
class SlidingWindow ():
"""
Produce perturbation matrices based on hard, block-y occlusion areas as
generated by sliding a window of a configured size over the area of an
image.
Due to the geometry of sliding windows, if the stride given does not evenly
divide the window size along the applicable axis, then the result plane of
values when summing the generated masks will not be even.
Related, if the stride is set to be larger than the window size, the
resulting plane of summed values will also not be even, as there be
increasingly long valleys of unperturbed space between masked regions.
:param window_size: The block window size in pixels as a tuple with format
`(height, width)`.
:param stride: The sliding window striding step in pixels as a tuple with
format `(height_step, width_step)`.
"""
def __init__(
self,
window_size: Tuple[int, int] = (25, 25),
stride: Tuple[int, int] = (5, 5),
):
self.window_size: Tuple[int, int] = (int(window_size[0]), int(window_size[1]))
self.stride: Tuple[int, int] = (int(stride[0]), int(stride[1]))
def perturb(
self,
ref_image: np.ndarray
) -> np.ndarray:
win_h, win_w = self.window_size
stride_h, stride_w = self.stride
img_size = ref_image.shape[:2]
img_h, img_w = img_size
rows = np.arange(0 + stride_h - win_h, img_h, stride_h)
cols = np.arange(0 + stride_w - win_w, img_w, stride_w)
num_masks = len(rows) * len(cols)
masks = np.zeros((num_masks, img_h, img_w), dtype=int)
rows_m = np.repeat(rows, len(cols))
cols_m = np.tile(cols, len(rows))
for i, (r, c) in enumerate(zip(rows_m, cols_m)):
# use of np.clip function here is more costly than min/max use.
r1 = max(0, r)
r2 = min(r + win_h, img_h)
c1 = max(0, c)
c2 = min(c + win_w, img_w)
rs = slice(r1, r2)
cs = slice(c1, c2)
masks[i, rs, cs] = 1
return masks
gt_filepath = "./ground_truth_test_000.hdf5"
sample_ids = random.sample(range(0, 127), 1)
# Loading ground truth file for the observation data loaded above
with h5py.File(gt_filepath, "r") as gtf:
for i, ids in enumerate(sample_ids):
sample_x = gtf['data'][ids, :, :]
sample_x = cv2.resize(sample_x, dsize=(image_size, image_size), interpolation=cv2.INTER_CUBIC)
sample_x = (255 * (sample_x - np.min(sample_x)) / np.ptp(sample_x)).astype(float)
# Obtain actual vector x used in model
x_flatten = sample_x.flatten()
x, y = sample_x.shape
string_list = []
for i in range(x):
for j in range(y):
string_list.append("{},{}".format(i, j))
indices_A = np.array(string_list, dtype = 'object').reshape(x, y)
# Visualize the image
print(sample_x)
plt.figure()
plt.imshow(sample_x, cmap='gray')
plt.title('Sample Image')
plt.show()
plt.close()
# Computing rays along all possible diagonals and their respective indices ('row', 'column')
def sample_diag(sample_x):
diags = [sample_x[::-1, :].diagonal(i) for i in range(-sample_x.shape[0]+1, sample_x.shape[1])]
diags.extend(sample_x.diagonal(i) for i in range(sample_x.shape[1]-1, -sample_x.shape[0], -1))
return [n.tolist() for n in diags]
diag_indic = sample_diag(indices_A)
# Computing rays along all columns
column_arr = []
column_indic_global = []
for j in range(y):
columns = [row[j] for row in sample_x]
column_arr.append(columns)
column_indic = []
for r_cnt in range(x):
column_indic.append("{},{}".format(r_cnt, j))
column_indic_global.append(column_indic)
# Computing rays along all rows
rows_arr = []
row_indic_global = []
for j in range(x):
rows = [row for row in sample_x[j, :]]
rows_arr.append(rows)
row_indic = []
for c_cnt in range(y):
row_indic.append("{},{}".format(j, c_cnt))
row_indic_global.append(row_indic)
# Using a sliding window of size 25 and stride 5 for observations
window_arr = []
window_indic_global = []
for i in [3, 5]:
# Iterate through multiple window size and stride for more observations
sd = SlidingWindow((i, i), (2, 1))
masks = sd.perturb(sample_x)
for m_n, mask in enumerate(masks):
window_arr = []
N = sum(mask)
A_ind = np.where(mask == 1)
window_arr = ["{},{}".format(A_row, A_col) for A_row, A_col in zip(A_ind[0], A_ind[1])]
window_indic_global.append(window_arr)
# Joining diagonals, columns and row indices and values into a single list
all_indices = [*diag_indic, *column_indic_global, *row_indic_global, *window_indic_global]
# Creating empty mixing matrix A for model
row_dim, col_dim = np.shape(sample_x)
A_mixing_matrix = np.zeros((len(all_indices), row_dim, col_dim))
for obs_n, ind in enumerate(all_indices):
for sub_i in ind:
local_x, local_y = sub_i.split(',')
A_mixing_matrix[obs_n, int(local_x), int(local_y)] = 1
A_final = A_mixing_matrix.reshape((obs_n+1, row_dim*col_dim))
# Pure observations are used for lambda to sample from poisson distribution
pure_obser = A_final@x_flatten
poisson_obser = np.random.poisson(pure_obser, len(all_indices))
# Saving to mat files for use in MATLAB
np.save('simulated_large_A.npy', A_final)
np.save('simulated_large_x.npy', x_flatten)
print("Image vector x is of shape {}".format(len(x_flatten)))
print("Mixing matrix A is of shape {}".format(np.shape(A_final)))
print("Obervation matrix Y is of shape {}".format(np.shape(poisson_obser)))
print("X:", x_flatten)
print("A:", A_final)
print("Y:", poisson_obser)
print("Y without poisson", pure_obser)