Skip to content

Commit

Permalink
Covariance_kernels pullrequest changes incorporated.
Browse files Browse the repository at this point in the history
  • Loading branch information
E105D104U125 committed Jul 4, 2024
1 parent 2ca5360 commit 81f6e58
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 58 deletions.
123 changes: 69 additions & 54 deletions skfda/misc/covariances.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
"""Covariances module."""
from __future__ import annotations

import abc
from typing import Callable, Sequence, Tuple, Union
from typing import Any, Callable, Sequence

import matplotlib.pyplot as plt
import numpy as np
import sklearn.gaussian_process.kernels as sklearn_kern
from matplotlib.figure import Figure
from numpy.typing import NDArray
from scipy.special import gamma, kv

from ..misc._math import inner_product_matrix
Expand All @@ -22,23 +24,20 @@ def _squared_norms(x: NDArrayFloat, y: NDArrayFloat) -> NDArrayFloat:
).sum(2)


CovarianceLike = Union[
float,
NDArrayFloat,
Callable[[ArrayLike, ArrayLike], NDArrayFloat],
]
CovarianceLike = (
float
| NDArrayFloat
| Callable[[ArrayLike, ArrayLike], NDArrayFloat]
)

InputAcceptable = Union[
np.ndarray,
FData,
]
Input = NDArray[Any] | FData


def _transform_to_2d(t: ArrayLike) -> NDArrayFloat:
"""Transform 1d arrays in column vectors."""
t = np.asfarray(t)

dim = len(t.shape)
dim = t.ndim
assert dim <= 2

if dim < 2:
Expand All @@ -58,39 +57,40 @@ def _execute_covariance(

if isinstance(covariance, (int, float)):
return np.array(covariance)

if callable(covariance):
result = covariance(x, y)
elif isinstance(covariance, np.ndarray):
result = covariance
else:
if callable(covariance):
result = covariance(x, y)
elif isinstance(covariance, np.ndarray):
result = covariance
else:
# GPy kernel
result = covariance.K(x, y)
# GPy kernel
result = covariance.K(x, y)

assert result.shape[0] == len(x)
assert result.shape[1] == len(y)
return result
assert result.shape[0] == len(x)
assert result.shape[1] == len(y)
return result


class Covariance(abc.ABC):
"""Abstract class for covariance functions."""

_parameters_str: Sequence[Tuple[str, str]]
_parameters_str: Sequence[tuple[str, str]]
_latex_formula: str

@abc.abstractmethod
def __call__(
self,
x: InputAcceptable,
y: InputAcceptable | None = None,
x: Input,
y: Input | None = None,
) -> NDArrayFloat:
"""Compute covariance function on input data."""
pass

def _param_check_and_transform(
self,
x: InputAcceptable,
y: InputAcceptable | None = None,
) -> Tuple[np.ndarray | FData, np.ndarray | FData]:
x: Input,
y: Input | None = None,
) -> tuple[Input, Input]:
# Param check
if y is None:
y = x
Expand All @@ -101,16 +101,15 @@ def _param_check_and_transform(
f'({type(x)}, {type(y)}).',
)

if isinstance(x, np.ndarray) and isinstance(y, np.ndarray):
x, y = np.array(x), np.array(y)
if not isinstance(x, FData) and not isinstance(y, FData):
if len(x.shape) < 2:
x = np.atleast_2d(x)
if len(y.shape) < 2:
y = np.atleast_2d(y)

return x, y

def heatmap(self, limits: Tuple[float, float] = (-1, 1)) -> Figure:
def heatmap(self, limits: tuple[float, float] = (-1, 1)) -> Figure:
"""Return a heatmap plot of the covariance function."""
from ..exploratory.visualization._utils import _create_figure

