Skip to content

Commit

Permalink
feat: ml pipelines
Browse files Browse the repository at this point in the history
Signed-off-by: Nandita Koppisetty <[email protected]>
  • Loading branch information
nkoppisetty committed Dec 11, 2023
1 parent 28fa28f commit d34b3d2
Show file tree
Hide file tree
Showing 24 changed files with 663 additions and 292 deletions.
29 changes: 19 additions & 10 deletions numalogic/backtest/_prom.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from numalogic.connectors.prometheus import PrometheusFetcher
from numalogic.tools.data import StreamingDataset, inverse_window
from numalogic.tools.types import artifact_t
from numalogic.udfs import UDFFactory, StreamConf
from numalogic.udfs import UDFFactory, StreamConf, MLPipelineConf

DEFAULT_OUTPUT_DIR = os.path.join(BASE_DIR, ".btoutput")
LOGGER = logging.getLogger(__name__)
Expand All @@ -54,8 +54,12 @@ def _init_default_streamconf(metrics: list[str]) -> StreamConf:
return StreamConf(
source=ConnectorType.prometheus,
window_size=DEFAULT_SEQUENCE_LEN,
metrics=metrics,
numalogic_conf=numalogic_cfg,
ml_pipelines={
"default": MLPipelineConf(
metrics=metrics,
numalogic_conf=numalogic_cfg,
)
},
)


Expand All @@ -73,6 +77,7 @@ class PromUnivarBacktester:
output_dir: Output directory
test_ratio: Ratio of test data to total data
stream_conf: Stream configuration
pipeline_id: ml pipeline id from stream_conf
"""

def __init__(
Expand All @@ -86,18 +91,20 @@ def __init__(
output_dir: Union[str, Path] = DEFAULT_OUTPUT_DIR,
test_ratio: float = 0.25,
stream_conf: Optional[StreamConf] = None,
pipeline_id: Optional[str] = "default",
):
self._url = url
self.namespace = namespace
self.appname = appname
self.metric = metric
self.conf = stream_conf or _init_default_streamconf([metric])
self.ml_pl_conf = self.conf.ml_pipelines[pipeline_id]
self.test_ratio = test_ratio
self.lookback_days = lookback_days
self.return_labels = return_labels

self._seq_len = self.conf.window_size
self._n_features = len(self.conf.metrics)
self._n_features = len(self.ml_pl_conf.metrics)

self.out_dir = self.get_outdir(appname, metric, outdir=output_dir)
self._datapath = os.path.join(self.out_dir, "data.csv")
Expand Down Expand Up @@ -170,13 +177,13 @@ def train_models(
LOGGER.info("Training data shape: %s", x_train.shape)

artifacts = UDFFactory.get_udf_cls("trainer").compute(
model=ModelFactory().get_instance(self.conf.numalogic_conf.model),
model=ModelFactory().get_instance(self.ml_pl_conf.numalogic_conf.model),
input_=x_train,
preproc_clf=PreprocessFactory().get_pipeline_instance(
self.conf.numalogic_conf.preprocess
self.ml_pl_conf.numalogic_conf.preprocess
),
threshold_clf=ThresholdFactory().get_instance(self.conf.numalogic_conf.threshold),
numalogic_cfg=self.conf.numalogic_conf,
threshold_clf=ThresholdFactory().get_instance(self.ml_pl_conf.numalogic_conf.threshold),
numalogic_cfg=self.ml_pl_conf.numalogic_conf,
)
artifacts_dict = {
"model": artifacts["inference"].artifact,
Expand Down Expand Up @@ -233,10 +240,12 @@ def generate_scores(

ds = StreamingDataset(x_scaled, seq_len=self.conf.window_size)
anomaly_scores = np.zeros(
(len(ds), self.conf.window_size, len(self.conf.metrics)), dtype=np.float32
(len(ds), self.conf.window_size, len(self.ml_pl_conf.metrics)), dtype=np.float32
)
x_recon = np.zeros_like(anomaly_scores, dtype=np.float32)
postproc_func = PostprocessFactory().get_instance(self.conf.numalogic_conf.postprocess)
postproc_func = PostprocessFactory().get_instance(
self.ml_pl_conf.numalogic_conf.postprocess
)

for idx, arr in enumerate(ds):
x_recon[idx] = nn_udf.compute(model=artifacts["model"], input_=arr)
Expand Down
3 changes: 2 additions & 1 deletion numalogic/udfs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from numalogic._constants import BASE_DIR
from numalogic.udfs._base import NumalogicUDF
from numalogic.udfs._config import StreamConf, PipelineConf, load_pipeline_conf
from numalogic.udfs._config import StreamConf, PipelineConf, MLPipelineConf, load_pipeline_conf
from numalogic.udfs.factory import UDFFactory, ServerFactory
from numalogic.udfs.inference import InferenceUDF
from numalogic.udfs.postprocess import PostprocessUDF
Expand Down Expand Up @@ -34,6 +34,7 @@ def set_logger() -> None:
"UDFFactory",
"StreamConf",
"PipelineConf",
"MLPipelineConf",
"load_pipeline_conf",
"ServerFactory",
"set_logger",
Expand Down
43 changes: 37 additions & 6 deletions numalogic/udfs/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

from numalogic.tools.exceptions import ConfigNotFoundError
from numalogic.tools.types import artifact_t
from numalogic.udfs._config import PipelineConf, StreamConf

from numalogic.udfs._config import StreamConf, PipelineConf, MLPipelineConf

_DEFAULT_CONF_ID = "default"

Expand Down Expand Up @@ -61,17 +60,28 @@ def register_conf(self, config_id: str, conf: StreamConf) -> None:
"""
self.pl_conf.stream_confs[config_id] = conf

