From 5b4b7141b3b1d4aa7df8f7146ab095c5913fb983 Mon Sep 17 00:00:00 2001 From: Jonathan Daniel <36337649+jond01@users.noreply.github.com> Date: Mon, 23 Sep 2024 10:56:10 +0300 Subject: [PATCH] [Model Monitoring] Fix TDEngine case-insensitivity in the table name [1.7.x] (#6383) --- dependencies.py | 2 +- extras-requirements.txt | 4 +-- .../db/tsdb/tdengine/schemas.py | 10 ++++---- .../db/tsdb/tdengine/tdengine_connector.py | 25 ++++++++++--------- .../tsdb/tdengine/test_tdengine_connector.py | 24 +++++++++--------- tests/model_monitoring/test_tdengine.py | 5 ++-- 6 files changed, 35 insertions(+), 35 deletions(-) diff --git a/dependencies.py b/dependencies.py index bfead386ab30..8cb40a4b65c1 100644 --- a/dependencies.py +++ b/dependencies.py @@ -76,7 +76,7 @@ def extra_requirements() -> dict[str, list[str]]: "distributed~=2023.12.1", ], "alibaba-oss": ["ossfs==2023.12.0", "oss2==2.18.1"], - "tdengine": ["taos-ws-py~=0.3.2"], + "tdengine": ["taos-ws-py~=0.3.3"], "snowflake": ["snowflake-connector-python~=3.7"], } diff --git a/extras-requirements.txt b/extras-requirements.txt index bbc6a6f5d2c9..9caa6a2184c0 100644 --- a/extras-requirements.txt +++ b/extras-requirements.txt @@ -47,5 +47,5 @@ databricks-sdk~=0.13.0 sqlalchemy~=1.4 dask~=2023.12.1 distributed~=2023.12.1 -taos-ws-py~=0.3.2 -snowflake-connector-python~=3.7 \ No newline at end of file +taos-ws-py~=0.3.3 +snowflake-connector-python~=3.7 diff --git a/mlrun/model_monitoring/db/tsdb/tdengine/schemas.py b/mlrun/model_monitoring/db/tsdb/tdengine/schemas.py index 281724225da1..885edc3cd2c1 100644 --- a/mlrun/model_monitoring/db/tsdb/tdengine/schemas.py +++ b/mlrun/model_monitoring/db/tsdb/tdengine/schemas.py @@ -94,20 +94,20 @@ def _create_super_table_query(self) -> str: tags = ", ".join(f"{col} {val}" for col, val in self.tags.items()) return f"CREATE STABLE if NOT EXISTS {self.database}.{self.super_table} ({columns}) TAGS ({tags});" - def _create_subtable_query( + def _create_subtable_sql( self, subtable: str, values: dict[str, Union[str, int, float, datetime.datetime]], ) -> str: try: - values = ", ".join(f"'{values[val]}'" for val in self.tags) + tags = ", ".join(f"'{values[val]}'" for val in self.tags) except KeyError: raise mlrun.errors.MLRunInvalidArgumentError( f"values must contain all tags: {self.tags.keys()}" ) - return f"CREATE TABLE if NOT EXISTS {self.database}.{subtable} USING {self.super_table} TAGS ({values});" + return f"CREATE TABLE if NOT EXISTS {self.database}.{subtable} USING {self.super_table} TAGS ({tags});" - def _insert_subtable_query( + def _insert_subtable_stmt( self, connection: taosws.Connection, subtable: str, @@ -116,7 +116,7 @@ def _insert_subtable_query( stmt = connection.statement() question_marks = ", ".join("?" * len(self.columns)) stmt.prepare(f"INSERT INTO ? VALUES ({question_marks});") - stmt.set_tbname_tags(subtable, []) + stmt.set_tbname(subtable) bind_params = [] diff --git a/mlrun/model_monitoring/db/tsdb/tdengine/tdengine_connector.py b/mlrun/model_monitoring/db/tsdb/tdengine/tdengine_connector.py index 3e26de1d7d4d..6a3b85bfe52f 100644 --- a/mlrun/model_monitoring/db/tsdb/tdengine/tdengine_connector.py +++ b/mlrun/model_monitoring/db/tsdb/tdengine/tdengine_connector.py @@ -97,7 +97,7 @@ def write_application_event( self, event: dict, kind: mm_schemas.WriterEventKind = mm_schemas.WriterEventKind.RESULT, - ): + ) -> None: """ Write a single result or metric to TSDB. """ @@ -113,7 +113,7 @@ def write_application_event( # Write a new result table = self.tables[mm_schemas.TDEngineSuperTables.APP_RESULTS] table_name = ( - f"{table_name}_" f"{event[mm_schemas.ResultData.RESULT_NAME]}" + f"{table_name}_{event[mm_schemas.ResultData.RESULT_NAME]}" ).replace("-", "_") event.pop(mm_schemas.ResultData.CURRENT_STATS, None) @@ -121,9 +121,13 @@ def write_application_event( # Write a new metric table = self.tables[mm_schemas.TDEngineSuperTables.METRICS] table_name = ( - f"{table_name}_" f"{event[mm_schemas.MetricData.METRIC_NAME]}" + f"{table_name}_{event[mm_schemas.MetricData.METRIC_NAME]}" ).replace("-", "_") + # Escape the table name for case-sensitivity (ML-7908) + # https://github.com/taosdata/taos-connector-python/issues/260 + table_name = f"`{table_name}`" + # Convert the datetime strings to datetime objects event[mm_schemas.WriterEvent.END_INFER_TIME] = self._convert_to_datetime( val=event[mm_schemas.WriterEvent.END_INFER_TIME] @@ -132,15 +136,11 @@ def write_application_event( val=event[mm_schemas.WriterEvent.START_INFER_TIME] ) - create_table_query = table._create_subtable_query( - subtable=table_name, values=event - ) - self.connection.execute(create_table_query) + create_table_sql = table._create_subtable_sql(subtable=table_name, values=event) + self.connection.execute(create_table_sql) - insert_statement = table._insert_subtable_query( - self.connection, - subtable=table_name, - values=event, + insert_statement = table._insert_subtable_stmt( + self.connection, subtable=table_name, values=event ) insert_statement.add_batch() insert_statement.execute() @@ -280,6 +280,7 @@ def _get_records( timestamp_column=timestamp_column, database=self.database, ) + logger.debug("Querying TDEngine", query=full_query) try: query_result = self.connection.query(full_query) except taosws.QueryError as e: @@ -336,7 +337,7 @@ def read_metrics_data( metrics_condition = " OR ".join( [ - f"({mm_schemas.WriterEvent.APPLICATION_NAME} = '{metric.app}' AND {name} = '{metric.name}')" + f"({mm_schemas.WriterEvent.APPLICATION_NAME}='{metric.app}' AND {name}='{metric.name}')" for metric in metrics ] ) diff --git a/tests/model_monitoring/db/tsdb/tdengine/test_tdengine_connector.py b/tests/model_monitoring/db/tsdb/tdengine/test_tdengine_connector.py index ba81a2e08eec..2f3b78b91187 100644 --- a/tests/model_monitoring/db/tsdb/tdengine/test_tdengine_connector.py +++ b/tests/model_monitoring/db/tsdb/tdengine/test_tdengine_connector.py @@ -14,10 +14,10 @@ import os import uuid -from datetime import datetime +from collections.abc import Iterator +from datetime import datetime, timezone import pytest -import pytz import taosws from mlrun.common.schemas.model_monitoring import ( @@ -31,16 +31,16 @@ database = "test_tdengine_connector_" + uuid.uuid4().hex -def drop_database(connection): +def drop_database(connection: taosws.Connection) -> None: connection.execute(f"DROP DATABASE IF EXISTS {database}") -def is_tdengine_defined(): - return connection_string and connection_string.startswith("taosws://") +def is_tdengine_defined() -> bool: + return connection_string is not None and connection_string.startswith("taosws://") @pytest.fixture -def connector() -> TDEngineConnector: +def connector() -> Iterator[TDEngineConnector]: connection = taosws.connect() drop_database(connection) conn = TDEngineConnector( @@ -53,13 +53,13 @@ def connector() -> TDEngineConnector: @pytest.mark.skipif(not is_tdengine_defined(), reason="TDEngine is not defined") -def test_write_application_event(connector): +def test_write_application_event(connector: TDEngineConnector) -> None: endpoint_id = "1" app_name = "my_app" - result_name = "my_result" + result_name = "my_Result" result_kind = 0 - start_infer_time = datetime(2024, 1, 1, tzinfo=pytz.UTC) - end_infer_time = datetime(2024, 1, 1, second=1, tzinfo=pytz.UTC) + start_infer_time = datetime(2024, 1, 1, tzinfo=timezone.utc) + end_infer_time = datetime(2024, 1, 1, second=1, tzinfo=timezone.utc) result_status = 0 result_value = 123 data = { @@ -86,7 +86,7 @@ def test_write_application_event(connector): ModelEndpointMonitoringMetric( project=project, app=app_name, - name="my_result", + name=result_name, full_name=f"{project}.{app_name}.result.{result_name}", type=ModelEndpointMonitoringMetricType.RESULT, ), @@ -96,9 +96,9 @@ def test_write_application_event(connector): assert len(read_back_results) == 1 read_back_result = read_back_results[0] assert read_back_result.full_name == f"{project}.{app_name}.result.{result_name}" + assert read_back_result.data assert read_back_result.result_kind.value == result_kind assert read_back_result.type == "result" - assert read_back_result.data assert len(read_back_result.values) == 1 read_back_values = read_back_result.values[0] assert read_back_values.timestamp == end_infer_time diff --git a/tests/model_monitoring/test_tdengine.py b/tests/model_monitoring/test_tdengine.py index 0a2c2703ae7b..b229c78a6999 100644 --- a/tests/model_monitoring/test_tdengine.py +++ b/tests/model_monitoring/test_tdengine.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - import datetime from typing import Union @@ -75,7 +74,7 @@ def test_create_sub_table( remove_tag: bool, ): assert ( - super_table._create_subtable_query(subtable=subtable, values=values) + super_table._create_subtable_sql(subtable=subtable, values=values) == f"CREATE TABLE if NOT EXISTS {_MODEL_MONITORING_DATABASE}.{subtable} " f"USING {super_table.super_table} TAGS ('{values['tag1']}', '{values['tag2']}');" ) @@ -83,7 +82,7 @@ def test_create_sub_table( # test with missing tag values.pop("tag1") with pytest.raises(mlrun.errors.MLRunInvalidArgumentError): - super_table._create_subtable_query(subtable=subtable, values=values) + super_table._create_subtable_sql(subtable=subtable, values=values) @pytest.mark.parametrize( ("subtable", "remove_tag"), [("subtable_1", False), ("subtable_2", True)]