Skip to content

Commit

Permalink
ENH: SPMD interface for IncrementalLinearRegression (#1972)
Browse files Browse the repository at this point in the history
  • Loading branch information
olegkkruglov authored Sep 5, 2024
1 parent a8b7373 commit 45fc83d
Show file tree
Hide file tree
Showing 9 changed files with 504 additions and 39 deletions.
56 changes: 29 additions & 27 deletions onedal/linear_model/incremental_linear_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ class IncrementalLinearRegression(BaseLinearRegression):
"""

def __init__(self, fit_intercept=True, copy_X=False, algorithm="norm_eq"):
module = self._get_backend("linear_model", "regression")
super().__init__(fit_intercept=fit_intercept, copy_X=copy_X, algorithm=algorithm)
self._partial_result = module.partial_train_result()
self._reset()

def _reset(self):
module = self._get_backend("linear_model", "regression")
self._partial_result = module.partial_train_result()
self._partial_result = self._get_backend(
"linear_model", "regression", "partial_train_result"
)

def partial_fit(self, X, y, queue=None):
"""
Expand All @@ -74,26 +74,27 @@ def partial_fit(self, X, y, queue=None):
"""
module = self._get_backend("linear_model", "regression")

if not hasattr(self, "_policy"):
self._policy = self._get_policy(queue, X)
self._queue = queue
policy = self._get_policy(queue, X)

X, y = _convert_to_supported(self._policy, X, y)
X, y = _convert_to_supported(policy, X, y)

if not hasattr(self, "_dtype"):
self._dtype = get_dtype(X)
self._params = self._get_onedal_params(self._dtype)

y = np.asarray(y).astype(dtype=self._dtype)
self._y_ndim_1 = y.ndim == 1
y = np.asarray(y, dtype=self._dtype)

X, y = _check_X_y(X, y, dtype=[np.float64, np.float32], accept_2d_y=True)
X, y = _check_X_y(
X, y, dtype=[np.float64, np.float32], accept_2d_y=True, force_all_finite=False
)

self.n_features_in_ = _num_features(X, fallback_1d=True)
X_table, y_table = to_table(X, y)
hparams = get_hyperparameters("linear_regression", "train")
if hparams is not None and not hparams.is_default:
self._partial_result = module.partial_train(
self._policy,
policy,
self._params,
hparams.backend,
self._partial_result,
Expand All @@ -102,7 +103,7 @@ def partial_fit(self, X, y, queue=None):
)
else:
self._partial_result = module.partial_train(
self._policy, self._params, self._partial_result, X_table, y_table
policy, self._params, self._partial_result, X_table, y_table
)

def finalize_fit(self, queue=None):
Expand All @@ -113,36 +114,36 @@ def finalize_fit(self, queue=None):
Parameters
----------
queue : dpctl.SyclQueue
Not used here, added for API conformance
If not None, use this queue for computations.
Returns
-------
self : object
Returns the instance itself.
"""

if queue is not None:
policy = self._get_policy(queue)
else:
policy = self._get_policy(self._queue)

module = self._get_backend("linear_model", "regression")
hparams = get_hyperparameters("linear_regression", "train")
if hparams is not None and not hparams.is_default:
result = module.finalize_train(
self._policy, self._params, hparams.backend, self._partial_result
policy, self._params, hparams.backend, self._partial_result
)
else:
result = module.finalize_train(
self._policy, self._params, self._partial_result
)
result = module.finalize_train(policy, self._params, self._partial_result)

self._onedal_model = result.model

packed_coefficients = from_table(result.model.packed_coefficients)
self.coef_, self.intercept_ = (
packed_coefficients[:, 1:],
packed_coefficients[:, 0],
packed_coefficients[:, 1:].squeeze(),
packed_coefficients[:, 0].squeeze(),
)

if self.coef_.shape[0] == 1 and self._y_ndim_1:
self.coef_ = self.coef_.ravel()
self.intercept_ = self.intercept_[0]

return self


Expand Down Expand Up @@ -203,8 +204,7 @@ def partial_fit(self, X, y, queue=None):
"""
module = self._get_backend("linear_model", "regression")

if not hasattr(self, "_queue"):
self._queue = queue
self._queue = queue
policy = self._get_policy(queue, X)

X, y = _convert_to_supported(policy, X, y)
Expand All @@ -213,9 +213,11 @@ def partial_fit(self, X, y, queue=None):
self._dtype = get_dtype(X)
self._params = self._get_onedal_params(self._dtype)

y = np.asarray(y).astype(dtype=self._dtype)
y = np.asarray(y, dtype=self._dtype)

X, y = _check_X_y(X, y, dtype=[np.float64, np.float32], accept_2d_y=True)
X, y = _check_X_y(
X, y, dtype=[np.float64, np.float32], accept_2d_y=True, force_all_finite=False
)

self.n_features_in_ = _num_features(X, fallback_1d=True)
X_table, y_table = to_table(X, y)
Expand Down
1 change: 1 addition & 0 deletions onedal/linear_model/linear_model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ ONEDAL_PY_INIT_MODULE(linear_model) {
#ifdef ONEDAL_DATA_PARALLEL_SPMD
ONEDAL_PY_INSTANTIATE(init_train_ops, sub, policy_spmd, task_list);
ONEDAL_PY_INSTANTIATE(init_infer_ops, sub, policy_spmd, task_list);
ONEDAL_PY_INSTANTIATE(init_finalize_train_ops, sub, policy_spmd, task_list);
#else // ONEDAL_DATA_PARALLEL_SPMD
ONEDAL_PY_INSTANTIATE(init_train_ops, sub, policy_list, task_list);
ONEDAL_PY_INSTANTIATE(init_infer_ops, sub, policy_list, task_list);
Expand Down
3 changes: 2 additions & 1 deletion onedal/spmd/linear_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
# limitations under the License.
# ==============================================================================

from .incremental_linear_model import IncrementalLinearRegression
from .linear_model import LinearRegression
from .logistic_regression import LogisticRegression

__all__ = ["LinearRegression", "LogisticRegression"]
__all__ = ["IncrementalLinearRegression", "LinearRegression", "LogisticRegression"]
97 changes: 97 additions & 0 deletions onedal/spmd/linear_model/incremental_linear_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# ==============================================================================
# Copyright 2024 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

import numpy as np

from daal4py.sklearn._utils import get_dtype

from ...common.hyperparameters import get_hyperparameters
from ...datatypes import _convert_to_supported, to_table
from ...linear_model import (
IncrementalLinearRegression as base_IncrementalLinearRegression,
)
from ...utils import _check_X_y, _num_features
from .._base import BaseEstimatorSPMD


class IncrementalLinearRegression(BaseEstimatorSPMD, base_IncrementalLinearRegression):
"""
Distributed incremental Linear Regression oneDAL implementation.
API is the same as for `onedal.linear_model.IncrementalLinearRegression`.
"""

def _reset(self):
self._partial_result = super(base_IncrementalLinearRegression, self)._get_backend(
"linear_model", "regression", "partial_train_result"
)

def partial_fit(self, X, y, queue=None):
"""
Computes partial data for linear regression
from data batch X and saves it to `_partial_result`.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data batch, where `n_samples` is the number of samples
in the batch, and `n_features` is the number of features.
y: array-like of shape (n_samples,) or (n_samples, n_targets) in
case of multiple targets
Responses for training data.
queue : dpctl.SyclQueue
If not None, use this queue for computations.
Returns
-------
self : object
Returns the instance itself.
"""
module = super(base_IncrementalLinearRegression, self)._get_backend(
"linear_model", "regression"
)

self._queue = queue
policy = super(base_IncrementalLinearRegression, self)._get_policy(queue, X)

X, y = _convert_to_supported(policy, X, y)

if not hasattr(self, "_dtype"):
self._dtype = get_dtype(X)
self._params = self._get_onedal_params(self._dtype)

y = np.asarray(y, dtype=self._dtype)

X, y = _check_X_y(
X, y, dtype=[np.float64, np.float32], accept_2d_y=True, force_all_finite=False
)

self.n_features_in_ = _num_features(X, fallback_1d=True)
X_table, y_table = to_table(X, y)
hparams = get_hyperparameters("linear_regression", "train")
if hparams is not None and not hparams.is_default:
self._partial_result = module.partial_train(
policy,
self._params,
hparams.backend,
self._partial_result,
X_table,
y_table,
)
else:
self._partial_result = module.partial_train(
policy, self._params, self._partial_result, X_table, y_table
)
11 changes: 5 additions & 6 deletions sklearnex/linear_model/incremental_linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def _onedal_predict(self, X, queue=None):
assert hasattr(self, "_onedal_estimator")
if self._need_to_finalize:
self._onedal_finalize_fit()
return self._onedal_estimator.predict(X, queue)
return self._onedal_estimator.predict(X, queue=queue)

def _onedal_score(self, X, y, sample_weight=None, queue=None):
return r2_score(
Expand Down Expand Up @@ -194,17 +194,17 @@ def _onedal_partial_fit(self, X, y, check_input=True, queue=None):
onedal_params = {"fit_intercept": self.fit_intercept, "copy_X": self.copy_X}
if not hasattr(self, "_onedal_estimator"):
self._onedal_estimator = self._onedal_incremental_linear(**onedal_params)
self._onedal_estimator.partial_fit(X, y, queue)
self._onedal_estimator.partial_fit(X, y, queue=queue)
self._need_to_finalize = True

def _onedal_finalize_fit(self):
def _onedal_finalize_fit(self, queue=None):
assert hasattr(self, "_onedal_estimator")
is_underdetermined = self.n_samples_seen_ < self.n_features_in_ + int(
self.fit_intercept
)
if is_underdetermined:
raise ValueError("Not enough samples to finalize")
self._onedal_estimator.finalize_fit()
self._onedal_estimator.finalize_fit(queue=queue)
self._need_to_finalize = False

def _onedal_fit(self, X, y, queue=None):
Expand Down Expand Up @@ -263,8 +263,7 @@ def _onedal_fit(self, X, y, queue=None):
"Only one sample available. You may want to reshape your data array"
)

self._onedal_finalize_fit()

self._onedal_finalize_fit(queue=queue)
return self

def get_intercept_(self):
Expand Down
8 changes: 4 additions & 4 deletions sklearnex/linear_model/tests/test_incremental_linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_sklearnex_fit_on_gold_data(dataframe, queue, fit_intercept, macro_block
X = np.array([[1], [2]])
X = X.astype(dtype=dtype)
X_df = _convert_to_dataframe(X, sycl_queue=queue, target_df=dataframe)
y = np.array([1, 2])
y = np.array([[1], [2]])
y = y.astype(dtype=dtype)
y_df = _convert_to_dataframe(y, sycl_queue=queue, target_df=dataframe)

Expand Down Expand Up @@ -185,16 +185,16 @@ def test_sklearnex_partial_fit_on_random_data(
inclin.partial_fit(X_split_df, y_split_df)

tol = 1e-4 if inclin.coef_.dtype == np.float32 else 1e-7
assert_allclose(coef, inclin.coef_.T, atol=tol)
assert_allclose(coef.T.squeeze(), inclin.coef_, atol=tol)

if fit_intercept:
assert_allclose(intercept, inclin.intercept_, atol=tol)

X_test = gen.random(size=(num_samples, num_features), dtype=dtype)
if fit_intercept:
expected_y_pred = X_test @ coef + intercept[np.newaxis, :]
expected_y_pred = (X_test @ coef + intercept[np.newaxis, :]).squeeze()
else:
expected_y_pred = X_test @ coef
expected_y_pred = (X_test @ coef).squeeze()

X_test_df = _convert_to_dataframe(X_test, sycl_queue=queue, target_df=dataframe)

Expand Down
3 changes: 2 additions & 1 deletion sklearnex/spmd/linear_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
# limitations under the License.
# ==============================================================================

from .incremental_linear_model import IncrementalLinearRegression
from .linear_model import LinearRegression
from .logistic_regression import LogisticRegression

__all__ = ["LinearRegression", "LogisticRegression"]
__all__ = ["IncrementalLinearRegression", "LinearRegression", "LogisticRegression"]
35 changes: 35 additions & 0 deletions sklearnex/spmd/linear_model/incremental_linear_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# ==============================================================================
# Copyright 2024 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================


from onedal.spmd.linear_model import (
IncrementalLinearRegression as onedalSPMD_IncrementalLinearRegression,
)

from ...linear_model import (
IncrementalLinearRegression as base_IncrementalLinearRegression,
)


class IncrementalLinearRegression(base_IncrementalLinearRegression):
"""
Distributed incremental estimator for linear regression.
Allows for distributed training of linear regression if data is split into batches.
API is the same as for `sklearnex.linear_model.IncrementalLinearRegression`.
"""

_onedal_incremental_linear = staticmethod(onedalSPMD_IncrementalLinearRegression)
Loading

0 comments on commit 45fc83d

Please sign in to comment.