Skip to content

Commit

Permalink
add data attribute and unit testing
Browse files Browse the repository at this point in the history
  • Loading branch information
ahosler committed Dec 16, 2024
1 parent 5cf2b6d commit 320aeeb
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 26 deletions.
27 changes: 13 additions & 14 deletions ads/opctl/config/merger.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,33 @@
#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2022, 2023 Oracle and/or its affiliates.
# Copyright (c) 2022, 2024 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

import os
from string import Template
from typing import Dict
import json

import yaml

from ads.common.auth import AuthType, ResourcePrincipal
from ads.opctl import logger
from ads.opctl.config.base import ConfigProcessor
from ads.opctl.config.utils import read_from_ini, _DefaultNoneDict
from ads.opctl.utils import is_in_notebook_session, get_service_pack_prefix
from ads.opctl.config.utils import _DefaultNoneDict, read_from_ini
from ads.opctl.constants import (
DEFAULT_PROFILE,
DEFAULT_OCI_CONFIG_FILE,
DEFAULT_CONDA_PACK_FOLDER,
DEFAULT_ADS_CONFIG_FOLDER,
ADS_JOBS_CONFIG_FILE_NAME,
ADS_CONFIG_FILE_NAME,
ADS_ML_PIPELINE_CONFIG_FILE_NAME,
ADS_DATAFLOW_CONFIG_FILE_NAME,
ADS_JOBS_CONFIG_FILE_NAME,
ADS_LOCAL_BACKEND_CONFIG_FILE_NAME,
ADS_ML_PIPELINE_CONFIG_FILE_NAME,
ADS_MODEL_DEPLOYMENT_CONFIG_FILE_NAME,
DEFAULT_NOTEBOOK_SESSION_CONDA_DIR,
BACKEND_NAME,
DEFAULT_ADS_CONFIG_FOLDER,
DEFAULT_CONDA_PACK_FOLDER,
DEFAULT_NOTEBOOK_SESSION_CONDA_DIR,
DEFAULT_OCI_CONFIG_FILE,
DEFAULT_PROFILE,
)
from ads.opctl.utils import get_service_pack_prefix, is_in_notebook_session


