diff --git a/examples/sklearnex/incremental_covariance_spmd.py b/examples/sklearnex/incremental_covariance_spmd.py new file mode 100644 index 0000000000..5cae45960a --- /dev/null +++ b/examples/sklearnex/incremental_covariance_spmd.py @@ -0,0 +1,58 @@ +# =============================================================================== +# Copyright 2024 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== + +import dpctl +import dpctl.tensor as dpt +import numpy as np +from mpi4py import MPI + +from sklearnex.spmd.covariance import IncrementalEmpiricalCovariance + + +def get_local_data(data, comm): + rank = comm.Get_rank() + num_ranks = comm.Get_size() + local_size = (data.shape[0] + num_ranks - 1) // num_ranks + return data[rank * local_size : (rank + 1) * local_size] + + +# We create SYCL queue and MPI communicator to perform computation on multiple GPUs + +q = dpctl.SyclQueue("gpu") +comm = MPI.COMM_WORLD + +num_batches = 2 +seed = 77 +num_samples, num_features = 3000, 3 +drng = np.random.default_rng(seed) +X = drng.random(size=(num_samples, num_features)) + +# Local data are obtained for each GPU and splitted into batches + +X_local = get_local_data(X, comm) +X_split = np.array_split(X_local, num_batches) + +cov = IncrementalEmpiricalCovariance() + +# Partial fit is called for each batch on each GPU + +for i in range(num_batches): + dpt_X = dpt.asarray(X_split[i], usm_type="device", sycl_queue=q) + cov.partial_fit(dpt_X) + +# Finalization of results is performed in a lazy way after requesting results like in non-SPMD incremental estimators. + +print(f"Computed covariance values on rank {comm.Get_rank()}:\n", cov.covariance_) diff --git a/onedal/covariance/covariance.cpp b/onedal/covariance/covariance.cpp index 789fac5755..ebd1c54226 100644 --- a/onedal/covariance/covariance.cpp +++ b/onedal/covariance/covariance.cpp @@ -175,9 +175,11 @@ ONEDAL_PY_INIT_MODULE(covariance) { using namespace dal::covariance; auto sub = m.def_submodule("covariance"); + #ifdef ONEDAL_DATA_PARALLEL_SPMD ONEDAL_PY_INSTANTIATE(init_compute_ops, sub, policy_spmd, task::compute); - #else + ONEDAL_PY_INSTANTIATE(init_finalize_compute_ops, sub, policy_spmd, task::compute); + #else ONEDAL_PY_INSTANTIATE(init_compute_ops, sub, policy_list, task::compute); ONEDAL_PY_INSTANTIATE(init_partial_compute_ops, sub, policy_list, task::compute); ONEDAL_PY_INSTANTIATE(init_finalize_compute_ops, sub, policy_list, task::compute); diff --git a/onedal/covariance/incremental_covariance.py b/onedal/covariance/incremental_covariance.py index d2737ce2b4..baa6d48163 100644 --- a/onedal/covariance/incremental_covariance.py +++ b/onedal/covariance/incremental_covariance.py @@ -15,8 +15,7 @@ # =============================================================================== import numpy as np -from daal4py.sklearn._utils import daal_check_version, get_dtype, make2d -from onedal import _backend +from daal4py.sklearn._utils import daal_check_version, get_dtype from ..datatypes import _convert_to_supported, from_table, to_table from ..utils import _check_array @@ -86,10 +85,11 @@ def partial_fit(self, X, y=None, queue=None): """ X = _check_array(X, dtype=[np.float64, np.float32], ensure_2d=True) - if not hasattr(self, "_policy"): - self._policy = self._get_policy(queue, X) + self._queue = queue - X = _convert_to_supported(self._policy, X) + policy = self._get_policy(queue, X) + + X = _convert_to_supported(policy, X) if not hasattr(self, "_dtype"): self._dtype = get_dtype(X) @@ -100,7 +100,7 @@ def partial_fit(self, X, y=None, queue=None): "covariance", None, "partial_compute", - self._policy, + policy, params, self._partial_result, table_X, @@ -114,7 +114,7 @@ def finalize_fit(self, queue=None): Parameters ---------- queue : dpctl.SyclQueue - Not used here, added for API conformance + If not None, use this queue for computations. Returns ------- @@ -122,11 +122,16 @@ def finalize_fit(self, queue=None): Returns the instance itself. """ params = self._get_onedal_params(self._dtype) + if queue is not None: + policy = self._get_policy(queue) + else: + policy = self._get_policy(self._queue) + result = self._get_backend( "covariance", None, "finalize_compute", - self._policy, + policy, params, self._partial_result, ) diff --git a/onedal/spmd/covariance/__init__.py b/onedal/spmd/covariance/__init__.py index 61b94adeff..e83495c684 100644 --- a/onedal/spmd/covariance/__init__.py +++ b/onedal/spmd/covariance/__init__.py @@ -15,5 +15,6 @@ # ============================================================================== from .covariance import EmpiricalCovariance +from .incremental_covariance import IncrementalEmpiricalCovariance -__all__ = ["EmpiricalCovariance"] +__all__ = ["EmpiricalCovariance", "IncrementalEmpiricalCovariance"] diff --git a/onedal/spmd/covariance/incremental_covariance.py b/onedal/spmd/covariance/incremental_covariance.py new file mode 100644 index 0000000000..e0840c3ac6 --- /dev/null +++ b/onedal/spmd/covariance/incremental_covariance.py @@ -0,0 +1,82 @@ +# ============================================================================== +# Copyright 2024 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +import numpy as np + +from daal4py.sklearn._utils import get_dtype + +from ...covariance import ( + IncrementalEmpiricalCovariance as base_IncrementalEmpiricalCovariance, +) +from ...datatypes import _convert_to_supported, to_table +from ...utils import _check_array +from .._base import BaseEstimatorSPMD + + +class IncrementalEmpiricalCovariance( + BaseEstimatorSPMD, base_IncrementalEmpiricalCovariance +): + def _reset(self): + self._partial_result = super( + base_IncrementalEmpiricalCovariance, self + )._get_backend("covariance", None, "partial_compute_result") + + def partial_fit(self, X, y=None, queue=None): + """ + Computes partial data for the covariance matrix + from data batch X and saves it to `_partial_result`. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Training data batch, where `n_samples` is the number of samples + in the batch, and `n_features` is the number of features. + + y : Ignored + Not used, present for API consistency by convention. + + queue : dpctl.SyclQueue + If not None, use this queue for computations. + + Returns + ------- + self : object + Returns the instance itself. + """ + X = _check_array(X, dtype=[np.float64, np.float32], ensure_2d=True) + + self._queue = queue + + policy = super(base_IncrementalEmpiricalCovariance, self)._get_policy(queue, X) + + X = _convert_to_supported(policy, X) + + if not hasattr(self, "_dtype"): + self._dtype = get_dtype(X) + + params = self._get_onedal_params(self._dtype) + table_X = to_table(X) + self._partial_result = super( + base_IncrementalEmpiricalCovariance, self + )._get_backend( + "covariance", + None, + "partial_compute", + policy, + params, + self._partial_result, + table_X, + ) diff --git a/sklearnex/covariance/incremental_covariance.py b/sklearnex/covariance/incremental_covariance.py index 26c5acc054..630982cb9e 100644 --- a/sklearnex/covariance/incremental_covariance.py +++ b/sklearnex/covariance/incremental_covariance.py @@ -115,9 +115,9 @@ def _onedal_supported(self, method_name, *data): ) return patching_status - def _onedal_finalize_fit(self): + def _onedal_finalize_fit(self, queue=None): assert hasattr(self, "_onedal_estimator") - self._onedal_estimator.finalize_fit() + self._onedal_estimator.finalize_fit(queue=queue) self._need_to_finalize = False if not daal_check_version((2024, "P", 400)) and self.assume_centered: @@ -192,7 +192,7 @@ def _onedal_partial_fit(self, X, queue=None, check_input=True): else: self.n_samples_seen_ += X.shape[0] - self._onedal_estimator.partial_fit(X, queue) + self._onedal_estimator.partial_fit(X, queue=queue) finally: self._need_to_finalize = True @@ -326,7 +326,7 @@ def _onedal_fit(self, X, queue=None): X_batch = X[batch] self._onedal_partial_fit(X_batch, queue=queue, check_input=False) - self._onedal_finalize_fit() + self._onedal_finalize_fit(queue=queue) return self diff --git a/sklearnex/covariance/tests/test_incremental_covariance.py b/sklearnex/covariance/tests/test_incremental_covariance.py index d15ac3bb6c..0b44c2de7d 100644 --- a/sklearnex/covariance/tests/test_incremental_covariance.py +++ b/sklearnex/covariance/tests/test_incremental_covariance.py @@ -26,6 +26,7 @@ from sklearn.datasets import load_diabetes from sklearn.decomposition import PCA +from daal4py.sklearn._utils import daal_check_version from onedal.tests.utils._dataframes_support import ( _as_numpy, _convert_to_dataframe, @@ -37,6 +38,11 @@ @pytest.mark.parametrize("dtype", [np.float32, np.float64]) @pytest.mark.parametrize("assume_centered", [True, False]) def test_sklearnex_partial_fit_on_gold_data(dataframe, queue, dtype, assume_centered): + is_gpu = queue is not None and queue.sycl_device.is_gpu + if assume_centered and is_gpu and not daal_check_version((2025, "P", 0)): + pytest.skip( + "Due to a bug on oneDAL side, means are not set to zero when assume_centered=True" + ) from sklearnex.covariance import IncrementalEmpiricalCovariance X = np.array([[0, 1], [0, 1]]) @@ -143,6 +149,11 @@ def test_sklearnex_partial_fit_on_random_data( def test_sklearnex_fit_on_random_data( dataframe, queue, num_batches, row_count, column_count, dtype, assume_centered ): + is_gpu = queue is not None and queue.sycl_device.is_gpu + if assume_centered and is_gpu and not daal_check_version((2025, "P", 0)): + pytest.skip( + "Due to a bug on oneDAL side, means are not set to zero when assume_centered=True" + ) from sklearnex.covariance import IncrementalEmpiricalCovariance seed = 77 diff --git a/sklearnex/spmd/covariance/__init__.py b/sklearnex/spmd/covariance/__init__.py index 61b94adeff..e83495c684 100644 --- a/sklearnex/spmd/covariance/__init__.py +++ b/sklearnex/spmd/covariance/__init__.py @@ -15,5 +15,6 @@ # ============================================================================== from .covariance import EmpiricalCovariance +from .incremental_covariance import IncrementalEmpiricalCovariance -__all__ = ["EmpiricalCovariance"] +__all__ = ["EmpiricalCovariance", "IncrementalEmpiricalCovariance"] diff --git a/sklearnex/spmd/covariance/incremental_covariance.py b/sklearnex/spmd/covariance/incremental_covariance.py new file mode 100644 index 0000000000..1e18f8842b --- /dev/null +++ b/sklearnex/spmd/covariance/incremental_covariance.py @@ -0,0 +1,37 @@ +# ============================================================================== +# Copyright 2024 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +from onedal.spmd.covariance import ( + IncrementalEmpiricalCovariance as onedalSPMD_IncrementalEmpiricalCovariance, +) + +from ...covariance import ( + IncrementalEmpiricalCovariance as base_IncrementalEmpiricalCovariance, +) + + +class IncrementalEmpiricalCovariance(base_IncrementalEmpiricalCovariance): + """ + Incremental distributed estimator for covariance. + Allows to distributely compute empirical covariance estimated by maximum + likelihood method if data are splitted into batches. + + API is the same as for `sklearnex.covariance.IncrementalEmpiricalCovariance` + """ + + _onedal_incremental_covariance = staticmethod( + onedalSPMD_IncrementalEmpiricalCovariance + ) diff --git a/sklearnex/spmd/covariance/tests/test_incremental_covariance_spmd.py b/sklearnex/spmd/covariance/tests/test_incremental_covariance_spmd.py new file mode 100644 index 0000000000..b371b67bb2 --- /dev/null +++ b/sklearnex/spmd/covariance/tests/test_incremental_covariance_spmd.py @@ -0,0 +1,184 @@ +# ============================================================================== +# Copyright 2024 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== + +import numpy as np +import pytest +from numpy.testing import assert_allclose + +from onedal.tests.utils._dataframes_support import ( + _convert_to_dataframe, + get_dataframes_and_queues, +) +from sklearnex.tests._utils_spmd import ( + _generate_statistic_data, + _get_local_tensor, + _mpi_libs_and_gpu_available, +) + + +@pytest.mark.skipif( + not _mpi_libs_and_gpu_available, + reason="GPU device and MPI libs required for test", +) +@pytest.mark.parametrize( + "dataframe,queue", + get_dataframes_and_queues(dataframe_filter_="dpnp,dpctl", device_filter_="gpu"), +) +@pytest.mark.parametrize("assume_centered", [True, False]) +@pytest.mark.parametrize("dtype", [np.float32, np.float64]) +@pytest.mark.mpi +def test_incremental_covariance_fit_spmd_gold(dataframe, queue, assume_centered, dtype): + # Import spmd and batch algo + from sklearnex.covariance import IncrementalEmpiricalCovariance + from sklearnex.spmd.covariance import ( + IncrementalEmpiricalCovariance as IncrementalEmpiricalCovariance_SPMD, + ) + + # Create gold data and process into dpt + data = np.array( + [ + [0.0, 0.0, 0.0], + [0.0, 1.0, 2.0], + [0.0, 2.0, 4.0], + [0.0, 3.0, 8.0], + [0.0, 4.0, 16.0], + [0.0, 5.0, 32.0], + [0.0, 6.0, 64.0], + [0.0, 7.0, 128.0], + ], + dtype=dtype, + ) + + dpt_data = _convert_to_dataframe(data, sycl_queue=queue, target_df=dataframe) + + local_dpt_data = _convert_to_dataframe( + _get_local_tensor(data), sycl_queue=queue, target_df=dataframe + ) + + # ensure results of batch algo match spmd + spmd_result = IncrementalEmpiricalCovariance_SPMD( + assume_centered=assume_centered + ).fit(local_dpt_data) + non_spmd_result = IncrementalEmpiricalCovariance(assume_centered=assume_centered).fit( + dpt_data + ) + + assert_allclose(spmd_result.covariance_, non_spmd_result.covariance_) + assert_allclose(spmd_result.location_, non_spmd_result.location_) + + +@pytest.mark.skipif( + not _mpi_libs_and_gpu_available, + reason="GPU device and MPI libs required for test", +) +@pytest.mark.parametrize( + "dataframe,queue", + get_dataframes_and_queues(dataframe_filter_="dpnp,dpctl", device_filter_="gpu"), +) +@pytest.mark.parametrize("num_blocks", [1, 2]) +@pytest.mark.parametrize("assume_centered", [True, False]) +@pytest.mark.parametrize("dtype", [np.float32, np.float64]) +@pytest.mark.mpi +def test_incremental_covariance_partial_fit_spmd_gold( + dataframe, queue, num_blocks, assume_centered, dtype +): + # Import spmd and batch algo + from sklearnex.covariance import IncrementalEmpiricalCovariance + from sklearnex.spmd.covariance import ( + IncrementalEmpiricalCovariance as IncrementalEmpiricalCovariance_SPMD, + ) + + # Create gold data and process into dpt + data = np.array( + [ + [0.0, 0.0, 0.0], + [0.0, 1.0, 2.0], + [0.0, 2.0, 4.0], + [0.0, 3.0, 8.0], + [0.0, 4.0, 16.0], + [0.0, 5.0, 32.0], + [0.0, 6.0, 64.0], + [0.0, 7.0, 128.0], + ], + dtype=dtype, + ) + + dpt_data = _convert_to_dataframe(data, sycl_queue=queue, target_df=dataframe) + + local_data = _get_local_tensor(data) + split_local_data = np.array_split(local_data, num_blocks) + + inccov_spmd = IncrementalEmpiricalCovariance_SPMD(assume_centered=assume_centered) + inccov = IncrementalEmpiricalCovariance(assume_centered=assume_centered) + + for i in range(num_blocks): + local_dpt_data = _convert_to_dataframe( + split_local_data[i], sycl_queue=queue, target_df=dataframe + ) + inccov_spmd.partial_fit(local_dpt_data) + + inccov.fit(dpt_data) + + assert_allclose(inccov_spmd.covariance_, inccov.covariance_) + assert_allclose(inccov_spmd.location_, inccov.location_) + + +@pytest.mark.skipif( + not _mpi_libs_and_gpu_available, + reason="GPU device and MPI libs required for test", +) +@pytest.mark.parametrize("n_samples", [100, 10000]) +@pytest.mark.parametrize("n_features", [10, 100]) +@pytest.mark.parametrize("num_blocks", [1, 2]) +@pytest.mark.parametrize("assume_centered", [True, False]) +@pytest.mark.parametrize("dtype", [np.float32, np.float64]) +@pytest.mark.parametrize( + "dataframe,queue", + get_dataframes_and_queues(dataframe_filter_="dpnp,dpctl", device_filter_="gpu"), +) +@pytest.mark.mpi +def test_incremental_covariance_partial_fit_spmd_synthetic( + n_samples, n_features, num_blocks, assume_centered, dataframe, queue, dtype +): + # Import spmd and batch algo + from sklearnex.covariance import IncrementalEmpiricalCovariance + from sklearnex.spmd.covariance import ( + IncrementalEmpiricalCovariance as IncrementalEmpiricalCovariance_SPMD, + ) + + # Generate data and process into dpt + data = _generate_statistic_data(n_samples, n_features, dtype=dtype) + + dpt_data = _convert_to_dataframe(data, sycl_queue=queue, target_df=dataframe) + + local_data = _get_local_tensor(data) + split_local_data = np.array_split(local_data, num_blocks) + + inccov_spmd = IncrementalEmpiricalCovariance_SPMD(assume_centered=assume_centered) + inccov = IncrementalEmpiricalCovariance(assume_centered=assume_centered) + + for i in range(num_blocks): + local_dpt_data = _convert_to_dataframe( + split_local_data[i], sycl_queue=queue, target_df=dataframe + ) + inccov_spmd.partial_fit(local_dpt_data) + + inccov.fit(dpt_data) + + tol = 1e-7 + + assert_allclose(inccov_spmd.covariance_, inccov.covariance_, atol=tol) + assert_allclose(inccov_spmd.location_, inccov.location_, atol=tol) diff --git a/tests/run_examples.py b/tests/run_examples.py index d44a1bceb4..8f2f10ad01 100755 --- a/tests/run_examples.py +++ b/tests/run_examples.py @@ -146,6 +146,7 @@ def check_library(rule): req_device["covariance_spmd.py"] = ["gpu"] req_device["dbscan_spmd.py"] = ["gpu"] req_device["incremental_basic_statistics_dpctl.py"] = ["gpu"] +req_device["incremental_covariance_spmd.py"] = ["gpu"] req_device["incremental_linear_regression_dpctl.py"] = ["gpu"] req_device["incremental_pca_dpctl.py"] = ["gpu"] req_device["kmeans_spmd.py"] = ["gpu"] @@ -165,6 +166,7 @@ def check_library(rule): req_library["covariance_spmd.py"] = ["dpctl", "mpi4py"] req_library["dbscan_spmd.py"] = ["dpctl", "mpi4py"] req_library["incremental_basic_statistics_dpctl.py"] = ["dpctl"] +req_library["incremental_covariance_spmd.py"] = ["dpctl", "mpi4py"] req_library["incremental_linear_regression_dpctl.py"] = ["dpctl"] req_library["incremental_pca_dpctl.py"] = ["dpctl"] req_library["kmeans_spmd.py"] = ["dpctl", "mpi4py"]