diff --git a/numalogic/backtest/_prom.py b/numalogic/backtest/_prom.py index a777a7b6..f55aa282 100644 --- a/numalogic/backtest/_prom.py +++ b/numalogic/backtest/_prom.py @@ -37,7 +37,7 @@ from numalogic.connectors.prometheus import PrometheusFetcher from numalogic.tools.data import StreamingDataset, inverse_window from numalogic.tools.types import artifact_t -from numalogic.udfs import UDFFactory, StreamConf +from numalogic.udfs import UDFFactory, StreamConf, MLPipelineConf DEFAULT_OUTPUT_DIR = os.path.join(BASE_DIR, ".btoutput") LOGGER = logging.getLogger(__name__) @@ -54,8 +54,12 @@ def _init_default_streamconf(metrics: list[str]) -> StreamConf: return StreamConf( source=ConnectorType.prometheus, window_size=DEFAULT_SEQUENCE_LEN, - metrics=metrics, - numalogic_conf=numalogic_cfg, + ml_pipelines={ + "default": MLPipelineConf( + metrics=metrics, + numalogic_conf=numalogic_cfg, + ) + }, ) @@ -73,6 +77,7 @@ class PromUnivarBacktester: output_dir: Output directory test_ratio: Ratio of test data to total data stream_conf: Stream configuration + pipeline_id: ml pipeline id from stream_conf """ def __init__( @@ -86,18 +91,20 @@ def __init__( output_dir: Union[str, Path] = DEFAULT_OUTPUT_DIR, test_ratio: float = 0.25, stream_conf: Optional[StreamConf] = None, + pipeline_id: Optional[str] = "default", ): self._url = url self.namespace = namespace self.appname = appname self.metric = metric self.conf = stream_conf or _init_default_streamconf([metric]) + self.ml_pl_conf = self.conf.ml_pipelines[pipeline_id] self.test_ratio = test_ratio self.lookback_days = lookback_days self.return_labels = return_labels self._seq_len = self.conf.window_size - self._n_features = len(self.conf.metrics) + self._n_features = len(self.ml_pl_conf.metrics) self.out_dir = self.get_outdir(appname, metric, outdir=output_dir) self._datapath = os.path.join(self.out_dir, "data.csv") @@ -170,13 +177,13 @@ def train_models( LOGGER.info("Training data shape: %s", x_train.shape) artifacts = UDFFactory.get_udf_cls("trainer").compute( - model=ModelFactory().get_instance(self.conf.numalogic_conf.model), + model=ModelFactory().get_instance(self.ml_pl_conf.numalogic_conf.model), input_=x_train, preproc_clf=PreprocessFactory().get_pipeline_instance( - self.conf.numalogic_conf.preprocess + self.ml_pl_conf.numalogic_conf.preprocess ), - threshold_clf=ThresholdFactory().get_instance(self.conf.numalogic_conf.threshold), - numalogic_cfg=self.conf.numalogic_conf, + threshold_clf=ThresholdFactory().get_instance(self.ml_pl_conf.numalogic_conf.threshold), + numalogic_cfg=self.ml_pl_conf.numalogic_conf, ) artifacts_dict = { "model": artifacts["inference"].artifact, @@ -233,10 +240,12 @@ def generate_scores( ds = StreamingDataset(x_scaled, seq_len=self.conf.window_size) anomaly_scores = np.zeros( - (len(ds), self.conf.window_size, len(self.conf.metrics)), dtype=np.float32 + (len(ds), self.conf.window_size, len(self.ml_pl_conf.metrics)), dtype=np.float32 ) x_recon = np.zeros_like(anomaly_scores, dtype=np.float32) - postproc_func = PostprocessFactory().get_instance(self.conf.numalogic_conf.postprocess) + postproc_func = PostprocessFactory().get_instance( + self.ml_pl_conf.numalogic_conf.postprocess + ) for idx, arr in enumerate(ds): x_recon[idx] = nn_udf.compute(model=artifacts["model"], input_=arr) diff --git a/numalogic/udfs/__init__.py b/numalogic/udfs/__init__.py index a77f3453..7c4f04be 100644 --- a/numalogic/udfs/__init__.py +++ b/numalogic/udfs/__init__.py @@ -5,7 +5,7 @@ from numalogic._constants import BASE_DIR from numalogic.udfs._base import NumalogicUDF -from numalogic.udfs._config import StreamConf, PipelineConf, load_pipeline_conf +from numalogic.udfs._config import StreamConf, PipelineConf, MLPipelineConf, load_pipeline_conf from numalogic.udfs.factory import UDFFactory, ServerFactory from numalogic.udfs.inference import InferenceUDF from numalogic.udfs.postprocess import PostprocessUDF @@ -34,6 +34,7 @@ def set_logger() -> None: "UDFFactory", "StreamConf", "PipelineConf", + "MLPipelineConf", "load_pipeline_conf", "ServerFactory", "set_logger", diff --git a/numalogic/udfs/_base.py b/numalogic/udfs/_base.py index f077294f..28ce254e 100644 --- a/numalogic/udfs/_base.py +++ b/numalogic/udfs/_base.py @@ -17,8 +17,7 @@ from numalogic.tools.exceptions import ConfigNotFoundError from numalogic.tools.types import artifact_t -from numalogic.udfs._config import PipelineConf, StreamConf - +from numalogic.udfs._config import StreamConf, PipelineConf, MLPipelineConf _DEFAULT_CONF_ID = "default" @@ -61,7 +60,7 @@ def register_conf(self, config_id: str, conf: StreamConf) -> None: """ self.pl_conf.stream_confs[config_id] = conf - def _get_default_conf(self, config_id) -> StreamConf: + def _get_default_stream_conf(self, config_id) -> StreamConf: """Get the default config.""" try: return self.pl_conf.stream_confs[_DEFAULT_CONF_ID] @@ -69,9 +68,20 @@ def _get_default_conf(self, config_id) -> StreamConf: err_msg = f"Config with ID {config_id} or {_DEFAULT_CONF_ID} not found!" raise ConfigNotFoundError(err_msg) from None - def get_conf(self, config_id: str) -> StreamConf: + def _get_default_ml_pipeline_conf(self, config_id, pipeline_id) -> MLPipelineConf: + """Get the default pipeline config.""" + try: + return self.pl_conf.stream_confs[_DEFAULT_CONF_ID].ml_pipelines[_DEFAULT_CONF_ID] + except KeyError: + err_msg = ( + f"Pipeline with ID {pipeline_id} or {_DEFAULT_CONF_ID} " + f"not found for config ID {config_id}!" + ) + raise ConfigNotFoundError(err_msg) from None + + def get_stream_conf(self, config_id: str) -> StreamConf: """ - Get config with the given ID. + Get stream config with the given ID. If not found, return the default config. Args: @@ -88,7 +98,28 @@ def get_conf(self, config_id: str) -> StreamConf: try: return self.pl_conf.stream_confs[config_id] except KeyError: - return self._get_default_conf(config_id) + return self._get_default_stream_conf(config_id) + + def get_ml_pipeline_conf(self, config_id: str, pipeline_id: str) -> MLPipelineConf: + """ + Get stream's pipeline config with the given ID. + If not found, return the default config. + + Args: + config_id: Config ID + + Returns + ------- + StreamConf object + + Raises + ------ + ConfigNotFoundError: If config with the given ID is not found + """ + try: + return self.pl_conf.stream_confs[config_id].ml_pipelines[pipeline_id] + except KeyError: + return self._get_default_ml_pipeline_conf(config_id, pipeline_id) def exec(self, keys: list[str], datum: Datum) -> Messages: """ diff --git a/numalogic/udfs/_config.py b/numalogic/udfs/_config.py index b668bee9..94163d85 100644 --- a/numalogic/udfs/_config.py +++ b/numalogic/udfs/_config.py @@ -17,14 +17,20 @@ _logger = logging.getLogger(__name__) +@dataclass +class MLPipelineConf: + pipeline_id: str = "default" + metrics: list[str] = field(default_factory=list) + numalogic_conf: NumalogicConf = field(default_factory=lambda: NumalogicConf()) + + @dataclass class StreamConf: config_id: str = "default" source: ConnectorType = ConnectorType.druid # TODO: do not allow redis connector here window_size: int = 12 composite_keys: list[str] = field(default_factory=list) - metrics: list[str] = field(default_factory=list) - numalogic_conf: NumalogicConf = field(default_factory=lambda: NumalogicConf()) + ml_pipelines: dict[str, MLPipelineConf] = field(default_factory=dict) @dataclass diff --git a/numalogic/udfs/entities.py b/numalogic/udfs/entities.py index ba4f75bd..6f0fc6ce 100644 --- a/numalogic/udfs/entities.py +++ b/numalogic/udfs/entities.py @@ -33,6 +33,7 @@ class Header(str, Enum): class _BasePayload: uuid: str config_id: str + pipeline_id: str composite_keys: list[str] diff --git a/numalogic/udfs/factory.py b/numalogic/udfs/factory.py index cff84578..1ae2e87b 100644 --- a/numalogic/udfs/factory.py +++ b/numalogic/udfs/factory.py @@ -21,12 +21,14 @@ class UDFFactory: """Factory class to fetch the right UDF.""" from numalogic.udfs import NumalogicUDF + from numalogic.udfs.pipeline import PipelineUDF from numalogic.udfs.inference import InferenceUDF from numalogic.udfs.postprocess import PostprocessUDF from numalogic.udfs.preprocess import PreprocessUDF from numalogic.udfs.trainer import DruidTrainerUDF, PromTrainerUDF _UDF_MAP: ClassVar[dict[str, type[NumalogicUDF]]] = { + "pipeline": PipelineUDF, "preprocess": PreprocessUDF, "inference": InferenceUDF, "postprocess": PostprocessUDF, diff --git a/numalogic/udfs/inference.py b/numalogic/udfs/inference.py index bc047d9b..31436be5 100644 --- a/numalogic/udfs/inference.py +++ b/numalogic/udfs/inference.py @@ -24,7 +24,7 @@ UDF_TIME, _increment_counter, ) -from numalogic.udfs.tools import _load_artifact +from numalogic.udfs.tools import _load_artifact, get_skeys _LOGGER = logging.getLogger(__name__) @@ -124,10 +124,11 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: ) return Messages(Message(keys=keys, value=payload.to_json())) - _conf = self.get_conf(payload.config_id) + _stream_conf = self.get_stream_conf(payload.config_id) + _conf = _stream_conf.ml_pipelines[payload.pipeline_id] artifact_data, payload = _load_artifact( - skeys=[_ckey for _, _ckey in zip(_conf.composite_keys, payload.composite_keys)], + skeys=get_skeys(payload, _stream_conf), dkeys=[_conf.numalogic_conf.model.name], payload=payload, model_registry=self.model_registry, @@ -210,7 +211,9 @@ def is_model_stale(self, artifact_data: ArtifactData, payload: StreamPayload) -> ------- True if artifact is stale, False otherwise """ - _conf = self.get_conf(payload.config_id) + _conf = self.get_ml_pipeline_conf( + config_id=payload.config_id, pipeline_id=payload.pipeline_id + ) if ( self.model_registry.is_artifact_stale( artifact_data, _conf.numalogic_conf.trainer.retrain_freq_hr diff --git a/numalogic/udfs/pipeline.py b/numalogic/udfs/pipeline.py new file mode 100644 index 00000000..7a1985eb --- /dev/null +++ b/numalogic/udfs/pipeline.py @@ -0,0 +1,78 @@ +import json +import os +import time +import orjson +import logging +from typing import Optional + +from numpy import typing as npt +from pynumaflow.mapper import Datum, Messages, Message + +from numalogic.tools.types import artifact_t +from numalogic.udfs._metrics import UDF_TIME +from numalogic.udfs import NumalogicUDF +from numalogic.udfs._config import PipelineConf + +# TODO: move to config +LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", "3600")) +LOCAL_CACHE_SIZE = int(os.getenv("LOCAL_CACHE_SIZE", "10000")) +LOAD_LATEST = os.getenv("LOAD_LATEST", "false").lower() == "true" + +_LOGGER = logging.getLogger(__name__) + + +class PipelineUDF(NumalogicUDF): + """ + Pipeline UDF for Numalogic. + + Args: + pl_conf: PipelineConf instance + """ + + @classmethod + def compute(cls, model: artifact_t, input_: npt.NDArray[float], **kwargs): + pass + + def __init__(self, pl_conf: Optional[PipelineConf] = None): + super().__init__(pl_conf=pl_conf, _vtx="pipeline") + + @UDF_TIME.time() + def exec(self, keys: list[str], datum: Datum) -> Messages: + """ + The preprocess function here receives data from the data source. + + Perform preprocess on the input data. + + Args: + ------- + keys: List of keys + datum: Datum object + + Returns + ------- + Messages instance + + """ + _start_time = time.perf_counter() + + # check message sanity + try: + data_payload = orjson.loads(datum.value) + _LOGGER.info("%s - Data payload: %s", data_payload["uuid"], data_payload) + except (orjson.JSONDecodeError, KeyError): # catch json decode error only + _LOGGER.exception("Error while decoding input json") + return Messages(Message.to_drop()) + + _stream_conf = self.get_stream_conf(data_payload["config_id"]) + + # create a new message for each ML pipeline + messages = Messages() + for pipeline in _stream_conf.ml_pipelines: + data_payload["pipeline_id"] = pipeline + messages.append(Message(keys=keys, value=str.encode(json.dumps(data_payload)))) + + _LOGGER.debug( + "Time taken to execute Pipeline: %.4f sec", + time.perf_counter() - _start_time, + ) + return messages diff --git a/numalogic/udfs/postprocess.py b/numalogic/udfs/postprocess.py index f53f33f2..df198fd9 100644 --- a/numalogic/udfs/postprocess.py +++ b/numalogic/udfs/postprocess.py @@ -23,7 +23,7 @@ from numalogic.udfs import NumalogicUDF from numalogic.udfs._config import PipelineConf from numalogic.udfs.entities import StreamPayload, Header, Status, TrainerPayload, OutputPayload -from numalogic.udfs.tools import _load_artifact +from numalogic.udfs.tools import _load_artifact, get_skeys # TODO: move to config LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", "3600")) @@ -91,13 +91,14 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: ) # load configs - _conf = self.get_conf(payload.config_id) + _stream_conf = self.get_stream_conf(payload.config_id) + _conf = _stream_conf.ml_pipelines[payload.pipeline_id] thresh_cfg = _conf.numalogic_conf.threshold postprocess_cfg = _conf.numalogic_conf.postprocess # load artifact thresh_artifact, payload = _load_artifact( - skeys=[_ckey for _, _ckey in zip(_conf.composite_keys, payload.composite_keys)], + skeys=get_skeys(payload, _stream_conf), dkeys=[thresh_cfg.name], payload=payload, model_registry=self.model_registry, @@ -141,6 +142,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: out_payload = OutputPayload( uuid=payload.uuid, config_id=payload.config_id, + pipeline_id=payload.pipeline_id, composite_keys=payload.composite_keys, timestamp=payload.end_ts, unified_anomaly=np.max(anomaly_scores), @@ -157,13 +159,13 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: # Forward payload if a training request is tagged if payload.header == Header.TRAIN_REQUEST or payload.status == Status.ARTIFACT_STALE: - _conf = self.get_conf(payload.config_id) - ckeys = [_ckey for _, _ckey in zip(_conf.composite_keys, payload.composite_keys)] + ckeys = [_ckey for _, _ckey in zip(_stream_conf.composite_keys, payload.composite_keys)] train_payload = TrainerPayload( uuid=payload.uuid, composite_keys=ckeys, metrics=payload.metrics, config_id=payload.config_id, + pipeline_id=payload.pipeline_id, ) messages.append(Message(keys=keys, value=train_payload.to_json(), tags=["train"])) _LOGGER.debug( diff --git a/numalogic/udfs/preprocess.py b/numalogic/udfs/preprocess.py index 0b216555..2f7def38 100644 --- a/numalogic/udfs/preprocess.py +++ b/numalogic/udfs/preprocess.py @@ -26,7 +26,7 @@ from numalogic.udfs import NumalogicUDF from numalogic.udfs._config import PipelineConf from numalogic.udfs.entities import Status, Header -from numalogic.udfs.tools import make_stream_payload, get_df, _load_artifact +from numalogic.udfs.tools import make_stream_payload, get_df, _load_artifact, get_skeys # TODO: move to config LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", "3600")) @@ -95,17 +95,16 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: except (orjson.JSONDecodeError, KeyError): # catch json decode error only _LOGGER.exception("Error while decoding input json") return Messages(Message.to_drop()) - raw_df, timestamps = get_df( - data_payload=data_payload, stream_conf=self.get_conf(data_payload["config_id"]) - ) + + _stream_conf = self.get_stream_conf(data_payload["config_id"]) + _conf = _stream_conf.ml_pipelines[data_payload["pipeline_id"]] + raw_df, timestamps = get_df(data_payload=data_payload, stream_conf=_stream_conf) _metric_label_values = (self._vtx, ":".join(keys), data_payload["config_id"]) _increment_counter(counter=MSG_IN_COUNTER, labels=_metric_label_values) # Drop message if dataframe shape conditions are not met - if raw_df.shape[0] < self.get_conf(data_payload["config_id"]).window_size or raw_df.shape[ - 1 - ] != len(self.get_conf(data_payload["config_id"]).metrics): + if raw_df.shape[0] < _stream_conf.window_size or raw_df.shape[1] != len(_conf.metrics): _LOGGER.error("Dataframe shape: (%f, %f) error ", raw_df.shape[0], raw_df.shape[1]) _increment_counter( counter=DATASHAPE_ERROR_COUNTER, @@ -121,11 +120,9 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: payload = make_stream_payload(data_payload, raw_df, timestamps, keys) # Check if model will be present in registry - - _conf = self.get_conf(payload.config_id) if any(_cfg.stateful for _cfg in _conf.numalogic_conf.preprocess): preproc_artifact, payload = _load_artifact( - skeys=[_ckey for _, _ckey in zip(_conf.composite_keys, payload.composite_keys)], + skeys=get_skeys(payload, _stream_conf), dkeys=[_cfg.name for _cfg in _conf.numalogic_conf.preprocess], payload=payload, model_registry=self.model_registry, diff --git a/numalogic/udfs/tools.py b/numalogic/udfs/tools.py index ba7f6022..aff733ca 100644 --- a/numalogic/udfs/tools.py +++ b/numalogic/udfs/tools.py @@ -33,6 +33,21 @@ class _DedupMetadata(NamedTuple): msg_train_records: Optional[str] +def get_skeys(payload: StreamPayload, stream_conf: StreamConf): + """ + + Args: + payload: StreamPayload object + stream_conf: stream configuration. + + Returns + ------- + skeys formed with composite keys and pipeline id + """ + composite_keys = [_ckey for _, _ckey in zip(stream_conf.composite_keys, payload.composite_keys)] + return [*composite_keys, payload.pipeline_id] + + def get_df( data_payload: dict, stream_conf: StreamConf, fill_value: float = 0.0 ) -> tuple[DataFrame, list[int]]: @@ -47,7 +62,8 @@ def get_df( ------- dataframe and timestamps """ - features = stream_conf.metrics + _conf = stream_conf.ml_pipelines[data_payload["pipeline_id"]] + features = _conf.metrics df = ( pd.DataFrame(data_payload["data"], columns=["timestamp", *features]) .fillna(fill_value) @@ -74,6 +90,7 @@ def make_stream_payload( return StreamPayload( uuid=data_payload["uuid"], config_id=data_payload["config_id"], + pipeline_id=data_payload["pipeline_id"], composite_keys=keys, data=np.ascontiguousarray(raw_df, dtype=np.float32), raw_data=np.ascontiguousarray(raw_df, dtype=np.float32), diff --git a/numalogic/udfs/trainer/_base.py b/numalogic/udfs/trainer/_base.py index 71bfaa6c..4af72ff8 100644 --- a/numalogic/udfs/trainer/_base.py +++ b/numalogic/udfs/trainer/_base.py @@ -19,7 +19,7 @@ from numalogic.tools.exceptions import ConfigNotFoundError, RedisRegistryError from numalogic.tools.types import redis_client_t, artifact_t, KEYS, KeyedArtifact from numalogic.udfs import NumalogicUDF -from numalogic.udfs._config import StreamConf, PipelineConf +from numalogic.udfs._config import PipelineConf, MLPipelineConf from numalogic.udfs.entities import TrainerPayload from numalogic.udfs._metrics import ( REDIS_ERROR_COUNTER, @@ -146,7 +146,10 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: # Construct payload object payload = TrainerPayload(**orjson.loads(datum.value)) _metric_label_values = (":".join(payload.composite_keys), payload.config_id) - _conf = self.get_conf(payload.config_id) + + _conf = self.get_ml_pipeline_conf( + config_id=payload.config_id, pipeline_id=payload.pipeline_id + ) _increment_counter( counter=MSG_IN_COUNTER, labels=(self._vtx, ":".join(payload.composite_keys), payload.config_id), @@ -220,7 +223,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: ) # Save artifacts - skeys = payload.composite_keys + skeys = [*payload.composite_keys, payload.pipeline_id] self.artifacts_to_save( skeys=skeys, @@ -247,7 +250,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages: ) return Messages(Message.to_drop()) - def _construct_preproc_clf(self, _conf: StreamConf) -> Optional[artifact_t]: + def _construct_preproc_clf(self, _conf: MLPipelineConf) -> Optional[artifact_t]: preproc_clfs = [] for _cfg in _conf.numalogic_conf.preprocess: _clf = self._preproc_factory.get_instance(_cfg) @@ -304,7 +307,9 @@ def artifacts_to_save( _LOGGER.info("%s - Artifact saved with with versions: %s", payload.uuid, ver_dict) def _is_data_sufficient(self, payload: TrainerPayload, df: pd.DataFrame) -> bool: - _conf = self.get_conf(payload.config_id) + _conf = self.get_ml_pipeline_conf( + config_id=payload.config_id, pipeline_id=payload.pipeline_id + ) if len(df) < _conf.numalogic_conf.trainer.min_train_size: _ = self.train_msg_deduplicator.ack_insufficient_data( key=payload.composite_keys, uuid=payload.uuid, train_records=len(df) diff --git a/numalogic/udfs/trainer/_druid.py b/numalogic/udfs/trainer/_druid.py index 9563d4a5..6977a1ba 100644 --- a/numalogic/udfs/trainer/_druid.py +++ b/numalogic/udfs/trainer/_druid.py @@ -46,7 +46,9 @@ def __init__( except AttributeError as err: raise ConfigNotFoundError("Druid config not found!") from err - def register_druid_fetcher_conf(self, config_id: str, conf: DruidFetcherConf) -> None: + def register_druid_fetcher_conf( + self, config_id: str, pipeline_id: str, conf: DruidFetcherConf + ) -> None: """ Register DruidFetcherConf with the UDF. @@ -54,9 +56,10 @@ def register_druid_fetcher_conf(self, config_id: str, conf: DruidFetcherConf) -> config_id: Config ID conf: DruidFetcherConf object """ - self.pl_conf.druid_conf.id_fetcher[config_id] = conf + fetcher_id = f"{config_id}-{pipeline_id}" + self.pl_conf.druid_conf.id_fetcher[fetcher_id] = conf - def get_druid_fetcher_conf(self, config_id: str) -> DruidFetcherConf: + def get_druid_fetcher_conf(self, config_id: str, pipeline_id: str) -> DruidFetcherConf: """ Get DruidFetcherConf with the given ID. @@ -71,10 +74,11 @@ def get_druid_fetcher_conf(self, config_id: str) -> DruidFetcherConf: ------ ConfigNotFoundError: If config with the given ID is not found """ + fetcher_id = f"{config_id}-{pipeline_id}" try: - return self.pl_conf.druid_conf.id_fetcher[config_id] + return self.pl_conf.druid_conf.id_fetcher[fetcher_id] except KeyError as err: - raise ConfigNotFoundError(f"Config with ID {config_id} not found!") from err + raise ConfigNotFoundError(f"Config with ID {fetcher_id} not found!") from err def fetch_data(self, payload: TrainerPayload) -> pd.DataFrame: """ @@ -89,21 +93,25 @@ def fetch_data(self, payload: TrainerPayload) -> pd.DataFrame: """ _start_time = time.perf_counter() _metric_label_values = (":".join(payload.composite_keys), payload.config_id) - _conf = self.get_conf(payload.config_id) + _stream_conf = self.get_stream_conf(payload.config_id) + _conf = _stream_conf.ml_pipelines[payload.pipeline_id] _fetcher_conf = self.dataconn_conf.fetcher or ( - self.get_druid_fetcher_conf(payload.config_id) + self.get_druid_fetcher_conf( + config_id=payload.config_id, pipeline_id=payload.pipeline_id + ) if self.dataconn_conf.id_fetcher else None ) if not _fetcher_conf: raise ConfigNotFoundError( - f"Druid fetcher config not found for config_id: {payload.config_id}!" + f"Druid fetcher config not found for config_id: {payload.config_id}," + f" pipeline_id: {payload.pipeline_id}!" ) try: _df = self.data_fetcher.fetch( datasource=_fetcher_conf.datasource, - filter_keys=_conf.composite_keys, + filter_keys=_stream_conf.composite_keys, filter_values=payload.composite_keys, dimensions=list(_fetcher_conf.dimensions), delay=self.dataconn_conf.delay_hrs, diff --git a/numalogic/udfs/trainer/_prom.py b/numalogic/udfs/trainer/_prom.py index e09c9db2..f59aac7e 100644 --- a/numalogic/udfs/trainer/_prom.py +++ b/numalogic/udfs/trainer/_prom.py @@ -61,7 +61,8 @@ def fetch_data(self, payload: TrainerPayload) -> pd.DataFrame: """ _start_time = time.perf_counter() _metric_label_values = (":".join(payload.composite_keys), payload.config_id) - _conf = self.get_conf(payload.config_id) + _stream_conf = self.get_stream_conf(payload.config_id) + _conf = _stream_conf.ml_pipelines[payload.pipeline_id] end_dt = datetime.now(pytz.utc) start_dt = end_dt - timedelta(hours=_conf.numalogic_conf.trainer.train_hours) @@ -75,7 +76,7 @@ def fetch_data(self, payload: TrainerPayload) -> pd.DataFrame: return_labels=["rollouts_pod_template_hash"], filters={ "numalogic": "true", - **dict(zip(_conf.composite_keys, payload.composite_keys)), + **dict(zip(_stream_conf.composite_keys, payload.composite_keys)), }, ) except Exception: diff --git a/tests/udfs/resources/_config.yaml b/tests/udfs/resources/_config.yaml index 673d4039..10995004 100644 --- a/tests/udfs/resources/_config.yaml +++ b/tests/udfs/resources/_config.yaml @@ -3,23 +3,45 @@ stream_confs: config_id: "druid-config" source: "druid" composite_keys: [ 'service-mesh', '1', '2' ] - metrics: [ "col1" , "col2" ] window_size: 10 - numalogic_conf: - model: - name: "VanillaAE" - conf: - seq_len: 10 - n_features: 2 - preprocess: - - name: "LogTransformer" - stateful: false - conf: - add_factor: 5 - threshold: - name: "StdDevThreshold" - conf: - min_threshold: 0.1 + ml_pipelines: + pipeline1: + pipeline_id: "pipeline1" + metrics: [ "col1" , "col2" ] + numalogic_conf: + model: + name: "VanillaAE" + conf: + seq_len: 10 + n_features: 2 + preprocess: + - name: "LogTransformer" + stateful: false + conf: + add_factor: 5 + threshold: + name: "StdDevThreshold" + conf: + min_threshold: 0.1 + pipeline2: + pipeline_id: "pipeline2" + metrics: [ "col1" , "col2" ] + numalogic_conf: + model: + name: "VanillaAE" + conf: + seq_len: 10 + n_features: 2 + preprocess: + - name: "LogTransformer" + stateful: false + conf: + add_factor: 5 + threshold: + name: "StdDevThreshold" + conf: + min_threshold: 0.1 + redis_conf: url: "isbsvc-redis-isbs-redis-svc.oss-analytics-numalogicosamfci-usw2-e2e.svc" port: 26379 @@ -29,9 +51,10 @@ redis_conf: druid_conf: url: "druid-endpoint" endpoint: "endpoint" - fetcher: - dimensions: [ "col1" ] - datasource: "table-name" - group_by: [ "timestamp", "col1" ] - pivot: - columns: [ "col2" ] \ No newline at end of file + id_fetcher: + druid-config-pipeline1: + dimensions: [ "col1" ] + datasource: "table-name" + group_by: [ "timestamp", "col1" ] + pivot: + columns: [ "col2" ] \ No newline at end of file diff --git a/tests/udfs/resources/_config2.yaml b/tests/udfs/resources/_config2.yaml index cf130f78..c525fff0 100644 --- a/tests/udfs/resources/_config2.yaml +++ b/tests/udfs/resources/_config2.yaml @@ -3,25 +3,29 @@ stream_confs: config_id: "druid-config" source: "druid" composite_keys: [ 'service-mesh', '1', '2' ] - metrics: [ "col1" , "col2" ] window_size: 10 - numalogic_conf: - model: - name: "VanillaAE" - conf: - seq_len: 10 - n_features: 2 - preprocess: - - name: "LogTransformer" - stateful: false - conf: - add_factor: 5 - - name: "StandardScaler" - stateful: true - threshold: - name: "MahalanobisThreshold" - conf: - max_outlier_prob: 0.08 + ml_pipelines: + pipeline1: + pipeline_id: "pipeline1" + metrics: [ "col1" , "col2" ] + numalogic_conf: + model: + name: "VanillaAE" + conf: + seq_len: 10 + n_features: 2 + preprocess: + - name: "LogTransformer" + stateful: false + conf: + add_factor: 5 + - name: "StandardScaler" + stateful: true + threshold: + name: "MahalanobisThreshold" + conf: + max_outlier_prob: 0.08 + redis_conf: url: "isbsvc-redis-isbs-redis-svc.oss-analytics-numalogicosamfci-usw2-e2e.svc" port: 26379 @@ -32,7 +36,7 @@ druid_conf: url: "druid-endpoint" endpoint: "endpoint" id_fetcher: - druid-config: + druid-config-pipeline1: dimensions: [ "col1" ] datasource: "table-name" group_by: [ "timestamp", "col1" ] diff --git a/tests/udfs/resources/_config3.yaml b/tests/udfs/resources/_config3.yaml index 86ba6198..ae7aa58f 100644 --- a/tests/udfs/resources/_config3.yaml +++ b/tests/udfs/resources/_config3.yaml @@ -3,47 +3,53 @@ stream_confs: config_id: "odl-graphql" source: "prometheus" composite_keys: [ "namespace", "app" ] - metrics: [ "namespace_app_rollouts_cpu_utilization", "namespace_app_rollouts_http_request_error_rate", "namespace_app_rollouts_memory_utilization" ] window_size: 12 - numalogic_conf: - model: - name: "Conv1dVAE" - conf: - seq_len: 12 - n_features: 3 - latent_dim: 1 - preprocess: - - name: "StandardScaler" - threshold: - name: "MahalanobisThreshold" - trainer: - train_hours: 3 - min_train_size: 100 - pltrainer_conf: - accelerator: cpu - max_epochs: 5 + ml_pipelines: + pipeline1: + pipeline_id: "pipeline1" + metrics: [ "namespace_app_rollouts_cpu_utilization", "namespace_app_rollouts_http_request_error_rate", "namespace_app_rollouts_memory_utilization" ] + numalogic_conf: + model: + name: "Conv1dVAE" + conf: + seq_len: 12 + n_features: 3 + latent_dim: 1 + preprocess: + - name: "StandardScaler" + threshold: + name: "MahalanobisThreshold" + trainer: + train_hours: 3 + min_train_size: 100 + pltrainer_conf: + accelerator: cpu + max_epochs: 5 default: config_id: "default" source: "prometheus" composite_keys: [ "namespace", "app" ] - metrics: ["namespace_app_rollouts_http_request_error_rate"] window_size: 12 - numalogic_conf: - model: - name: "SparseVanillaAE" - conf: - seq_len: 12 - n_features: 1 - preprocess: - - name: "StandardScaler" - threshold: - name: "StdDevThreshold" - trainer: - train_hours: 1 - min_train_size: 100 - pltrainer_conf: - accelerator: cpu - max_epochs: 5 + ml_pipelines: + default: + pipeline_id: "default" + metrics: ["namespace_app_rollouts_http_request_error_rate"] + numalogic_conf: + model: + name: "SparseVanillaAE" + conf: + seq_len: 12 + n_features: 1 + preprocess: + - name: "StandardScaler" + threshold: + name: "StdDevThreshold" + trainer: + train_hours: 1 + min_train_size: 100 + pltrainer_conf: + accelerator: cpu + max_epochs: 5 redis_conf: url: "http://localhost:6379" diff --git a/tests/udfs/resources/_config4.yaml b/tests/udfs/resources/_config4.yaml index 0fc17a20..0c8380b6 100644 --- a/tests/udfs/resources/_config4.yaml +++ b/tests/udfs/resources/_config4.yaml @@ -3,25 +3,28 @@ stream_confs: config_id: "mycustomconf" source: "prometheus" composite_keys: [ "namespace", "app" ] - metrics: [ "namespace_app_rollouts_cpu_utilization", "namespace_app_rollouts_http_request_error_rate", "namespace_app_rollouts_memory_utilization" ] window_size: 12 - numalogic_conf: - model: - name: "Conv1dVAE" - conf: - seq_len: 12 - n_features: 3 - latent_dim: 1 - preprocess: - - name: "StandardScaler" - threshold: - name: "MahalanobisThreshold" - trainer: - train_hours: 3 - min_train_size: 100 - pltrainer_conf: - accelerator: cpu - max_epochs: 5 + ml_pipelines: + pipeline1: + pipeline_id: "pipeline1" + metrics: [ "namespace_app_rollouts_cpu_utilization", "namespace_app_rollouts_http_request_error_rate", "namespace_app_rollouts_memory_utilization" ] + numalogic_conf: + model: + name: "Conv1dVAE" + conf: + seq_len: 12 + n_features: 3 + latent_dim: 1 + preprocess: + - name: "StandardScaler" + threshold: + name: "MahalanobisThreshold" + trainer: + train_hours: 3 + min_train_size: 100 + pltrainer_conf: + accelerator: cpu + max_epochs: 5 redis_conf: url: "http://localhost:6222" port: 26379 diff --git a/tests/udfs/resources/data/stream.json b/tests/udfs/resources/data/stream.json index 7b1859bc..a277c707 100644 --- a/tests/udfs/resources/data/stream.json +++ b/tests/udfs/resources/data/stream.json @@ -1,6 +1,7 @@ { "uuid": "dd7dfb43-532b-49a3-906e-f78f82ad9c4b", "config_id": "druid-config", + "pipeline_id": "pipeline1", "data": [ { "degraded": 14, diff --git a/tests/udfs/test_inference.py b/tests/udfs/test_inference.py index efb714a5..f38bc95d 100644 --- a/tests/udfs/test_inference.py +++ b/tests/udfs/test_inference.py @@ -12,7 +12,7 @@ from numalogic.models.autoencoder.variants import VanillaAE from numalogic.registry import RedisRegistry, ArtifactData from numalogic.tools.exceptions import RedisRegistryError -from numalogic.udfs import StreamConf, InferenceUDF +from numalogic.udfs import StreamConf, InferenceUDF, MLPipelineConf from numalogic.udfs.entities import StreamPayload, Header, Status REDIS_CLIENT = FakeStrictRedis(server=FakeServer()) @@ -24,6 +24,7 @@ DATA = { "uuid": "dd7dfb43-532b-49a3-906e-f78f82ad9c4b", "config_id": "conf1", + "pipeline_id": "pipeline1", "composite_keys": ["service-mesh", "1", "2"], "data": [ [4.801275, 1.4581239], @@ -86,10 +87,17 @@ def setUp(self) -> None: self.udf.register_conf( "conf1", StreamConf( - numalogic_conf=NumalogicConf( - model=ModelInfo(name="VanillaAE", conf={"seq_len": 12, "n_features": 2}), - trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), - ) + ml_pipelines={ + "pipeline1": MLPipelineConf( + pipeline_id="pipeline1", + numalogic_conf=NumalogicConf( + model=ModelInfo( + name="VanillaAE", conf={"seq_len": 12, "n_features": 2} + ), + trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), + ), + ) + } ), ) diff --git a/tests/udfs/test_pipeline.py b/tests/udfs/test_pipeline.py new file mode 100644 index 00000000..47282b48 --- /dev/null +++ b/tests/udfs/test_pipeline.py @@ -0,0 +1,55 @@ +import logging +import os +import unittest +from datetime import datetime +from fakeredis import FakeServer, FakeStrictRedis +from omegaconf import OmegaConf +from orjson import orjson + +from numalogic._constants import TESTS_DIR +from numalogic.udfs._config import PipelineConf +from numalogic.udfs.pipeline import PipelineUDF +from tests.udfs.utility import input_json_from_file + +logging.basicConfig(level=logging.DEBUG) +REDIS_CLIENT = FakeStrictRedis(server=FakeServer()) +KEYS = ["service-mesh", "1", "2"] +DATUM = input_json_from_file(os.path.join(TESTS_DIR, "udfs", "resources", "data", "stream.json")) + +DATUM_KW = { + "event_time": datetime.now(), + "watermark": datetime.now(), +} + + +class TestPipelineUDF(unittest.TestCase): + def setUp(self) -> None: + _given_conf = OmegaConf.load(os.path.join(TESTS_DIR, "udfs", "resources", "_config.yaml")) + _given_conf_2 = OmegaConf.load( + os.path.join(TESTS_DIR, "udfs", "resources", "_config2.yaml") + ) + schema = OmegaConf.structured(PipelineConf) + pl_conf = PipelineConf(**OmegaConf.merge(schema, _given_conf)) + pl_conf_2 = PipelineConf(**OmegaConf.merge(schema, _given_conf_2)) + self.udf1 = PipelineUDF(pl_conf=pl_conf) + self.udf2 = PipelineUDF(pl_conf=pl_conf_2) + self.udf1.register_conf("druid-config", pl_conf.stream_confs["druid-config"]) + self.udf2.register_conf("druid-config", pl_conf_2.stream_confs["druid-config"]) + + def test_pipeline_1(self): + msgs = self.udf1(KEYS, DATUM) + self.assertEqual(2, len(msgs)) + for msg in msgs: + data_payload = orjson.loads(msg.value) + self.assertTrue(data_payload["pipeline_id"]) + + def test_pipeline_2(self): + msgs = self.udf2(KEYS, DATUM) + self.assertEqual(1, len(msgs)) + for msg in msgs: + data_payload = orjson.loads(msg.value) + self.assertTrue(data_payload["pipeline_id"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/udfs/test_postprocess.py b/tests/udfs/test_postprocess.py index 6519f683..33e13adb 100644 --- a/tests/udfs/test_postprocess.py +++ b/tests/udfs/test_postprocess.py @@ -29,6 +29,7 @@ DATA = { "uuid": "dd7dfb43-532b-49a3-906e-f78f82ad9c4b", "config_id": "druid-config", + "pipeline_id": "pipeline1", "composite_keys": ["service-mesh", "1", "2"], "data": [ [2.055191, 2.205468], @@ -113,7 +114,9 @@ def test_postprocess_infer_model_stale(self): data["status"] = Status.ARTIFACT_STALE data["header"] = Header.MODEL_INFERENCE self.registry.save( - KEYS, ["StdDevThreshold"], StdDevThreshold().fit(np.asarray([[0, 1], [1, 2]])) + [*KEYS, "pipeline1"], + ["StdDevThreshold"], + StdDevThreshold().fit(np.asarray([[0, 1], [1, 2]])), ) msg = self.udf(KEYS, Datum(keys=KEYS, value=orjson.dumps(data), **DATUM_KW)) self.assertEqual(2, len(msg)) @@ -123,7 +126,9 @@ def test_postprocess_all_model_present_01(self): data["status"] = Status.ARTIFACT_FOUND data["header"] = Header.MODEL_INFERENCE self.registry.save( - KEYS, ["StdDevThreshold"], StdDevThreshold().fit(np.asarray([[0, 1], [1, 2]])) + [*KEYS, "pipeline1"], + ["StdDevThreshold"], + StdDevThreshold().fit(np.asarray([[0, 1], [1, 2]])), ) msg = self.udf(KEYS, Datum(keys=KEYS, value=orjson.dumps(data), **DATUM_KW)) @@ -146,7 +151,9 @@ def test_postprocess_all_model_present_02(self): "tags": {"asset_alias": "data", "asset_id": "123456789", "env": "prd"}, } self.registry.save( - KEYS, ["MahalanobisThreshold"], MahalanobisThreshold().fit(np.asarray([[0, 1], [1, 2]])) + [*KEYS, "pipeline1"], + ["MahalanobisThreshold"], + MahalanobisThreshold().fit(np.asarray([[0, 1], [1, 2]])), ) msg = udf(KEYS, Datum(keys=KEYS, value=orjson.dumps(data), **DATUM_KW)) diff --git a/tests/udfs/test_trainer.py b/tests/udfs/test_trainer.py index 6f17fd11..1083e5e1 100644 --- a/tests/udfs/test_trainer.py +++ b/tests/udfs/test_trainer.py @@ -19,7 +19,7 @@ from numalogic.connectors import RedisConf, DruidConf, DruidFetcherConf from numalogic.connectors.druid import DruidFetcher from numalogic.tools.exceptions import ConfigNotFoundError -from numalogic.udfs import StreamConf, PipelineConf +from numalogic.udfs import StreamConf, PipelineConf, MLPipelineConf from numalogic.udfs.tools import TrainMsgDeduplicator from numalogic.udfs.trainer import DruidTrainerUDF, PromTrainerUDF @@ -43,6 +43,7 @@ def setUp(self): payload = { "uuid": "some-uuid", "config_id": "druid-config", + "pipeline_id": "pipeline1", "composite_keys": ["5984175597303660107"], "metrics": ["failed", "degraded"], } @@ -72,13 +73,20 @@ def test_trainer_01(self): self.udf1.register_conf( "druid-config", StreamConf( - numalogic_conf=NumalogicConf( - model=ModelInfo( - name="VanillaAE", stateful=True, conf={"seq_len": 12, "n_features": 2} - ), - preprocess=[ModelInfo(name="LogTransformer", stateful=True, conf={})], - trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), - ) + ml_pipelines={ + "pipeline1": MLPipelineConf( + pipeline_id="pipeline1", + numalogic_conf=NumalogicConf( + model=ModelInfo( + name="VanillaAE", + stateful=True, + conf={"seq_len": 12, "n_features": 2}, + ), + preprocess=[ModelInfo(name="LogTransformer", stateful=True, conf={})], + trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), + ), + ) + } ), ) self.udf1(self.keys, self.datum) @@ -86,8 +94,8 @@ def test_trainer_01(self): self.assertEqual( 2, REDIS_CLIENT.exists( - b"5984175597303660107::VanillaAE::LATEST", - b"5984175597303660107::StdDevThreshold::LATEST", + b"5984175597303660107:pipeline1::VanillaAE::LATEST", + b"5984175597303660107:pipeline1::StdDevThreshold::LATEST", ), ) @@ -96,20 +104,27 @@ def test_trainer_02(self): self.udf1.register_conf( "druid-config", StreamConf( - numalogic_conf=NumalogicConf( - model=ModelInfo(name="VanillaAE", conf={"seq_len": 12, "n_features": 2}), - preprocess=[ModelInfo(name="StandardScaler", conf={})], - trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), - ) + ml_pipelines={ + "pipeline1": MLPipelineConf( + pipeline_id="pipeline1", + numalogic_conf=NumalogicConf( + model=ModelInfo( + name="VanillaAE", conf={"seq_len": 12, "n_features": 2} + ), + preprocess=[ModelInfo(name="StandardScaler", conf={})], + trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), + ), + ) + } ), ) self.udf1(self.keys, self.datum) self.assertEqual( 3, REDIS_CLIENT.exists( - b"5984175597303660107::VanillaAE::LATEST", - b"5984175597303660107::StdDevThreshold::LATEST", - b"5984175597303660107::StandardScaler::LATEST", + b"5984175597303660107:pipeline1::VanillaAE::LATEST", + b"5984175597303660107:pipeline1::StdDevThreshold::LATEST", + b"5984175597303660107:pipeline1::StandardScaler::LATEST", ), ) @@ -118,20 +133,30 @@ def test_trainer_03(self): self.udf1.register_conf( "druid-config", StreamConf( - numalogic_conf=NumalogicConf( - model=ModelInfo(name="VanillaAE", conf={"seq_len": 12, "n_features": 2}), - preprocess=[ModelInfo(name="LogTransformer"), ModelInfo(name="StandardScaler")], - trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), - ) + ml_pipelines={ + "pipeline1": MLPipelineConf( + pipeline_id="pipeline1", + numalogic_conf=NumalogicConf( + model=ModelInfo( + name="VanillaAE", conf={"seq_len": 12, "n_features": 2} + ), + preprocess=[ + ModelInfo(name="LogTransformer"), + ModelInfo(name="StandardScaler"), + ], + trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), + ), + ) + } ), ) self.udf1(self.keys, self.datum) self.assertEqual( 3, REDIS_CLIENT.exists( - b"5984175597303660107::VanillaAE::LATEST", - b"5984175597303660107::StdDevThreshold::LATEST", - b"5984175597303660107::LogTransformer:StandardScaler::LATEST", + b"5984175597303660107:pipeline1::VanillaAE::LATEST", + b"5984175597303660107:pipeline1::StdDevThreshold::LATEST", + b"5984175597303660107:pipeline1::LogTransformer:StandardScaler::LATEST", ), ) @@ -140,11 +165,21 @@ def test_trainer_do_train(self): self.udf1.register_conf( "druid-config", StreamConf( - numalogic_conf=NumalogicConf( - model=ModelInfo(name="VanillaAE", conf={"seq_len": 12, "n_features": 2}), - preprocess=[ModelInfo(name="LogTransformer"), ModelInfo(name="StandardScaler")], - trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), - ) + ml_pipelines={ + "pipeline1": MLPipelineConf( + pipeline_id="pipeline1", + numalogic_conf=NumalogicConf( + model=ModelInfo( + name="VanillaAE", conf={"seq_len": 12, "n_features": 2} + ), + preprocess=[ + ModelInfo(name="LogTransformer"), + ModelInfo(name="StandardScaler"), + ], + trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), + ), + ) + } ), ) time.time() @@ -154,17 +189,17 @@ def test_trainer_do_train(self): self.assertEqual( 3, REDIS_CLIENT.exists( - b"5984175597303660107::VanillaAE::LATEST", - b"5984175597303660107::StdDevThreshold::LATEST", - b"5984175597303660107::LogTransformer:StandardScaler::LATEST", + b"5984175597303660107:pipeline1::VanillaAE::LATEST", + b"5984175597303660107:pipeline1::StdDevThreshold::LATEST", + b"5984175597303660107:pipeline1::LogTransformer:StandardScaler::LATEST", ), ) self.assertEqual( 3, REDIS_CLIENT.exists( - b"5984175597303660107::VanillaAE::1", - b"5984175597303660107::StdDevThreshold::1", - b"5984175597303660107::LogTransformer:StandardScaler::1", + b"5984175597303660107:pipeline1::VanillaAE::1", + b"5984175597303660107:pipeline1::StdDevThreshold::1", + b"5984175597303660107:pipeline1::LogTransformer:StandardScaler::1", ), ) @@ -173,11 +208,21 @@ def test_trainer_do_not_train_1(self): self.udf1.register_conf( "druid-config", StreamConf( - numalogic_conf=NumalogicConf( - model=ModelInfo(name="VanillaAE", conf={"seq_len": 12, "n_features": 2}), - preprocess=[ModelInfo(name="LogTransformer"), ModelInfo(name="StandardScaler")], - trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), - ) + ml_pipelines={ + "pipeline1": MLPipelineConf( + pipeline_id="pipeline1", + numalogic_conf=NumalogicConf( + model=ModelInfo( + name="VanillaAE", conf={"seq_len": 12, "n_features": 2} + ), + preprocess=[ + ModelInfo(name="LogTransformer"), + ModelInfo(name="StandardScaler"), + ], + trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), + ), + ) + } ), ) self.udf1(self.keys, self.datum) @@ -185,17 +230,17 @@ def test_trainer_do_not_train_1(self): self.assertEqual( 3, REDIS_CLIENT.exists( - b"5984175597303660107::VanillaAE::LATEST", - b"5984175597303660107::StdDevThreshold::LATEST", - b"5984175597303660107::LogTransformer:StandardScaler::LATEST", + b"5984175597303660107:pipeline1::VanillaAE::LATEST", + b"5984175597303660107:pipeline1::StdDevThreshold::LATEST", + b"5984175597303660107:pipeline1::LogTransformer:StandardScaler::LATEST", ), ) self.assertEqual( 0, REDIS_CLIENT.exists( - b"5984175597303660107::VanillaAE::1", - b"5984175597303660107::StdDevThreshold::1", - b"5984175597303660107::LogTransformer:StandardScaler::1", + b"5984175597303660107:pipeline1::VanillaAE::1", + b"5984175597303660107:pipeline1::StdDevThreshold::1", + b"5984175597303660107:pipeline1::LogTransformer:StandardScaler::1", ), ) @@ -204,11 +249,21 @@ def test_trainer_do_not_train_2(self): self.udf1.register_conf( "druid-config", StreamConf( - numalogic_conf=NumalogicConf( - model=ModelInfo(name="VanillaAE", conf={"seq_len": 12, "n_features": 2}), - preprocess=[ModelInfo(name="LogTransformer"), ModelInfo(name="StandardScaler")], - trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), - ) + ml_pipelines={ + "pipeline1": MLPipelineConf( + pipeline_id="pipeline1", + numalogic_conf=NumalogicConf( + model=ModelInfo( + name="VanillaAE", conf={"seq_len": 12, "n_features": 2} + ), + preprocess=[ + ModelInfo(name="LogTransformer"), + ModelInfo(name="StandardScaler"), + ], + trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), + ), + ) + } ), ) self.udf1(self.keys, self.datum) @@ -220,9 +275,9 @@ def test_trainer_do_not_train_2(self): self.assertEqual( 0, REDIS_CLIENT.exists( - b"5984175597303660107::VanillaAE::1", - b"5984175597303660107::StdDevThreshold::1", - b"5984175597303660107::LogTransformer:StandardScaler::1", + b"5984175597303660107:pipeline1::VanillaAE::1", + b"5984175597303660107:pipeline1::StdDevThreshold::1", + b"5984175597303660107:pipeline1::LogTransformer:StandardScaler::1", ), ) @@ -231,11 +286,21 @@ def test_trainer_do_not_train_3(self): self.udf1.register_conf( "druid-config", StreamConf( - numalogic_conf=NumalogicConf( - model=ModelInfo(name="VanillaAE", conf={"seq_len": 12, "n_features": 2}), - preprocess=[ModelInfo(name="LogTransformer"), ModelInfo(name="StandardScaler")], - trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), - ) + ml_pipelines={ + "pipeline1": MLPipelineConf( + pipeline_id="pipeline1", + numalogic_conf=NumalogicConf( + model=ModelInfo( + name="VanillaAE", conf={"seq_len": 12, "n_features": 2} + ), + preprocess=[ + ModelInfo(name="LogTransformer"), + ModelInfo(name="StandardScaler"), + ], + trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), + ), + ) + } ), ) TrainMsgDeduplicator(REDIS_CLIENT).ack_read(self.keys, "some-uuid") @@ -245,9 +310,9 @@ def test_trainer_do_not_train_3(self): self.assertEqual( 0, REDIS_CLIENT.exists( - b"5984175597303660107::VanillaAE::0", - b"5984175597303660107::StdDevThreshold::0", - b"5984175597303660107::LogTransformer:StandardScaler::0", + b"5984175597303660107:pipeline1::VanillaAE::0", + b"5984175597303660107:pipeline1::StdDevThreshold::0", + b"5984175597303660107:pipeline1::LogTransformer:StandardScaler::0", ), ) @@ -256,11 +321,21 @@ def test_trainer_do_not_train_4(self): self.udf1.register_conf( "druid-config", StreamConf( - numalogic_conf=NumalogicConf( - model=ModelInfo(name="VanillaAE", conf={"seq_len": 12, "n_features": 2}), - preprocess=[ModelInfo(name="LogTransformer"), ModelInfo(name="StandardScaler")], - trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), - ) + ml_pipelines={ + "pipeline1": MLPipelineConf( + pipeline_id="pipeline1", + numalogic_conf=NumalogicConf( + model=ModelInfo( + name="VanillaAE", conf={"seq_len": 12, "n_features": 2} + ), + preprocess=[ + ModelInfo(name="LogTransformer"), + ModelInfo(name="StandardScaler"), + ], + trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), + ), + ) + } ), ) self.udf1(self.keys, self.datum) @@ -278,19 +353,26 @@ def test_trainer_data_insufficient(self): self.udf1.register_conf( "druid-config", StreamConf( - numalogic_conf=NumalogicConf( - model=ModelInfo(name="VanillaAE", conf={"seq_len": 12, "n_features": 2}), - preprocess=[ModelInfo(name="StandardScaler", conf={})], - trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), - ) + ml_pipelines={ + "pipeline1": MLPipelineConf( + pipeline_id="pipeline1", + numalogic_conf=NumalogicConf( + model=ModelInfo( + name="VanillaAE", conf={"seq_len": 12, "n_features": 2} + ), + preprocess=[ModelInfo(name="StandardScaler", conf={})], + trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), + ), + ) + } ), ) self.udf1(self.keys, self.datum) self.assertFalse( REDIS_CLIENT.exists( - b"5984175597303660107::VanillaAE::LATEST", - b"5984175597303660107::StdDevThreshold::LATEST", - b"5984175597303660107::StandardScaler::LATEST", + b"5984175597303660107:pipeline1::VanillaAE::LATEST", + b"5984175597303660107:pipeline1::StdDevThreshold::LATEST", + b"5984175597303660107:pipeline1::StandardScaler::LATEST", ) ) @@ -299,19 +381,26 @@ def test_trainer_datafetcher_err(self): self.udf1.register_conf( "druid-config", StreamConf( - numalogic_conf=NumalogicConf( - model=ModelInfo(name="VanillaAE", conf={"seq_len": 12, "n_features": 2}), - preprocess=[ModelInfo(name="StandardScaler", conf={})], - trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), - ) + ml_pipelines={ + "pipeline1": MLPipelineConf( + pipeline_id="pipeline1", + numalogic_conf=NumalogicConf( + model=ModelInfo( + name="VanillaAE", conf={"seq_len": 12, "n_features": 2} + ), + preprocess=[ModelInfo(name="StandardScaler", conf={})], + trainer=TrainerConf(pltrainer_conf=LightningTrainerConf(max_epochs=1)), + ), + ) + } ), ) self.udf1(self.keys, self.datum) self.assertFalse( REDIS_CLIENT.exists( - b"5984175597303660107::VanillaAE::LATEST", - b"5984175597303660107::StdDevThreshold::LATEST", - b"5984175597303660107::StandardScaler::LATEST", + b"5984175597303660107:pipeline1::VanillaAE::LATEST", + b"5984175597303660107:pipeline1::StdDevThreshold::LATEST", + b"5984175597303660107:pipeline1::StandardScaler::LATEST", ) ) @@ -358,15 +447,22 @@ def test_druid_from_config_missing(self): pl_conf = PipelineConf( stream_confs={ "druid-config": StreamConf( - numalogic_conf=NumalogicConf( - model=ModelInfo(name="VanillaAE", conf={"seq_len": 12, "n_features": 2}), - preprocess=[ - ModelInfo(name="LogTransformer"), - ], - trainer=TrainerConf( - pltrainer_conf=LightningTrainerConf(max_epochs=1), - ), - ) + ml_pipelines={ + "pipeline1": MLPipelineConf( + pipeline_id="pipeline1", + numalogic_conf=NumalogicConf( + model=ModelInfo( + name="VanillaAE", conf={"seq_len": 12, "n_features": 2} + ), + preprocess=[ + ModelInfo(name="LogTransformer"), + ], + trainer=TrainerConf( + pltrainer_conf=LightningTrainerConf(max_epochs=1), + ), + ), + ) + } ) }, druid_conf=DruidConf(url="some-url", endpoint="druid/v2", delay_hrs=3), @@ -379,15 +475,22 @@ def test_druid_get_config_error(self): pl_conf = PipelineConf( stream_confs={ "druid-config": StreamConf( - numalogic_conf=NumalogicConf( - model=ModelInfo(name="VanillaAE", conf={"seq_len": 12, "n_features": 2}), - preprocess=[ - ModelInfo(name="LogTransformer"), - ], - trainer=TrainerConf( - pltrainer_conf=LightningTrainerConf(max_epochs=1), - ), - ) + ml_pipelines={ + "pipeline1": MLPipelineConf( + pipeline_id="pipeline1", + numalogic_conf=NumalogicConf( + model=ModelInfo( + name="VanillaAE", conf={"seq_len": 12, "n_features": 2} + ), + preprocess=[ + ModelInfo(name="LogTransformer"), + ], + trainer=TrainerConf( + pltrainer_conf=LightningTrainerConf(max_epochs=1), + ), + ), + ) + } ) }, druid_conf=DruidConf( @@ -395,7 +498,7 @@ def test_druid_get_config_error(self): endpoint="druid/v2", delay_hrs=3, id_fetcher={ - "some-id": DruidFetcherConf( + "some-id-pipeline1": DruidFetcherConf( datasource="some-datasource", dimensions=["some-dimension"] ) }, @@ -403,9 +506,11 @@ def test_druid_get_config_error(self): ) udf3 = DruidTrainerUDF(REDIS_CLIENT, pl_conf=pl_conf) udf3.register_conf("druid-config", pl_conf.stream_confs["druid-config"]) - udf3.register_druid_fetcher_conf("some-id", pl_conf.druid_conf.id_fetcher["some-id"]) + udf3.register_druid_fetcher_conf( + "some-id", "pipeline1", pl_conf.druid_conf.id_fetcher["some-id-pipeline1"] + ) with self.assertRaises(ConfigNotFoundError): - udf3.get_druid_fetcher_conf("different-config") + udf3.get_druid_fetcher_conf("different-config", "pipeline1") with self.assertRaises(ConfigNotFoundError): udf3(self.keys, self.datum) @@ -430,6 +535,7 @@ def setUp(self): payload = { "uuid": "some-uuid", "config_id": "odl-graphql", + "pipeline_id": "pipeline1", "composite_keys": ["odl-odlgraphql-usw2-e2e", "odl-graphql"], "metrics": [ "namespace_app_rollouts_cpu_utilization", @@ -454,9 +560,9 @@ def test_trainer_01(self): self.assertEqual( 3, REDIS_CLIENT.exists( - b"odl-odlgraphql-usw2-e2e:odl-graphql::StandardScaler::LATEST", - b"odl-odlgraphql-usw2-e2e:odl-graphql::Conv1dVAE::LATEST", - b"odl-odlgraphql-usw2-e2e:odl-graphql::MahalanobisThreshold::LATEST", + b"odl-odlgraphql-usw2-e2e:odl-graphql:pipeline1::StandardScaler::LATEST", + b"odl-odlgraphql-usw2-e2e:odl-graphql:pipeline1::Conv1dVAE::LATEST", + b"odl-odlgraphql-usw2-e2e:odl-graphql:pipeline1::MahalanobisThreshold::LATEST", ), ) @@ -469,6 +575,7 @@ def test_trainer_02(self): { "uuid": "some-uuid", "config_id": "myconf", + "pipeline_id": "pipeline1", "composite_keys": [ "dev-devx-druidreverseproxy-usw2-qal", "druid-reverse-proxy", @@ -485,9 +592,9 @@ def test_trainer_02(self): self.assertEqual( 3, REDIS_CLIENT.exists( - b"dev-devx-druidreverseproxy-usw2-qal:druid-reverse-proxy::StandardScaler::LATEST", - b"dev-devx-druidreverseproxy-usw2-qal:druid-reverse-proxy::SparseVanillaAE::LATEST", - b"dev-devx-druidreverseproxy-usw2-qal:druid-reverse-proxy::StdDevThreshold::LATEST", + b"dev-devx-druidreverseproxy-usw2-qal:druid-reverse-proxy:pipeline1::StandardScaler::LATEST", + b"dev-devx-druidreverseproxy-usw2-qal:druid-reverse-proxy:pipeline1::SparseVanillaAE::LATEST", + b"dev-devx-druidreverseproxy-usw2-qal:druid-reverse-proxy:pipeline1::StdDevThreshold::LATEST", ), ) diff --git a/tests/udfs/utility.py b/tests/udfs/utility.py index cb2b8e63..b1d56a3c 100644 --- a/tests/udfs/utility.py +++ b/tests/udfs/utility.py @@ -36,26 +36,22 @@ def input_json_from_file(data_path: str) -> Datum: def store_in_redis(pl_conf, registry): """Store preprocess artifacts in redis.""" - preproc_clfs = [] - preproc_factory = PreprocessFactory() - for _cfg in pl_conf.stream_confs["druid-config"].numalogic_conf.preprocess: - _clf = preproc_factory.get_instance(_cfg) - preproc_clfs.append(_clf) - if any( - _conf.stateful for _conf in pl_conf.stream_confs["druid-config"].numalogic_conf.preprocess - ): - preproc_clf = make_pipeline(*preproc_clfs) - preproc_clf.fit(np.asarray([[1, 3], [4, 6]])) - registry.save_multiple( - skeys=pl_conf.stream_confs["druid-config"].composite_keys, - dict_artifacts={ - "inference": KeyedArtifact(dkeys=["AE"], artifact=VanillaAE(10)), - "preproc": KeyedArtifact( - dkeys=[ - _conf.name - for _conf in pl_conf.stream_confs["druid-config"].numalogic_conf.preprocess - ], - artifact=preproc_clf, - ), - }, - ) + for _pipeline_id, _ml_conf in pl_conf.stream_confs["druid-config"].ml_pipelines.items(): + preproc_clfs = [] + preproc_factory = PreprocessFactory() + for _cfg in _ml_conf.numalogic_conf.preprocess: + _clf = preproc_factory.get_instance(_cfg) + preproc_clfs.append(_clf) + if any(_conf.stateful for _conf in _ml_conf.numalogic_conf.preprocess): + preproc_clf = make_pipeline(*preproc_clfs) + preproc_clf.fit(np.asarray([[1, 3], [4, 6]])) + registry.save_multiple( + skeys=[*pl_conf.stream_confs["druid-config"].composite_keys, _pipeline_id], + dict_artifacts={ + "inference": KeyedArtifact(dkeys=["AE"], artifact=VanillaAE(10)), + "preproc": KeyedArtifact( + dkeys=[_conf.name for _conf in _ml_conf.numalogic_conf.preprocess], + artifact=preproc_clf, + ), + }, + )