def _get_default_conf(self, config_id) -> StreamConf:
def _get_default_stream_conf(self, config_id) -> StreamConf:
"""Get the default config."""
try:
return self.pl_conf.stream_confs[_DEFAULT_CONF_ID]
except KeyError:
err_msg = f"Config with ID {config_id} or {_DEFAULT_CONF_ID} not found!"
raise ConfigNotFoundError(err_msg) from None

def get_conf(self, config_id: str) -> StreamConf:
def _get_default_ml_pipeline_conf(self, config_id, pipeline_id) -> MLPipelineConf:
"""Get the default pipeline config."""
try:
return self.pl_conf.stream_confs[_DEFAULT_CONF_ID].ml_pipelines[_DEFAULT_CONF_ID]
except KeyError:
err_msg = (

Check warning on line 76 in numalogic/udfs/_base.py

View check run for this annotation

Codecov / codecov/patch

numalogic/udfs/_base.py#L75-L76

Added lines #L75 - L76 were not covered by tests
f"Pipeline with ID {pipeline_id} or {_DEFAULT_CONF_ID} "
f"not found for config ID {config_id}!"
)
raise ConfigNotFoundError(err_msg) from None

Check warning on line 80 in numalogic/udfs/_base.py

View check run for this annotation

Codecov / codecov/patch

numalogic/udfs/_base.py#L80

Added line #L80 was not covered by tests

def get_stream_conf(self, config_id: str) -> StreamConf:
"""
Get config with the given ID.
Get stream config with the given ID.
If not found, return the default config.
Args:
Expand All @@ -88,7 +98,28 @@ def get_conf(self, config_id: str) -> StreamConf:
try:
return self.pl_conf.stream_confs[config_id]
except KeyError:
return self._get_default_conf(config_id)
return self._get_default_stream_conf(config_id)

Check warning on line 101 in numalogic/udfs/_base.py

View check run for this annotation

Codecov / codecov/patch

numalogic/udfs/_base.py#L101

Added line #L101 was not covered by tests

def get_ml_pipeline_conf(self, config_id: str, pipeline_id: str) -> MLPipelineConf:
"""
Get stream's pipeline config with the given ID.
If not found, return the default config.
Args:
config_id: Config ID
Returns
-------
StreamConf object
Raises
------
ConfigNotFoundError: If config with the given ID is not found
"""
try:
return self.pl_conf.stream_confs[config_id].ml_pipelines[pipeline_id]
except KeyError:
return self._get_default_ml_pipeline_conf(config_id, pipeline_id)

def exec(self, keys: list[str], datum: Datum) -> Messages:
"""
Expand Down
10 changes: 8 additions & 2 deletions numalogic/udfs/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@
_logger = logging.getLogger(__name__)


