Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: SPMD interface for IncrementalLinearRegression #1972

Merged
merged 14 commits into from
Sep 5, 2024
56 changes: 29 additions & 27 deletions onedal/linear_model/incremental_linear_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ class IncrementalLinearRegression(BaseLinearRegression):
"""

def __init__(self, fit_intercept=True, copy_X=False, algorithm="norm_eq"):
module = self._get_backend("linear_model", "regression")
super().__init__(fit_intercept=fit_intercept, copy_X=copy_X, algorithm=algorithm)
self._partial_result = module.partial_train_result()
self._reset()

def _reset(self):
module = self._get_backend("linear_model", "regression")
self._partial_result = module.partial_train_result()
self._partial_result = self._get_backend(
"linear_model", "regression", "partial_train_result"
)

def partial_fit(self, X, y, queue=None):
"""
Expand All @@ -74,26 +74,27 @@ def partial_fit(self, X, y, queue=None):
"""
module = self._get_backend("linear_model", "regression")

if not hasattr(self, "_policy"):
self._policy = self._get_policy(queue, X)
self._queue = queue
policy = self._get_policy(queue, X)

X, y = _convert_to_supported(self._policy, X, y)
X, y = _convert_to_supported(policy, X, y)

if not hasattr(self, "_dtype"):
self._dtype = get_dtype(X)
self._params = self._get_onedal_params(self._dtype)

y = np.asarray(y).astype(dtype=self._dtype)
self._y_ndim_1 = y.ndim == 1
y = np.asarray(y, dtype=self._dtype)

X, y = _check_X_y(X, y, dtype=[np.float64, np.float32], accept_2d_y=True)
X, y = _check_X_y(
X, y, dtype=[np.float64, np.float32], accept_2d_y=True, force_all_finite=False
)

self.n_features_in_ = _num_features(X, fallback_1d=True)
X_table, y_table = to_table(X, y)
hparams = get_hyperparameters("linear_regression", "train")
if hparams is not None and not hparams.is_default:
self._partial_result = module.partial_train(
ethanglaser marked this conversation as resolved.
Show resolved Hide resolved
self._policy,
policy,
self._params,
hparams.backend,
self._partial_result,
Expand All @@ -102,7 +103,7 @@ def partial_fit(self, X, y, queue=None):
)
else:
self._partial_result = module.partial_train(
self._policy, self._params, self._partial_result, X_table, y_table
policy, self._params, self._partial_result, X_table, y_table
)

def finalize_fit(self, queue=None):
Expand All @@ -113,36 +114,36 @@ def finalize_fit(self, queue=None):
Parameters
----------
queue : dpctl.SyclQueue
Not used here, added for API conformance
If not None, use this queue for computations.

Returns
-------
self : object
Returns the instance itself.
"""

if queue is not None:
policy = self._get_policy(queue)
else:
policy = self._get_policy(self._queue)

module = self._get_backend("linear_model", "regression")
hparams = get_hyperparameters("linear_regression", "train")
if hparams is not None and not hparams.is_default:
result = module.finalize_train(
self._policy, self._params, hparams.backend, self._partial_result
policy, self._params, hparams.backend, self._partial_result
)
else:
result = module.finalize_train(
self._policy, self._params, self._partial_result
)
result = module.finalize_train(policy, self._params, self._partial_result)

self._onedal_model = result.model

packed_coefficients = from_table(result.model.packed_coefficients)
self.coef_, self.intercept_ = (
packed_coefficients[:, 1:],
packed_coefficients[:, 0],
packed_coefficients[:, 1:].squeeze(),
ethanglaser marked this conversation as resolved.
Show resolved Hide resolved
packed_coefficients[:, 0].squeeze(),
)

if self.coef_.shape[0] == 1 and self._y_ndim_1:
self.coef_ = self.coef_.ravel()
self.intercept_ = self.intercept_[0]

return self


