Skip to content

Commit

Permalink
[Model Monitoring] Fix TDEngine case-insensitivity in the table name …
Browse files Browse the repository at this point in the history
…[1.7.x] (mlrun#6383)
  • Loading branch information
jond01 authored Sep 23, 2024
1 parent 7691271 commit 5b4b714
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 35 deletions.
2 changes: 1 addition & 1 deletion dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def extra_requirements() -> dict[str, list[str]]:
"distributed~=2023.12.1",
],
"alibaba-oss": ["ossfs==2023.12.0", "oss2==2.18.1"],
"tdengine": ["taos-ws-py~=0.3.2"],
"tdengine": ["taos-ws-py~=0.3.3"],
"snowflake": ["snowflake-connector-python~=3.7"],
}

Expand Down
4 changes: 2 additions & 2 deletions extras-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,5 @@ databricks-sdk~=0.13.0
sqlalchemy~=1.4
dask~=2023.12.1
distributed~=2023.12.1
taos-ws-py~=0.3.2
snowflake-connector-python~=3.7
taos-ws-py~=0.3.3
snowflake-connector-python~=3.7
10 changes: 5 additions & 5 deletions mlrun/model_monitoring/db/tsdb/tdengine/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,20 @@ def _create_super_table_query(self) -> str:
tags = ", ".join(f"{col} {val}" for col, val in self.tags.items())
return f"CREATE STABLE if NOT EXISTS {self.database}.{self.super_table} ({columns}) TAGS ({tags});"

