-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathorthogen.py
164 lines (150 loc) · 6.79 KB
/
orthogen.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
Orthogonal data generator for data transformation. After transformation, the
specified group of features will be orthogonal to rest of features with minimal
information loss.
'''
import sys
import arrow
import numpy as np
from numpy import matmul, multiply, ones, zeros, eye
from numpy.linalg import inv, norm
class OrthoDataGen(object):
'''
Orthogonal Data Generator
Rawdata consists of data matrix X and Z where Z might be highly correlated
with X due to some potential bias. The objective of this method is removing
the bias by looking for X_hat is orthogonal to Z (cov(X_hat, Z) = 0) with
minimal information loss w.r.t X.
This problem is equivalent to a minimization of Frobenius distance.
argmin || X - SU^T ||_F^2, subject to < SU^T, Z > = 0, U \\in G_{p,k}
where G_{p,k} is the Grassman maniford of orthogonal matrices.
'''
def __init__(self, X, Z, k):
# data dimension check
n, p = X.shape
n_z, m = Z.shape
assert n == n_z, 'X and Z have unequal number of rows.'
# variables initialization
self.n, self.p, self.k = n, p, k # number of data / number of feature / rank
self.X = X # n x p data matrix of p features measured over n subjects,
self.Z = Z # n x m additional group membership variable,
self.U = np.random.uniform(0, 1, (p, k)) # p x k matrix of k linear orthonormal basis vectors,
self.S = np.random.uniform(0, 1, (n, k)) # n x k matrix of associated scores.
print('[%s] n = %d, p = %d, k = %d, X is %d x %d, Z is %d x %d.' %
(arrow.now(), n, p, k, n, p, n, m), file=sys.stderr)
def sog(self, t, tol=1e-1, verbose=True):
'''
Sparse Orthogonal to Subgroup (SOG) algorithm
'''
THETA_DETAULT = .1
# update each column of matrix U and S.
for j in range(self.k):
print('[%s] updating %d/%d ...' % (arrow.now(), j+1, self.k), file=sys.stderr)
# stop when changes in u_j and s_j are sufficiently small
last_s_j = zeros((self.n, 1))
last_u_j = zeros((self.p, 1))
i = 0
while self.change_measure(last_s_j, self.S[:, j], tol) or \
self.change_measure(last_u_j, self.U[:, j], tol):
# update last s_j, u_j
last_s_j = np.copy(self.S[:, j])
last_u_j = np.copy(self.U[:, j])
P_j = eye(self.n) - sum([ matmul(self.S[:, l], self.S[:, l].T) for l in range(j-1) ])
beta_j = matmul(matmul(matmul(matmul(inv(matmul(self.Z.T, self.Z)), self.Z.T), P_j), self.X), self.U[:, j])
# update s_j
# s_j is the column of matrix U, where len(s_j) = n, j = 1, ..., k
unnorm_s_j = matmul(matmul(P_j, self.X), self.U[:, j]) - matmul(self.Z, beta_j)
self.S[:, j] = unnorm_s_j / norm(unnorm_s_j + 1e-10)
# update u_j
# u_j is the column of matrix S, where len(u_j) = p, j = 1, ..., k
XT_sj = matmul(self.X.T, self.S[:, j])
# theta = 0 if norm(XT_sj, ord=1) <= t else THETA_DETAULT
# S_theta_x = self.soft_threshold_operator(theta, XT_sj)
# self.U[:, j] = S_theta_x / norm(S_theta_x + 1e-10)
self.U[:, j], theta, diff = self.U_j(XT_sj, t)
# log information if verbose is true
if verbose:
print('[%s] ---------------------------------' % arrow.now(), file=sys.stderr)
print('[%s] iter %d' % (arrow.now(), i), file=sys.stderr)
print('[%s]\t||u_j||_1 = %.3f, theta = %.3f, norm diff = %.3f' %
(arrow.now(), norm(self.U[:, j], ord=1), theta, diff),
file=sys.stderr)
print('[%s]\ts_j change is %.3f, u_j change is %.3f' %
(arrow.now(),
norm(last_s_j - self.S[:, j]),
norm(last_u_j - self.U[:, j])),
file=sys.stderr)
print('[%s]\tFrobenius measure is %.3f' %
(arrow.now(), norm(self.X - matmul(self.S, self.U.T), ord='fro')),
file=sys.stderr)
i += 1
def reconstruct(self):
'''
Reconstruct X_hat
'''
# d_j = s_j^T * X * u_j.
d = [ (matmul(matmul(self.S[:, j].T, self.X), self.U[:, j]) * self.S[:, j]).tolist()
for j in range(self.k) ]
# S denote the n x k matrix with columns d_j * s_j.
# U is the p x k sparse matrix with rows u_j, j = 1, ..., k
# return X_hat = S * U^T
X_hat = matmul(np.array(d).T, self.U.T)
return X_hat
def U_j(self, x, t, min_theta=0., max_theta=1., num_search=1000):
'''
Calculate U_j with indicated t value.
It will search all possible theta to make the difference |norm(u_j) - t|
as small as possible. And use this theta and input x to compute the
u_j with soft threshold operator.
'''
# initiate the optimal theta, u_j and minimal difference
min_diff = 999
opt_theta = 0
opt_u_j = 0
# search through all possible theta
for theta in np.linspace(min_theta, max_theta, num_search):
S_theta_x = self.soft_threshold_operator(theta, x)
u_j = S_theta_x / norm(S_theta_x) if norm(S_theta_x) != 0 else 0
diff = abs(norm(u_j, ord=1) - t)
# break the loop once have 0 diff
if diff == 0:
opt_u_j = u_j
opt_theta = theta
min_diff = norm(u_j, ord=1) - t
break
# look for the minimal diff
if abs(norm(u_j, ord=1) - t) < min_diff:
opt_u_j = u_j
opt_theta = theta
min_diff = norm(u_j, ord=1) - t
return opt_u_j, opt_theta, min_diff
@staticmethod
def soft_threshold_operator(theta, x):
indicator_func = ones(len(x)) * (abs(x) >= theta)
return multiply(multiply(np.sign(x), (abs(x) - theta)), indicator_func)
@staticmethod
def change_measure(mat_a, mat_b, tol):
measure = norm(mat_a - mat_b, ord=2)
return True if measure > tol else False
# Unittest on a simple example
if __name__ == '__main__':
X = np.array([
[1, 1, 0, 0.1, 0, 0],
[1, 1, 0, 0, 0, 0],
[0, 0, 1, 1, 0, 0],
[0, 0, 1, 1, 0.1, 0],
[0, 0, 0, 0, 1, 1]
])
Z = np.array([
[1, 1],
[1, 1],
[1, 1.1],
[1.1, 1],
[1, 1.1]
])
k = 3
odg = OrthoDataGen(X, Z, k)
odg.sog(t=1.529, tol=1e-2)
print(odg.reconstruct())