Expand Down Expand Up @@ -203,8 +204,7 @@ def partial_fit(self, X, y, queue=None):
"""
module = self._get_backend("linear_model", "regression")

if not hasattr(self, "_queue"):
self._queue = queue
self._queue = queue
policy = self._get_policy(queue, X)

X, y = _convert_to_supported(policy, X, y)
Expand All @@ -213,9 +213,11 @@ def partial_fit(self, X, y, queue=None):
self._dtype = get_dtype(X)
self._params = self._get_onedal_params(self._dtype)

y = np.asarray(y).astype(dtype=self._dtype)
y = np.asarray(y, dtype=self._dtype)

X, y = _check_X_y(X, y, dtype=[np.float64, np.float32], accept_2d_y=True)
X, y = _check_X_y(
X, y, dtype=[np.float64, np.float32], accept_2d_y=True, force_all_finite=False
)

self.n_features_in_ = _num_features(X, fallback_1d=True)
X_table, y_table = to_table(X, y)
Expand Down
1 change: 1 addition & 0 deletions onedal/linear_model/linear_model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ ONEDAL_PY_INIT_MODULE(linear_model) {
#ifdef ONEDAL_DATA_PARALLEL_SPMD
ONEDAL_PY_INSTANTIATE(init_train_ops, sub, policy_spmd, task_list);
ONEDAL_PY_INSTANTIATE(init_infer_ops, sub, policy_spmd, task_list);
ONEDAL_PY_INSTANTIATE(init_finalize_train_ops, sub, policy_spmd, task_list);
#else // ONEDAL_DATA_PARALLEL_SPMD
ONEDAL_PY_INSTANTIATE(init_train_ops, sub, policy_list, task_list);
ONEDAL_PY_INSTANTIATE(init_infer_ops, sub, policy_list, task_list);
Expand Down
3 changes: 2 additions & 1 deletion onedal/spmd/linear_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
# limitations under the License.
# ==============================================================================

from .incremental_linear_model import IncrementalLinearRegression
from .linear_model import LinearRegression
from .logistic_regression import LogisticRegression

__all__ = ["LinearRegression", "LogisticRegression"]
__all__ = ["IncrementalLinearRegression", "LinearRegression", "LogisticRegression"]
97 changes: 97 additions & 0 deletions onedal/spmd/linear_model/incremental_linear_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# ==============================================================================
# Copyright 2024 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

import numpy as np

from daal4py.sklearn._utils import get_dtype

from ...common.hyperparameters import get_hyperparameters
from ...datatypes import _convert_to_supported, to_table
from ...linear_model import (
IncrementalLinearRegression as base_IncrementalLinearRegression,
)
from ...utils import _check_X_y, _num_features
from .._base import BaseEstimatorSPMD


class IncrementalLinearRegression(BaseEstimatorSPMD, base_IncrementalLinearRegression):
"""
Distributed incremental Linear Regression oneDAL implementation.

API is the same as for `onedal.linear_model.IncrementalLinearRegression`.
"""

def _reset(self):
ethanglaser marked this conversation as resolved.
Show resolved Hide resolved
self._partial_result = super(base_IncrementalLinearRegression, self)._get_backend(
"linear_model", "regression", "partial_train_result"
)

def partial_fit(self, X, y, queue=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still a little confused why this is re-implemented and cannot take the base estimator's - can you clarify?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't have spmd partial_fit on c++ side, that's why it is reimplemented here to take non-spmd backend

"""
Computes partial data for linear regression
from data batch X and saves it to `_partial_result`.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data batch, where `n_samples` is the number of samples
in the batch, and `n_features` is the number of features.

y: array-like of shape (n_samples,) or (n_samples, n_targets) in
case of multiple targets
Responses for training data.