@dataclass
class MLPipelineConf:
pipeline_id: str = "default"
metrics: list[str] = field(default_factory=list)
numalogic_conf: NumalogicConf = field(default_factory=lambda: NumalogicConf())


@dataclass
class StreamConf:
config_id: str = "default"
source: ConnectorType = ConnectorType.druid # TODO: do not allow redis connector here
window_size: int = 12
composite_keys: list[str] = field(default_factory=list)
metrics: list[str] = field(default_factory=list)
numalogic_conf: NumalogicConf = field(default_factory=lambda: NumalogicConf())
ml_pipelines: dict[str, MLPipelineConf] = field(default_factory=dict)


@dataclass
Expand Down
1 change: 1 addition & 0 deletions numalogic/udfs/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Header(str, Enum):
class _BasePayload:
uuid: str
config_id: str
pipeline_id: str
composite_keys: list[str]


Expand Down
2 changes: 2 additions & 0 deletions numalogic/udfs/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ class UDFFactory:
"""Factory class to fetch the right UDF."""

from numalogic.udfs import NumalogicUDF
from numalogic.udfs.pipeline import PipelineUDF
from numalogic.udfs.inference import InferenceUDF
from numalogic.udfs.postprocess import PostprocessUDF
from numalogic.udfs.preprocess import PreprocessUDF
from numalogic.udfs.trainer import DruidTrainerUDF, PromTrainerUDF

_UDF_MAP: ClassVar[dict[str, type[NumalogicUDF]]] = {
"pipeline": PipelineUDF,
"preprocess": PreprocessUDF,
"inference": InferenceUDF,
"postprocess": PostprocessUDF,
Expand Down
11 changes: 7 additions & 4 deletions numalogic/udfs/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
UDF_TIME,
_increment_counter,
)
from numalogic.udfs.tools import _load_artifact
from numalogic.udfs.tools import _load_artifact, get_skeys

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -124,10 +124,11 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
)
return Messages(Message(keys=keys, value=payload.to_json()))

_conf = self.get_conf(payload.config_id)
_stream_conf = self.get_stream_conf(payload.config_id)
_conf = _stream_conf.ml_pipelines[payload.pipeline_id]

