diff --git a/docs/modules/preprocessing/dim_reduction.rst b/docs/modules/preprocessing/dim_reduction.rst index d78d8705c..7bf81dc4c 100644 --- a/docs/modules/preprocessing/dim_reduction.rst +++ b/docs/modules/preprocessing/dim_reduction.rst @@ -43,5 +43,6 @@ of the covariance between the two data blocks. .. autosummary:: :toctree: autosummary + skfda.preprocessing.dim_reduction.FDM skfda.preprocessing.dim_reduction.FPCA skfda.preprocessing.dim_reduction.FPLS \ No newline at end of file diff --git a/docs/refs.bib b/docs/refs.bib index 005bc0112..cc88606cc 100644 --- a/docs/refs.bib +++ b/docs/refs.bib @@ -31,6 +31,18 @@ @article{baillo+grane_2009_local keywords = {62G08,62G30,Cross-validation,Fourier expansion,Functional data,Kernel regression,Local linear regression,Nonparametric smoothing} } +@article{barroso++_2023_fdm, + author = {M. Barroso and C. M. Alaíz and J. L. Torrecilla and A. Fernández}, + title = {Functional diffusion maps}, + journal = {Statistics and Computing}, + year = {2023}, + volume = {34}, + number = {1}, + pages = {22}, + doi = {10.1007/s11222-023-10332-1}, + url = {https://doi.org/10.1007/s11222-023-10332-1} +} + @article{berrendero++_2016_mrmr, title = {The {{mRMR}} Variable Selection Method: A Comparative Study for Functional Data}, shorttitle = {The {{mRMR}} Variable Selection Method}, diff --git a/examples/plot_fdm.py b/examples/plot_fdm.py new file mode 100644 index 000000000..a75efdeef --- /dev/null +++ b/examples/plot_fdm.py @@ -0,0 +1,412 @@ +""" +Functional Diffusion Maps +============================================================================ +In this example, the use of the Functional Diffusion Map (FDM) technique is +shown over different datasets. +Firstly, an example of basic use of the technique is provided. +Later, two examples of parameter tuning are presented, for embedding spaces +of diferent dimensions. +Finally, an application of the method for a real, non-synthetic, dataset is +provided. +""" + +# Author: Eduardo Terrés Caballero +# License: MIT + +from itertools import product + +import matplotlib.pyplot as plt +import numpy as np +from matplotlib.colors import ListedColormap +from sklearn import datasets + +from skfda.datasets import fetch_phoneme +from skfda.misc.covariances import Gaussian +from skfda.preprocessing.dim_reduction import FDM +from skfda.representation import FDataGrid + +random_state = 0 + +#################################################################### +# Some examples shown here are further explained in the +# article :footcite:t:`barroso++_2023_fdm`. + + +#################################################################### +# **MOONS DATASET EXAMPLE** +# +# Firstly, a basic example of execution is presented using a functional version +# of the moons dataset, a dataset consisting of two dimentional coordinates +# representing the position of two different moons. +n_samples, n_grid_pts = 100, 50 +data_moons, y = datasets.make_moons( + n_samples=n_samples, + noise=0, + random_state=random_state, +) + +colors = ["blue", "orange"] +cmap = ListedColormap(colors) +plt.scatter(data_moons[:, 0], data_moons[:, 1], c=y, cmap=cmap) +plt.title("Moons data") +plt.show() + +#################################################################### +# Using a two elements basis, the functional observation corresponding +# to a multivariate observation is obtained by treating the coordinates +# as coefficients that multiply the elements of the basis. +# In other words, the multivariate vectors are interpreted as elements +# of the basis. +# Below is the code to generate the synthetic moons functional data. +grid = np.linspace(-np.pi, np.pi, n_grid_pts) +basis = np.array([np.sin(4 * grid), grid ** 2 + 2 * grid - 2]) +fd_moons = FDataGrid( + data_matrix=data_moons @ basis, + grid_points=grid, + dataset_name="Functional moons data", + argument_names=("x",), + coordinate_names=("f (x)",), +) +fd_moons.plot(linewidth=0.5, group=y, group_colors=colors) +plt.xlim((-np.pi, np.pi)) +plt.show() + +#################################################################### +# Once the functional data is available, it simply remains to choose +# the value of the parameters of the model. +# +# The FDM technique involves the use of a kernel operator, that acts +# as a measure of similarity for the data. In this case we will be using +# the Gaussian kernel, with a length scale parameter of 0.25. +fdm = FDM( + n_components=2, + kernel=Gaussian(length_scale=0.25), + alpha=0, + n_steps=1, +) +embedding = fdm.fit_transform(fd_moons) + +plt.scatter(embedding[:, 0], embedding[:, 1], c=y, cmap=cmap) +plt.title("Diffusion coordinates for the functional moons data") +plt.show() + +#################################################################### +# As we can see, the functional diffusion map has correctly interpreted +# the topological nature of the data, by successfully separating the +# coordinates associated to both moons. + + +#################################################################### +# **SPIRALS DATASET EXAMPLE** +# +# Next is an example of parameter tuning in the form of a grid +# search for a set of given values for the length_scale kernel parameter +# and the alpha parameter of the FDM method. +# We have two spirals, which are initially entangled and thus +# indistinguishable to the machine. +# +# Below is the code for the generation of the spiral data and its +# functional equivalent, following a similar dynamic as in the moons dataset. +n_samples, n_grid_pts = 100, 50 +t = (np.pi / 4 + np.linspace(0, 4, n_samples)) * np.pi +dx, dy = 10 * t * np.cos(t), 10 * t * np.sin(t) +y = np.array([0] * n_samples + [1] * n_samples) +data_spirals = np.column_stack(( + np.concatenate((dx, -dx)), np.concatenate((dy, -dy)), +)) + +colors = ["yellow", "purple"] +cmap = ListedColormap(colors) +plt.scatter(data_spirals[:, 0], data_spirals[:, 1], c=y, cmap=cmap) +plt.gca().set_aspect("equal", adjustable="box") +plt.title("Spirals data") +plt.show() + +# Define functional data object +grid = np.linspace(-np.pi, np.pi, n_grid_pts) +basis = np.array([grid * np.cos(grid) / 3, grid * np.sin(grid) / 3]) +fd_spirals = FDataGrid( + data_matrix=data_spirals @ basis, + grid_points=grid, + dataset_name="Functional spirals data", + argument_names=("x",), + coordinate_names=("f (x)",), +) +fd_spirals.plot(linewidth=0.5, group=y, group_colors=colors) +plt.show() + + +#################################################################### +# Once the functional data is ready, we will perform a grid search +# for the following values of the parameters, as well as plot +# the resulting embeddings for visual comparison. +alpha_set = [0, 0.33, 0.66, 1] +length_scale_set = [2.5, 3, 4.5, 7, 10, 11, 15] +param_grid = product(alpha_set, length_scale_set) + +#################################################################### +fig, axes = plt.subplots( + len(alpha_set), len(length_scale_set), figsize=(16, 8), +) + +for (alpha, length_scale), ax in zip(param_grid, axes.ravel()): + fdm = FDM( + n_components=2, + kernel=Gaussian(length_scale=length_scale), + alpha=alpha, + n_steps=1, + ) + embedding = fdm.fit_transform(fd_spirals) + + ax.scatter(embedding[:, 0], embedding[:, 1], c=y, cmap=cmap) + ax.set_xticklabels([]) + ax.set_yticklabels([]) + +for ax, alpha in zip(axes[:, 0], alpha_set): + ax.set_ylabel(f"$\\alpha$: {alpha}", size=20, rotation=0, ha="right") + +for ax, length_scale in zip(axes[0], length_scale_set): + ax.set_title(f"$len\_sc$: {length_scale}", size=20, va="bottom") + +plt.show() + +#################################################################### +# The first thing to notice is that the parameter length scale exerts +# a greater influence in the resulting embedding than the parameter alpha. +# In this sense, the figures of any given column are more similar than those +# of any given row. +# Thus, we shall set alpha equal to 0 because, by theory, it is +# equivalent to skipping a normalization step in the process +# +# Moreover, we can see that the optimal choice of the length scale parameter +# of the kernel is 4.5 because it visually presents the more clear separation +# between the trajectories of both spirals. +# Hence, for a length scale of the kernel function of 4.5 the method is able +# to understand the local geometry of the spirals dataset. +# For a small value of the kernel parameter (for example 1) contiguous points +# in the same arm of the spiral are not considered close because the kernel is +# too narrow, resulting in apparently random diffusion coordinates. +# For a large value of the kernel parameter (for example 15) the kernel is wide +# enough so that points in contiguous spiral arms, which belong to different +# trajectories, are considered similar. Hence the diffusion coordinates keep +# these relations by mantaining both trajectories entagled. +# In summary, for a value of length scale of 4.5 the kernel is wide enough so +# that points in the same arm of a trajectory are considered similar, +# but its not too wide so that points in contiguous arms of the spiral are +# also considered similar. + +#################################################################### +# For a reliable comparison between embeddings, it is advisable to use +# the same scale in all axis. +# To ilustrate this idea, next is a re-execution for the row alpha +# equals 0. +alpha_set = [0] +length_scale_set = [2.5, 3, 4.5, 7, 10, 11, 15] +param_grid = product(alpha_set, length_scale_set) + +fig, axes = plt.subplots( + len(alpha_set), + len(length_scale_set), + figsize=(16, 4), +) + +for (alpha, length_scale), ax in zip(param_grid, axes.ravel()): + fdm = FDM( + n_components=2, + kernel=Gaussian(length_scale=length_scale), + alpha=alpha, + n_steps=1, + ) + embedding = fdm.fit_transform(fd_spirals) + + ax.scatter(embedding[:, 0], embedding[:, 1], c=y, cmap=cmap) + ax.set_xlim((-0.4, 0.4)) + ax.set_ylim((-0.4, 0.4)) + +axes[0].set_ylabel( + f"$\\alpha$: {alpha_set[0]}", size=20, rotation=0, ha="right", +) + +for ax, length_scale in zip(axes, length_scale_set): + ax.set_title(f"$len\_sc$: {length_scale}", size=20, va="bottom") + +plt.show() + +#################################################################### +# **SWISS ROLL DATASET EXAMPLE** +# +# So far, the above examples have been computed with a value of the +# n_components parameter of 2. This implies that the resulting +# diffusion coordinate points belong to a two-dimensional space and thus we +# can provide a graphical representation. +# The aim of this new section is to explore further possibilities +# regarding n_components. +# +# We will now apply the method to a more complex example, the +# Swiss roll dataset. This dataset consists of three dimensional points +# that lay over a topological manifold shaped like a Swiss roll. +n_samples, n_grid_pts = 500, 100 +data_swiss, y = datasets.make_swiss_roll( + n_samples=n_samples, + noise=0, + random_state=random_state, +) +fig = plt.figure() +axis = fig.add_subplot(111, projection="3d") +axis.set_title("Swiss roll data") +axis.scatter(data_swiss[:, 0], data_swiss[:, 1], data_swiss[:, 2], c=y) +plt.show() + +#################################################################### +# Similarly to the previous examples, the functional data object is defined. +# In this case a three element base will be used, since the multivariate data +# points belong to a three-dimensional space. +# For clarity purposes, only the first fifty functional observations are +# plotted. +grid = np.linspace(-np.pi, np.pi, n_grid_pts) +basis = np.array([np.sin(4 * grid), np.cos(8 * grid), np.sin(12 * grid)]) +data_matrix = np.array(data_swiss) @ basis +fd_swiss = FDataGrid( + data_matrix=data_matrix, + grid_points=grid, + dataset_name="Functional Swiss roll data", + argument_names=("x",), + coordinate_names=("f (x)",), +) +fd_swiss[:50].plot(linewidth=0.5, group=y[:50]) +plt.show() + + +#################################################################### +# Now, the FDM method will be applied for different values of the +# parameters, again in the form of a grid search. +# Note that the diffusion coordinates will now consist of three components. +alpha_set = [0, 0.5, 1] +length_scale_set = [1.5, 2.5, 4, 5] +param_grid = product(alpha_set, length_scale_set) + +#################################################################### +fig, axes = plt.subplots( + len(alpha_set), + len(length_scale_set), + figsize=(16, 8), + subplot_kw={"projection": "3d"}, +) + +for (alpha, length_scale), ax in zip(param_grid, axes.ravel()): + fdm = FDM( + n_components=3, + kernel=Gaussian(length_scale=length_scale), + alpha=alpha, + n_steps=1, + ) + embedding = fdm.fit_transform(fd_swiss) + + ax.scatter(embedding[:, 0], embedding[:, 1], embedding[:, 2], c=y) + ax.set_xticklabels([]) + ax.set_yticklabels([]) + ax.set_zticklabels([]) + ax.set_title(f"$\\alpha$: {alpha} $len\_sc$: {length_scale}") + +plt.show() + +#################################################################### +# Let's take a closer look at the resulting embedding for a value +# of length_scale and alpha equal to 2.5 and 0, respectively. +alpha, length_scale = 0, 2.5 +fdm = FDM( + n_components=3, + kernel=Gaussian(length_scale=length_scale), + alpha=alpha, + n_steps=1, +) +embedding = fdm.fit_transform(fd_swiss) + +fig = plt.figure() +ax = fig.add_subplot(111, projection="3d") +ax.scatter(embedding[:, 0], embedding[:, 1], embedding[:, 2], c=y) +ax.set_title( + "Diffusion coordinates for \n" + f"$\\alpha$: {alpha} $len\_sc$: {length_scale}", +) +plt.show() + +#################################################################### +# The election of the optimal parameters is relative to the problem at hand. +# The goal behind choosing values of length_scale equal to 2.5 and alpha equal +# to 0 is to obtain a unrolled transformation to the Swiss roll. +# Note that in the roll there are pairs of points whose euclidean +# distance is small but whose shortest path contained in the +# manifold is significantly larger, since it must complete an entire loop. +# +# In this sense, the process happens to have taken into account the +# shortest path distance rather than the euclidean one. Thus, one may argue +# that the topological nature of the data has been respected. +# This new diffusion coordinates could be useful to gain more insights into +# the initial data through further analysis. + +######################################################################### +# **REAL DATASET: PHONEME** +# +# The aim of this section is to provide an example of application of +# the FDM method to a non-synthetic dataset. +# Below is an example of execution using the phoneme dataset, +# a dataset consisting of the computed log-periodogram for five distinct +# phonemes coming from recorded male speech from the TIMIT database. +n_samples = 300 +colors = ["C0", "C1", "C2", "C3", "C4"] +group_names = ["aa", "ao", "dcl", "iy", "sh"] + +# Fetch phoneme dataset +fd_phoneme, y = fetch_phoneme(return_X_y=True) +fd_phoneme, y = fd_phoneme[:n_samples], y[:n_samples] +fd_phoneme.plot( + linewidth=0.7, + group=y, + group_colors=colors, + group_names=group_names, + legend=True, +) +plt.show() + +#################################################################### +# The resulting diffusion coordinates in three dimensions will be +# plotted, using different views to better understand the plot. +cmap = ListedColormap(colors) +alpha, length_scale = 1, 10 +fdm = FDM( + n_components=3, + kernel=Gaussian(length_scale=length_scale), + alpha=alpha, + n_steps=1, +) +diffusion_coord = fdm.fit_transform(fd_phoneme) + +# Plot three views of the diffusion coordinates +view_points = [(30, 70), (28, 0), (10, -120)] + +fig, axes = plt.subplots( + 1, len(view_points), figsize=(18, 6), subplot_kw={"projection": "3d"}, +) + +for view, ax in zip(view_points, axes.ravel()): + ax.scatter( + diffusion_coord[:, 0], diffusion_coord[:, 1], diffusion_coord[:, 2], + c=y, cmap=cmap, + ) + ax.view_init(*view) + ax.set_title(f"View {view}", fontsize=26) +plt.show() + +#################################################################### +# We can see that the diffusion coordinates for the different phonemes +# have been clustered in the 3D space. This representation enables a +# more clear separation of the data into the different phoneme groups. +# In this way, the phonemes groups that are similar to each other, namely /aa/ +# and /ao/ are closer in the space. In fact, these two groups partly overlap +# (orange and blue). + + +############################################################################### +# **References:** +# .. footbibliography:: diff --git a/skfda/preprocessing/dim_reduction/__init__.py b/skfda/preprocessing/dim_reduction/__init__.py index 5c8ae8405..6aad2fca8 100644 --- a/skfda/preprocessing/dim_reduction/__init__.py +++ b/skfda/preprocessing/dim_reduction/__init__.py @@ -12,6 +12,7 @@ "variable_selection", ], submod_attrs={ + "_fdm": ["FDM"], "_fpca": ["FPCA"], "_fpls": ["FPLS"], "_neighbor_transforms": ["KNeighborsTransformer"], @@ -19,6 +20,7 @@ ) if TYPE_CHECKING: + from ._fdm import FDM as FDM from ._fpca import FPCA as FPCA from ._fpls import FPLS as FPLS from ._neighbor_transforms import ( diff --git a/skfda/preprocessing/dim_reduction/_fdm.py b/skfda/preprocessing/dim_reduction/_fdm.py new file mode 100644 index 000000000..490fbd604 --- /dev/null +++ b/skfda/preprocessing/dim_reduction/_fdm.py @@ -0,0 +1,230 @@ +"""Functional Diffusion Maps Module.""" +from __future__ import annotations + +from typing import Callable + +import numpy as np +import scipy +from sklearn.utils.extmath import svd_flip +from typing_extensions import Self + +from ..._utils._sklearn_adapter import BaseEstimator, InductiveTransformerMixin +from ...representation import FData +from ...typing._numpy import NDArrayFloat + +KernelFunction = Callable[[FData, FData | None], NDArrayFloat] + + +class FDM( + BaseEstimator, + InductiveTransformerMixin[FData, NDArrayFloat, object], +): + r""" + Functional diffusion maps. + + Class that implements functional diffusion maps for both + basis and grid representations of the data. + + Parameters: + n_components: Dimension of the space where the embedded + functional data belongs to. For visualization of the + data purposes, a value of 2 or 3 shall be used. + kernel: kernel function used over the functional observations. + It serves as a measure of connectivity or similitude between + points, where higher value means greater connectivity. + alpha: density parameter in the interval [0, 1] used in the + normalization step. A value of 0 means the data distribution + is not taken into account during the normalization step. + The opposite holds for a higher value of alpha. + n_steps: Number of steps in the random walk. + + + Attributes: + transition_matrix\_: trasition matrix computed from the data. + eigenvalues\_: highest n_components eigenvalues of transition_matrix\_ + in descending order starting from the second highest. + eigenvectors\_right\_: right eigenvectors of transition\_matrix\_ + corresponding to eigenvalues\_. + d\_alpha\_: vector of densities of the weigthed graph. + training\_dataset\_: dataset used for training the method. It is needed + for the transform method. + + Examples: + In this example we fetch the Canadian weather dataset and divide it + into train and test sets. We then obtain the diffusion coordinates + for the train set and predict these coordinates for the test set. + + >>> from skfda.datasets import fetch_weather + >>> from skfda.representation import FDataGrid + >>> from skfda.misc.covariances import Gaussian + >>> X, y = fetch_weather(return_X_y=True, as_frame=True) + >>> fd : FDataGrid = X.iloc[:, 0].values + >>> fd_train = fd[:25] + >>> fd_test = fd[25:] + >>> fdm = FDM( + ... n_components=2, + ... kernel=Gaussian(variance=1, length_scale=1), + ... alpha=1, + ... n_steps=1 + ... ) + >>> embedding_train = fdm.fit_transform(X=fd_train) + >>> embedding_test = fdm.transform(X=fd_test) + + Notes: + Performing fit and transform actions is not equivalent to performing + fit_transform. In the former case an approximation of the diffusion + coordinates is computed via the Nyström method. In the latter, the + true diffusion coordinates are computed. + """ + + def __init__( + self, + *, + n_components: int = 2, + kernel: KernelFunction, + alpha: float = 0, + n_steps: int = 1, + ) -> None: + self.n_components = n_components + self.kernel = kernel + self.alpha = alpha + self.n_steps = n_steps + + def fit( # noqa: WPS238 + self, + X: FData, + y: object = None, + ) -> Self: + """ + Compute the transition matrix and save it. + + Args: + X: Functional data for which to obtain diffusion coordinates. + y: Ignored. + + Returns: + self + + """ + # Parameter validation + if self.n_components < 1: + raise ValueError( + f'Embedding dimension ({self.n_components}) cannot ' + 'be less than 1. ', + ) + + if self.n_components >= X.n_samples: + raise ValueError( + f'Embedding dimension ({self.n_components}) cannot be ' + f'greater or equal to the number of samples ({X.n_samples}).', + ) + + if self.alpha < 0 or self.alpha > 1: + raise ValueError( + f'Parameter alpha (= {self.alpha}) must ' + 'be in the interval [0, 1].', + ) + + if self.n_steps < 0: + raise ValueError( + f'Parameter n_steps (= {self.n_steps}) must ', + 'be a greater than 0.', + ) + + # Construct the weighted graph + weighted_graph = self.kernel(X, X) + + # Compute density of each vertex by adding the rows + self.d_alpha_ = np.sum(weighted_graph, axis=1) ** self.alpha + + # Construct the normalized graph + norm_w_graph = weighted_graph / np.outer(self.d_alpha_, self.d_alpha_) + + rows_sum = np.sum(norm_w_graph, axis=1) + self.transition_matrix_ = norm_w_graph / rows_sum[:, np.newaxis] + + eigenvalues, eigenvectors_right_ = scipy.linalg.eig( + self.transition_matrix_, + ) + + # Remove highest eigenvalue and take the n_components + # highest eigenvalues in descending order + index_order = eigenvalues.argsort()[-2:-self.n_components - 2:-1] + eigenvalues = np.real(eigenvalues[index_order]) + eigenvectors_right, _ = svd_flip( + eigenvectors_right_[:, index_order], + np.zeros_like(eigenvectors_right_[:, index_order]).T, + ) + self.eigenvalues_ = eigenvalues + self.eigenvectors_right_ = eigenvectors_right + + self.training_dataset_ = X + + return self + + def fit_transform( + self, + X: FData, + y: object = None, + ) -> NDArrayFloat: + """ + Compute the diffusion coordinates for the functional data X. + + The diffusion coordinate corresponding to the i-th curve is stored + in the i-th row of the returning matrix. + Note that fit_transform is not equivalent to applying fit and + transform. + + Args: + X: Functional data for which to obtain diffusion coordinates. + y: Ignored. + + Returns: + Diffusion coordinates for the functional data X. + + """ + self.fit(X, y) + + # Compute and return the diffusion map + return np.real( + self.eigenvectors_right_ + * self.eigenvalues_ ** self.n_steps, + ) + + def transform( + self, + X: FData, + ) -> NDArrayFloat: + """ + Compute the diffusion coordinates for the functional data X. + + Compute the diffusion coordinates of out-of-sample data using the + eigenvectors and eigenvalues computed during the training. + + Args: + X: Functional data for which to predict diffusion coordinates. + + Returns: + Diffusion coordinates for the functional data X_out. + + """ + # Construct the weighted graph crossing the training + # and out-of-sample data + weighted_graph_out = self.kernel(X, self.training_dataset_) + + # Compute density of each vertex by adding the rows + h_alpha = np.sum(weighted_graph_out, axis=1) ** self.alpha + + # Construct the normalized graph + norm_w_graph_out = ( + weighted_graph_out / np.outer(h_alpha, self.d_alpha_) + ) + + # Define transition matrix by normalizing by rows + rows_sum = np.sum(norm_w_graph_out, axis=1) + transition_matrix_out = norm_w_graph_out / rows_sum[:, np.newaxis] + + return transition_matrix_out @ ( # type: ignore[no-any-return] + self.eigenvectors_right_ + * self.eigenvalues_ ** (self.n_steps - 1) + ) diff --git a/skfda/tests/test_fdm.py b/skfda/tests/test_fdm.py new file mode 100644 index 000000000..566686bc5 --- /dev/null +++ b/skfda/tests/test_fdm.py @@ -0,0 +1,351 @@ +"""Tests for FDM.""" +from typing import Tuple + +import numpy as np +import pytest +from sklearn import datasets + +from skfda import FData, FDataBasis, FDataGrid +from skfda.misc.covariances import Gaussian +from skfda.misc.metrics import PairwiseMetric, l1_distance, l2_distance +from skfda.preprocessing.dim_reduction import FDM +from skfda.representation.basis import MonomialBasis +from skfda.typing._numpy import NDArrayFloat + + +def _discretize_fdatabasis(fd_basis: FDataBasis) -> FDataGrid: + return fd_basis.to_grid( + np.linspace(*fd_basis.basis.domain_range[0], 300), + ) + + +def rbf_kernel(fd: FData, sigma: float) -> NDArrayFloat: + """Calculate the radial basis function kernel. + + Args: + fd: Functional data object + sigma: locality parameter + + Returns: + Computed real value + """ + norm_matrix = PairwiseMetric(l2_distance)(fd) + return np.exp( # type: ignore[no-any-return] + - np.square(norm_matrix) / (2 * sigma ** 2), + ) + + +def laplacian_kernel(fd: FData, sigma: float) -> NDArrayFloat: + """Calculate the radial basis function kernel. + + Args: + fd: Functional data object + sigma: locality parameter + + Returns: + Computed real value + """ + norm_matrix = PairwiseMetric(l1_distance)(fd) + return np.exp(- norm_matrix / sigma) # type: ignore[no-any-return] + + +############################################################################## +# Fixtures +############################################################################## +dummy_fdata = FDataGrid(data_matrix=[[0.9]]) + + +@pytest.fixture( + params=[ + FDM(kernel=Gaussian(), alpha=-1), + FDM(kernel=Gaussian(), n_components=0), + FDM(kernel=Gaussian(), n_components=2), + FDM(kernel=Gaussian(), n_steps=0), + ], +) +def data_grid_param_check(request: FDM) -> Tuple[FDM, FDataGrid]: + """Fixture for testing parameter checks.""" + return request.param, dummy_fdata + + +@pytest.fixture( + params=[ + FDataGrid( + data_matrix=[ + [52, 93, 15], + [72, 61, 21], + [83, 87, 75], + [75, 88, 24], + ], + grid_points=[0, 1 / 2, 1], + ), + ], +) +def precalculated_fdatagrid_example( + request: FDataGrid, +) -> FDataGrid: + """Fixture for loading a precalculated FDataGrid example.""" + return request.param # type: ignore[no-any-return] + + +@pytest.fixture( + params=[ + ( + FDataBasis( + basis=MonomialBasis(domain_range=(0, 2), n_basis=3), + coefficients=[ + [0, 0, 1], + [1, 1, 0], + [0, 2, 0], + ], + ), + FDataBasis( + basis=MonomialBasis(domain_range=(0, 2), n_basis=3), + coefficients=[ + [-1, 1, 0], + ], + ), + ), + ], +) +def precalculated_fdatabasis_example( + request: FDataBasis, +) -> Tuple[FDataBasis, FDataGrid, FDataBasis]: + """Load FDataBasis example. + + Fixture for loading a prealculated FDataBasis exmaple and discretizing + an FDataBasis into a FDataGrid object. + """ + fd_basis: FDataBasis + fd_out: FDataBasis + fd_basis, fd_out = request.param + return ( + fd_basis, + _discretize_fdatabasis(fd_basis), + fd_out, + ) + + +@pytest.fixture +def moons_functional_dataset() -> FDataGrid: + """Fixture for loading moons dataset example.""" + n_grid_pts, n_samples = 100, 20 + data_moon, _ = datasets.make_moons( + n_samples=n_samples, + noise=0.0, + random_state=0, + ) + grid = np.linspace(-np.pi, np.pi, n_grid_pts) + + # Basis + basis_1 = np.sin(4 * grid) + basis_2 = (grid ** 2 + 2 * grid - 2) + basis = np.array([basis_1, basis_2]) + + # Generate functional data object + data_matrix = np.array(data_moon) @ basis + return FDataGrid( + data_matrix=data_matrix, + grid_points=grid, + ) + +############################################################################## +# Tests +############################################################################## + + +def test_param_check(data_grid_param_check: Tuple[FDM, FDataGrid]) -> None: + """Check that invalid parameters in fit raise exception.""" + fdm, fd = data_grid_param_check + + pytest.raises( + ValueError, + fdm.fit, + fd, + ) + + +def test_precalculated_grid_example( + precalculated_fdatagrid_example: FDataGrid, +) -> None: + """Compare the embedding in grid against the fda package.""" + fd = precalculated_fdatagrid_example + fdm = FDM( + n_components=2, + kernel=Gaussian(length_scale=10), + alpha=0.9, + n_steps=2, + ) + embedding = fdm.fit_transform(fd) + + expected_embedding = [ + [-0.0513176, 0.4572292], + [0.6188756, -0.2537263], + [-0.5478710, -0.3488652], + [-0.0525144, 0.3176671], + ] + + np.testing.assert_allclose( + embedding, + expected_embedding, + atol=1e-7, + ) + + +def test_precalculated_basis_example( + precalculated_fdatabasis_example: Tuple[FDataBasis, FDataGrid, FDataBasis], +) -> None: + """Compare the embedding in basis and grid against the fda package.""" + fd_basis, fd_grid, _ = precalculated_fdatabasis_example + + fdm_basis = FDM( + n_components=2, + kernel=Gaussian(), + alpha=1, + n_steps=2, + ) + embedding_basis = fdm_basis.fit_transform(fd_basis) + + fdm_grid = FDM( + n_components=2, + kernel=Gaussian(), + alpha=1, + n_steps=2, + ) + embedding_grid = fdm_grid.fit_transform(fd_grid) + + expected_transition_matrix = [ + [0.52466, 0.20713, 0.26821], + [0.21187, 0.47341, 0.31472], + [0.27529, 0.31581, 0.40891], + ] + + # Check transition matrix + for tran_matrix in ( + fdm_basis.transition_matrix_, + fdm_grid.transition_matrix_, + ): + np.testing.assert_allclose( + tran_matrix, + expected_transition_matrix, + atol=1e-5, + ) + + # Check diffusion coordinates + expected_embedding = [ + [0.0687, -0.00285], + [-0.05426, -0.00648], + [-0.01607, 0.00944], + ] + + for embedding in (embedding_basis, embedding_grid): + np.testing.assert_allclose( + embedding, + expected_embedding, + atol=1e-5, + ) + + +def test_nystrom( + precalculated_fdatabasis_example: Tuple[FDataBasis, FDataGrid, FDataBasis], +) -> None: + """Test Nystrom method. + + Compare the embedding of out-of-sample points in basis and grid + against the fda package via the Nystrom method. + """ + fd_basis, fd_grid, fd_out = precalculated_fdatabasis_example + + fdm_basis = FDM( + n_components=2, + kernel=Gaussian(), + alpha=1, + n_steps=2, + ) + embedding_basis = fdm_basis.fit(fd_basis).transform(fd_out) + + fdm_grid = FDM( + n_components=2, + kernel=Gaussian(), + alpha=1, + n_steps=2, + ) + embedding_grid = fdm_grid.fit(fd_grid).transform( + _discretize_fdatabasis(fd_out), + ) + + expected_embedding = [ + [0.156125, -0.021132], + ] + + np.testing.assert_allclose( + embedding_basis, + expected_embedding, + atol=1e-3, + ) + + np.testing.assert_allclose( + embedding_grid, + expected_embedding, + atol=1e-3, + ) + + +def test_moons_dataset( + moons_functional_dataset: FDataGrid, +) -> None: + """Test the embedding for a small version of the moons dataset example.""" + fdata = moons_functional_dataset + alpha, sigma = (1.0, 2.5) + fdm = FDM( + n_components=2, + kernel=Gaussian(length_scale=sigma), + alpha=alpha, + n_steps=1, + ) + embedding = fdm.fit_transform(fdata) + + expected_embedding = [ + [-0.07094005, -0.19475867], + [0.04054447, -0.21670899], + [0.09244529, -0.19569411], + [0.07094005, -0.19475867], + [0.13443593, -0.12554232], + [-0.21055562, 0.01815767], + [0.27732511, 0.1802182], + [-0.26962763, 0.16043396], + [0.29220798, 0.22086707], + [0.18339034, -0.04185513], + [0.29344042, 0.22420163], + [-0.29220798, 0.22086707], + [-0.09244529, -0.19569411], + [0.21055562, 0.01815767], + [-0.27732511, 0.1802182], + [-0.04054447, -0.21670899], + [0.26962763, 0.16043396], + [-0.13443593, -0.12554232], + [-0.29344042, 0.22420163], + [-0.18339034, -0.04185513], + ] + + np.testing.assert_allclose( + embedding, + expected_embedding, + atol=1e-5, + ) + + +def test_gaussian_covariance( + moons_functional_dataset: FDataGrid, +) -> None: + """Test Gaussian covariance in covariance module against RBF kernel.""" + sigma = 0.1 + fdata = moons_functional_dataset + gaussian_cov_matrix = Gaussian(length_scale=sigma)(fdata) + rbf_kernel_matrix = rbf_kernel(fdata, sigma) + + np.testing.assert_allclose( + gaussian_cov_matrix, + rbf_kernel_matrix, + atol=1e-7, + )