def _create_subtable_query(
def _create_subtable_sql(
self,
subtable: str,
values: dict[str, Union[str, int, float, datetime.datetime]],
) -> str:
try:
values = ", ".join(f"'{values[val]}'" for val in self.tags)
tags = ", ".join(f"'{values[val]}'" for val in self.tags)
except KeyError:
raise mlrun.errors.MLRunInvalidArgumentError(
f"values must contain all tags: {self.tags.keys()}"
)
return f"CREATE TABLE if NOT EXISTS {self.database}.{subtable} USING {self.super_table} TAGS ({values});"
return f"CREATE TABLE if NOT EXISTS {self.database}.{subtable} USING {self.super_table} TAGS ({tags});"

def _insert_subtable_query(
def _insert_subtable_stmt(
self,
connection: taosws.Connection,
subtable: str,
Expand All @@ -116,7 +116,7 @@ def _insert_subtable_query(
stmt = connection.statement()
question_marks = ", ".join("?" * len(self.columns))
stmt.prepare(f"INSERT INTO ? VALUES ({question_marks});")
stmt.set_tbname_tags(subtable, [])
stmt.set_tbname(subtable)

bind_params = []

Expand Down
25 changes: 13 additions & 12 deletions mlrun/model_monitoring/db/tsdb/tdengine/tdengine_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def write_application_event(
self,
event: dict,
kind: mm_schemas.WriterEventKind = mm_schemas.WriterEventKind.RESULT,
):
) -> None:
"""
Write a single result or metric to TSDB.
"""
Expand All @@ -113,17 +113,21 @@ def write_application_event(
# Write a new result
table = self.tables[mm_schemas.TDEngineSuperTables.APP_RESULTS]
table_name = (
f"{table_name}_" f"{event[mm_schemas.ResultData.RESULT_NAME]}"
f"{table_name}_{event[mm_schemas.ResultData.RESULT_NAME]}"
).replace("-", "_")
event.pop(mm_schemas.ResultData.CURRENT_STATS, None)

else:
# Write a new metric
table = self.tables[mm_schemas.TDEngineSuperTables.METRICS]
table_name = (
f"{table_name}_" f"{event[mm_schemas.MetricData.METRIC_NAME]}"
f"{table_name}_{event[mm_schemas.MetricData.METRIC_NAME]}"
).replace("-", "_")

# Escape the table name for case-sensitivity (ML-7908)
# https://github.com/taosdata/taos-connector-python/issues/260
table_name = f"`{table_name}`"

# Convert the datetime strings to datetime objects
event[mm_schemas.WriterEvent.END_INFER_TIME] = self._convert_to_datetime(
val=event[mm_schemas.WriterEvent.END_INFER_TIME]
Expand All @@ -132,15 +136,11 @@ def write_application_event(
val=event[mm_schemas.WriterEvent.START_INFER_TIME]
)

create_table_query = table._create_subtable_query(
subtable=table_name, values=event
)
self.connection.execute(create_table_query)
create_table_sql = table._create_subtable_sql(subtable=table_name, values=event)
self.connection.execute(create_table_sql)

insert_statement = table._insert_subtable_query(
self.connection,
subtable=table_name,
values=event,
insert_statement = table._insert_subtable_stmt(
self.connection, subtable=table_name, values=event
)
insert_statement.add_batch()
insert_statement.execute()
Expand Down Expand Up @@ -280,6 +280,7 @@ def _get_records(
timestamp_column=timestamp_column,
database=self.database,
)
logger.debug("Querying TDEngine", query=full_query)
try:
query_result = self.connection.query(full_query)
except taosws.QueryError as e:
Expand Down Expand Up @@ -336,7 +337,7 @@ def read_metrics_data(

metrics_condition = " OR ".join(
[
f"({mm_schemas.WriterEvent.APPLICATION_NAME} = '{metric.app}' AND {name} = '{metric.name}')"
f"({mm_schemas.WriterEvent.APPLICATION_NAME}='{metric.app}' AND {name}='{metric.name}')"
for metric in metrics
]
)
Expand Down
24 changes: 12 additions & 12 deletions tests/model_monitoring/db/tsdb/tdengine/test_tdengine_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@

import os
import uuid
from datetime import datetime
from collections.abc import Iterator
from datetime import datetime, timezone

import pytest
import pytz
import taosws

from mlrun.common.schemas.model_monitoring import (
Expand All @@ -31,16 +31,16 @@
database = "test_tdengine_connector_" + uuid.uuid4().hex


def drop_database(connection):
def drop_database(connection: taosws.Connection) -> None:
connection.execute(f"DROP DATABASE IF EXISTS {database}")


def is_tdengine_defined():
return connection_string and connection_string.startswith("taosws://")
def is_tdengine_defined() -> bool:
return connection_string is not None and connection_string.startswith("taosws://")


@pytest.fixture
def connector() -> TDEngineConnector:
def connector() -> Iterator[TDEngineConnector]:
connection = taosws.connect()
drop_database(connection)
conn = TDEngineConnector(
Expand All @@ -53,13 +53,13 @@ def connector() -> TDEngineConnector:


@pytest.mark.skipif(not is_tdengine_defined(), reason="TDEngine is not defined")
def test_write_application_event(connector):
def test_write_application_event(connector: TDEngineConnector) -> None:
endpoint_id = "1"
app_name = "my_app"
result_name = "my_result"
result_name = "my_Result"
result_kind = 0
start_infer_time = datetime(2024, 1, 1, tzinfo=pytz.UTC)
end_infer_time = datetime(2024, 1, 1, second=1, tzinfo=pytz.UTC)
start_infer_time = datetime(2024, 1, 1, tzinfo=timezone.utc)
end_infer_time = datetime(2024, 1, 1, second=1, tzinfo=timezone.utc)
result_status = 0
result_value = 123
data = {
Expand All @@ -86,7 +86,7 @@ def test_write_application_event(connector):
ModelEndpointMonitoringMetric(
project=project,
app=app_name,
name="my_result",
name=result_name,
full_name=f"{project}.{app_name}.result.{result_name}",
type=ModelEndpointMonitoringMetricType.RESULT,
),
Expand All @@ -96,9 +96,9 @@ def test_write_application_event(connector):
assert len(read_back_results) == 1
read_back_result = read_back_results[0]
assert read_back_result.full_name == f"{project}.{app_name}.result.{result_name}"
assert read_back_result.data
assert read_back_result.result_kind.value == result_kind
assert read_back_result.type == "result"
assert read_back_result.data
assert len(read_back_result.values) == 1
read_back_values = read_back_result.values[0]
assert read_back_values.timestamp == end_infer_time
Expand Down
5 changes: 2 additions & 3 deletions tests/model_monitoring/test_tdengine.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import datetime
from typing import Union

Expand Down Expand Up @@ -75,15 +74,15 @@ def test_create_sub_table(
remove_tag: bool,
):
assert (
super_table._create_subtable_query(subtable=subtable, values=values)
super_table._create_subtable_sql(subtable=subtable, values=values)
== f"CREATE TABLE if NOT EXISTS {_MODEL_MONITORING_DATABASE}.{subtable} "
f"USING {super_table.super_table} TAGS ('{values['tag1']}', '{values['tag2']}');"
)
if remove_tag:
# test with missing tag
values.pop("tag1")
with pytest.raises(mlrun.errors.MLRunInvalidArgumentError):
super_table._create_subtable_query(subtable=subtable, values=values)
super_table._create_subtable_sql(subtable=subtable, values=values)

@pytest.mark.parametrize(
("subtable", "remove_tag"), [("subtable_1", False), ("subtable_2", True)]
Expand Down

0 comments on commit 5b4b714

Please sign in to comment.