artifact_data, payload = _load_artifact(
skeys=[_ckey for _, _ckey in zip(_conf.composite_keys, payload.composite_keys)],
skeys=get_skeys(payload, _stream_conf),
dkeys=[_conf.numalogic_conf.model.name],
payload=payload,
model_registry=self.model_registry,
Expand Down Expand Up @@ -210,7 +211,9 @@ def is_model_stale(self, artifact_data: ArtifactData, payload: StreamPayload) ->
-------
True if artifact is stale, False otherwise
"""
_conf = self.get_conf(payload.config_id)
_conf = self.get_ml_pipeline_conf(
config_id=payload.config_id, pipeline_id=payload.pipeline_id
)
if (
self.model_registry.is_artifact_stale(
artifact_data, _conf.numalogic_conf.trainer.retrain_freq_hr
Expand Down
78 changes: 78 additions & 0 deletions numalogic/udfs/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import json
import os
import time
import orjson
import logging
from typing import Optional

from numpy import typing as npt
from pynumaflow.mapper import Datum, Messages, Message

from numalogic.tools.types import artifact_t
from numalogic.udfs._metrics import UDF_TIME
from numalogic.udfs import NumalogicUDF
from numalogic.udfs._config import PipelineConf

# TODO: move to config
LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", "3600"))
LOCAL_CACHE_SIZE = int(os.getenv("LOCAL_CACHE_SIZE", "10000"))
LOAD_LATEST = os.getenv("LOAD_LATEST", "false").lower() == "true"

_LOGGER = logging.getLogger(__name__)


class PipelineUDF(NumalogicUDF):
"""
Pipeline UDF for Numalogic.
Args:
pl_conf: PipelineConf instance
"""

@classmethod
def compute(cls, model: artifact_t, input_: npt.NDArray[float], **kwargs):
pass

Check warning on line 34 in numalogic/udfs/pipeline.py

View check run for this annotation

Codecov / codecov/patch

numalogic/udfs/pipeline.py#L34

Added line #L34 was not covered by tests

def __init__(self, pl_conf: Optional[PipelineConf] = None):
super().__init__(pl_conf=pl_conf, _vtx="pipeline")

@UDF_TIME.time()
def exec(self, keys: list[str], datum: Datum) -> Messages:
"""
The preprocess function here receives data from the data source.
Perform preprocess on the input data.
Args:
-------
keys: List of keys
datum: Datum object
Returns
-------
Messages instance
"""
_start_time = time.perf_counter()

# check message sanity
try:
data_payload = orjson.loads(datum.value)
_LOGGER.info("%s - Data payload: %s", data_payload["uuid"], data_payload)
except (orjson.JSONDecodeError, KeyError): # catch json decode error only
_LOGGER.exception("Error while decoding input json")
return Messages(Message.to_drop())

Check warning on line 64 in numalogic/udfs/pipeline.py

View check run for this annotation

Codecov / codecov/patch

numalogic/udfs/pipeline.py#L62-L64

Added lines #L62 - L64 were not covered by tests

_stream_conf = self.get_stream_conf(data_payload["config_id"])

# create a new message for each ML pipeline
messages = Messages()
for pipeline in _stream_conf.ml_pipelines:
data_payload["pipeline_id"] = pipeline
messages.append(Message(keys=keys, value=str.encode(json.dumps(data_payload))))

_LOGGER.debug(
"Time taken to execute Pipeline: %.4f sec",
time.perf_counter() - _start_time,
)
return messages
12 changes: 7 additions & 5 deletions numalogic/udfs/postprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from numalogic.udfs import NumalogicUDF
from numalogic.udfs._config import PipelineConf
from numalogic.udfs.entities import StreamPayload, Header, Status, TrainerPayload, OutputPayload
from numalogic.udfs.tools import _load_artifact
from numalogic.udfs.tools import _load_artifact, get_skeys

# TODO: move to config
LOCAL_CACHE_TTL = int(os.getenv("LOCAL_CACHE_TTL", "3600"))
Expand Down Expand Up @@ -91,13 +91,14 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
)

# load configs
_conf = self.get_conf(payload.config_id)
_stream_conf = self.get_stream_conf(payload.config_id)
_conf = _stream_conf.ml_pipelines[payload.pipeline_id]
thresh_cfg = _conf.numalogic_conf.threshold
postprocess_cfg = _conf.numalogic_conf.postprocess

# load artifact
thresh_artifact, payload = _load_artifact(
skeys=[_ckey for _, _ckey in zip(_conf.composite_keys, payload.composite_keys)],
skeys=get_skeys(payload, _stream_conf),
dkeys=[thresh_cfg.name],
payload=payload,
model_registry=self.model_registry,
Expand Down Expand Up @@ -141,6 +142,7 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:
out_payload = OutputPayload(
uuid=payload.uuid,
config_id=payload.config_id,
pipeline_id=payload.pipeline_id,
composite_keys=payload.composite_keys,
timestamp=payload.end_ts,
unified_anomaly=np.max(anomaly_scores),
Expand All @@ -157,13 +159,13 @@ def exec(self, keys: list[str], datum: Datum) -> Messages:

# Forward payload if a training request is tagged
if payload.header == Header.TRAIN_REQUEST or payload.status == Status.ARTIFACT_STALE:
_conf = self.get_conf(payload.config_id)
ckeys = [_ckey for _, _ckey in zip(_conf.composite_keys, payload.composite_keys)]
ckeys = [_ckey for _, _ckey in zip(_stream_conf.composite_keys, payload.composite_keys)]
train_payload = TrainerPayload(
uuid=payload.uuid,
composite_keys=ckeys,
metrics=payload.metrics,
config_id=payload.config_id,
pipeline_id=payload.pipeline_id,
)
messages.append(Message(keys=keys, value=train_payload.to_json(), tags=["train"]))
_LOGGER.debug(
Expand Down
Loading

0 comments on commit d34b3d2

Please sign in to comment.