diff --git a/libs/models/som.py b/libs/models/som.py index b7ebcf8..68a8ed5 100644 --- a/libs/models/som.py +++ b/libs/models/som.py @@ -6,7 +6,7 @@ class SOM: - def __init__(self, X, latent_dim, resolution, sigma_max, sigma_min, tau, init='random',metric="sqeuclidean"): + def __init__(self, X, latent_dim, resolution, sigma_max, sigma_min, tau, init='random', metric="sqeuclidean"): self.X = X self.N = self.X.shape[0] @@ -28,22 +28,23 @@ def __init__(self, X, latent_dim, resolution, sigma_max, sigma_min, tau, init='r elif latent_dim == 2: if isinstance(init, str) and init == 'PCA': comp1, comp2 = pca.singular_values_[0], pca.singular_values_[1] - zeta = np.meshgrid(np.linspace(-1, 1, resolution), np.linspace(-comp2/comp1, comp2/comp1, resolution)) + zeta = np.meshgrid(np.linspace(-1, 1, resolution), + np.linspace(-comp2 / comp1, comp2 / comp1, resolution)) else: zeta = np.meshgrid(np.linspace(-1, 1, resolution), np.linspace(-1, 1, resolution)) - self.Zeta = np.dstack(zeta).reshape(resolution**2, latent_dim) + self.Zeta = np.dstack(zeta).reshape(resolution ** 2, latent_dim) else: raise ValueError("invalid latent dimension: {}".format(latent_dim)) - self.K = resolution**self.L + self.K = resolution ** self.L if isinstance(init, str) and init == 'random': self.Z = np.random.rand(self.N, latent_dim) * 2.0 - 1.0 elif isinstance(init, str) and init == 'random_bmu': init_bmus = np.random.randint(0, self.Zeta.shape[0] - 1, self.N) - self.Z = self.Zeta[init_bmus,:] + self.Z = self.Zeta[init_bmus, :] elif isinstance(init, str) and init == 'PCA': - self.Z = pca.transform(X)/comp1 + self.Z = pca.transform(X) / comp1 elif isinstance(init, np.ndarray) and init.dtype == int: init_bmus = init.copy() self.Z = self.Zeta[init_bmus, :] @@ -52,9 +53,9 @@ def __init__(self, X, latent_dim, resolution, sigma_max, sigma_min, tau, init='r else: raise ValueError("invalid init: {}".format(init)) - #metricに関する処理 + # metricに関する処理 if metric == "sqeuclidean": - self.metric="sqeuclidean" + self.metric = "sqeuclidean" elif metric == "KLdivergence": self.metric = "KLdivergence" @@ -78,28 +79,28 @@ def fit(self, nb_epoch=100, verbose=True): # 協調過程 # 学習量を計算 # sigma = self.sigma_min + (self.sigma_max - self.sigma_min) * np.exp(-epoch / self.tau) # 近傍半径を設定 - sigma = max(self.sigma_min, self.sigma_max * ( 1 - (epoch / self.tau) ) )# 近傍半径を設定 + sigma = max(self.sigma_min, self.sigma_max * (1 - (epoch / self.tau))) # 近傍半径を設定 Dist = dist.cdist(self.Zeta, self.Z, 'sqeuclidean') # KxNの距離行列を計算 # ノードと勝者ノードの全ての組み合わせにおける距離を網羅した行列 - H = np.exp(-Dist / (2 * sigma * sigma)) # KxNの学習量行列を計算 + H = np.exp(-Dist / (2 * sigma * sigma)) # KxNの学習量行列を計算 # 適合過程 # 参照ベクトルの更新 - G = np.sum(H, axis=1)[:, np.newaxis] # 各ノードが受ける学習量の総和を保持するKx1の列ベクトルを計算 - Ginv = np.reciprocal(G) # Gのそれぞれの要素の逆数を取る - R = H * Ginv # 学習量の総和が1になるように規格化 - self.Y = R @ self.X # 学習量を重みとして観測データの平均を取り参照ベクトルとする + G = np.sum(H, axis=1)[:, np.newaxis] # 各ノードが受ける学習量の総和を保持するKx1の列ベクトルを計算 + Ginv = np.reciprocal(G) # Gのそれぞれの要素の逆数を取る + R = H * Ginv # 学習量の総和が1になるように規格化 + self.Y = R @ self.X # 学習量を重みとして観測データの平均を取り参照ベクトルとする # 競合過程 - if self.metric is "sqeuclidean": # ユークリッド距離を使った勝者決定 + if self.metric == "sqeuclidean": # ユークリッド距離を使った勝者決定 # 勝者ノードの計算 Dist = dist.cdist(self.X, self.Y) # NxKの距離行列を計算 bmus = Dist.argmin(axis=1) # Nx1の勝者ノード番号をまとめた列ベクトルを計算 # argmin(axis=1)を用いて各行で最小値を探しそのインデックスを返す self.Z = self.Zeta[bmus, :] # 勝者ノード番号から勝者ノードを求める - elif self.metric is "KLdivergence": # KL情報量を使った勝者決定 + elif self.metric == "KLdivergence": # KL情報量を使った勝者決定 Dist = np.sum(self.X[:, np.newaxis, :] * np.log(self.Y)[np.newaxis, :, :], axis=2) # N*K行列 # 勝者番号の決定 bmus = np.argmax(Dist, axis=1) @@ -110,3 +111,11 @@ def fit(self, nb_epoch=100, verbose=True): self.history['z'][epoch] = self.Z self.history['y'][epoch] = self.Y self.history['sigma'][epoch] = sigma + + def transform(self, X): + if self.metric == "sqeuclidean": + distance = dist.cdist(X, self.Y, self.metric) + return self.Zeta[distance.argmin(axis=1)] + elif self.metric == "KLdivergence": + divergence = -np.sum(self.X[:, np.newaxis, :] * np.log(self.Y[np.newaxis, :, :]), axis=2) # NxK + return self.Zeta[divergence.argmin(axis=1)] diff --git a/libs/models/tsom_plus_som.py b/libs/models/tsom_plus_som.py index 10e0609..7956dcb 100644 --- a/libs/models/tsom_plus_som.py +++ b/libs/models/tsom_plus_som.py @@ -5,13 +5,13 @@ class TSOMPlusSOM: - def __init__(self, member_features, index_members_of_group, params_tsom, params_som): + def __init__(self, member_features, group_features, params_tsom, params_som): self.params_tsom = params_tsom self.params_som = params_som self.params_tsom['X'] = member_features - self.index_members_of_group = index_members_of_group # グループ数の確認 - self.group_num = len(self.index_members_of_group) + self.group_features = group_features # グループ数の確認 + self.group_num = len(self.group_features) def fit(self, tsom_epoch_num, kernel_width, som_epoch_num): self._fit_1st_TSOM(tsom_epoch_num) @@ -23,19 +23,37 @@ def _fit_1st_TSOM(self, tsom_epoch_num): self.tsom.fit(tsom_epoch_num) def _fit_KDE(self, kernel_width): # 学習した後の潜在空間からKDEで確率分布を作る - prob_data = np.zeros((self.group_num, self.tsom.K1)) # group数*ノード数 - # グループごとにKDEを適用 - for i in range(self.group_num): - Dist = dist.cdist(self.tsom.Zeta1, self.tsom.Z1[self.index_members_of_group[i], :], - 'sqeuclidean') # KxNの距離行列を計算 - H = np.exp(-Dist / (2 * kernel_width * kernel_width)) # KxNの学習量行列を計算 - prob = np.sum(H, axis=1) - prob_sum = np.sum(prob) - prob = prob / prob_sum - prob_data[i, :] = prob + prob_data = self._calculate_kde(group_features=self.group_features, kernel_width=kernel_width) self.params_som['X'] = prob_data self.params_som['metric'] = "KLdivergence" + def _calculate_kde(self, group_features, kernel_width): + # グループごとにKDEを適用 + if isinstance(group_features, np.ndarray) and group_features.ndim == 2: + # group_featuresがbag of membersで与えられた時の処理 + distance = dist.cdist(self.tsom.Zeta1, self.tsom.Z1, 'sqeuclidean') # K1 x num_members + H = np.exp(-0.5 * distance / (kernel_width * kernel_width)) # KxN + prob_data = group_features @ H.T # num_group x K1 + prob_data = prob_data / prob_data.sum(axis=1)[:, None] + else: + # group_featuresがlist of listsもしくはlist of arraysで与えられた時の処理 + prob_data = np.zeros((self.group_num, self.tsom.K1)) # group数*ノード数 + for i, one_group_features in enumerate(group_features): + Dist = dist.cdist(self.tsom.Zeta1, + self.tsom.Z1[one_group_features, :], + 'sqeuclidean') # KxNの距離行列を計算 + H = np.exp(-Dist / (2 * kernel_width * kernel_width)) # KxNのカーネルの値を計算 + prob = np.sum(H, axis=1) + prob_sum = np.sum(prob) + prob = prob / prob_sum + prob_data[i, :] = prob + return prob_data + def _fit_2nd_SOM(self, som_epoch_num): # 上位のSOMを self.som = SOM(**self.params_som) self.som.fit(som_epoch_num) + + def transform(self, group_features, kernel_width): + group_density = self._calculate_kde(group_features=group_features, + kernel_width=kernel_width) + return self.som.transform(X=group_density) diff --git a/libs/models/unsupervised_kernel_regression.py b/libs/models/unsupervised_kernel_regression.py new file mode 100644 index 0000000..e238df5 --- /dev/null +++ b/libs/models/unsupervised_kernel_regression.py @@ -0,0 +1,201 @@ +import numpy as np +import scipy.spatial.distance as dist +from tqdm import tqdm + + +class UnsupervisedKernelRegression(object): + def __init__(self, X, n_components, bandwidth_gaussian_kernel=1.0, + is_compact=False, lambda_=1.0, + init='random', is_loocv=False, is_save_history=False): + self.X = X.copy() + self.n_samples = X.shape[0] + self.n_dimensions = X.shape[1] + self.n_components = n_components + self.bandwidth_gaussian_kernel = bandwidth_gaussian_kernel + self.precision = 1.0 / (bandwidth_gaussian_kernel * bandwidth_gaussian_kernel) + self.is_compact = is_compact + self.is_loocv = is_loocv + self.is_save_hisotry = is_save_history + + self.Z = None + if isinstance(init, str) and init in 'random': + self.Z = np.random.normal(0, 1.0, (self.n_samples, self.n_components)) * bandwidth_gaussian_kernel * 0.5 + elif isinstance(init, np.ndarray) and init.shape == (self.n_samples, self.n_components): + self.Z = init.copy() + else: + raise ValueError("invalid init: {}".format(init)) + + self.lambda_ = lambda_ + + + + self._done_fit = False + + + + def fit(self, nb_epoch=100, verbose=True, eta=0.5, expand_epoch=None): + + K = self.X @ self.X.T + + self.nb_epoch = nb_epoch + + if self.is_save_hisotry: + self.history = {} + self.history['z'] = np.zeros((nb_epoch, self.n_samples, self.n_components)) + self.history['y'] = np.zeros((nb_epoch, self.n_samples, self.n_dimensions)) + self.history['zvar'] = np.zeros((nb_epoch, self.n_components)) + self.history['obj_func'] = np.zeros(nb_epoch) + + + if verbose: + bar = tqdm(range(nb_epoch)) + else: + bar = range(nb_epoch) + + + for epoch in bar: + Delta = self.Z[:, None, :] - self.Z[None, :, :] + DistZ = np.sum(np.square(Delta), axis=2) + H = np.exp(-0.5 * self.precision * DistZ) + if self.is_loocv: + H -= np.identity(H.shape[0]) + + G = np.sum(H, axis=1)[:, None] + GInv = 1 / G + R = H * GInv + + Y = R @ self.X + DeltaYX = Y[:,None,:] - self.X[None, :, :] + Error = Y - self.X + obj_func = np.sum(np.square(Error)) / self.n_samples + self.lambda_ * np.sum(np.square(self.Z)) + + A = self.precision * R * np.einsum('nd,nid->ni', Y - self.X, DeltaYX) + dFdZ = -2.0 * np.sum((A + A.T)[:, :, None] * Delta, axis=1) / self.n_samples + + dFdZ -= self.lambda_ * self.Z + + self.Z += eta * dFdZ + if self.is_compact: + self.Z = np.clip(self.Z,-1.0,1.0) + else: + self.Z -= self.Z.mean(axis=0) + + + if self.is_save_hisotry: + self.history['z'][epoch] = self.Z + self.history['y'][epoch] = Y + self.history['zvar'][epoch] = np.mean(np.square(self.Z - self.Z.mean(axis=0)),axis=0) + self.history['obj_func'][epoch] = obj_func + + + + self._done_fit = True + return self.history + + def calculation_history_of_mapping(self, resolution, size='auto'): + """ + :param resolution: + :param size: + :return: + """ + if not self._done_fit: + raise ValueError("fit is not done") + + self.resolution = resolution + Zeta = create_zeta(-1, 1, self.n_components, resolution) + M = Zeta.shape[0] + + self.history['f'] = np.zeros((self.nb_epoch, M, self.n_dimensions)) + + for epoch in range(self.nb_epoch): + Z = self.history['z'][epoch] + if size == 'auto': + Zeta = create_zeta(Z.min(), Z.max(), self.n_components, resolution) + else: + Zeta = create_zeta(size.min(), size.max(), self.n_components, resolution) + + Dist = dist.cdist(Zeta, Z, 'sqeuclidean') + + H = np.exp(-0.5 * self.precision * Dist) + G = np.sum(H, axis=1)[:, None] + GInv = np.reciprocal(G) + R = H * GInv + + Y = np.dot(R, self.X) + + self.history['f'][epoch] = Y + + def transform(self, Xnew, nb_epoch_trans=100, eta_trans=0.5, verbose=True, constrained=True): + # calculate latent variables of new data using gradient descent + # objective function is square error E = ||f(z)-x||^2 + + if not self._done_fit: + raise ValueError("fit is not done") + + Nnew = Xnew.shape[0] + + # initialize Znew, using latent variables of observed data + Dist_Xnew_X = dist.cdist(Xnew, self.X) + BMS = np.argmin(Dist_Xnew_X, axis=1) # calculate Best Matching Sample + Znew = self.Z[BMS,:] # initialize Znew + + if verbose: + bar = tqdm(range(nb_epoch_trans)) + else: + bar = range(nb_epoch_trans) + + for epoch in bar: + # calculate gradient + Delta = self.Z[None,:,:] - Znew[:,None,:] # shape = (Nnew,N,L) + Dist_Znew_Z = dist.cdist(Znew,self.Z,"sqeuclidean") # shape = (Nnew,N) + H = np.exp(-0.5 * self.precision * Dist_Znew_Z) # shape = (Nnew,N) + G = np.sum(H,axis=1)[:,None] # shape = (Nnew,1) + Ginv = np.reciprocal(G) # shape = (Nnew,1) + R = H * Ginv # shape = (Nnew,N) + F = R @ self.X # shape = (Nnew,D) + + Delta_bar = np.einsum("kn,knl->kl",R,Delta) # (Nnew,N)times(Nnew,N,L)=(Nnew,L) + # Delta_bar = np.sum(R[:,:,None] * Delta, axis=1) # same calculate + dRdZ = self.precision * R[:, :, None] * (Delta - Delta_bar[:, None, :]) # shape = (Nnew,N,L) + + dFdZ = np.einsum("nd,knl->kdl",self.X,dRdZ) # shape = (Nnew,D,L) + # dFdZ = np.sum(self.X[None,:,:,None]*dRdZ[:,:,None,:],axis=1) # same calculate + dEdZ = 2.0 * np.einsum("kd,kdl->kl",F-Xnew,dFdZ) # shape (Nnew, L) + # update latent variables + Znew -= eta_trans * dEdZ + if self.is_compact: + Znew = np.clip(Znew,-1.0,1.0) + if constrained: + Znew = np.clip(Znew, self.Z.min(axis=0), self.Z.max(axis=0)) + + return Znew + + def inverse_transform(self, Znew): + if not self._done_fit: + raise ValueError("fit is not done") + if Znew.shape[1]!=self.n_components: + raise ValueError("Znew dimension must be {}".format(self.n_components)) + + Dist_Znew_Z = dist.cdist(Znew,self.Z,"sqeuclidean") # shape = (Nnew,N) + H = np.exp(-0.5 * self.precision * Dist_Znew_Z) # shape = (Nnew,N) + G = np.sum(H,axis=1)[:,None] # shape = (Nnew,1) + Ginv = np.reciprocal(G) # shape = (Nnew,1) + R = H * Ginv # shape = (Nnew,N) + F = R @ self.X # shape = (Nnew,D) + + return F + + + + +def create_zeta(zeta_min, zeta_max, latent_dim, resolution): + mesh1d, step = np.linspace(zeta_min, zeta_max, resolution, endpoint=False, retstep=True) + mesh1d += step / 2.0 + if latent_dim == 1: + Zeta = mesh1d + elif latent_dim == 2: + Zeta = np.meshgrid(mesh1d, mesh1d) + else: + raise ValueError("invalid latent dim {}".format(latent_dim)) + Zeta = np.dstack(Zeta).reshape(-1, latent_dim) + return Zeta diff --git a/tests/plus_TSOM/allclose_plusTSOM.py b/tests/plus_TSOM/allclose_plusTSOM.py index 9a8ee1d..c61b9d5 100644 --- a/tests/plus_TSOM/allclose_plusTSOM.py +++ b/tests/plus_TSOM/allclose_plusTSOM.py @@ -10,11 +10,10 @@ class TestTSOMPlusSOM(unittest.TestCase): def create_artficial_data(self,n_samples,n_features,n_groups,n_samples_per_group): x = np.random.normal(0.0,1.0,(n_samples,n_features)) if isinstance(n_samples_per_group,int): - index_members_of_group = np.random.randint(0,n_samples,(n_groups,n_samples_per_group)) - elif isinstance(n_samples_per_group,np.ndarray): - index_members_of_group = [] - for n_samples_in_the_group in n_samples_per_group: - index_members_of_group.append(np.random.randint(0,n_samples,n_samples_in_the_group)) + n_samples_per_group = np.ones(n_groups,int) * n_samples_per_group + index_members_of_group = [] + for n_samples_in_the_group in n_samples_per_group: + index_members_of_group.append(np.random.randint(0,n_samples,n_samples_in_the_group)) return x, index_members_of_group def test_plusTSOM_ishida_vs_test_plusTSOM_watanabe(self): @@ -23,7 +22,7 @@ def test_plusTSOM_ishida_vs_test_plusTSOM_watanabe(self): n_samples = 1000 n_groups = 10 # group数 n_features = 3 # 各メンバーの特徴数 - n_samples_per_group = 30 # 各グループにメンバーに何人いるのか + n_samples_per_group = np.random.randint(1,30,n_groups) # 各グループにメンバーに何人いるのか member_features,index_members_of_group = self.create_artficial_data(n_samples, n_features, n_groups, @@ -52,7 +51,7 @@ def test_plusTSOM_ishida_vs_test_plusTSOM_watanabe(self): kernel_width = 0.3 htsom_ishida = TSOMPlusSOM(member_features=member_features, - index_members_of_group=index_members_of_group, + group_features=index_members_of_group, params_tsom=params_tsom, params_som=params_som) htsom_watanabe = TSOMPlusSOMWatanabe(member_features=member_features, @@ -74,5 +73,125 @@ def test_plusTSOM_ishida_vs_test_plusTSOM_watanabe(self): np.testing.assert_allclose(htsom_ishida.som.history['y'], htsom_watanabe.som.history['y']) np.testing.assert_allclose(htsom_ishida.som.history['z'], htsom_watanabe.som.history['z']) + def _transform_list_to_bag(self,list_of_indexes,num_members): + bag_of_members = np.empty((0,num_members)) + for indexes in list_of_indexes: + one_hot_vectors = np.eye(num_members)[indexes] + one_bag = one_hot_vectors.sum(axis=0)[None,:] + bag_of_members=np.append(bag_of_members,one_bag,axis=0) + return bag_of_members + def test_matching_index_member_as_list_or_bag(self): + seed = 100 + np.random.seed(seed) + n_members = 100 + n_groups = 10 # group数 + n_features = 3 # 各メンバーの特徴数 + n_samples_per_group = np.random.randint(1,50,n_groups) # 各グループにメンバーに何人いるのか + member_features,index_members_of_group = self.create_artficial_data(n_members, + n_features, + n_groups, + n_samples_per_group) + bag_of_members = self._transform_list_to_bag(index_members_of_group, n_members) + + Z1 = np.random.rand(n_members, 2) * 2.0 - 1.0 + Z2 = np.random.rand(n_features, 2) * 2.0 - 1.0 + init_TSOM = [Z1, Z2] + init_SOM = np.random.rand(n_groups, 2) * 2.0 - 1.0 + params_tsom = {'latent_dim': [2, 2], + 'resolution': [10, 10], + 'SIGMA_MAX': [1.0, 1.0], + 'SIGMA_MIN': [0.1, 0.1], + 'TAU': [50, 50], + 'init': init_TSOM} + params_som = {'latent_dim': 2, + 'resolution': 10, + 'sigma_max': 2.0, + 'sigma_min': 0.5, + 'tau': 50, + 'init': init_SOM} + tsom_epoch_num = 50 + som_epoch_num = 50 + kernel_width = 0.3 + + tsom_plus_som_input_list = TSOMPlusSOM(member_features=member_features, + group_features=index_members_of_group, + params_tsom=params_tsom, + params_som=params_som) + tsom_plus_som_input_bag = TSOMPlusSOM(member_features=member_features, + group_features=bag_of_members, + params_tsom=params_tsom, + params_som=params_som) + + tsom_plus_som_input_list.fit(tsom_epoch_num=tsom_epoch_num, + kernel_width=kernel_width, + som_epoch_num=som_epoch_num) + tsom_plus_som_input_bag.fit(tsom_epoch_num=tsom_epoch_num, + kernel_width=kernel_width, + som_epoch_num=som_epoch_num) + + np.testing.assert_allclose(tsom_plus_som_input_list.tsom.history['y'], tsom_plus_som_input_bag.tsom.history['y']) + np.testing.assert_allclose(tsom_plus_som_input_list.tsom.history['z1'], tsom_plus_som_input_bag.tsom.history['z1']) + np.testing.assert_allclose(tsom_plus_som_input_list.tsom.history['z2'], tsom_plus_som_input_bag.tsom.history['z2']) + np.testing.assert_allclose(tsom_plus_som_input_list.params_som['X'], tsom_plus_som_input_bag.params_som['X']) + np.testing.assert_allclose(tsom_plus_som_input_list.som.history['y'], tsom_plus_som_input_bag.som.history['y']) + np.testing.assert_allclose(tsom_plus_som_input_list.som.history['z'], tsom_plus_som_input_bag.som.history['z']) + + def test_transform(self): + # prepare dataset + seed = 100 + np.random.seed(seed) + n_members = 1000 + n_groups = 10 # group数 + n_features = 3 # 各メンバーの特徴数 + n_members_per_group = np.random.randint(1,30,n_groups) # 各グループにメンバーに何人いるのか + member_features,index_members_of_group = self.create_artficial_data(n_members, + n_features, + n_groups, + n_members_per_group) + bag_of_members = self._transform_list_to_bag(index_members_of_group,num_members=n_members) + + # prepare parameters + Z1 = np.random.rand(n_members, 2) * 2.0 - 1.0 + Z2 = np.random.rand(n_features, 2) * 2.0 - 1.0 + init_TSOM = [Z1, Z2] + init_SOM = np.random.rand(n_groups, 2) * 2.0 - 1.0 + params_tsom = {'latent_dim': [2, 2], + 'resolution': [10, 10], + 'SIGMA_MAX': [1.0, 1.0], + 'SIGMA_MIN': [0.1, 0.1], + 'TAU': [50, 50], + 'init': init_TSOM} + params_som = {'latent_dim': 2, + 'resolution': 10, + 'sigma_max': 2.0, + 'sigma_min': 0.5, + 'tau': 50, + 'init': init_SOM} + tsom_epoch_num = 50 + kernel_width = 0.3 + som_epoch_num = 50 + + # fit + htsom_bag = TSOMPlusSOM(member_features=member_features, + group_features=bag_of_members, + params_tsom=params_tsom, + params_som=params_som) + htsom_bag.fit(tsom_epoch_num,kernel_width,som_epoch_num) + Z_fit_bag = htsom_bag.som.Z + Z_transformed_bag = htsom_bag.transform(group_features=bag_of_members,kernel_width=kernel_width) + + htsom_list = TSOMPlusSOM(member_features=member_features, + group_features=index_members_of_group, + params_tsom=params_tsom, + params_som=params_som) + htsom_list.fit(tsom_epoch_num,kernel_width,som_epoch_num) + Z_fit_list = htsom_list.som.Z + Z_transformed_list = htsom_list.transform(group_features=index_members_of_group,kernel_width=kernel_width) + + # compare estimated latent variables in fit and one in transform + np.testing.assert_allclose(Z_fit_bag,Z_transformed_bag) + np.testing.assert_allclose(Z_fit_list,Z_transformed_list) + + if __name__ == "__main__": unittest.main() diff --git a/tests/som/test_som.py b/tests/som/test_som.py index 6c08da7..180bbdb 100644 --- a/tests/som/test_som.py +++ b/tests/som/test_som.py @@ -96,6 +96,37 @@ def test_init_pca(self): np.testing.assert_allclose(SOMResult, EVDResult/np.sqrt(Lambda.real.max()), rtol=1e-06) + def test_transform(self): + n_distributon = 100 + n_category = 20 + + # create categorical distribution + X_categorical = np.random.rand(n_distributon,n_category) + X_categorical = X_categorical / X_categorical.sum(axis=1)[:,None] + + np.testing.assert_allclose(X_categorical.sum(axis=1),np.ones(X_categorical.shape[0])) + + # fit + som_categorical = SOM(X_categorical,latent_dim=2,resolution=50,sigma_max=2.0,sigma_min=0.3,tau=50,metric="KLdivergence") + som_categorical.fit(50) + Z_fit = som_categorical.Z + Z_transformed = som_categorical.transform(X_categorical) + + np.testing.assert_allclose(Z_transformed,Z_fit) + + # confirm to multi variable dataset + n_samples = 100 + n_features = 20 + + X_multi_variate = np.random.normal(0.0,1.0,(n_samples,n_features)) + + # fit + som_multi_variate = SOM(X_multi_variate,latent_dim=2,resolution=50,sigma_max=2.0,sigma_min=0.2,tau=50,metric="sqeuclidean") + som_multi_variate.fit(10) + Z_fit = som_multi_variate.Z + Z_transformed = som_multi_variate.transform(X_multi_variate) + + np.testing.assert_allclose(Z_fit,Z_transformed) diff --git a/tutorials/ukr/fitting_saddle_shape.py b/tutorials/ukr/fitting_saddle_shape.py new file mode 100644 index 0000000..aac82d3 --- /dev/null +++ b/tutorials/ukr/fitting_saddle_shape.py @@ -0,0 +1,87 @@ +import numpy as np +from libs.models.unsupervised_kernel_regression import UnsupervisedKernelRegression as UKR +from libs.models.som import SOM +from libs.datasets.artificial.kura import create_data +import matplotlib.pyplot as plt +from mpl_toolkits.mplot3d import Axes3D +import matplotlib.animation as animation + +if __name__ == '__main__': + # create artiricial dataset + nb_samples = 500 + seed = 1 + np.random.seed(seed) + X = create_data(nb_samples) + x_sigma = 0.1 + X += np.random.normal(0, x_sigma, X.shape) + + # common parameter + n_components = 2 + bandwidth_gaussian_kernel = 0.2 + nb_epoch = 100 + + # ukr parameter + is_compact = True + is_save_history = True + lambda_ = 0.0 + eta = 8.0 + + # som parameter + tau = nb_epoch + init_bandwidth = 2.0 + resolution = 10 + + # learn ukr and som + som = SOM(X, latent_dim=n_components, resolution=resolution, + sigma_max=init_bandwidth, sigma_min=bandwidth_gaussian_kernel, tau=tau) + ukr = UKR(X, n_components=n_components, bandwidth_gaussian_kernel=bandwidth_gaussian_kernel, + is_compact=is_compact, is_save_history=is_save_history, lambda_=lambda_) + som.fit(nb_epoch=nb_epoch) + ukr.fit(nb_epoch=nb_epoch, eta=eta) + ukr.calculation_history_of_mapping(resolution=30) + + fig = plt.figure(figsize=[7, 8]) + ax_latent_space_som = fig.add_subplot(2, 2, 1, aspect='equal') + ax_data_space_som = fig.add_subplot(2, 2, 2, aspect='equal', projection='3d') + ax_latent_space_ukr = fig.add_subplot(2, 2, 3, aspect='equal') + ax_data_space_ukr = fig.add_subplot(2, 2, 4, aspect='equal', projection='3d') + + + def plot(i): + ax_latent_space_som.cla() + ax_data_space_som.cla() + ax_data_space_ukr.cla() + ax_latent_space_ukr.cla() + ax_data_space_som.scatter(X[:, 0], X[:, 1], X[:, 2], s=3, c=X[:, 0], alpha=0.5) + ax_data_space_ukr.scatter(X[:, 0], X[:, 1], X[:, 2], s=3, c=X[:, 0], alpha=0.5) + mapping_2d_som = som.history['y'][i].reshape(resolution, resolution, X.shape[1]) + ax_data_space_som.plot_wireframe(mapping_2d_som[:, :, 0], + mapping_2d_som[:, :, 1], + mapping_2d_som[:, :, 2]) + mapping_2d_ukr = ukr.history['f'][i].reshape(30, 30, X.shape[1]) + ax_data_space_ukr.plot_surface(mapping_2d_ukr[:, :, 0], + mapping_2d_ukr[:, :, 1], + mapping_2d_ukr[:, :, 2], + antialiased=False) + ith_z_som = som.history['z'][i] + ith_z_ukr = ukr.history['z'][i] + ax_latent_space_som.scatter(ith_z_som[:, 0], ith_z_som[:, 1], s=3, c=X[:, 0]) + ax_latent_space_ukr.scatter(ith_z_ukr[:, 0], ith_z_ukr[:, 1], s=3, c=X[:, 0]) + fig.suptitle("epoch {}".format(i)) + + ax_latent_space_som.set_xlim(-1.0, 1.0) + ax_latent_space_som.set_ylim(-1.0, 1.0) + ax_latent_space_ukr.set_xlim(-1.0, 1.0) + ax_latent_space_ukr.set_ylim(-1.0, 1.0) + + ax_latent_space_som.set_title('som latent space') + ax_latent_space_ukr.set_title('ukr latent space') + ax_data_space_som.set_title('som data space') + ax_data_space_ukr.set_title('ukr data space') + + + ani = animation.FuncAnimation(fig, plot, frames=nb_epoch, interval=20, repeat=False) + plt.show() + + # anime_learning_process_3d(X=ukr.X, Y_allepoch=ukr.history['f']) +#