Skip to content

Commit

Permalink
[Model Monitoring] Create TSDB schema for batch infer [1.5.x]
Browse files Browse the repository at this point in the history
Backport mlrun#4546 to 1.5.x
  • Loading branch information
jond01 committed Nov 12, 2023
1 parent 3603adc commit 7d4013f
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 0 deletions.
10 changes: 10 additions & 0 deletions mlrun/model_monitoring/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import v3io
import v3io.dataplane
import v3io_frames
from v3io_frames.frames_pb2 import IGNORE

import mlrun.common.helpers
import mlrun.common.model_monitoring.helpers
Expand Down Expand Up @@ -592,6 +593,15 @@ def _initialize_v3io_configurations(self):
container=self.tsdb_container,
token=self.v3io_access_key,
)
logger.info(
"Creating table in TSDB if it does not already exist", table=self.tsdb_path
)
self.frames.create(
backend="tsdb",
table=self.tsdb_path,
if_exists=IGNORE,
rate="1/s",
)

def post_init(self):
"""
Expand Down
75 changes: 75 additions & 0 deletions tests/system/model_monitoring/test_model_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import mlrun.serving.routers
from mlrun.errors import MLRunNotFoundError
from mlrun.model import BaseMetadata
from mlrun.model_monitoring.writer import _TSDB_BE, ModelMonitoringWriter
from mlrun.runtimes import BaseRuntime
from mlrun.utils.v3io_clients import get_frames_client
from tests.system.base import TestMLRunSystem
Expand Down Expand Up @@ -959,3 +960,77 @@ def test_model_monitoring_with_kafka_stream(self):

assert model_endpoint.status.metrics["generic"]["latency_avg_5m"] > 0
assert model_endpoint.status.metrics["generic"]["predictions_count_5m"] > 0


@TestMLRunSystem.skip_test_if_env_not_configured
@pytest.mark.enterprise
class TestModelInferenceTSDBRecord(TestMLRunSystem):
"""
Test that batch inference records results to V3IO TSDB when tracking is
enabled and the selected model does not have a serving endpoint.
"""

project_name = "infer-model-tsdb"
name_prefix = "infer-model-only"

@classmethod
def custom_setup_class(cls) -> None:
dataset = load_iris()
cls.train_set = pd.DataFrame(
dataset.data, # pyright: ignore[reportGeneralTypeIssues]
columns=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
],
)
cls.model_name = "clf_model"

cls.infer_results_df = cls.train_set.copy()
cls.infer_results_df[
mlrun.common.schemas.EventFieldType.TIMESTAMP
] = datetime.utcnow()

def _log_model(self) -> str:
model = self.project.log_model( # pyright: ignore[reportOptionalMemberAccess]
self.model_name,
model_dir=os.path.relpath(self.assets_path),
model_file="model.pkl",
training_set=self.train_set,
artifact_path=f"v3io:///projects/{self.project_name}",
)
return model.uri

@classmethod
def _test_v3io_tsdb_record(cls) -> None:
frames = ModelMonitoringWriter._get_v3io_frames_client(
v3io_container="users",
)
df: pd.DataFrame = frames.read(
backend=_TSDB_BE,
table=f"pipelines/{cls.project_name}/model-endpoints/events",
start="now-5m",
)
assert len(df) == 1, "Expected a single record in the TSDB"
assert {
"endpoint_id",
"record_type",
"hellinger_mean",
"kld_mean",
"tvd_mean",
} == set(df.columns), "Unexpected columns in the TSDB record"

def test_record(self) -> None:
model_uri = self._log_model()
mlrun.model_monitoring.api.record_results(
project=self.project_name,
infer_results_df=self.infer_results_df,
model_path=model_uri,
trigger_monitoring_job=True,
model_endpoint_name=f"{self.name_prefix}-test",
context=mlrun.get_or_create_ctx(
name=f"{self.name_prefix}-context"
), # pyright: ignore[reportGeneralTypeIssues]
)
self._test_v3io_tsdb_record()

0 comments on commit 7d4013f

Please sign in to comment.