Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dataflow changes #1018

Merged
merged 21 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/run-forecast-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ jobs:
$CONDA/bin/conda init
source /home/runner/.bashrc
pip install -r test-requirements-operators.txt
pip install "oracle-automlx[forecasting]>=24.4.0"
pip install "oracle-automlx[forecasting]>=24.4.1"
pip install pandas>=2.2.0
python -m pytest -v -p no:warnings --durations=5 tests/operators/forecast
27 changes: 13 additions & 14 deletions ads/opctl/config/merger.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,33 @@
#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2022, 2023 Oracle and/or its affiliates.
# Copyright (c) 2022, 2024 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

import os
from string import Template
from typing import Dict
import json

import yaml

from ads.common.auth import AuthType, ResourcePrincipal
from ads.opctl import logger
from ads.opctl.config.base import ConfigProcessor
from ads.opctl.config.utils import read_from_ini, _DefaultNoneDict
from ads.opctl.utils import is_in_notebook_session, get_service_pack_prefix
from ads.opctl.config.utils import _DefaultNoneDict, read_from_ini
from ads.opctl.constants import (
DEFAULT_PROFILE,
DEFAULT_OCI_CONFIG_FILE,
DEFAULT_CONDA_PACK_FOLDER,
DEFAULT_ADS_CONFIG_FOLDER,
ADS_JOBS_CONFIG_FILE_NAME,
ADS_CONFIG_FILE_NAME,
ADS_ML_PIPELINE_CONFIG_FILE_NAME,
ADS_DATAFLOW_CONFIG_FILE_NAME,
ADS_JOBS_CONFIG_FILE_NAME,
ADS_LOCAL_BACKEND_CONFIG_FILE_NAME,
ADS_ML_PIPELINE_CONFIG_FILE_NAME,
ADS_MODEL_DEPLOYMENT_CONFIG_FILE_NAME,
DEFAULT_NOTEBOOK_SESSION_CONDA_DIR,
BACKEND_NAME,
DEFAULT_ADS_CONFIG_FOLDER,
DEFAULT_CONDA_PACK_FOLDER,
DEFAULT_NOTEBOOK_SESSION_CONDA_DIR,
DEFAULT_OCI_CONFIG_FILE,
DEFAULT_PROFILE,
)
from ads.opctl.utils import get_service_pack_prefix, is_in_notebook_session


class ConfigMerger(ConfigProcessor):
Expand All @@ -41,8 +39,9 @@ class ConfigMerger(ConfigProcessor):
"""

def process(self, **kwargs) -> None:
config_string = Template(json.dumps(self.config)).safe_substitute(os.environ)
self.config = json.loads(config_string)
for key, value in self.config.items():
if isinstance(value, str): # Substitute only if the value is a string
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment is factual, but why only string values? My mind went to things like pulling a host and port out of the env, but that would break with only strings being pulled in? I guess a comment explaining why only keeping strings would be useful

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great point. It's so that the validator doesn't try to substitute anything within the pandas dataframe object.
We were getting all sorts of issues with Cerberus on a pandas df. As a first pass, removing non-string from the validator.

Ultimately most numbers get converted to string within yaml

self.config[key] = Template(value).safe_substitute(os.environ)

if "runtime" not in self.config:
self.config["runtime"] = {}
Expand Down
8 changes: 4 additions & 4 deletions ads/opctl/operator/common/operator_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2023 Oracle and/or its affiliates.
# Copyright (c) 2023, 2024 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/


Expand All @@ -11,15 +10,16 @@
from typing import Any, Dict, List

from ads.common.serializer import DataClassSerializable

from ads.opctl.operator.common.utils import OperatorValidator
from ads.opctl.operator.common.errors import InvalidParameterError
from ads.opctl.operator.common.utils import OperatorValidator


@dataclass(repr=True)
class InputData(DataClassSerializable):
"""Class representing operator specification input data details."""

connect_args: Dict = None
data: Dict = None
format: str = None
columns: List[str] = None
url: str = None
Expand Down
17 changes: 12 additions & 5 deletions ads/opctl/operator/lowcode/common/transformations.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
#!/usr/bin/env python

# Copyright (c) 2023, 2024 Oracle and/or its affiliates.
# Copyright (c) 2023, 2025 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

from abc import ABC

import numpy as np
import pandas as pd

from ads.opctl import logger
Expand Down Expand Up @@ -209,18 +210,24 @@ def _outlier_treatment(self, df):
-------
A new Pandas DataFrame with treated outliears.
"""
df["z_score"] = (
return df
df["__z_score"] = (
df[self.target_column_name]
.groupby(DataColumns.Series)
.transform(lambda x: (x - x.mean()) / x.std())
)
outliers_mask = df["z_score"].abs() > 3
outliers_mask = df["__z_score"].abs() > 3

if df[self.target_column_name].dtype == np.int:
df[self.target_column_name].astype(np.float)

df.loc[outliers_mask, self.target_column_name] = (
df[self.target_column_name]
.groupby(DataColumns.Series)
.transform(lambda x: x.mean())
.transform(lambda x: np.median(x))
)
return df.drop("z_score", axis=1)
df_ret = df.drop("__z_score", axis=1)
return df_ret

def _check_historical_dataset(self, df):
expected_names = [self.target_column_name, self.dt_column_name] + (
Expand Down
16 changes: 11 additions & 5 deletions ads/opctl/operator/lowcode/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

# Copyright (c) 2024 Oracle and/or its affiliates.
# Copyright (c) 2024, 2025 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

import logging
Expand Down Expand Up @@ -40,6 +40,7 @@ def load_data(data_spec, storage_options=None, **kwargs):
if data_spec is None:
raise InvalidParameterError("No details provided for this data source.")
filename = data_spec.url
data = data_spec.data
format = data_spec.format
columns = data_spec.columns
connect_args = data_spec.connect_args
Expand All @@ -51,9 +52,12 @@ def load_data(data_spec, storage_options=None, **kwargs):
default_signer() if ObjectStorageDetails.is_oci_path(filename) else {}
)
if vault_secret_id is not None and connect_args is None:
connect_args = dict()
connect_args = {}

if filename is not None:
if data is not None:
if format == "spark":
data = data.toPandas()
elif filename is not None:
if not format:
_, format = os.path.splitext(filename)
format = format[1:]
Expand Down Expand Up @@ -98,7 +102,7 @@ def load_data(data_spec, storage_options=None, **kwargs):
except Exception as e:
raise Exception(
f"Could not retrieve database credentials from vault {vault_secret_id}: {e}"
)
) from e