class ConfigMerger(ConfigProcessor):
Expand All @@ -41,8 +39,9 @@ class ConfigMerger(ConfigProcessor):
"""

def process(self, **kwargs) -> None:
config_string = Template(json.dumps(self.config)).safe_substitute(os.environ)
self.config = json.loads(config_string)
for key, value in self.config.items():
if isinstance(value, str): # Substitute only if the value is a string
self.config[key] = Template(value).safe_substitute(os.environ)

if "runtime" not in self.config:
self.config["runtime"] = {}
Expand Down
8 changes: 4 additions & 4 deletions ads/opctl/operator/common/operator_config.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env python
# -*- coding: utf-8; -*-

# Copyright (c) 2023 Oracle and/or its affiliates.
# Copyright (c) 2023, 2024 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/


Expand All @@ -11,15 +10,16 @@
from typing import Any, Dict, List

from ads.common.serializer import DataClassSerializable

from ads.opctl.operator.common.utils import OperatorValidator
from ads.opctl.operator.common.errors import InvalidParameterError
from ads.opctl.operator.common.utils import OperatorValidator


@dataclass(repr=True)
class InputData(DataClassSerializable):
"""Class representing operator specification input data details."""

connect_args: Dict = None
data: Dict = None
format: str = None
columns: List[str] = None
url: str = None
Expand Down
6 changes: 5 additions & 1 deletion ads/opctl/operator/lowcode/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def load_data(data_spec, storage_options=None, **kwargs):
if data_spec is None:
raise InvalidParameterError("No details provided for this data source.")
filename = data_spec.url
data = data_spec.data
format = data_spec.format
columns = data_spec.columns
connect_args = data_spec.connect_args
Expand All @@ -53,7 +54,10 @@ def load_data(data_spec, storage_options=None, **kwargs):
if vault_secret_id is not None and connect_args is None:
connect_args = dict()

if filename is not None:
if data is not None:
if format == "spark":
data = data.toPandas()
elif filename is not None:
if not format:
_, format = os.path.splitext(filename)
format = format[1:]
Expand Down
33 changes: 26 additions & 7 deletions ads/opctl/operator/lowcode/forecast/model/base_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@

from ..const import (
AUTO_SELECT,
BACKTEST_REPORT_NAME,
SUMMARY_METRICS_HORIZON_LIMIT,
SpeedAccuracyMode,
SupportedMetrics,
SupportedModels,
BACKTEST_REPORT_NAME
)
from ..operator_config import ForecastOperatorConfig, ForecastOperatorSpec
from .forecast_datasets import ForecastDatasets
Expand Down Expand Up @@ -259,7 +259,11 @@ def generate_report(self):
output_dir = self.spec.output_directory.url
file_path = f"{output_dir}/{BACKTEST_REPORT_NAME}"
if self.spec.model == AUTO_SELECT:
backtest_sections.append(rc.Heading("Auto-Select Backtesting and Performance Metrics", level=2))
backtest_sections.append(
rc.Heading(
"Auto-Select Backtesting and Performance Metrics", level=2
)
)
if not os.path.exists(file_path):
failure_msg = rc.Text(
"auto-select could not be executed. Please check the "
Expand All @@ -268,15 +272,23 @@ def generate_report(self):
backtest_sections.append(failure_msg)
else:
backtest_stats = pd.read_csv(file_path)
model_metric_map = backtest_stats.drop(columns=['metric', 'backtest'])
average_dict = {k: round(v, 4) for k, v in model_metric_map.mean().to_dict().items()}
model_metric_map = backtest_stats.drop(
columns=["metric", "backtest"]
)
average_dict = {
k: round(v, 4)
for k, v in model_metric_map.mean().to_dict().items()
}
best_model = min(average_dict, key=average_dict.get)
summary_text = rc.Text(
f"Overall, the average {self.spec.metric} scores for the models are {average_dict}, with"
f" {best_model} being identified as the top-performing model during backtesting.")
f" {best_model} being identified as the top-performing model during backtesting."
)
backtest_table = rc.DataTable(backtest_stats, index=True)
liner_plot = get_auto_select_plot(backtest_stats)
backtest_sections.extend([backtest_table, summary_text, liner_plot])
backtest_sections.extend(
[backtest_table, summary_text, liner_plot]
)

forecast_plots = []
if len(self.forecast_output.list_series_ids()) > 0:
Expand All @@ -301,7 +313,14 @@ def generate_report(self):
forecast_plots = [forecast_text, forecast_sec]

yaml_appendix_title = rc.Heading("Reference: YAML File", level=2)
yaml_appendix = rc.Yaml(self.config.to_dict())
config_dict = self.config.to_dict()
# pop the data incase it isn't json serializable
config_dict["spec"]["historical_data"].pop("data")
if config_dict["spec"].get("additional_data"):
config_dict["spec"]["additional_data"].pop("data")
if config_dict["spec"].get("test_data"):
config_dict["spec"]["test_data"].pop("data")
yaml_appendix = rc.Yaml(config_dict)
report_sections = (
[summary]
+ backtest_sections
Expand Down
12 changes: 12 additions & 0 deletions ads/opctl/operator/lowcode/forecast/schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ spec:
nullable: true
required: false
type: dict
data:
nullable: true
required: false
format:
allowed:
- csv
Expand All @@ -48,6 +51,7 @@ spec:
- sql_query
- hdf
- tsv
- pandas
required: false
type: string
columns:
Expand Down Expand Up @@ -92,6 +96,9 @@ spec:
nullable: true
required: false
type: dict
data:
nullable: true
required: false
format:
allowed:
- csv
Expand All @@ -103,6 +110,7 @@ spec:
- sql_query
- hdf
- tsv
- pandas
required: false
type: string
columns:
Expand Down Expand Up @@ -146,6 +154,9 @@ spec:
nullable: true
required: false
type: dict
data:
nullable: true
required: false
format:
allowed:
- csv
Expand All @@ -157,6 +168,7 @@ spec:
- sql_query
- hdf
- tsv
- pandas
required: false
type: string
columns:
Expand Down
110 changes: 110 additions & 0 deletions tests/operators/forecast/test_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,116 @@ def test_smape_error():
assert result == 0


@pytest.mark.parametrize("model", MODELS)
def test_pandas_historical_input(operator_setup, model):
from ads.opctl.operator.lowcode.forecast.__main__ import operate
from ads.opctl.operator.lowcode.forecast.model.forecast_datasets import (
ForecastDatasets,
)
from ads.opctl.operator.lowcode.forecast.operator_config import (
ForecastOperatorConfig,
)

tmpdirname = operator_setup
historical_data_path, additional_data_path = setup_small_rossman()
yaml_i, output_data_path = populate_yaml(
tmpdirname=tmpdirname,
historical_data_path=historical_data_path,
additional_data_path=additional_data_path,
)
yaml_i["spec"]["horizon"] = 10
yaml_i["spec"]["model"] = model
df = pd.read_csv(historical_data_path)
yaml_i["spec"]["historical_data"].pop("url")
yaml_i["spec"]["historical_data"]["data"] = df
yaml_i["spec"]["historical_data"]["format"] = "pandas"

operator_config = ForecastOperatorConfig.from_dict(yaml_i)
operate(operator_config)
assert pd.read_csv(additional_data_path)["Date"].equals(
pd.read_csv(f"{tmpdirname}/results/forecast.csv")["Date"]
)


@pytest.mark.parametrize("model", MODELS)
def test_pandas_additional_input(operator_setup, model):
from ads.opctl.operator.lowcode.forecast.__main__ import operate
from ads.opctl.operator.lowcode.forecast.model.forecast_datasets import (
ForecastDatasets,
)
from ads.opctl.operator.lowcode.forecast.operator_config import (
ForecastOperatorConfig,
)

tmpdirname = operator_setup
historical_data_path, additional_data_path = setup_small_rossman()
yaml_i, output_data_path = populate_yaml(
tmpdirname=tmpdirname,
historical_data_path=historical_data_path,
additional_data_path=additional_data_path,
)
yaml_i["spec"]["horizon"] = 10
yaml_i["spec"]["model"] = model
df = pd.read_csv(historical_data_path)
yaml_i["spec"]["historical_data"].pop("url")
yaml_i["spec"]["historical_data"]["data"] = df
yaml_i["spec"]["historical_data"]["format"] = "pandas"

df_add = pd.read_csv(additional_data_path)
yaml_i["spec"]["additional_data"].pop("url")
yaml_i["spec"]["additional_data"]["data"] = df_add
yaml_i["spec"]["additional_data"]["format"] = "pandas"

operator_config = ForecastOperatorConfig.from_dict(yaml_i)
operate(operator_config)
assert pd.read_csv(additional_data_path)["Date"].equals(
pd.read_csv(f"{tmpdirname}/results/forecast.csv")["Date"]
)


# def test_spark_additional_input(operator_setup):
# from ads.opctl.operator.lowcode.forecast.__main__ import operate
# from ads.opctl.operator.lowcode.forecast.model.forecast_datasets import ForecastDatasets
# from ads.opctl.operator.lowcode.forecast.operator_config import ForecastOperatorConfig
# from pyspark.sql import SparkSession
# from pyspark import SparkContext

# spark = SparkSession.builder.getOrCreate()

# tmpdirname = operator_setup
# historical_data_path, additional_data_path = setup_small_rossman()
# yaml_i, output_data_path = populate_yaml(
# tmpdirname=tmpdirname,
# historical_data_path=historical_data_path,
# additional_data_path=additional_data_path,
# )
# yaml_i["spec"]["horizon"] = 10
# yaml_i["spec"]["model"] = "prophet"

# df = pd.read_csv(historical_data_path)
# spark_df = spark.createDataFrame(df)

# def _run_operator(df):
# yaml_i["spec"]["historical_data"].pop("url")
# yaml_i["spec"]["historical_data"]["data"] = spark_df
# yaml_i["spec"]["historical_data"]["format"] = "spark"
# operator_config = ForecastOperatorConfig.from_dict(yaml_i)
# operate(operator_config)

# # df_add = pd.read_csv(additional_data_path)
# # spark_df_add = spark.createDataFrame(df_add)
# # yaml_i["spec"]["additional_data"].pop("url")
# # yaml_i["spec"]["additional_data"]["data"] = spark_df_add
# # yaml_i["spec"]["additional_data"]["format"] = "spark"

# rdd_processed = spark_df.rdd.map(lambda x: _run_operator(x, broadcast_yaml_i))
# print(rdd_processed.collect())

# assert pd.read_csv(additional_data_path)["Date"].equals(
# pd.read_csv(f"{tmpdirname}/results/forecast.csv")["Date"]
# )


@pytest.mark.parametrize("model", MODELS)
def test_date_format(operator_setup, model):
tmpdirname = operator_setup
Expand Down

0 comments on commit 320aeeb

Please sign in to comment.