Expand Down Expand Up @@ -266,6 +265,7 @@ class Brownian(Covariance):
Brownian()
"""

_latex_formula = (
r"K(x, x') = \sigma^2 \frac{|x - \mathcal{O}| + "
r"|x' - \mathcal{O}| - |x - x'|}{2}"
Expand All @@ -282,14 +282,21 @@ def __init__(self, *, variance: float = 1, origin: float = 0) -> None:

def __call__(
self,
x: InputAcceptable | FData,
y: InputAcceptable | None = None,
x: NDArray[Any],
y: NDArray[Any] | None = None,
) -> NDArrayFloat:
"""Compute Brownian covariance function on input data."""
if isinstance(x, FData) or isinstance(y, FData):
raise ValueError('Not defined for FData objects.')
raise ValueError(
'Brownian covariance not defined for FData objects.',
)

xx: NDArray[Any]
yy: NDArray[Any]
xx, yy = self._param_check_and_transform(x, y)

x = _transform_to_2d(x) - self.origin
y = _transform_to_2d(y) - self.origin
xx = xx - self.origin
yy = yy - self.origin

sum_norms = np.add.outer(
np.linalg.norm(x, axis=-1),
Expand Down Expand Up @@ -363,9 +370,10 @@ def __init__(self, *, variance: float = 1, intercept: float = 0) -> None:

def __call__(
self,
x: InputAcceptable,
y: InputAcceptable | None = None,
x: Input,
y: Input | None = None,
) -> NDArrayFloat:
"""Compute linear covariance function on input data."""
x, y = self._param_check_and_transform(x, y)

x_y = inner_product_matrix(x, y)
Expand Down Expand Up @@ -445,12 +453,13 @@ def __init__(
self.intercept = intercept
self.slope = slope
self.degree = degree

def __call__(
self,
x: InputAcceptable,
y: InputAcceptable | None = None,
x: Input,
y: Input | None = None,
) -> NDArrayFloat:
"""Compute polynomial covariance function on input data."""
x, y = self._param_check_and_transform(x, y)

x_y = inner_product_matrix(x, y)
Expand Down Expand Up @@ -527,13 +536,16 @@ def __init__(self, *, variance: float = 1, length_scale: float = 1):

def __call__(
self,
x: InputAcceptable,
y: InputAcceptable | None = None,
x: Input,
y: Input | None = None,
) -> NDArrayFloat:
"""Compute Gaussian covariance function on input data."""
x, y = self._param_check_and_transform(x, y)

distance_x_y = PairwiseMetric(l2_distance)(x, y)
return self.variance * np.exp(-distance_x_y ** 2 / (2 * self.length_scale ** 2))
return self.variance * np.exp( # type: ignore[no-any-return]
-distance_x_y ** 2 / (2 * self.length_scale ** 2),
)

def to_sklearn(self) -> sklearn_kern.Kernel:
return (
Expand Down Expand Up @@ -606,9 +618,10 @@ def __init__(

def __call__(
self,
x: InputAcceptable,
y: InputAcceptable | None = None,
x: Input,
y: Input | None = None,
) -> NDArrayFloat:
"""Compute exponential covariance function on input data."""
x, y = self._param_check_and_transform(x, y)

distance_x_y = PairwiseMetric(l2_distance)(x, y)
Expand Down Expand Up @@ -680,9 +693,10 @@ def __init__(self, *, variance: float = 1):

def __call__(
self,
x: InputAcceptable,
y: InputAcceptable | None = None,
x: Input,
y: Input | None = None,
) -> NDArrayFloat:
"""Compute white noise covariance function on input data."""
if isinstance(x, FData) or isinstance(y, FData):
raise ValueError('Not defined for FData objects.')

Expand Down Expand Up @@ -767,9 +781,10 @@ def __init__(

def __call__(
self,
x: InputAcceptable,
y: InputAcceptable | None = None,
x: Input,
y: Input | None = None,
) -> NDArrayFloat:
"""Compute Matern covariance function on input data."""
x, y = self._param_check_and_transform(x, y)

distance_x_y = PairwiseMetric(l2_distance)(x, y)
Expand Down Expand Up @@ -864,12 +879,9 @@ def __init__(self, data: FData) -> None:

def __call__(
self,
x: InputAcceptable,
y: InputAcceptable | None = None,
x: Input,
y: Input | None = None,
) -> NDArrayFloat:
if isinstance(x, FData) or isinstance(y, FData):
raise ValueError('Not defined for FData objects.')

"""Evaluate the covariance function.
Args:
Expand All @@ -879,6 +891,9 @@ def __call__(
Returns:
Covariance function evaluated at the grid formed by x and y.
"""
if isinstance(x, FData) or isinstance(y, FData):
raise ValueError('Not defined for FData objects.')

return self.cov_fdata([x, y], grid=True)[0, ..., 0]


Expand Down
16 changes: 12 additions & 4 deletions skfda/tests/test_covariances.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ def covariances_raise_fixture(request: Any) -> Any:

@pytest.fixture
def fdatabasis_data() -> Tuple[FDataBasis, FDataBasis]:
"""Fixture for getting fdatabasis objects."""
"""Fixture for getting fdatabasis objects.
The dataset is used to test manual calculations of the covariance functions
against the implementation.
"""
basis = MonomialBasis(
n_basis=2,
domain_range=(-2, 2),
Expand Down Expand Up @@ -141,7 +145,7 @@ def test_covariances(
fetch_functional_data: FDataGrid,
covariances_fixture: cov.Covariance,
) -> None:
"""Check that invalid parameters in fit raise exception."""
"""Check that parameter conversion is done correctly."""
fd = fetch_functional_data
cov_kernel = covariances_fixture

Expand All @@ -160,11 +164,15 @@ def test_raises(
fetch_functional_data: FDataGrid,
covariances_raise_fixture: Any,
) -> None:
"""Check that it raises a ValueError exception."""
"""Check raises ValueError.
Check that non-functional kernels raise a ValueError exception
with functional data.
"""
fd = fetch_functional_data
cov_kernel = covariances_raise_fixture

np.testing.assert_raises(
pytest.raises(
ValueError,
cov_kernel,
fd,
Expand Down

0 comments on commit 81f6e58

Please sign in to comment.