diff --git a/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_to_delta.py b/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_to_delta.py index ce36b921d..e783f11fa 100644 --- a/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_to_delta.py +++ b/src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_to_delta.py @@ -29,7 +29,6 @@ class ValueTypeConstants(): FLOAT_VALUE = "ValueType = 'float'" STRING_VALUE = "ValueType = 'string'" - class SparkPCDMToDeltaDestination(DestinationInterface): ''' The Process Control Data Model written to Delta @@ -123,71 +122,78 @@ def _get_eventdate_string(self, df: DataFrame) -> str: dates_list = list(dates_df.toPandas()["EventDate"]) return str(dates_list).replace('[','').replace(']','') - def _write_delta_batch(self, df: DataFrame, destination: str): + def _write_delta_merge(self, df: DataFrame, destination: str): + df = df.select("EventDate", "TagName", "EventTime", "Status", "Value", "ChangeType") + when_matched_update_list = [ + DeltaMergeConditionValues( + condition="(source.ChangeType IN ('insert', 'update', 'upsert')) AND ((source.Status != target.Status) OR (source.Value != target.Value))", + values={ + "EventDate": "source.EventDate", + "TagName": "source.TagName", + "EventTime": "source.EventTime", + "Status": "source.Status", + "Value": "source.Value" + } + ) + ] + when_matched_delete_list = [ + DeltaMergeCondition( + condition="source.ChangeType = 'delete'" + ) + ] + when_not_matched_insert_list = [ + DeltaMergeConditionValues( + condition="(source.ChangeType IN ('insert', 'update', 'upsert'))", + values={ + "EventDate": "source.EventDate", + "TagName": "source.TagName", + "EventTime": "source.EventTime", + "Status": "source.Status", + "Value": "source.Value" + } + ) + ] + + merge_condition = "source.EventDate = target.EventDate AND source.TagName = target.TagName AND source.EventTime = target.EventTime" - if self.merge == True: - df = df.select("EventDate", "TagName", "EventTime", "Status", "Value", "ChangeType") - when_matched_update_list = [ - DeltaMergeConditionValues( - condition="(source.ChangeType IN ('insert', 'update', 'upsert')) AND ((source.Status != target.Status) OR (source.Value != target.Value))", - values={ - "EventDate": "source.EventDate", - "TagName": "source.TagName", - "EventTime": "source.EventTime", - "Status": "source.Status", - "Value": "source.Value" - } - ) - ] - when_matched_delete_list = [ - DeltaMergeCondition( - condition="source.ChangeType = 'delete'" - ) - ] - when_not_matched_insert_list = [ - DeltaMergeConditionValues( - condition="(source.ChangeType IN ('insert', 'update', 'upsert'))", - values={ - "EventDate": "source.EventDate", - "TagName": "source.TagName", - "EventTime": "source.EventTime", - "Status": "source.Status", - "Value": "source.Value" - } - ) - ] + perform_merge = True + if self.try_broadcast_join != True: + eventdate_string = self._get_eventdate_string(df) + if eventdate_string == None or eventdate_string == "": + perform_merge = False + else: + merge_condition = "target.EventDate in ({}) AND ".format(eventdate_string) + merge_condition - merge_condition = "source.EventDate = target.EventDate AND source.TagName = target.TagName AND source.EventTime = target.EventTime" - - perform_merge = True - if self.try_broadcast_join != True: - eventdate_string = self._get_eventdate_string(df) - if eventdate_string == None or eventdate_string == "": - perform_merge = False - else: - merge_condition = "target.EventDate in ({}) AND ".format(eventdate_string) + merge_condition + if perform_merge == True: + SparkDeltaMergeDestination( + spark=self.spark, + data=df, + destination=destination, + options=self.options, + merge_condition=merge_condition, + when_matched_update_list=when_matched_update_list, + when_matched_delete_list=when_matched_delete_list, + when_not_matched_insert_list=when_not_matched_insert_list, + try_broadcast_join=self.try_broadcast_join, + trigger=self.trigger, + query_name=self.query_name + ).write_batch() - if perform_merge == True: - delta = SparkDeltaMergeDestination( - spark=self.spark, - data=df, - destination=destination, - options=self.options, - merge_condition=merge_condition, - when_matched_update_list=when_matched_update_list, - when_matched_delete_list=when_matched_delete_list, - when_not_matched_insert_list=when_not_matched_insert_list, - try_broadcast_join=self.try_broadcast_join - ) + def _write_delta_batch(self, df: DataFrame, destination: str): + + if self.merge == True: + self._write_delta_merge(df.filter(col("ChangeType").isin('insert', 'update', 'upsert')), destination) + self._write_delta_merge(df.filter(col("ChangeType") == 'delete'), destination) else: df = df.select("TagName", "EventTime", "Status", "Value") - delta = SparkDeltaDestination( + SparkDeltaDestination( data=df, destination=destination, - options=self.options - ) - - delta.write_batch() + options=self.options, + mode=self.mode, + trigger=self.trigger, + query_name=self.query_name + ).write_batch() def _write_data_by_type(self, df: DataFrame): if self.merge == True: @@ -197,7 +203,7 @@ def _write_data_by_type(self, df: DataFrame): df = df.withColumn("EventTime", (floor(col("EventTime").cast("double")*1000)/1000).cast("timestamp")) if self.remove_duplicates == True: - df = df.drop_duplicates(["TagName", "EventTime"]) + df = df.drop_duplicates(["TagName", "EventTime", "ChangeType"]) float_df = ( df diff --git a/src/sdk/python/rtdip_sdk/queries/metadata.py b/src/sdk/python/rtdip_sdk/queries/metadata.py index a5d779fe0..194058b93 100644 --- a/src/sdk/python/rtdip_sdk/queries/metadata.py +++ b/src/sdk/python/rtdip_sdk/queries/metadata.py @@ -48,6 +48,7 @@ def get(connection: object, parameters_dict: dict) -> pd.DataFrame: cursor.execute(query) df = cursor.fetch_all() cursor.close() + connection.close() return df except Exception as e: logging.exception('error returning dataframe') diff --git a/src/sdk/python/rtdip_sdk/queries/time_series/_query_builder.py b/src/sdk/python/rtdip_sdk/queries/time_series/_query_builder.py index f4fe7c734..682e583d0 100644 --- a/src/sdk/python/rtdip_sdk/queries/time_series/_query_builder.py +++ b/src/sdk/python/rtdip_sdk/queries/time_series/_query_builder.py @@ -105,7 +105,10 @@ def _sample_query(parameters_dict: dict) -> tuple: ",date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp(\"{{ start_date }}\"), \"{{ time_zone }}\"), from_utc_timestamp(to_timestamp(\"{{ end_date }}\"), \"{{ time_zone }}\"), INTERVAL '{{ time_interval_rate + ' ' + time_interval_unit }}')) AS timestamp_array, explode(array('{{ tag_names | join('\\', \\'') }}')) AS TagName) " ",window_buckets AS (SELECT timestamp_array AS window_start ,TagName ,LEAD(timestamp_array) OVER (ORDER BY timestamp_array) AS window_end FROM date_array) " ",project_resample_results AS (SELECT d.window_start ,d.window_end ,d.TagName ,{{ agg_method }}(e.Value) OVER (PARTITION BY d.TagName, d.window_start ORDER BY e.EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS Value FROM window_buckets d INNER JOIN raw_events e ON e.EventTime >= d.window_start AND e.EventTime < d.window_end AND e.TagName = d.TagName) " - "SELECT window_start AS EventTime ,TagName ,Value FROM project_resample_results GROUP BY window_start ,TagName ,Value ORDER BY TagName, EventTime " + "SELECT window_start AS EventTime ,TagName ,Value FROM project_resample_results GROUP BY window_start ,TagName ,Value " + "{% if is_resample is defined and is_resample == true %}" + "ORDER BY TagName, EventTime " + "{% endif %}" ) sample_parameters = { @@ -121,7 +124,8 @@ def _sample_query(parameters_dict: dict) -> tuple: "time_interval_rate": parameters_dict['time_interval_rate'], "time_interval_unit": parameters_dict['time_interval_unit'], "agg_method": parameters_dict['agg_method'], - "time_zone": parameters_dict["time_zone"] + "time_zone": parameters_dict["time_zone"], + "is_resample": True } sql_template = Template(sample_query) @@ -148,10 +152,10 @@ def _interpolation_query(parameters_dict: dict, sample_query: str, sample_parame ",linear_interpolation_calculations AS (SELECT coalesce(a.TagName, b.TagName) as TagName, coalesce(a.EventTime, b.EventTime) as EventTime, a.EventTime as Requested_EventTime, b.EventTime as Found_EventTime, b.Value, " "last_value(b.EventTime, true) OVER (PARTITION BY a.TagName ORDER BY a.EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS Last_EventTime, last_value(b.Value, true) OVER (PARTITION BY a.TagName ORDER BY a.EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS Last_Value, " "first_value(b.EventTime, true) OVER (PARTITION BY a.TagName ORDER BY a.EventTime ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS Next_EventTime, first_value(b.Value, true) OVER (PARTITION BY a.TagName ORDER BY a.EventTime ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS Next_Value, " - "CASE WHEN b.Value is NULL THEN Last_Value + (unix_timestamp(a.EventTime) - unix_timestamp(Last_EventTime)) * ((Next_Value - Last_Value)) / ((unix_timestamp(Next_EventTime) - unix_timestamp(Last_EventTime))) ELSE b.Value END AS linear_interpolated_value FROM date_array a FULL OUTER JOIN resample b ON a.EventTime = b.EventTime AND a.TagName = b.TagName ORDER BY a.EventTime, b.TagName) " - "SELECT EventTime, TagName, linear_interpolated_value AS Value FROM linear_interpolation_calculations " + "CASE WHEN b.Value is NULL THEN Last_Value + (unix_timestamp(a.EventTime) - unix_timestamp(Last_EventTime)) * ((Next_Value - Last_Value)) / ((unix_timestamp(Next_EventTime) - unix_timestamp(Last_EventTime))) ELSE b.Value END AS linear_interpolated_value FROM date_array a FULL OUTER JOIN resample b ON a.EventTime = b.EventTime AND a.TagName = b.TagName) " + "SELECT EventTime, TagName, linear_interpolated_value AS Value FROM linear_interpolation_calculations ORDER BY TagName, EventTime " "{% else %}" - "SELECT * FROM resample " + "SELECT * FROM resample ORDER BY TagName, EventTime " "{% endif %}" ) @@ -307,6 +311,7 @@ def _query_builder(parameters_dict: dict, query_type: str) -> str: if query_type == "interpolate": sample_prepared_query, sample_query, sample_parameters = _sample_query(parameters_dict) + sample_parameters["is_resample"] = False return _interpolation_query(parameters_dict, sample_query, sample_parameters) if query_type == "time_weighted_average": diff --git a/src/sdk/python/rtdip_sdk/queries/time_series/interpolate.py b/src/sdk/python/rtdip_sdk/queries/time_series/interpolate.py index ef3af55cf..de10aa1fe 100644 --- a/src/sdk/python/rtdip_sdk/queries/time_series/interpolate.py +++ b/src/sdk/python/rtdip_sdk/queries/time_series/interpolate.py @@ -16,7 +16,6 @@ import pandas as pd from ._query_builder import _query_builder - def get(connection: object, parameters_dict: dict) -> pd.DataFrame: ''' An RTDIP interpolation function that is intertwined with the RTDIP Resampling function. @@ -68,6 +67,7 @@ def get(connection: object, parameters_dict: dict) -> pd.DataFrame: cursor.execute(query) df = cursor.fetch_all() cursor.close() + connection.close() return df except Exception as e: logging.exception('error returning dataframe') diff --git a/src/sdk/python/rtdip_sdk/queries/time_series/interpolation_at_time.py b/src/sdk/python/rtdip_sdk/queries/time_series/interpolation_at_time.py index 4a73815e1..ab3072125 100644 --- a/src/sdk/python/rtdip_sdk/queries/time_series/interpolation_at_time.py +++ b/src/sdk/python/rtdip_sdk/queries/time_series/interpolation_at_time.py @@ -54,6 +54,7 @@ def get(connection: object, parameters_dict: dict) -> pd.DataFrame: cursor.execute(query) df = cursor.fetch_all() cursor.close() + connection.close() return df except Exception as e: logging.exception('error returning dataframe') diff --git a/src/sdk/python/rtdip_sdk/queries/time_series/raw.py b/src/sdk/python/rtdip_sdk/queries/time_series/raw.py index bbb40bae1..b363d49fd 100644 --- a/src/sdk/python/rtdip_sdk/queries/time_series/raw.py +++ b/src/sdk/python/rtdip_sdk/queries/time_series/raw.py @@ -52,6 +52,7 @@ def get(connection: object, parameters_dict: dict) -> pd.DataFrame: cursor.execute(query) df = cursor.fetch_all() cursor.close() + connection.close() return df except Exception as e: logging.exception('error returning dataframe') diff --git a/src/sdk/python/rtdip_sdk/queries/time_series/resample.py b/src/sdk/python/rtdip_sdk/queries/time_series/resample.py index c1aa00d11..c12de10cb 100644 --- a/src/sdk/python/rtdip_sdk/queries/time_series/resample.py +++ b/src/sdk/python/rtdip_sdk/queries/time_series/resample.py @@ -68,6 +68,7 @@ def get(connection: object, parameters_dict: dict) -> pd.DataFrame: cursor.execute(query) df = cursor.fetch_all() cursor.close() + connection.close() return df except Exception as e: logging.exception('error returning dataframe') diff --git a/src/sdk/python/rtdip_sdk/queries/time_series/time_weighted_average.py b/src/sdk/python/rtdip_sdk/queries/time_series/time_weighted_average.py index 169fcdf89..53318932e 100644 --- a/src/sdk/python/rtdip_sdk/queries/time_series/time_weighted_average.py +++ b/src/sdk/python/rtdip_sdk/queries/time_series/time_weighted_average.py @@ -60,6 +60,7 @@ def get(connection: object, parameters_dict: dict) -> pd.DataFrame: cursor.execute(query) df = cursor.fetch_all() cursor.close() + connection.close() return df except Exception as e: logging.exception('error returning dataframe') diff --git a/tests/sdk/python/rtdip_sdk/queries/test_interpolate.py b/tests/sdk/python/rtdip_sdk/queries/test_interpolate.py index 859ee3a16..479f4c1af 100644 --- a/tests/sdk/python/rtdip_sdk/queries/test_interpolate.py +++ b/tests/sdk/python/rtdip_sdk/queries/test_interpolate.py @@ -27,7 +27,7 @@ ACCESS_TOKEN = "mock_databricks_token" DATABRICKS_SQL_CONNECT = 'databricks.sql.connect' DATABRICKS_SQL_CONNECT_CURSOR = 'databricks.sql.connect.cursor' -MOCKED_QUERY= 'WITH resample AS (WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(to_timestamp(date_format(EventTime, \'yyyy-MM-dd HH:mm:ss.SSS\')), "+0000") as EventTime, TagName, Status, Value FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE EventDate BETWEEN to_date(to_timestamp("2011-01-01T00:00:00+00:00")) AND to_date(to_timestamp("2011-01-02T23:59:59+00:00")) AND EventTime BETWEEN to_timestamp("2011-01-01T00:00:00+00:00") AND to_timestamp("2011-01-02T23:59:59+00:00") AND TagName in (\'MOCKED-TAGNAME\') AND Status = \'Good\' ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS timestamp_array, explode(array(\'MOCKED-TAGNAME\')) AS TagName) ,window_buckets AS (SELECT timestamp_array AS window_start ,TagName ,LEAD(timestamp_array) OVER (ORDER BY timestamp_array) AS window_end FROM date_array) ,project_resample_results AS (SELECT d.window_start ,d.window_end ,d.TagName ,avg(e.Value) OVER (PARTITION BY d.TagName, d.window_start ORDER BY e.EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS Value FROM window_buckets d INNER JOIN raw_events e ON e.EventTime >= d.window_start AND e.EventTime < d.window_end AND e.TagName = d.TagName) SELECT window_start AS EventTime ,TagName ,Value FROM project_resample_results GROUP BY window_start ,TagName ,Value ORDER BY TagName, EventTime ),date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS EventTime, explode(array(\'MOCKED-TAGNAME\')) AS TagName) SELECT a.EventTime, a.TagName, last_value(b.Value, true) OVER (PARTITION BY a.TagName ORDER BY a.EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS Value FROM date_array a LEFT OUTER JOIN resample b ON a.EventTime = b.EventTime AND a.TagName = b.TagName ORDER BY a.TagName, a.EventTime ' +MOCKED_QUERY= 'WITH resample AS (WITH raw_events AS (SELECT DISTINCT from_utc_timestamp(to_timestamp(date_format(EventTime, \'yyyy-MM-dd HH:mm:ss.SSS\')), "+0000") as EventTime, TagName, Status, Value FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE EventDate BETWEEN to_date(to_timestamp("2011-01-01T00:00:00+00:00")) AND to_date(to_timestamp("2011-01-02T23:59:59+00:00")) AND EventTime BETWEEN to_timestamp("2011-01-01T00:00:00+00:00") AND to_timestamp("2011-01-02T23:59:59+00:00") AND TagName in (\'MOCKED-TAGNAME\') AND Status = \'Good\' ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS timestamp_array, explode(array(\'MOCKED-TAGNAME\')) AS TagName) ,window_buckets AS (SELECT timestamp_array AS window_start ,TagName ,LEAD(timestamp_array) OVER (ORDER BY timestamp_array) AS window_end FROM date_array) ,project_resample_results AS (SELECT d.window_start ,d.window_end ,d.TagName ,avg(e.Value) OVER (PARTITION BY d.TagName, d.window_start ORDER BY e.EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS Value FROM window_buckets d INNER JOIN raw_events e ON e.EventTime >= d.window_start AND e.EventTime < d.window_end AND e.TagName = d.TagName) SELECT window_start AS EventTime ,TagName ,Value FROM project_resample_results GROUP BY window_start ,TagName ,Value ),date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS EventTime, explode(array(\'MOCKED-TAGNAME\')) AS TagName) SELECT a.EventTime, a.TagName, last_value(b.Value, true) OVER (PARTITION BY a.TagName ORDER BY a.EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS Value FROM date_array a LEFT OUTER JOIN resample b ON a.EventTime = b.EventTime AND a.TagName = b.TagName ORDER BY a.TagName, a.EventTime ' MOCKED_PARAMETER_DICT = { "business_unit": "mocked-buiness-unit", "region": "mocked-region", @@ -46,6 +46,7 @@ def test_interpolate(mocker: MockerFixture): mocked_cursor = mocker.spy(MockedDBConnection, "cursor") + mocked_connection_close = mocker.spy(MockedDBConnection, "close") mocked_execute = mocker.spy(MockedCursor, "execute") mocked_fetch_all = mocker.patch.object(MockedCursor, "fetchall_arrow", return_value = pa.Table.from_pandas(pd.DataFrame(data={'a': [1], 'b': [2], 'c': [3], 'd': [4]}))) mocked_close = mocker.spy(MockedCursor, "close") @@ -56,6 +57,7 @@ def test_interpolate(mocker: MockerFixture): actual = interpolate_get(mocked_connection, MOCKED_PARAMETER_DICT) mocked_cursor.assert_called_once() + mocked_connection_close.assert_called_once mocked_execute.assert_called_once_with(mocker.ANY, query=MOCKED_QUERY) mocked_fetch_all.assert_called_once() mocked_close.assert_called_once() @@ -64,7 +66,9 @@ def test_interpolate(mocker: MockerFixture): def test_interpolate_sample_rate_unit(mocker: MockerFixture): MOCKED_PARAMETER_DICT["sample_rate"] = "15" MOCKED_PARAMETER_DICT["sample_unit"] = "minute" + mocked_cursor = mocker.spy(MockedDBConnection, "cursor") + mocked_connection_close = mocker.spy(MockedDBConnection, "close") mocked_execute = mocker.spy(MockedCursor, "execute") mocked_fetch_all = mocker.patch.object(MockedCursor, "fetchall_arrow", return_value = pa.Table.from_pandas(pd.DataFrame(data={'a': [1], 'b': [2], 'c': [3], 'd': [4]}))) mocked_close = mocker.spy(MockedCursor, "close") @@ -75,6 +79,7 @@ def test_interpolate_sample_rate_unit(mocker: MockerFixture): actual = interpolate_get(mocked_connection, MOCKED_PARAMETER_DICT) mocked_cursor.assert_called_once() + mocked_connection_close.assert_called_once mocked_execute.assert_called_once_with(mocker.ANY, query=MOCKED_QUERY) mocked_fetch_all.assert_called_once() mocked_close.assert_called_once() @@ -82,6 +87,7 @@ def test_interpolate_sample_rate_unit(mocker: MockerFixture): def test_interpolate_fails(mocker: MockerFixture): mocker.spy(MockedDBConnection, "cursor") + mocker.spy(MockedDBConnection, "close") mocker.spy(MockedCursor, "execute") mocker.patch.object(MockedCursor, "fetchall_arrow", side_effect=Exception) mocker.spy(MockedCursor, "close") @@ -95,6 +101,7 @@ def test_interpolate_fails(mocker: MockerFixture): def test_interpolate_tag_name_not_list_fails(mocker: MockerFixture): MOCKED_PARAMETER_DICT["tag_names"] = "abc" mocker.spy(MockedDBConnection, "cursor") + mocker.spy(MockedDBConnection, "close") mocker.spy(MockedCursor, "execute") mocker.patch.object(MockedCursor, "fetchall_arrow", side_effect=Exception) mocker.spy(MockedCursor, "close") diff --git a/tests/sdk/python/rtdip_sdk/queries/test_interpolation_at_time.py b/tests/sdk/python/rtdip_sdk/queries/test_interpolation_at_time.py index ad8797f3b..e708cef94 100644 --- a/tests/sdk/python/rtdip_sdk/queries/test_interpolation_at_time.py +++ b/tests/sdk/python/rtdip_sdk/queries/test_interpolation_at_time.py @@ -42,6 +42,7 @@ def test_interpolation_at_time(mocker: MockerFixture): mocked_cursor = mocker.spy(MockedDBConnection, "cursor") + mocked_connection_close = mocker.spy(MockedDBConnection, "close") mocked_execute = mocker.spy(MockedCursor, "execute") mocked_fetch_all = mocker.patch.object(MockedCursor, "fetchall_arrow", return_value = pa.Table.from_pandas(pd.DataFrame(data={'a': [1], 'b': [2], 'c': [3], 'd': [4]}))) mocked_close = mocker.spy(MockedCursor, "close") @@ -52,6 +53,7 @@ def test_interpolation_at_time(mocker: MockerFixture): actual = interpolation_at_time_get(mocked_connection, MOCKED_PARAMETER_DICT) mocked_cursor.assert_called_once() + mocked_connection_close.assert_called_once() mocked_execute.assert_called_once_with(mocker.ANY, query=MOCKED_QUERY) mocked_fetch_all.assert_called_once() mocked_close.assert_called_once() @@ -59,6 +61,7 @@ def test_interpolation_at_time(mocker: MockerFixture): def test_interpolation_at_time_fails(mocker: MockerFixture): mocker.spy(MockedDBConnection, "cursor") + mocker.spy(MockedDBConnection, "close") mocker.spy(MockedCursor, "execute") mocker.patch.object(MockedCursor, "fetchall_arrow", side_effect=Exception) mocker.spy(MockedCursor, "close") diff --git a/tests/sdk/python/rtdip_sdk/queries/test_metadata.py b/tests/sdk/python/rtdip_sdk/queries/test_metadata.py index be521ea86..1f4f03298 100644 --- a/tests/sdk/python/rtdip_sdk/queries/test_metadata.py +++ b/tests/sdk/python/rtdip_sdk/queries/test_metadata.py @@ -47,6 +47,7 @@ def test_metadata(mocker: MockerFixture): mocked_cursor = mocker.spy(MockedDBConnection, "cursor") + mocked_connection_close = mocker.spy(MockedDBConnection, "close") mocked_execute = mocker.spy(MockedCursor, "execute") mocked_fetch_all = mocker.patch.object(MockedCursor, "fetchall_arrow", return_value = pa.Table.from_pandas(pd.DataFrame(data={'a': [1], 'b': [2], 'c': [3], 'd': [4]}))) mocked_close = mocker.spy(MockedCursor, "close") @@ -57,6 +58,7 @@ def test_metadata(mocker: MockerFixture): actual = metadata_raw(mocked_connection, MOCKED_PARAMETER_DICT) mocked_cursor.assert_called_once() + mocked_connection_close.assert_called_once() mocked_execute.assert_called_once_with(mocker.ANY, query=MOCKED_QUERY) mocked_fetch_all.assert_called_once() mocked_close.assert_called_once() @@ -64,6 +66,7 @@ def test_metadata(mocker: MockerFixture): def test_no_tag_metadata(mocker: MockerFixture): mocked_cursor = mocker.spy(MockedDBConnection, "cursor") + mocked_connection_close = mocker.spy(MockedDBConnection, "close") mocked_execute = mocker.spy(MockedCursor, "execute") mocked_fetch_all = mocker.patch.object(MockedCursor, "fetchall_arrow", return_value = pa.Table.from_pandas(pd.DataFrame(data={'a': [1], 'b': [2], 'c': [3], 'd': [4]}))) mocked_close = mocker.spy(MockedCursor, "close") @@ -74,6 +77,7 @@ def test_no_tag_metadata(mocker: MockerFixture): actual = metadata_raw(mocked_connection, MOCKED_PARAMETER_NO_TAGS_DICT) mocked_cursor.assert_called_once() + mocked_connection_close.assert_called_once() mocked_execute.assert_called_once_with(mocker.ANY, query=MOCKED_NO_TAG_QUERY) mocked_fetch_all.assert_called_once() mocked_close.assert_called_once() @@ -81,6 +85,7 @@ def test_no_tag_metadata(mocker: MockerFixture): def test_metadata_fails(mocker: MockerFixture): mocker.spy(MockedDBConnection, "cursor") + mocker.spy(MockedDBConnection, "close") mocker.spy(MockedCursor, "execute") mocker.patch.object(MockedCursor, "fetchall_arrow", side_effect=Exception) mocker.spy(MockedCursor, "close") diff --git a/tests/sdk/python/rtdip_sdk/queries/test_raw.py b/tests/sdk/python/rtdip_sdk/queries/test_raw.py index 5bfe1d66b..fe1d677db 100644 --- a/tests/sdk/python/rtdip_sdk/queries/test_raw.py +++ b/tests/sdk/python/rtdip_sdk/queries/test_raw.py @@ -43,6 +43,7 @@ def test_raw(mocker: MockerFixture): mocked_cursor = mocker.spy(MockedDBConnection, "cursor") + mocked_connection_close = mocker.spy(MockedDBConnection, "close") mocked_execute = mocker.spy(MockedCursor, "execute") mocked_fetch_all = mocker.patch.object(MockedCursor, "fetchall_arrow", return_value = pa.Table.from_pandas(pd.DataFrame(data={'EventTime': [pd.to_datetime("2022-01-01 00:10:00+00:00")], 'TagName': ["MOCKED-TAGNAME"], 'Status': ["Good"], 'Value':[177.09220]}))) mocked_close = mocker.spy(MockedCursor, "close") @@ -53,6 +54,7 @@ def test_raw(mocker: MockerFixture): actual = raw_get(mocked_connection, MOCKED_PARAMETER_DICT) mocked_cursor.assert_called_once() + mocked_connection_close.assert_called_once() mocked_execute.assert_called_once_with(mocker.ANY, query=MOCKED_QUERY) mocked_fetch_all.assert_called_once() mocked_close.assert_called_once() @@ -60,6 +62,7 @@ def test_raw(mocker: MockerFixture): def test_raw_fails(mocker: MockerFixture): mocker.spy(MockedDBConnection, "cursor") + mocker.spy(MockedDBConnection, "close") mocker.spy(MockedCursor, "execute") mocker.patch.object(MockedCursor, "fetchall_arrow", side_effect=Exception) mocker.spy(MockedCursor, "close") diff --git a/tests/sdk/python/rtdip_sdk/queries/test_resample.py b/tests/sdk/python/rtdip_sdk/queries/test_resample.py index b1057b6a0..50b5ff7d0 100644 --- a/tests/sdk/python/rtdip_sdk/queries/test_resample.py +++ b/tests/sdk/python/rtdip_sdk/queries/test_resample.py @@ -45,6 +45,7 @@ def test_resample(mocker: MockerFixture): mocked_cursor = mocker.spy(MockedDBConnection, "cursor") + mocked_connection_close = mocker.spy(MockedDBConnection, "close") mocked_execute = mocker.spy(MockedCursor, "execute") mocked_fetch_all = mocker.patch.object(MockedCursor, "fetchall_arrow", return_value = pa.Table.from_pandas(pd.DataFrame(data={'a': [1], 'b': [2], 'c': [3], 'd': [4]}))) mocked_close = mocker.spy(MockedCursor, "close") @@ -55,6 +56,7 @@ def test_resample(mocker: MockerFixture): actual = resample_get(mocked_connection, MOCKED_PARAMETER_DICT) mocked_cursor.assert_called_once() + mocked_connection_close.assert_called_once() mocked_execute.assert_called_once_with(mocker.ANY, query=MOCKED_QUERY) mocked_fetch_all.assert_called_once() mocked_close.assert_called_once() @@ -64,6 +66,7 @@ def test_resample_sample_rate_unit(mocker: MockerFixture): MOCKED_PARAMETER_DICT["sample_rate"] = "15" MOCKED_PARAMETER_DICT["sample_unit"] = "minute" mocked_cursor = mocker.spy(MockedDBConnection, "cursor") + mocked_connection_close = mocker.spy(MockedDBConnection, "close") mocked_execute = mocker.spy(MockedCursor, "execute") mocked_fetch_all = mocker.patch.object(MockedCursor, "fetchall_arrow", return_value = pa.Table.from_pandas(pd.DataFrame(data={'a': [1], 'b': [2], 'c': [3], 'd': [4]}))) mocked_close = mocker.spy(MockedCursor, "close") @@ -74,6 +77,7 @@ def test_resample_sample_rate_unit(mocker: MockerFixture): actual = resample_get(mocked_connection, MOCKED_PARAMETER_DICT) mocked_cursor.assert_called_once() + mocked_connection_close.assert_called_once() mocked_execute.assert_called_once_with(mocker.ANY, query=MOCKED_QUERY) mocked_fetch_all.assert_called_once() mocked_close.assert_called_once() @@ -81,6 +85,7 @@ def test_resample_sample_rate_unit(mocker: MockerFixture): def test_resample_fails(mocker: MockerFixture): mocker.spy(MockedDBConnection, "cursor") + mocker.spy(MockedDBConnection, "close") mocker.spy(MockedCursor, "execute") mocker.patch.object(MockedCursor, "fetchall_arrow", side_effect=Exception) mocker.spy(MockedCursor, "close") @@ -94,6 +99,7 @@ def test_resample_fails(mocker: MockerFixture): def test_resample_tag_name_not_list_fails(mocker: MockerFixture): MOCKED_PARAMETER_DICT["tag_names"] = "abc" mocker.spy(MockedDBConnection, "cursor") + mocker.spy(MockedDBConnection, "close") mocker.spy(MockedCursor, "execute") mocker.patch.object(MockedCursor, "fetchall_arrow", side_effect=Exception) mocker.spy(MockedCursor, "close") diff --git a/tests/sdk/python/rtdip_sdk/queries/test_time_weighted_average.py b/tests/sdk/python/rtdip_sdk/queries/test_time_weighted_average.py index fdee603fa..d0bd9241c 100644 --- a/tests/sdk/python/rtdip_sdk/queries/test_time_weighted_average.py +++ b/tests/sdk/python/rtdip_sdk/queries/test_time_weighted_average.py @@ -47,6 +47,7 @@ def test_time_weighted_average(mocker: MockerFixture): mocked_cursor = mocker.spy(MockedDBConnection, "cursor") + mocked_connection_close = mocker.spy(MockedDBConnection, "close") mocked_execute = mocker.spy(MockedCursor, "execute") mocked_fetch_all = mocker.patch.object(MockedCursor, "fetchall_arrow", return_value = pa.Table.from_pandas(pd.DataFrame(data={'a': [1], 'b': [2], 'c': [3], 'd': [4]}))) mocked_close = mocker.spy(MockedCursor, "close") @@ -57,6 +58,7 @@ def test_time_weighted_average(mocker: MockerFixture): actual = time_weighted_average_get(mocked_connection, MOCKED_PARAMETER_DICT) mocked_cursor.assert_called_once() + mocked_connection_close.assert_called_once() mocked_execute.assert_called_once_with(mocker.ANY, query=MOCKED_QUERY) mocked_fetch_all.assert_called_once() mocked_close.assert_called_once() @@ -65,6 +67,7 @@ def test_time_weighted_average(mocker: MockerFixture): def test_time_weighted_average_with_window_size_mins(mocker: MockerFixture): MOCKED_PARAMETER_DICT["window_size_mins"] = 15 mocked_cursor = mocker.spy(MockedDBConnection, "cursor") + mocked_connection_close = mocker.spy(MockedDBConnection, "close") mocked_execute = mocker.spy(MockedCursor, "execute") mocked_fetch_all = mocker.patch.object(MockedCursor, "fetchall_arrow", return_value = pa.Table.from_pandas(pd.DataFrame(data={'a': [1], 'b': [2], 'c': [3], 'd': [4]}))) mocked_close = mocker.spy(MockedCursor, "close") @@ -75,6 +78,7 @@ def test_time_weighted_average_with_window_size_mins(mocker: MockerFixture): actual = time_weighted_average_get(mocked_connection, MOCKED_PARAMETER_DICT) mocked_cursor.assert_called_once() + mocked_connection_close.assert_called_once() mocked_execute.assert_called_once_with(mocker.ANY, query=MOCKED_QUERY) mocked_fetch_all.assert_called_once() mocked_close.assert_called_once() @@ -84,6 +88,7 @@ def test_time_weighted_average_with_window_size_mins(mocker: MockerFixture): def test_time_weighted_average_metadata_step(mocker: MockerFixture): MOCKED_PARAMETER_DICT["step"] = "metadata" mocked_cursor = mocker.spy(MockedDBConnection, "cursor") + mocked_connection_close = mocker.spy(MockedDBConnection, "close") mocked_execute = mocker.spy(MockedCursor, "execute") mocked_fetch_all = mocker.patch.object(MockedCursor, "fetchall_arrow", return_value = pa.Table.from_pandas(pd.DataFrame(data={'a': [1], 'b': [2], 'c': [3], 'd': [4]}))) mocked_close = mocker.spy(MockedCursor, "close") @@ -94,6 +99,7 @@ def test_time_weighted_average_metadata_step(mocker: MockerFixture): actual = time_weighted_average_get(mocked_connection, MOCKED_PARAMETER_DICT) mocked_cursor.assert_called_once() + mocked_connection_close.assert_called_once() mocked_execute.assert_called_once_with(mocker.ANY, query=METADATA_MOCKED_QUERY) mocked_fetch_all.assert_called_once() mocked_close.assert_called_once() @@ -101,6 +107,7 @@ def test_time_weighted_average_metadata_step(mocker: MockerFixture): def test_time_weighted_average_fails(mocker: MockerFixture): mocker.spy(MockedDBConnection, "cursor") + mocker.spy(MockedDBConnection, "close") mocker.spy(MockedCursor, "execute") mocker.patch.object(MockedCursor, "fetchall_arrow", side_effect=Exception) mocker.spy(MockedCursor, "close") @@ -114,6 +121,7 @@ def test_time_weighted_average_fails(mocker: MockerFixture): def test_time_weighted_average_tag_name_not_list_fails(mocker: MockerFixture): MOCKED_PARAMETER_DICT["tag_names"] = "abc" mocker.spy(MockedDBConnection, "cursor") + mocker.spy(MockedDBConnection, "close") mocker.spy(MockedCursor, "execute") mocker.patch.object(MockedCursor, "fetchall_arrow", side_effect=Exception) mocker.spy(MockedCursor, "close")