diff --git a/docs/modules/datasets.rst b/docs/modules/datasets.rst index f5ed9f8da..b795df85b 100644 --- a/docs/modules/datasets.rst +++ b/docs/modules/datasets.rst @@ -47,6 +47,7 @@ The following functions are used to make synthetic functional datasets: .. autosummary:: :toctree: autosummary + skfda.datasets.euler_maruyama skfda.datasets.make_gaussian skfda.datasets.make_gaussian_process skfda.datasets.make_sinusoidal_process diff --git a/skfda/datasets/__init__.py b/skfda/datasets/__init__.py index 8abd0dfda..01f53197e 100644 --- a/skfda/datasets/__init__.py +++ b/skfda/datasets/__init__.py @@ -23,6 +23,7 @@ "fetch_bone_density", ], "_samples_generators": [ + "euler_maruyama", "make_gaussian", "make_gaussian_process", "make_multimodal_landmarks", @@ -51,6 +52,7 @@ fetch_weather as fetch_weather, ) from ._samples_generators import ( + euler_maruyama as euler_maruyama, make_gaussian as make_gaussian, make_gaussian_process as make_gaussian_process, make_multimodal_landmarks as make_multimodal_landmarks, diff --git a/skfda/datasets/_samples_generators.py b/skfda/datasets/_samples_generators.py index 65c13e6c1..0ae0e6490 100644 --- a/skfda/datasets/_samples_generators.py +++ b/skfda/datasets/_samples_generators.py @@ -1,11 +1,12 @@ from __future__ import annotations import itertools -from typing import Callable, Sequence, Union +from typing import Any, Callable, Sequence, Union import numpy as np import scipy.integrate from scipy.stats import multivariate_normal +from typing_extensions import Protocol from .._utils import _cartesian_product, _to_grid_points, normalize_warping from ..misc.covariances import Brownian, CovarianceLike, _execute_covariance @@ -13,12 +14,242 @@ from ..representation import FDataGrid from ..representation.interpolation import SplineInterpolation from ..typing._base import DomainRangeLike, GridPointsLike, RandomStateLike -from ..typing._numpy import NDArrayFloat +from ..typing._numpy import ArrayLike, NDArrayFloat MeanCallable = Callable[[np.ndarray], np.ndarray] CovarianceCallable = Callable[[np.ndarray, np.ndarray], np.ndarray] MeanLike = Union[float, NDArrayFloat, MeanCallable] +SDETerm = Callable[[float, NDArrayFloat], NDArrayFloat] + + +class InitialValueGenerator(Protocol): + """Class to represent SDE initial value generators. + + This is intented to be an interface compatible with the rvs method of + SciPy distributions. + """ + + def __call__( + self, + size: int, + random_state: RandomStateLike, + ) -> NDArrayFloat: + """Interface of initial value generator.""" + + +def euler_maruyama( # noqa: WPS210 + initial_condition: ArrayLike | InitialValueGenerator, + n_grid_points: int = 100, + drift: SDETerm | ArrayLike | None = None, + diffusion: SDETerm | ArrayLike | None = None, + n_samples: int | None = None, + start: float = 0.0, # noqa: WPS358 -- Distinguish float from integer + stop: float = 1.0, + diffusion_matricial_term: bool = True, + random_state: RandomStateLike = None, +) -> FDataGrid: + r"""Numerical integration of an Itô SDE using the Euler-Maruyana scheme. + + An SDE can be expressed with the following formula + + .. math:: + + d\mathbf{X}(t) = \mathbf{F}(t, \mathbf{X}(t))dt + \mathbf{G}(t, + \mathbf{X}(t))d\mathbf{W}(t). + + In this equation, :math:`\mathbf{X} = (X^{(1)}, X^{(2)}, ... , X^{(n)}) + \in \mathbb{R}^q` is a vector that represents the state of the stochastic + process. The function :math:`\mathbf{F}(t, \mathbf{X}) = (F^{(1)}(t, + \mathbf{X}), ..., F^{(q)}(t, \mathbf{X}))` is called drift and refers + to the deterministic component of the equation. The function + :math:`\mathbf{G} (t, \mathbf{X}) = (G^{i, j}(t, \mathbf{X}))_{i=1, j=1} + ^{q, m}` is denoted as the diffusion term and refers to the stochastic + component of the evolution. :math:`\mathbf{W}(t)` refers to a Wiener + process (Standard Brownian motion) of dimension :math:`m`. Finally, + :math:`q` refers to the dimension of the variable :math:`\mathbf{X}` + (dimension of the codomain) and :math:`m` to the dimension of the noise. + + Euler-Maruyama's method computes the approximated solution using the + Markov chain + + .. math:: + + X_{n + 1}^{(i)} = X_n^{(i)} + F^{(i)}(t_n, \mathbf{X}_n)\Delta t_n + + \sum_{j=1}^m G^{i,j}(t_n, \mathbf{X}_n)\sqrt{\Delta t_n}\Delta Z_n^j, + + where :math:`X_n^{(i)}` is the approximated value of :math:`X^{(i)}(t_n)` + and the :math:`\mathbf{Z}_m` are independent, identically distributed + :math:`m`-dimensional standard normal random variables. + + Args: + initial_condition: Initial condition of the SDE. It can have one of + three formats: An starting initial value from which to + calculate *n_samples* trajectories. An array of initial values. + For each starting point a trajectory will be calculated. A + function that generates random numbers or vectors. It should + have two parameters called size and random_state and it should + return an array. + n_grid_points: The total number of points of evaluation. + drift: Drift coefficient (:math:`F(t,\mathbf{X})` in the equation). + diffusion: Diffusion coefficient (:math:`G(t,\mathbf{X})` in the + equation). + n_samples: Number of trajectories integrated. + start: Starting time of the trajectories. + stop: Ending time of the trajectories. + diffusion_matricial_term: True if the diffusion coefficient is a + matrix. + random_state: Random state. + + + Returns: + :class:`FDataGrid` object comprising all the trajectories. + + See also: + :func:`make_gaussian_process`: Simpler function for generating + Gaussian processes. + + Examples: + Example of the use of euler_maruyama for an Ornstein-Uhlenbeck process + that has the equation: + + .. math: + + dX(t) = -A(X(t) - \mu)dt + BdW(t) + + >>> from scipy.stats import norm + >>> A = 1 + >>> mu = 3 + >>> B = 0.5 + >>> def ou_drift(t: float, x: np.ndarray) -> np.ndarray: + ... return -A * (x - mu) + >>> initial_condition = norm().rvs + >>> + >>> trajectories = euler_maruyama( + ... initial_condition=initial_condition, + ... n_samples=10, + ... drift=ou_drift, + ... diffusion=B, + ... ) + + """ + random_state = validate_random_state(random_state) + + if n_samples is None: + if callable(initial_condition): + raise ValueError( + "Invalid initial conditions. If a function is given, the " + "n_samples argument must be included.", + ) + + initial_values = np.atleast_1d(initial_condition) + n_samples = len(initial_values) + else: + if callable(initial_condition): + initial_values = initial_condition( + size=n_samples, + random_state=random_state, + ) + else: + initial_condition = np.atleast_1d(initial_condition) + dim_codomain = len(initial_condition) + initial_values = ( + initial_condition + * np.ones((n_samples, dim_codomain)) + ) + + if initial_values.ndim == 1: + initial_values = initial_values[:, np.newaxis] + elif initial_values.ndim > 2: + raise ValueError( + "Invalid initial conditions. Each of the starting points " + "must be a flat array.", + ) + (n_samples, dim_codomain) = initial_values.shape + + if dim_codomain == 1: + diffusion_matricial_term = False + + if drift is None: + drift = 0.0 # noqa: WPS358 -- Distinguish float from integer + + if callable(drift): + drift_function = drift + else: + def constant_drift( # noqa: WPS430 -- We need internal functions + t: float, + x: NDArrayFloat, + ) -> NDArrayFloat: + return np.atleast_1d(drift) + + drift_function = constant_drift + + if diffusion is None: + if diffusion_matricial_term: + diffusion = np.eye(dim_codomain) + else: + diffusion = 1.0 + + if callable(diffusion): + diffusion_function = diffusion + else: + def constant_diffusion( # noqa: WPS430 -- We need internal functions + t: float, + x: NDArrayFloat, + ) -> NDArrayFloat: + return np.atleast_1d(diffusion) + + diffusion_function = constant_diffusion + + def vector_diffusion_times_noise( # noqa: WPS430 We need internal functons + t_n: float, + x_n: NDArrayFloat, + noise: NDArrayFloat, + ) -> NDArrayFloat: + return diffusion_function(t_n, x_n) * noise + + def matrix_diffusion_times_noise( # noqa: WPS430 We need internal functons + t_n: float, + x_n: NDArrayFloat, + noise: NDArrayFloat, + ) -> Any: + return np.einsum( + '...dj, ...j -> ...d', + diffusion_function(t_n, x_n), + noise, + ) + + dim_noise = dim_codomain + + if diffusion_matricial_term: + diffusion_times_noise = matrix_diffusion_times_noise + dim_noise = diffusion_function(start, initial_values).shape[-1] + else: + diffusion_times_noise = vector_diffusion_times_noise + + data_matrix = np.zeros((n_samples, n_grid_points, dim_codomain)) + times = np.linspace(start, stop, n_grid_points) + delta_t = times[1:] - times[:-1] + noise = random_state.standard_normal( + size=(n_samples, n_grid_points - 1, dim_noise), + ) + data_matrix[:, 0] = initial_values + + for n in range(n_grid_points - 1): + t_n = times[n] + x_n = data_matrix[:, n] + + data_matrix[:, n + 1] = ( + x_n + + delta_t[n] * drift_function(t_n, x_n) + + diffusion_times_noise(t_n, x_n, noise[:, n]) + * np.sqrt(delta_t[n]) + ) + + return FDataGrid( + grid_points=times, + data_matrix=data_matrix, + ) def make_gaussian( @@ -148,7 +379,7 @@ def make_gaussian_process( ) -def make_sinusoidal_process( +def make_sinusoidal_process( # noqa: WPS211 n_samples: int = 15, n_features: int = 100, *, @@ -273,7 +504,7 @@ def make_multimodal_landmarks( return modes_location + variation -def make_multimodal_samples( +def make_multimodal_samples( # noqa: WPS211 n_samples: int = 15, *, n_modes: int = 1, @@ -371,7 +602,7 @@ def make_multimodal_samples( # Covariance matrix of the samples cov = mode_std * np.eye(dim_domain) - for i, j, k in itertools.product( + for i, j, k in itertools.product( # noqa: WPS440 range(n_samples), range(dim_codomain), range(n_modes), diff --git a/skfda/representation/irregular.py b/skfda/representation/irregular.py index 78350ba84..6b99d72b3 100644 --- a/skfda/representation/irregular.py +++ b/skfda/representation/irregular.py @@ -1409,14 +1409,15 @@ def __getitem__( ], ) - indices = np.cumsum(chunk_sizes) - chunk_sizes[0] + indices = np.concatenate([[0], np.cumsum(chunk_sizes)])[:-1] return self.copy( start_indices=indices.astype(int), points=arguments, values=values, - sample_names=self.sample_names[key], + sample_names=list(np.array(self.sample_names)[key]), ) + ##################################################################### # Numpy methods ##################################################################### diff --git a/skfda/tests/test_euler_maruyama.py b/skfda/tests/test_euler_maruyama.py new file mode 100644 index 000000000..e6684add2 --- /dev/null +++ b/skfda/tests/test_euler_maruyama.py @@ -0,0 +1,687 @@ +"""Tests of Euler Maruyama.""" + +import numpy as np +from scipy.stats import multivariate_normal, norm + +from skfda.datasets import euler_maruyama +from skfda.typing._numpy import NDArrayFloat + + +def test_one_initial_point() -> None: + """Case 1 -> One initial point + n_samples > 0. + + 1.1: initial_condition = 0, n_samples = 2 + + """ + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) # noqa: WPS204 -- reproducibility + initial_float = 0 + + expected_result = np.array([[ + [0], + [0.81217268], + [0.50629448], + [0.2422086], + [-0.29427571], + ], + [[0], + [0.43270381], + [-0.71806553], + [0.15434035], + [-0.2262631], + ], + ]) + + fd = euler_maruyama( + initial_float, + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + +def test_one_initial_point_monodimensional() -> None: + """Case 1.2 Starting point = float in an array format + n_samples. + + initial_condition = [0], n_samples = 2. + The result should be the same as 1.1 + """ + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + initial_float_in_list = [0] + + expected_result = np.array([[ + [0], + [0.81217268], + [0.50629448], + [0.2422086], + [-0.29427571], + ], + [[0], + [0.43270381], + [-0.71806553], + [0.15434035], + [-0.2262631], + ], + ]) + + fd = euler_maruyama( + initial_float_in_list, + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + +def test_one_initial_point_multidimensional() -> None: + """Case 1.3 Starting point = array + n_samples. + + initial_condition = np.array([1, 0]), n_samples = 2. + """ + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + initial_array = np.array([1, 0]) + + expected_result = np.array([ + [[1, 0], + [1.81217268, -0.30587821], + [1.54808681, -0.84236252], + [1.98079062, -1.99313187], + [2.8531965, -2.37373532], + ], + [[1, 0], + [1.15951955, -0.12468519], + [1.89057352, -1.15475554], + [1.72936491, -1.34678272], + [2.29624964, -1.89672835], + ], + ]) + + fd = euler_maruyama( + initial_array, + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + +def test_initial_data_generator() -> None: + """. + + Case 2 -> A function that generates random variables or vectors + + n_samples > 0. + + Tests: + - initial_condition = random_state.standard_normal, + n_samples = 2. + - initial_condition = random_state.standard_normal_2d, + n_samples = 2. + """ + n_samples = 2 + n_grid_points = 5 + + # Case 1 Starting generator of floats + n_samples + random_state = np.random.RandomState(1) + + def standard_normal_generator( # noqa: WPS430 + size: int, + random_state: np.random.RandomState, + ) -> NDArrayFloat: + return ( + random_state.standard_normal(size) + ) + + expected_result = np.array([ + [[1.62434536], + [1.36025949], + [0.82377518], + [1.25647899], + [0.10570964], + ], + [[-0.61175641], + [0.26064947], + [-0.11995398], + [0.03956557], + [-0.08511962], + ], + ]) + + fd = euler_maruyama( + standard_normal_generator, + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + # Case 2 Starting generator of vecotrs + n_samples + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + + def standard_normal_generator_2d( # noqa: WPS430 + size: int, + random_state: np.random.RandomState, + ) -> NDArrayFloat: + return random_state.standard_normal((size, 2)) + + expected_result = np.array([ + [[1.62434536, -0.61175641], + [2.05704918, -1.76252576], + [2.92945506, -2.14312921], + [3.08897461, -2.2678144], + [3.82002858, -3.29788476], + ], + [[-0.52817175, -1.07296862], + [-0.68938035, -1.2649958], + [-0.12249563, -1.81494143], + [-0.20870974, -2.25387064], + [-0.18760286, -1.96246304], + ], + ]) + + fd = euler_maruyama( + standard_normal_generator_2d, + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + +def test_initial_distribution() -> None: + """. + + Case 3 -> A scipy.stats distribution that generates data with the + rvs method. + Tests: + - initial_condition = scipy.stats.norm().rvs, n_samples = 2 + - initial_condition = scipy.stats.multivariate_norm().rvs, + n_samples = 2. + """ + n_samples = 2 + n_grid_points = 5 + + # Case 1 Starting 1-d distribution + n_samples + # The result should be the same as with the generator function + random_state = np.random.RandomState(1) + normal_distribution = norm().rvs + + expected_result = np.array([ + [[1.62434536], + [1.36025949], + [0.82377518], + [1.25647899], + [0.10570964], + ], + [[-0.61175641], + [0.26064947], + [-0.11995398], + [0.03956557], + [-0.08511962], + ], + ]) + + fd = euler_maruyama( + normal_distribution, + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + # Case 2 Starting n-d distribution + n_samples + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + mean = np.array([0, 1]) + covarianze_matrix = np.array([[1, 0], [0, 1]]) + + multivariate_normal_distribution = ( + multivariate_normal(mean, covarianze_matrix).rvs + ) + + expected_result = np.array([ + [[1.62434536, 0.38824359], + [2.05704918, -0.76252576], + [2.92945506, -1.14312921], + [3.08897461, -1.2678144], + [3.82002858, -2.29788476], + ], + [[-0.52817175, -0.07296862], + [-0.68938035, -0.2649958], + [-0.12249563, -0.81494143], + [-0.20870974, -1.25387064], + [-0.18760286, -0.96246304], + ], + ]) + + fd = euler_maruyama( + multivariate_normal_distribution, + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + +def test_vector_initial_points() -> None: + """Caso 4 -> A vector of initial points. + + Tests: + - initial_condition = 0 + - initial_condition = np.array([0, 1]) + - initial_condition = np.array([[0, 1], + [2, 0]]) + """ + n_grid_points = 5 + + # Case 1. One Starting 1d point = float + initial_float = 0 + random_state = np.random.RandomState(1) + expected_result = np.array([ + [[0], + [0.81217268], + [0.50629448], + [0.2422086], + [-0.29427571], + ], + ]) + + fd = euler_maruyama( + initial_float, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + # Case 2. Two Starting 1d points + n_grid_points = 5 + initial_array = np.array([0, 1]) + random_state = np.random.RandomState(1) + expected_result = np.array([ + [[0], + [0.81217268], + [0.50629448], + [0.2422086], + [-0.29427571], + ], + [[1], + [1.43270381], + [0.28193447], + [1.15434035], + [0.7737369], + ], + ]) + + fd = euler_maruyama( + initial_array, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + # Case 3. Starting vector + n_grid_points = 5 + initial_array = np.array([[0, 1], [2, 0]]) + random_state = np.random.RandomState(1) + expected_result = np.array([ + [[0, 1], + [0.81217268, 0.69412179], + [0.54808681, 0.15763748], + [0.98079062, -0.99313187], + [1.8531965, -1.37373532], + ], + [[2, 0], + [2.15951955, -0.12468519], + [2.89057352, -1.15475554], + [2.72936491, -1.34678272], + [3.29624964, -1.89672835], + ], + ]) + + fd = euler_maruyama( + initial_array, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + +def test_drift_cases() -> None: + """Test for different drift inputs.""" + initial_condition = np.array([0, 0]) # noqa: WPS204 + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + + def base_drift( # noqa: WPS430 + t: float, + x: NDArrayFloat, + ) -> float: + return 0 + + expected_result = np.array([ + [[0, 0], + [0.81217268, -0.30587821], + [0.54808681, -0.84236252], + [0.98079062, -1.99313187], + [1.8531965, -2.37373532], + ], + [[0, 0], + [0.15951955, -0.12468519], + [0.89057352, -1.15475554], + [0.72936491, -1.34678272], + [1.29624964, -1.89672835], + ], + ]) + + fd = euler_maruyama( + initial_condition, + n_grid_points=n_grid_points, + n_samples=n_samples, + drift=base_drift, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + initial_condition = np.array([0, 0]) + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + + def vector_drift( # noqa: WPS430 + t: float, + x: NDArrayFloat, + ) -> NDArrayFloat: + return np.array([0, 0]) + + expected_result = np.array([ + [[0, 0], + [0.81217268, -0.30587821], + [0.54808681, -0.84236252], + [0.98079062, -1.99313187], + [1.8531965, -2.37373532], + ], + [[0, 0], + [0.15951955, -0.12468519], + [0.89057352, -1.15475554], + [0.72936491, -1.34678272], + [1.29624964, -1.89672835], + ], + ]) + + fd = euler_maruyama( + initial_condition, + n_grid_points=n_grid_points, + n_samples=n_samples, + drift=vector_drift, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + +def test_diffusion_cases() -> None: + """Test for different diffusion inputs.""" + initial_condition = np.array([0, 0]) + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + + vector_diffusion = np.array([1, 2]) + + # The result should be the same as the previous test, but with + # the second column doubled + expected_result = np.array([ + [[0, 0], + [0.81217268, -0.30587821], + [0.54808681, -0.84236252], + [0.98079062, -1.99313187], + [1.8531965, -2.37373532], + ], + [[0, 0], + [0.15951955, -0.12468519], + [0.89057352, -1.15475554], + [0.72936491, -1.34678272], + [1.29624964, -1.89672835], + ], + ]) + expected_result[:, :, 1] *= 2 + + fd = euler_maruyama( + initial_condition, + n_grid_points=n_grid_points, + n_samples=n_samples, + drift=0, + diffusion=vector_diffusion, + diffusion_matricial_term=False, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + # The expected result should be the same than in test 2 of case 1 of + # initial_conditions tests but adding a second column that is the + # first doubled + + initial_condition = np.array([0, 0]) + n_samples = 2 + n_grid_points = 5 + random_state = np.random.RandomState(1) + + matrix_diffusion = np.array([[1], [2]]) + + expected_result = np.array([ + [[0], + [0.81217268], + [0.50629448], + [0.2422086], + [-0.29427571], + ], + [[0], + [0.43270381], + [-0.71806553], + [0.15434035], + [-0.2262631], + ], + ]) + + expected_result = np.concatenate( # noqa: WPS317 + (expected_result, 2 * expected_result), + axis=2, + ) + + fd = euler_maruyama( + initial_condition, + n_grid_points=n_grid_points, + n_samples=n_samples, + drift=0, + diffusion=matrix_diffusion, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) + + +def test_grid_points() -> None: + """Test for checking that the grid_points are correct.""" + random_state = np.random.RandomState(1) + initial_condition = np.array([0, 1]) + start = 0 + stop = 10 + n_grid_points = 105 + expected_grid_points = np.atleast_2d( + np.linspace(start, stop, n_grid_points), + ) + + fd = euler_maruyama( + initial_condition, + n_grid_points=n_grid_points, + n_samples=10, + start=start, + stop=stop, + random_state=random_state, + ) + + np.testing.assert_array_equal( + fd.grid_points, + expected_grid_points, + ) + + +def test_initial_condition_negative_cases() -> None: + """Test for checking initial_condition related error cases.""" + random_state = np.random.RandomState(1) + + # Case 1 Starting vector of points and n_samples + initial_condition = np.zeros((2, 2)) + n_samples = 3 + + with np.testing.assert_raises(ValueError): + euler_maruyama( + initial_condition, + n_samples=n_samples, + random_state=random_state, + ) + + # Case 2: Inital condition is an array of more than 2d + initial_condition = np.zeros((1, 1, 1)) + + with np.testing.assert_raises(ValueError): + euler_maruyama( + initial_condition, + random_state=random_state, + ) + + # Case 3: generator function without n_samples + initial_generator = norm().rvs + + with np.testing.assert_raises(ValueError): + euler_maruyama( + initial_generator, + random_state=random_state, + ) + + # Case 4: n_samples not greater than 0 + + with np.testing.assert_raises(ValueError): + euler_maruyama( + 0, + n_samples=-1, + random_state=random_state, + ) + + +def test_diffusion_negative_cases() -> None: + """Test for checking diffusion related error cases.""" + initial_condition = np.array([0, 0]) + n_samples = 2 + random_state = np.random.RandomState(1) + + vector_diffusion = np.array([1, 2]) + + initial_condition = np.array([0, 0, 0]) + + with np.testing.assert_raises(ValueError): + euler_maruyama( + initial_condition, + n_samples=n_samples, + diffusion=vector_diffusion, + random_state=random_state, + ) + + +def test_random_generator() -> None: + """Test using Generator instead of RandomState.""" + n_samples = 2 + n_grid_points = 5 + random_state = np.random.default_rng(seed=1) + normal_distribution = norm().rvs + + expected_result = np.array([ + [[0.34558419], + [0.51080273], + [-0.14077589], + [0.31190205], + [0.53508933], + ], + + [[0.82161814], + [0.55314153], + [0.84370058], + [1.02598678], + [1.17305302], + ], + ]) + + fd = euler_maruyama( + normal_distribution, + n_samples=n_samples, + n_grid_points=n_grid_points, + random_state=random_state, + ) + + np.testing.assert_almost_equal( + fd.data_matrix, + expected_result, + ) diff --git a/skfda/tests/test_irregular.py b/skfda/tests/test_irregular.py index a056efc6b..e268c50c4 100644 --- a/skfda/tests/test_irregular.py +++ b/skfda/tests/test_irregular.py @@ -294,6 +294,11 @@ def test_fdatairregular_getitem( assert len(fdatairregular[:NUM_CURVES:2]) == NUM_CURVES / 2 assert len(fdatairregular[:NUM_CURVES:2]) == NUM_CURVES / 2 + idxs = np.array((2, 0, 6)) + fd_subset = fdatairregular[idxs] + for i, idx in enumerate(idxs): + assert all(fd_subset[i] == fdatairregular[idx]) + def test_fdatairregular_coordinates( fdatairregular: FDataIrregular,