diff --git a/tests/system/model_monitoring/test_model_monitoring.py b/tests/system/model_monitoring/test_model_monitoring.py index 2d2f680de0c8..60088fc0aaaf 100644 --- a/tests/system/model_monitoring/test_model_monitoring.py +++ b/tests/system/model_monitoring/test_model_monitoring.py @@ -14,11 +14,12 @@ # import json import os +import pickle import string from datetime import datetime, timedelta, timezone from random import choice, randint, uniform from time import monotonic, sleep -from typing import Optional +from typing import Optional, Union import fsspec import numpy as np @@ -26,6 +27,8 @@ import pytest import v3iofs from sklearn.datasets import load_diabetes, load_iris +from sklearn.model_selection import train_test_split +from sklearn.svm import SVC import mlrun.artifacts.model import mlrun.common.schemas.model_monitoring @@ -306,7 +309,6 @@ def test_basic_model_monitoring(self): ) def _assert_model_endpoint_metrics(self): - endpoints_list = mlrun.get_run_db().list_model_endpoints( self.project_name, metrics=["predictions_per_second"] ) @@ -763,7 +765,8 @@ def _check_kv_schema_file(self): @pytest.mark.enterprise class TestBatchDrift(TestMLRunSystem): """Record monitoring parquet results and trigger the monitoring batch drift job analysis. This flow tests - the monitoring process of the batch infer job function that can be imported from the functions hub.""" + the monitoring process of the batch infer job function that can be imported from the functions hub. + """ project_name = "pr-batch-drift" @@ -959,3 +962,88 @@ def test_model_monitoring_with_kafka_stream(self): assert model_endpoint.status.metrics["generic"]["latency_avg_5m"] > 0 assert model_endpoint.status.metrics["generic"]["predictions_count_5m"] > 0 + + +@TestMLRunSystem.skip_test_if_env_not_configured +@pytest.mark.enterprise +class TestInferenceWithSpecialChars(TestMLRunSystem): + project_name = "pr-infer-special-chars" + name_prefix = "infer-monitoring" + + def __init__(self) -> None: + self.classif = SVC() + self.model_name = "classif_model" + self.columns = ["feat 1", "b (C)", "Last for df "] + self.y_name = "class (0-4) " + self.num_rows = 20 + self.num_cols = len(self.columns) + self.num_classes = 5 + self.x_train, self.x_test, self.y_train, self.y_test = self._generate_data() + self.training_set = self.x_train.join(self.y_train) + self.test_set = self.x_test.join(self.y_test) + self.infer_results_df = self.test_set + self.infer_results_df[ + mlrun.common.schemas.EventFieldType.TIMESTAMP + ] = datetime.utcnow() + self.endpoint_id = "5d6ce0e704442c0ac59a933cb4d238baba83bb5d" + self.function_name = f"{self.name_prefix}-function" + self._train() + + def _generate_data(self) -> list[Union[pd.DataFrame, pd.Series]]: + rng = np.random.default_rng(seed=23) + x = pd.DataFrame( + rng.random((self.num_rows, self.num_cols)), columns=self.columns + ) + y = pd.Series(np.arange(self.num_rows) % self.num_classes, name=self.y_name) + assert self.num_rows > self.num_classes + return train_test_split(x, y, train_size=0.6, random_state=4) + + def _train(self) -> None: + self.classif.fit( + self.x_train, self.y_train # pyright: ignore[reportGeneralTypeIssues] + ) + + def _get_monitoring_feature_set(self) -> mlrun.feature_store.FeatureSet: + model_endpoint = mlrun.get_run_db().get_model_endpoint( + project=self.project_name, + endpoint_id=self.endpoint_id, + ) + return mlrun.feature_store.get_feature_set( + model_endpoint.status.monitoring_feature_set_uri + ) + + def _test_feature_names(self) -> None: + feature_set = self._get_monitoring_feature_set() + features = feature_set.spec.features + feature_names = [feat.name for feat in features] + assert feature_names == [ + mlrun.feature_store.api.norm_column_name(feat) + for feat in self.columns + [self.y_name] + ] + + def test_batch_drift(self) -> None: + project = mlrun.get_run_db().get_project(self.project_name) + context = mlrun.get_or_create_ctx(name=f"{self.name_prefix}-context") + project.log_model( + self.model_name, + body=pickle.dumps(self.classif), + model_file="classif.pkl", + framework="sklearn", + training_set=self.training_set, + label_column=self.y_name, + ) + + mlrun.model_monitoring.api.record_results( + project=self.project_name, + model_path=project.get_artifact_uri( + key=self.model_name, category="model", tag="latest" + ), + model_endpoint_name=f"{self.name_prefix}-test", + function_name=self.function_name, + endpoint_id=self.endpoint_id, + context=context, + infer_results_df=self.infer_results_df, + trigger_monitoring_job=True, + ) + + self._test_feature_names()