con = oracledb.connect(**connect_args)
if table_name is not None:
Expand All @@ -122,6 +126,7 @@ def load_data(data_spec, storage_options=None, **kwargs):


def write_data(data, filename, format, storage_options, index=False, **kwargs):
disable_print()
if not format:
_, format = os.path.splitext(filename)
format = format[1:]
Expand All @@ -130,7 +135,8 @@ def write_data(data, filename, format, storage_options, index=False, **kwargs):
return call_pandas_fsspec(
write_fn, filename, index=index, storage_options=storage_options, **kwargs
)
raise OperatorYamlContentError(
enable_print()
raise InvalidParameterError(
f"The format {format} is not currently supported for writing data. Please change the format parameter for the data output: {filename} ."
)

Expand Down
36 changes: 20 additions & 16 deletions ads/opctl/operator/lowcode/forecast/model/automlx.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,6 @@ def _build_model(self) -> pd.DataFrame:

from automlx import Pipeline, init

cpu_count = os.cpu_count()
try:
if cpu_count < 4:
engine = "local"
engine_opts = None
else:
engine = "ray"
engine_opts = ({"ray_setup": {"_temp_dir": "/tmp/ray-temp"}},)
init(
engine=engine,
engine_opts=engine_opts,
loglevel=logging.CRITICAL,
)
except Exception as e:
logger.info(f"Error. Has Ray already been initialized? Skipping. {e}")

full_data_dict = self.datasets.get_data_by_series()

self.models = {}
Expand All @@ -113,6 +97,26 @@ def _build_model(self) -> pd.DataFrame:
# Clean up kwargs for pass through
model_kwargs_cleaned, time_budget = self.set_kwargs()

cpu_count = os.cpu_count()
try:
engine_type = model_kwargs_cleaned.pop(
"engine", "local" if cpu_count <= 4 else "ray"
)
engine_opts = (
None
if engine_type == "local"
else ({"ray_setup": {"_temp_dir": "/tmp/ray-temp"}},)
)
init(
engine=engine_type,
engine_opts=engine_opts,
loglevel=logging.CRITICAL,
)
except Exception as e:
logger.info(
f"Error initializing automlx. Has Ray already been initialized? Skipping. {e}"
)

for s_id, df in full_data_dict.items():
try:
logger.debug(f"Running automlx on series {s_id}")
Expand Down
10 changes: 9 additions & 1 deletion ads/opctl/operator/lowcode/forecast/model/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

from ..const import (
AUTO_SELECT,
BACKTEST_REPORT_NAME,
SUMMARY_METRICS_HORIZON_LIMIT,
SpeedAccuracyMode,
SupportedMetrics,
Expand Down Expand Up @@ -321,7 +322,14 @@ def generate_report(self):
forecast_plots = [forecast_text, forecast_sec]

yaml_appendix_title = rc.Heading("Reference: YAML File", level=2)
yaml_appendix = rc.Yaml(self.config.to_dict())
config_dict = self.config.to_dict()
# pop the data incase it isn't json serializable
config_dict["spec"]["historical_data"].pop("data")
if config_dict["spec"].get("additional_data"):
config_dict["spec"]["additional_data"].pop("data")
if config_dict["spec"].get("test_data"):
config_dict["spec"]["test_data"].pop("data")
yaml_appendix = rc.Yaml(config_dict)
report_sections = (
[summary]
+ backtest_sections
Expand Down
6 changes: 1 addition & 5 deletions ads/opctl/operator/lowcode/forecast/model/prophet.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,11 +369,7 @@ def _generate_report(self):
logger.debug(f"Full Traceback: {traceback.format_exc()}")

model_description = rc.Text(
"Prophet is a procedure for forecasting time series data based on an additive "
"model where non-linear trends are fit with yearly, weekly, and daily seasonality, "
"plus holiday effects. It works best with time series that have strong seasonal "
"effects and several seasons of historical data. Prophet is robust to missing "
"data and shifts in the trend, and typically handles outliers well."
"""Prophet is a procedure for forecasting time series data based on an additive model where non-linear trends are fit with yearly, weekly, and daily seasonality, plus holiday effects. It works best with time series that have strong seasonal effects and several seasons of historical data. Prophet is robust to missing data and shifts in the trend, and typically handles outliers well."""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is what DeepSeek things sounds better:

Prophet is a time-series forecasting tool that uses an additive model. It models non-linear trends and incorporates yearly, weekly, and daily seasonality, as well as the impact of holidays. Prophet is particularly effective for time series exhibiting strong seasonality and benefiting from multiple years of historical data. It's designed to be resilient to missing data, shifts in trends, and outliers.

)
other_sections = all_sections

Expand Down
12 changes: 12 additions & 0 deletions ads/opctl/operator/lowcode/forecast/schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ spec:
nullable: true
required: false
type: dict
data:
nullable: true
required: false
format:
allowed:
- csv
Expand All @@ -48,6 +51,7 @@ spec:
- sql_query
- hdf
- tsv
- pandas
required: false
type: string
columns:
Expand Down Expand Up @@ -92,6 +96,9 @@ spec:
nullable: true
required: false
type: dict
data:
nullable: true
required: false
format:
allowed:
- csv
Expand All @@ -103,6 +110,7 @@ spec:
- sql_query
- hdf
- tsv
- pandas
required: false
type: string
columns:
Expand Down Expand Up @@ -146,6 +154,9 @@ spec:
nullable: true
required: false
type: dict
data:
nullable: true
required: false
format:
allowed:
- csv
Expand All @@ -157,6 +168,7 @@ spec:
- sql_query
- hdf
- tsv
- pandas
required: false
type: string
columns:
Expand Down
11 changes: 5 additions & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,27 +157,26 @@ forecast = [
"oci-cli",
"py-cpuinfo",
"rich",
"autots[additional]",
"autots",
"mlforecast",
"neuralprophet>=0.7.0",
"numpy<2.0.0",
"oci-cli",
"optuna",
"oracle-ads",
"pmdarima",
"prophet",
"shap",
"sktime",
"statsmodels",
"plotly",
"oracledb",
"report-creator==1.0.28",
"report-creator==1.0.32",
]
anomaly = [
"oracle_ads[opctl]",
"autots",
"oracledb",
"report-creator==1.0.28",
"report-creator==1.0.32",
"rrcf==0.4.4",
"scikit-learn<1.6.0",
"salesforce-merlion[all]==2.0.4"
Expand All @@ -186,7 +185,7 @@ recommender = [
"oracle_ads[opctl]",
"scikit-surprise",
"plotly",
"report-creator==1.0.28",
"report-creator==1.0.32",
]
feature-store-marketplace = [
"oracle-ads[opctl]",
Expand All @@ -202,7 +201,7 @@ pii = [
"scrubadub_spacy",
"spacy-transformers==1.2.5",
"spacy==3.6.1",
"report-creator==1.0.28",
"report-creator==1.0.32",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latest is 1.0.37

]
llm = ["langchain>=0.2", "langchain-community", "langchain_openai", "pydantic>=2,<3", "evaluate>=0.4.0"]
aqua = ["jupyter_server"]
Expand Down
Loading
Loading