queue : dpctl.SyclQueue
If not None, use this queue for computations.
Returns
-------
self : object
Returns the instance itself.
"""
module = super(base_IncrementalLinearRegression, self)._get_backend(
"linear_model", "regression"
)

self._queue = queue
policy = super(base_IncrementalLinearRegression, self)._get_policy(queue, X)

X, y = _convert_to_supported(policy, X, y)

if not hasattr(self, "_dtype"):
self._dtype = get_dtype(X)
self._params = self._get_onedal_params(self._dtype)

y = np.asarray(y, dtype=self._dtype)

X, y = _check_X_y(
X, y, dtype=[np.float64, np.float32], accept_2d_y=True, force_all_finite=False
)

self.n_features_in_ = _num_features(X, fallback_1d=True)
X_table, y_table = to_table(X, y)
hparams = get_hyperparameters("linear_regression", "train")
if hparams is not None and not hparams.is_default:
self._partial_result = module.partial_train(
ethanglaser marked this conversation as resolved.
Show resolved Hide resolved
policy,
self._params,
hparams.backend,
self._partial_result,
X_table,
y_table,
)
else:
self._partial_result = module.partial_train(
policy, self._params, self._partial_result, X_table, y_table
)
11 changes: 5 additions & 6 deletions sklearnex/linear_model/incremental_linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def _onedal_predict(self, X, queue=None):
assert hasattr(self, "_onedal_estimator")
if self._need_to_finalize:
self._onedal_finalize_fit()
return self._onedal_estimator.predict(X, queue)
return self._onedal_estimator.predict(X, queue=queue)

def _onedal_score(self, X, y, sample_weight=None, queue=None):
return r2_score(
Expand Down Expand Up @@ -194,17 +194,17 @@ def _onedal_partial_fit(self, X, y, check_input=True, queue=None):
onedal_params = {"fit_intercept": self.fit_intercept, "copy_X": self.copy_X}
if not hasattr(self, "_onedal_estimator"):
self._onedal_estimator = self._onedal_incremental_linear(**onedal_params)
self._onedal_estimator.partial_fit(X, y, queue)
self._onedal_estimator.partial_fit(X, y, queue=queue)
self._need_to_finalize = True

def _onedal_finalize_fit(self):
def _onedal_finalize_fit(self, queue=None):
assert hasattr(self, "_onedal_estimator")
is_underdetermined = self.n_samples_seen_ < self.n_features_in_ + int(
self.fit_intercept
)
if is_underdetermined:
raise ValueError("Not enough samples to finalize")
self._onedal_estimator.finalize_fit()
self._onedal_estimator.finalize_fit(queue=queue)
self._need_to_finalize = False

def _onedal_fit(self, X, y, queue=None):
Expand Down Expand Up @@ -263,8 +263,7 @@ def _onedal_fit(self, X, y, queue=None):
"Only one sample available. You may want to reshape your data array"
)

self._onedal_finalize_fit()

self._onedal_finalize_fit(queue=queue)
return self

def get_intercept_(self):
Expand Down
8 changes: 4 additions & 4 deletions sklearnex/linear_model/tests/test_incremental_linear.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_sklearnex_fit_on_gold_data(dataframe, queue, fit_intercept, macro_block
X = np.array([[1], [2]])
X = X.astype(dtype=dtype)
X_df = _convert_to_dataframe(X, sycl_queue=queue, target_df=dataframe)
y = np.array([1, 2])
y = np.array([[1], [2]])
y = y.astype(dtype=dtype)
y_df = _convert_to_dataframe(y, sycl_queue=queue, target_df=dataframe)

Expand Down Expand Up @@ -185,16 +185,16 @@ def test_sklearnex_partial_fit_on_random_data(
inclin.partial_fit(X_split_df, y_split_df)

tol = 1e-4 if inclin.coef_.dtype == np.float32 else 1e-7
assert_allclose(coef, inclin.coef_.T, atol=tol)
assert_allclose(coef.T.squeeze(), inclin.coef_, atol=tol)

if fit_intercept:
assert_allclose(intercept, inclin.intercept_, atol=tol)

X_test = gen.random(size=(num_samples, num_features), dtype=dtype)
if fit_intercept:
expected_y_pred = X_test @ coef + intercept[np.newaxis, :]
expected_y_pred = (X_test @ coef + intercept[np.newaxis, :]).squeeze()
else:
expected_y_pred = X_test @ coef
expected_y_pred = (X_test @ coef).squeeze()

X_test_df = _convert_to_dataframe(X_test, sycl_queue=queue, target_df=dataframe)

Expand Down
3 changes: 2 additions & 1 deletion sklearnex/spmd/linear_model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
# limitations under the License.
# ==============================================================================

from .incremental_linear_model import IncrementalLinearRegression
from .linear_model import LinearRegression
from .logistic_regression import LogisticRegression

__all__ = ["LinearRegression", "LogisticRegression"]
__all__ = ["IncrementalLinearRegression", "LinearRegression", "LogisticRegression"]
35 changes: 35 additions & 0 deletions sklearnex/spmd/linear_model/incremental_linear_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# ==============================================================================
# Copyright 2024 Intel Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================


from onedal.spmd.linear_model import (
IncrementalLinearRegression as onedalSPMD_IncrementalLinearRegression,
)

from ...linear_model import (
IncrementalLinearRegression as base_IncrementalLinearRegression,
)


class IncrementalLinearRegression(base_IncrementalLinearRegression):
"""
Distributed incremental estimator for linear regression.
Allows for distributed training of linear regression if data is split into batches.

API is the same as for `sklearnex.linear_model.IncrementalLinearRegression`.
"""

_onedal_incremental_linear = staticmethod(onedalSPMD_IncrementalLinearRegression)
Loading
Loading