diff --git a/mlrun/model_monitoring/batch.py b/mlrun/model_monitoring/batch.py index dfb3bc5047cc..7bffa5b93aed 100644 --- a/mlrun/model_monitoring/batch.py +++ b/mlrun/model_monitoring/batch.py @@ -27,6 +27,7 @@ import v3io import v3io.dataplane import v3io_frames +from v3io_frames.frames_pb2 import IGNORE import mlrun.common.helpers import mlrun.common.model_monitoring.helpers @@ -592,6 +593,15 @@ def _initialize_v3io_configurations(self): container=self.tsdb_container, token=self.v3io_access_key, ) + logger.info( + "Creating table in TSDB if it does not already exist", table=self.tsdb_path + ) + self.frames.create( + backend="tsdb", + table=self.tsdb_path, + if_exists=IGNORE, + rate="1/s", + ) def post_init(self): """ diff --git a/tests/system/model_monitoring/test_model_monitoring.py b/tests/system/model_monitoring/test_model_monitoring.py index 2d2f680de0c8..10d4a6bf70e1 100644 --- a/tests/system/model_monitoring/test_model_monitoring.py +++ b/tests/system/model_monitoring/test_model_monitoring.py @@ -34,6 +34,7 @@ import mlrun.serving.routers from mlrun.errors import MLRunNotFoundError from mlrun.model import BaseMetadata +from mlrun.model_monitoring.writer import _TSDB_BE, ModelMonitoringWriter from mlrun.runtimes import BaseRuntime from mlrun.utils.v3io_clients import get_frames_client from tests.system.base import TestMLRunSystem @@ -959,3 +960,77 @@ def test_model_monitoring_with_kafka_stream(self): assert model_endpoint.status.metrics["generic"]["latency_avg_5m"] > 0 assert model_endpoint.status.metrics["generic"]["predictions_count_5m"] > 0 + + +@TestMLRunSystem.skip_test_if_env_not_configured +@pytest.mark.enterprise +class TestModelInferenceTSDBRecord(TestMLRunSystem): + """ + Test that batch inference records results to V3IO TSDB when tracking is + enabled and the selected model does not have a serving endpoint. + """ + + project_name = "infer-model-tsdb" + name_prefix = "infer-model-only" + + @classmethod + def custom_setup_class(cls) -> None: + dataset = load_iris() + cls.train_set = pd.DataFrame( + dataset.data, # pyright: ignore[reportGeneralTypeIssues] + columns=[ + "sepal_length_cm", + "sepal_width_cm", + "petal_length_cm", + "petal_width_cm", + ], + ) + cls.model_name = "clf_model" + + cls.infer_results_df = cls.train_set.copy() + cls.infer_results_df[ + mlrun.common.schemas.EventFieldType.TIMESTAMP + ] = datetime.utcnow() + + def _log_model(self) -> str: + model = self.project.log_model( # pyright: ignore[reportOptionalMemberAccess] + self.model_name, + model_dir=os.path.relpath(self.assets_path), + model_file="model.pkl", + training_set=self.train_set, + artifact_path=f"v3io:///projects/{self.project_name}", + ) + return model.uri + + @classmethod + def _test_v3io_tsdb_record(cls) -> None: + frames = ModelMonitoringWriter._get_v3io_frames_client( + v3io_container="users", + ) + df: pd.DataFrame = frames.read( + backend=_TSDB_BE, + table=f"pipelines/{cls.project_name}/model-endpoints/events", + start="now-5m", + ) + assert len(df) == 1, "Expected a single record in the TSDB" + assert { + "endpoint_id", + "record_type", + "hellinger_mean", + "kld_mean", + "tvd_mean", + } == set(df.columns), "Unexpected columns in the TSDB record" + + def test_record(self) -> None: + model_uri = self._log_model() + mlrun.model_monitoring.api.record_results( + project=self.project_name, + infer_results_df=self.infer_results_df, + model_path=model_uri, + trigger_monitoring_job=True, + model_endpoint_name=f"{self.name_prefix}-test", + context=mlrun.get_or_create_ctx( + name=f"{self.name_prefix}-context" + ), # pyright: ignore[reportGeneralTypeIssues] + ) + self._test_v3io_tsdb_record()