From ead4316938234b648fda1355dbe782f50f25f2e8 Mon Sep 17 00:00:00 2001 From: cching95 <73163191+cching95@users.noreply.github.com> Date: Wed, 2 Aug 2023 14:08:41 +0100 Subject: [PATCH] Add metadata Step to the Time Weighted Average Function, Unit test and Documentation updates. (#408) * updates Signed-off-by: cching95 <73163191+cching95@users.noreply.github.com> * add metadata step to time weighted average query Signed-off-by: cching95 <73163191+cching95@users.noreply.github.com> * Update Unit Tests for new metadata Step Signed-off-by: cching95 <73163191+cching95@users.noreply.github.com> --------- Signed-off-by: cching95 <73163191+cching95@users.noreply.github.com> --- src/api/v1/models.py | 2 +- .../queries/time_series/_query_builder.py | 11 ++++++++++ .../time_series/time_weighted_average.py | 2 +- .../queries/test_time_weighted_average.py | 22 ++++++++++++++++++- 4 files changed, 34 insertions(+), 3 deletions(-) diff --git a/src/api/v1/models.py b/src/api/v1/models.py index af5cff90b..ac68e2e73 100644 --- a/src/api/v1/models.py +++ b/src/api/v1/models.py @@ -175,7 +175,7 @@ def __init__( time_interval_rate: str = Query(..., description="Time Interval Rate as a numeric input", examples=[5]), time_interval_unit: str = Query(..., description="Time Interval Unit can be one of the options: [second, minute, day, hour]", examples=["second", "minute", "hour", "day"]), window_length: int = Query(..., description="Window Length in days", examples=[1]), - step: str = Query(..., description="Step can be true or false", examples=["true", "false"]) + step: str = Query(..., description="Step can be \"true\", \"false\" or \"metadata\". \"metadata\" will retrieve the step value from the metadata table.", examples=["true", "false", "metadata"]) ): self.window_size_mins = window_size_mins self.time_interval_rate = time_interval_rate diff --git a/src/sdk/python/rtdip_sdk/queries/time_series/_query_builder.py b/src/sdk/python/rtdip_sdk/queries/time_series/_query_builder.py index 085e94bed..f505b827f 100644 --- a/src/sdk/python/rtdip_sdk/queries/time_series/_query_builder.py +++ b/src/sdk/python/rtdip_sdk/queries/time_series/_query_builder.py @@ -226,16 +226,27 @@ def _time_weighted_average_query(parameters_dict: dict) -> str: time_weighted_average_query = ( "WITH raw_events AS (SELECT DISTINCT EventDate, TagName, from_utc_timestamp(to_timestamp(date_format(EventTime, 'yyyy-MM-dd HH:mm:ss.SSS')), \"{{ time_zone }}\") as EventTime, Status, Value FROM `{{ business_unit }}`.`sensors`.`{{ asset }}_{{ data_security_level }}_events_{{ data_type }}` WHERE EventDate BETWEEN date_sub(to_date(to_timestamp(\"{{ start_date }}\")), {{ window_length }}) AND date_add(to_date(to_timestamp(\"{{ end_date }}\")), {{ window_length }}) AND TagName in ('{{ tag_names | join('\\', \\'') }}') " "{% if include_bad_data is defined and include_bad_data == false %} AND Status = 'Good' {% endif %}) " + "{% if step is defined and step == \"metadata\" %} " + ",meta_data AS (SELECT TagName, IFNULL(Step, false) AS Step FROM `downstream`.`sensors`.`pernis_restricted_metadata` )" + "{% endif %}" ",date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp(\"{{ start_date }}\"), \"{{ time_zone }}\"), from_utc_timestamp(to_timestamp(\"{{ end_date }}\"), \"{{ time_zone }}\"), INTERVAL '{{ time_interval_rate + ' ' + time_interval_unit }}')) AS EventTime, explode(array('{{ tag_names | join('\\', \\'') }}')) AS TagName) " ",window_events AS (SELECT coalesce(a.TagName, b.TagName) AS TagName, coalesce(a.EventTime, b.EventTime) as EventTime, window(coalesce(a.EventTime, b.EventTime), '{{ time_interval_rate + ' ' + time_interval_unit }}').start WindowEventTime, b.Status, b.Value FROM date_array a " "FULL OUTER JOIN raw_events b ON CAST(a.EventTime AS long) = CAST(b.EventTime AS long) AND a.TagName = b.TagName) " ",fill_status AS (SELECT *, last_value(Status, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as Fill_Status, CASE WHEN Fill_Status = \"Good\" THEN Value ELSE null END AS Good_Value FROM window_events) " ",fill_value AS (SELECT *, last_value(Good_Value, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS Fill_Value FROM fill_status) " + "{% if step is defined and step == \"metadata\" %} " + ",twa_calculations AS (SELECT f.TagName, f.EventTime, f.WindowEventTime, m.Step, f.Status, f.Value, f.Fill_Status, f.Fill_Value, lead(f.EventTime) OVER (PARTITION BY f.TagName ORDER BY f.EventTime) AS Next_EventTime, lead(f.Fill_Status) OVER (PARTITION BY f.TagName ORDER BY f.EventTime) AS Next_Status " + ",CASE WHEN Next_Status = \"Good\" OR (f.Fill_Status = \"Good\" AND Next_Status = \"Bad\") THEN lead(f.Fill_Value) OVER (PARTITION BY f.TagName ORDER BY f.EventTime) ELSE f.Value END AS Next_Value_For_Status " + ",CASE WHEN f.Fill_Status = \"Good\" THEN Next_Value_For_Status ELSE 0 END AS Next_Value " + ",CASE WHEN f.Fill_Status = \"Good\" and Next_Status = \"Good\" THEN ((cast(Next_EventTime as double) - cast(f.EventTime as double)) / 60) WHEN f.Fill_Status = \"Good\" and Next_Status != \"Good\" THEN ((cast(Next_EventTime as integer) - cast(f.EventTime as double)) / 60) ELSE 0 END AS good_minutes " + ",CASE WHEN m.Step == false THEN ((f.Fill_Value + Next_Value) * 0.5) * good_minutes ELSE (f.Fill_Value * good_minutes) END AS twa_value FROM fill_value f LEFT JOIN meta_data m ON f.TagName = m.TagName) " + "{% else %} " ",twa_calculations AS (SELECT TagName, EventTime, WindowEventTime, {{ step }} AS Step, Status, Value, Fill_Status, Fill_Value, lead(EventTime) OVER (PARTITION BY TagName ORDER BY EventTime) AS Next_EventTime, lead(Fill_Status) OVER (PARTITION BY TagName ORDER BY EventTime) AS Next_Status " ",CASE WHEN Next_Status = \"Good\" OR (Fill_Status = \"Good\" AND Next_Status = \"Bad\") THEN lead(Fill_Value) OVER (PARTITION BY TagName ORDER BY EventTime) ELSE Value END AS Next_Value_For_Status " ",CASE WHEN Fill_Status = \"Good\" THEN Next_Value_For_Status ELSE 0 END AS Next_Value " ",CASE WHEN Fill_Status = \"Good\" and Next_Status = \"Good\" THEN ((cast(Next_EventTime as double) - cast(EventTime as double)) / 60) WHEN Fill_Status = \"Good\" and Next_Status != \"Good\" THEN ((cast(Next_EventTime as integer) - cast(EventTime as double)) / 60) ELSE 0 END AS good_minutes " ",CASE WHEN Step == false THEN ((Fill_Value + Next_Value) * 0.5) * good_minutes ELSE (Fill_Value * good_minutes) END AS twa_value FROM fill_value) " + "{% endif %} " ",project_result AS (SELECT TagName, WindowEventTime AS EventTime, sum(twa_value) / sum(good_minutes) AS Value from twa_calculations GROUP BY TagName, WindowEventTime) " "SELECT * FROM project_result WHERE EventTime BETWEEN to_timestamp(\"{{ start_datetime }}\") AND to_timestamp(\"{{ end_datetime }}\") ORDER BY TagName, EventTime " ) diff --git a/src/sdk/python/rtdip_sdk/queries/time_series/time_weighted_average.py b/src/sdk/python/rtdip_sdk/queries/time_series/time_weighted_average.py index 9a17b0d3e..169fcdf89 100644 --- a/src/sdk/python/rtdip_sdk/queries/time_series/time_weighted_average.py +++ b/src/sdk/python/rtdip_sdk/queries/time_series/time_weighted_average.py @@ -40,7 +40,7 @@ def get(connection: object, parameters_dict: dict) -> pd.DataFrame: time_interval_unit (str): The time interval unit (second, minute, day, hour) window_length (int): Add longer window time in days for the start or end of specified date to cater for edge cases. include_bad_data (bool): Include "Bad" data points with True or remove "Bad" data points with False - step (str): data points with step "enabled" or "disabled". The options for step are "metadata", "true" or "false". "metadata" will get the step requirements from the metadata table if applicable. + step (str): data points with step "enabled" or "disabled". The options for step are "true", "false" or "metadata". "metadata" will retrieve the step value from the metadata table. Returns: DataFrame: A dataframe containing the time weighted averages. ''' diff --git a/tests/sdk/python/rtdip_sdk/queries/test_time_weighted_average.py b/tests/sdk/python/rtdip_sdk/queries/test_time_weighted_average.py index 9f3881c7a..fdee603fa 100644 --- a/tests/sdk/python/rtdip_sdk/queries/test_time_weighted_average.py +++ b/tests/sdk/python/rtdip_sdk/queries/test_time_weighted_average.py @@ -27,7 +27,8 @@ ACCESS_TOKEN = "mock_databricks_token" DATABRICKS_SQL_CONNECT = 'databricks.sql.connect' DATABRICKS_SQL_CONNECT_CURSOR = 'databricks.sql.connect.cursor' -MOCKED_QUERY= 'WITH raw_events AS (SELECT DISTINCT EventDate, TagName, from_utc_timestamp(to_timestamp(date_format(EventTime, \'yyyy-MM-dd HH:mm:ss.SSS\')), "+0000") as EventTime, Status, Value FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE EventDate BETWEEN date_sub(to_date(to_timestamp("2011-01-01T00:00:00+00:00")), 1) AND date_add(to_date(to_timestamp("2011-01-02T23:59:59+00:00")), 1) AND TagName in (\'MOCKED-TAGNAME\') ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS EventTime, explode(array(\'MOCKED-TAGNAME\')) AS TagName) ,window_events AS (SELECT coalesce(a.TagName, b.TagName) AS TagName, coalesce(a.EventTime, b.EventTime) as EventTime, window(coalesce(a.EventTime, b.EventTime), \'15 minute\').start WindowEventTime, b.Status, b.Value FROM date_array a FULL OUTER JOIN raw_events b ON CAST(a.EventTime AS long) = CAST(b.EventTime AS long) AND a.TagName = b.TagName) ,fill_status AS (SELECT *, last_value(Status, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as Fill_Status, CASE WHEN Fill_Status = "Good" THEN Value ELSE null END AS Good_Value FROM window_events) ,fill_value AS (SELECT *, last_value(Good_Value, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS Fill_Value FROM fill_status) ,twa_calculations AS (SELECT TagName, EventTime, WindowEventTime, false AS Step, Status, Value, Fill_Status, Fill_Value, lead(EventTime) OVER (PARTITION BY TagName ORDER BY EventTime) AS Next_EventTime, lead(Fill_Status) OVER (PARTITION BY TagName ORDER BY EventTime) AS Next_Status ,CASE WHEN Next_Status = "Good" OR (Fill_Status = "Good" AND Next_Status = "Bad") THEN lead(Fill_Value) OVER (PARTITION BY TagName ORDER BY EventTime) ELSE Value END AS Next_Value_For_Status ,CASE WHEN Fill_Status = "Good" THEN Next_Value_For_Status ELSE 0 END AS Next_Value ,CASE WHEN Fill_Status = "Good" and Next_Status = "Good" THEN ((cast(Next_EventTime as double) - cast(EventTime as double)) / 60) WHEN Fill_Status = "Good" and Next_Status != "Good" THEN ((cast(Next_EventTime as integer) - cast(EventTime as double)) / 60) ELSE 0 END AS good_minutes ,CASE WHEN Step == false THEN ((Fill_Value + Next_Value) * 0.5) * good_minutes ELSE (Fill_Value * good_minutes) END AS twa_value FROM fill_value) ,project_result AS (SELECT TagName, WindowEventTime AS EventTime, sum(twa_value) / sum(good_minutes) AS Value from twa_calculations GROUP BY TagName, WindowEventTime) SELECT * FROM project_result WHERE EventTime BETWEEN to_timestamp("2011-01-01T00:00:00") AND to_timestamp("2011-01-02T23:59:59") ORDER BY TagName, EventTime ' +MOCKED_QUERY= 'WITH raw_events AS (SELECT DISTINCT EventDate, TagName, from_utc_timestamp(to_timestamp(date_format(EventTime, \'yyyy-MM-dd HH:mm:ss.SSS\')), "+0000") as EventTime, Status, Value FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE EventDate BETWEEN date_sub(to_date(to_timestamp("2011-01-01T00:00:00+00:00")), 1) AND date_add(to_date(to_timestamp("2011-01-02T23:59:59+00:00")), 1) AND TagName in (\'MOCKED-TAGNAME\') ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS EventTime, explode(array(\'MOCKED-TAGNAME\')) AS TagName) ,window_events AS (SELECT coalesce(a.TagName, b.TagName) AS TagName, coalesce(a.EventTime, b.EventTime) as EventTime, window(coalesce(a.EventTime, b.EventTime), \'15 minute\').start WindowEventTime, b.Status, b.Value FROM date_array a FULL OUTER JOIN raw_events b ON CAST(a.EventTime AS long) = CAST(b.EventTime AS long) AND a.TagName = b.TagName) ,fill_status AS (SELECT *, last_value(Status, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as Fill_Status, CASE WHEN Fill_Status = "Good" THEN Value ELSE null END AS Good_Value FROM window_events) ,fill_value AS (SELECT *, last_value(Good_Value, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS Fill_Value FROM fill_status) ,twa_calculations AS (SELECT TagName, EventTime, WindowEventTime, false AS Step, Status, Value, Fill_Status, Fill_Value, lead(EventTime) OVER (PARTITION BY TagName ORDER BY EventTime) AS Next_EventTime, lead(Fill_Status) OVER (PARTITION BY TagName ORDER BY EventTime) AS Next_Status ,CASE WHEN Next_Status = "Good" OR (Fill_Status = "Good" AND Next_Status = "Bad") THEN lead(Fill_Value) OVER (PARTITION BY TagName ORDER BY EventTime) ELSE Value END AS Next_Value_For_Status ,CASE WHEN Fill_Status = "Good" THEN Next_Value_For_Status ELSE 0 END AS Next_Value ,CASE WHEN Fill_Status = "Good" and Next_Status = "Good" THEN ((cast(Next_EventTime as double) - cast(EventTime as double)) / 60) WHEN Fill_Status = "Good" and Next_Status != "Good" THEN ((cast(Next_EventTime as integer) - cast(EventTime as double)) / 60) ELSE 0 END AS good_minutes ,CASE WHEN Step == false THEN ((Fill_Value + Next_Value) * 0.5) * good_minutes ELSE (Fill_Value * good_minutes) END AS twa_value FROM fill_value) ,project_result AS (SELECT TagName, WindowEventTime AS EventTime, sum(twa_value) / sum(good_minutes) AS Value from twa_calculations GROUP BY TagName, WindowEventTime) SELECT * FROM project_result WHERE EventTime BETWEEN to_timestamp("2011-01-01T00:00:00") AND to_timestamp("2011-01-02T23:59:59") ORDER BY TagName, EventTime ' +METADATA_MOCKED_QUERY = 'WITH raw_events AS (SELECT DISTINCT EventDate, TagName, from_utc_timestamp(to_timestamp(date_format(EventTime, \'yyyy-MM-dd HH:mm:ss.SSS\')), "+0000") as EventTime, Status, Value FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE EventDate BETWEEN date_sub(to_date(to_timestamp("2011-01-01T00:00:00+00:00")), 1) AND date_add(to_date(to_timestamp("2011-01-02T23:59:59+00:00")), 1) AND TagName in (\'MOCKED-TAGNAME\') ) ,meta_data AS (SELECT TagName, IFNULL(Step, false) AS Step FROM `downstream`.`sensors`.`pernis_restricted_metadata` ),date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS EventTime, explode(array(\'MOCKED-TAGNAME\')) AS TagName) ,window_events AS (SELECT coalesce(a.TagName, b.TagName) AS TagName, coalesce(a.EventTime, b.EventTime) as EventTime, window(coalesce(a.EventTime, b.EventTime), \'15 minute\').start WindowEventTime, b.Status, b.Value FROM date_array a FULL OUTER JOIN raw_events b ON CAST(a.EventTime AS long) = CAST(b.EventTime AS long) AND a.TagName = b.TagName) ,fill_status AS (SELECT *, last_value(Status, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as Fill_Status, CASE WHEN Fill_Status = "Good" THEN Value ELSE null END AS Good_Value FROM window_events) ,fill_value AS (SELECT *, last_value(Good_Value, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS Fill_Value FROM fill_status) ,twa_calculations AS (SELECT f.TagName, f.EventTime, f.WindowEventTime, m.Step, f.Status, f.Value, f.Fill_Status, f.Fill_Value, lead(f.EventTime) OVER (PARTITION BY f.TagName ORDER BY f.EventTime) AS Next_EventTime, lead(f.Fill_Status) OVER (PARTITION BY f.TagName ORDER BY f.EventTime) AS Next_Status ,CASE WHEN Next_Status = "Good" OR (f.Fill_Status = "Good" AND Next_Status = "Bad") THEN lead(f.Fill_Value) OVER (PARTITION BY f.TagName ORDER BY f.EventTime) ELSE f.Value END AS Next_Value_For_Status ,CASE WHEN f.Fill_Status = "Good" THEN Next_Value_For_Status ELSE 0 END AS Next_Value ,CASE WHEN f.Fill_Status = "Good" and Next_Status = "Good" THEN ((cast(Next_EventTime as double) - cast(f.EventTime as double)) / 60) WHEN f.Fill_Status = "Good" and Next_Status != "Good" THEN ((cast(Next_EventTime as integer) - cast(f.EventTime as double)) / 60) ELSE 0 END AS good_minutes ,CASE WHEN m.Step == false THEN ((f.Fill_Value + Next_Value) * 0.5) * good_minutes ELSE (f.Fill_Value * good_minutes) END AS twa_value FROM fill_value f LEFT JOIN meta_data m ON f.TagName = m.TagName) ,project_result AS (SELECT TagName, WindowEventTime AS EventTime, sum(twa_value) / sum(good_minutes) AS Value from twa_calculations GROUP BY TagName, WindowEventTime) SELECT * FROM project_result WHERE EventTime BETWEEN to_timestamp("2011-01-01T00:00:00") AND to_timestamp("2011-01-02T23:59:59") ORDER BY TagName, EventTime ' MOCKED_PARAMETER_DICT = { "business_unit": "mocked-buiness-unit", "region": "mocked-region", @@ -79,6 +80,25 @@ def test_time_weighted_average_with_window_size_mins(mocker: MockerFixture): mocked_close.assert_called_once() assert isinstance(actual, pd.DataFrame) + +def test_time_weighted_average_metadata_step(mocker: MockerFixture): + MOCKED_PARAMETER_DICT["step"] = "metadata" + mocked_cursor = mocker.spy(MockedDBConnection, "cursor") + mocked_execute = mocker.spy(MockedCursor, "execute") + mocked_fetch_all = mocker.patch.object(MockedCursor, "fetchall_arrow", return_value = pa.Table.from_pandas(pd.DataFrame(data={'a': [1], 'b': [2], 'c': [3], 'd': [4]}))) + mocked_close = mocker.spy(MockedCursor, "close") + mocker.patch(DATABRICKS_SQL_CONNECT, return_value = MockedDBConnection()) + + mocked_connection = DatabricksSQLConnection(SERVER_HOSTNAME, HTTP_PATH, ACCESS_TOKEN) + + actual = time_weighted_average_get(mocked_connection, MOCKED_PARAMETER_DICT) + + mocked_cursor.assert_called_once() + mocked_execute.assert_called_once_with(mocker.ANY, query=METADATA_MOCKED_QUERY) + mocked_fetch_all.assert_called_once() + mocked_close.assert_called_once() + assert isinstance(actual, pd.DataFrame) + def test_time_weighted_average_fails(mocker: MockerFixture): mocker.spy(MockedDBConnection, "cursor") mocker.spy(MockedCursor, "execute")