Skip to content

Commit

Permalink
Add a system test
Browse files Browse the repository at this point in the history
  • Loading branch information
jond01 committed Oct 1, 2023
1 parent 294fd9b commit caf6368
Showing 1 changed file with 91 additions and 3 deletions.
94 changes: 91 additions & 3 deletions tests/system/model_monitoring/test_model_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,21 @@
#
import json
import os
import pickle
import string
from datetime import datetime, timedelta, timezone
from random import choice, randint, uniform
from time import monotonic, sleep
from typing import Optional
from typing import Optional, Union

import fsspec
import numpy as np
import pandas as pd
import pytest
import v3iofs
from sklearn.datasets import load_diabetes, load_iris
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC

import mlrun.artifacts.model
import mlrun.common.schemas.model_monitoring
Expand Down Expand Up @@ -306,7 +309,6 @@ def test_basic_model_monitoring(self):
)

def _assert_model_endpoint_metrics(self):

endpoints_list = mlrun.get_run_db().list_model_endpoints(
self.project_name, metrics=["predictions_per_second"]
)
Expand Down Expand Up @@ -763,7 +765,8 @@ def _check_kv_schema_file(self):
@pytest.mark.enterprise
class TestBatchDrift(TestMLRunSystem):
"""Record monitoring parquet results and trigger the monitoring batch drift job analysis. This flow tests
the monitoring process of the batch infer job function that can be imported from the functions hub."""
the monitoring process of the batch infer job function that can be imported from the functions hub.
"""

project_name = "pr-batch-drift"

Expand Down Expand Up @@ -959,3 +962,88 @@ def test_model_monitoring_with_kafka_stream(self):

assert model_endpoint.status.metrics["generic"]["latency_avg_5m"] > 0
assert model_endpoint.status.metrics["generic"]["predictions_count_5m"] > 0


@TestMLRunSystem.skip_test_if_env_not_configured
@pytest.mark.enterprise
class TestInferenceWithSpecialChars(TestMLRunSystem):
project_name = "pr-infer-special-chars"
name_prefix = "infer-monitoring"

def __init__(self) -> None:
self.classif = SVC()
self.model_name = "classif_model"
self.columns = ["feat 1", "b (C)", "Last for df "]
self.y_name = "class (0-4) "
self.num_rows = 20
self.num_cols = len(self.columns)
self.num_classes = 5
self.x_train, self.x_test, self.y_train, self.y_test = self._generate_data()
self.training_set = self.x_train.join(self.y_train)
self.test_set = self.x_test.join(self.y_test)
self.infer_results_df = self.test_set
self.infer_results_df[
mlrun.common.schemas.EventFieldType.TIMESTAMP
] = datetime.utcnow()
self.endpoint_id = "5d6ce0e704442c0ac59a933cb4d238baba83bb5d"
self.function_name = f"{self.name_prefix}-function"
self._train()

def _generate_data(self) -> list[Union[pd.DataFrame, pd.Series]]:
rng = np.random.default_rng(seed=23)
x = pd.DataFrame(
rng.random((self.num_rows, self.num_cols)), columns=self.columns
)
y = pd.Series(np.arange(self.num_rows) % self.num_classes, name=self.y_name)
assert self.num_rows > self.num_classes
return train_test_split(x, y, train_size=0.6, random_state=4)

def _train(self) -> None:
self.classif.fit(
self.x_train, self.y_train # pyright: ignore[reportGeneralTypeIssues]
)

def _get_monitoring_feature_set(self) -> mlrun.feature_store.FeatureSet:
model_endpoint = mlrun.get_run_db().get_model_endpoint(
project=self.project_name,
endpoint_id=self.endpoint_id,
)
return mlrun.feature_store.get_feature_set(
model_endpoint.status.monitoring_feature_set_uri
)

def _test_feature_names(self) -> None:
feature_set = self._get_monitoring_feature_set()
features = feature_set.spec.features
feature_names = [feat.name for feat in features]
assert feature_names == [
mlrun.feature_store.api.norm_column_name(feat)
for feat in self.columns + [self.y_name]
]

def test_batch_drift(self) -> None:
project = mlrun.get_run_db().get_project(self.project_name)
context = mlrun.get_or_create_ctx(name=f"{self.name_prefix}-context")
project.log_model(
self.model_name,
body=pickle.dumps(self.classif),
model_file="classif.pkl",
framework="sklearn",
training_set=self.training_set,
label_column=self.y_name,
)

mlrun.model_monitoring.api.record_results(
project=self.project_name,
model_path=project.get_artifact_uri(
key=self.model_name, category="model", tag="latest"
),
model_endpoint_name=f"{self.name_prefix}-test",
function_name=self.function_name,
endpoint_id=self.endpoint_id,
context=context,
infer_results_df=self.infer_results_df,
trigger_monitoring_job=True,
)

self._test_feature_names()

0 comments on commit caf6368

Please sign in to comment.