Skip to content

Commit

Permalink
Add metadata Step to the Time Weighted Average Function, Unit test an…
Browse files Browse the repository at this point in the history
…d Documentation updates. (#408)

* updates

Signed-off-by: cching95 <[email protected]>

* add metadata step to time weighted average query

Signed-off-by: cching95 <[email protected]>

* Update Unit Tests for new metadata Step

Signed-off-by: cching95 <[email protected]>

---------

Signed-off-by: cching95 <[email protected]>
  • Loading branch information
cching95 authored Aug 2, 2023
1 parent 7c59c62 commit ead4316
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/api/v1/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def __init__(
time_interval_rate: str = Query(..., description="Time Interval Rate as a numeric input", examples=[5]),
time_interval_unit: str = Query(..., description="Time Interval Unit can be one of the options: [second, minute, day, hour]", examples=["second", "minute", "hour", "day"]),
window_length: int = Query(..., description="Window Length in days", examples=[1]),
step: str = Query(..., description="Step can be true or false", examples=["true", "false"])
step: str = Query(..., description="Step can be \"true\", \"false\" or \"metadata\". \"metadata\" will retrieve the step value from the metadata table.", examples=["true", "false", "metadata"])
):
self.window_size_mins = window_size_mins
self.time_interval_rate = time_interval_rate
Expand Down
11 changes: 11 additions & 0 deletions src/sdk/python/rtdip_sdk/queries/time_series/_query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,16 +226,27 @@ def _time_weighted_average_query(parameters_dict: dict) -> str:
time_weighted_average_query = (
"WITH raw_events AS (SELECT DISTINCT EventDate, TagName, from_utc_timestamp(to_timestamp(date_format(EventTime, 'yyyy-MM-dd HH:mm:ss.SSS')), \"{{ time_zone }}\") as EventTime, Status, Value FROM `{{ business_unit }}`.`sensors`.`{{ asset }}_{{ data_security_level }}_events_{{ data_type }}` WHERE EventDate BETWEEN date_sub(to_date(to_timestamp(\"{{ start_date }}\")), {{ window_length }}) AND date_add(to_date(to_timestamp(\"{{ end_date }}\")), {{ window_length }}) AND TagName in ('{{ tag_names | join('\\', \\'') }}') "
"{% if include_bad_data is defined and include_bad_data == false %} AND Status = 'Good' {% endif %}) "
"{% if step is defined and step == \"metadata\" %} "
",meta_data AS (SELECT TagName, IFNULL(Step, false) AS Step FROM `downstream`.`sensors`.`pernis_restricted_metadata` )"
"{% endif %}"
",date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp(\"{{ start_date }}\"), \"{{ time_zone }}\"), from_utc_timestamp(to_timestamp(\"{{ end_date }}\"), \"{{ time_zone }}\"), INTERVAL '{{ time_interval_rate + ' ' + time_interval_unit }}')) AS EventTime, explode(array('{{ tag_names | join('\\', \\'') }}')) AS TagName) "
",window_events AS (SELECT coalesce(a.TagName, b.TagName) AS TagName, coalesce(a.EventTime, b.EventTime) as EventTime, window(coalesce(a.EventTime, b.EventTime), '{{ time_interval_rate + ' ' + time_interval_unit }}').start WindowEventTime, b.Status, b.Value FROM date_array a "
"FULL OUTER JOIN raw_events b ON CAST(a.EventTime AS long) = CAST(b.EventTime AS long) AND a.TagName = b.TagName) "
",fill_status AS (SELECT *, last_value(Status, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as Fill_Status, CASE WHEN Fill_Status = \"Good\" THEN Value ELSE null END AS Good_Value FROM window_events) "
",fill_value AS (SELECT *, last_value(Good_Value, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS Fill_Value FROM fill_status) "
"{% if step is defined and step == \"metadata\" %} "
",twa_calculations AS (SELECT f.TagName, f.EventTime, f.WindowEventTime, m.Step, f.Status, f.Value, f.Fill_Status, f.Fill_Value, lead(f.EventTime) OVER (PARTITION BY f.TagName ORDER BY f.EventTime) AS Next_EventTime, lead(f.Fill_Status) OVER (PARTITION BY f.TagName ORDER BY f.EventTime) AS Next_Status "
",CASE WHEN Next_Status = \"Good\" OR (f.Fill_Status = \"Good\" AND Next_Status = \"Bad\") THEN lead(f.Fill_Value) OVER (PARTITION BY f.TagName ORDER BY f.EventTime) ELSE f.Value END AS Next_Value_For_Status "
",CASE WHEN f.Fill_Status = \"Good\" THEN Next_Value_For_Status ELSE 0 END AS Next_Value "
",CASE WHEN f.Fill_Status = \"Good\" and Next_Status = \"Good\" THEN ((cast(Next_EventTime as double) - cast(f.EventTime as double)) / 60) WHEN f.Fill_Status = \"Good\" and Next_Status != \"Good\" THEN ((cast(Next_EventTime as integer) - cast(f.EventTime as double)) / 60) ELSE 0 END AS good_minutes "
",CASE WHEN m.Step == false THEN ((f.Fill_Value + Next_Value) * 0.5) * good_minutes ELSE (f.Fill_Value * good_minutes) END AS twa_value FROM fill_value f LEFT JOIN meta_data m ON f.TagName = m.TagName) "
"{% else %} "
",twa_calculations AS (SELECT TagName, EventTime, WindowEventTime, {{ step }} AS Step, Status, Value, Fill_Status, Fill_Value, lead(EventTime) OVER (PARTITION BY TagName ORDER BY EventTime) AS Next_EventTime, lead(Fill_Status) OVER (PARTITION BY TagName ORDER BY EventTime) AS Next_Status "
",CASE WHEN Next_Status = \"Good\" OR (Fill_Status = \"Good\" AND Next_Status = \"Bad\") THEN lead(Fill_Value) OVER (PARTITION BY TagName ORDER BY EventTime) ELSE Value END AS Next_Value_For_Status "
",CASE WHEN Fill_Status = \"Good\" THEN Next_Value_For_Status ELSE 0 END AS Next_Value "
",CASE WHEN Fill_Status = \"Good\" and Next_Status = \"Good\" THEN ((cast(Next_EventTime as double) - cast(EventTime as double)) / 60) WHEN Fill_Status = \"Good\" and Next_Status != \"Good\" THEN ((cast(Next_EventTime as integer) - cast(EventTime as double)) / 60) ELSE 0 END AS good_minutes "
",CASE WHEN Step == false THEN ((Fill_Value + Next_Value) * 0.5) * good_minutes ELSE (Fill_Value * good_minutes) END AS twa_value FROM fill_value) "
"{% endif %} "
",project_result AS (SELECT TagName, WindowEventTime AS EventTime, sum(twa_value) / sum(good_minutes) AS Value from twa_calculations GROUP BY TagName, WindowEventTime) "
"SELECT * FROM project_result WHERE EventTime BETWEEN to_timestamp(\"{{ start_datetime }}\") AND to_timestamp(\"{{ end_datetime }}\") ORDER BY TagName, EventTime "
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def get(connection: object, parameters_dict: dict) -> pd.DataFrame:
time_interval_unit (str): The time interval unit (second, minute, day, hour)
window_length (int): Add longer window time in days for the start or end of specified date to cater for edge cases.
include_bad_data (bool): Include "Bad" data points with True or remove "Bad" data points with False
step (str): data points with step "enabled" or "disabled". The options for step are "metadata", "true" or "false". "metadata" will get the step requirements from the metadata table if applicable.
step (str): data points with step "enabled" or "disabled". The options for step are "true", "false" or "metadata". "metadata" will retrieve the step value from the metadata table.
Returns:
DataFrame: A dataframe containing the time weighted averages.
'''
Expand Down
22 changes: 21 additions & 1 deletion tests/sdk/python/rtdip_sdk/queries/test_time_weighted_average.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
ACCESS_TOKEN = "mock_databricks_token"
DATABRICKS_SQL_CONNECT = 'databricks.sql.connect'
DATABRICKS_SQL_CONNECT_CURSOR = 'databricks.sql.connect.cursor'
MOCKED_QUERY= 'WITH raw_events AS (SELECT DISTINCT EventDate, TagName, from_utc_timestamp(to_timestamp(date_format(EventTime, \'yyyy-MM-dd HH:mm:ss.SSS\')), "+0000") as EventTime, Status, Value FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE EventDate BETWEEN date_sub(to_date(to_timestamp("2011-01-01T00:00:00+00:00")), 1) AND date_add(to_date(to_timestamp("2011-01-02T23:59:59+00:00")), 1) AND TagName in (\'MOCKED-TAGNAME\') ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS EventTime, explode(array(\'MOCKED-TAGNAME\')) AS TagName) ,window_events AS (SELECT coalesce(a.TagName, b.TagName) AS TagName, coalesce(a.EventTime, b.EventTime) as EventTime, window(coalesce(a.EventTime, b.EventTime), \'15 minute\').start WindowEventTime, b.Status, b.Value FROM date_array a FULL OUTER JOIN raw_events b ON CAST(a.EventTime AS long) = CAST(b.EventTime AS long) AND a.TagName = b.TagName) ,fill_status AS (SELECT *, last_value(Status, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as Fill_Status, CASE WHEN Fill_Status = "Good" THEN Value ELSE null END AS Good_Value FROM window_events) ,fill_value AS (SELECT *, last_value(Good_Value, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS Fill_Value FROM fill_status) ,twa_calculations AS (SELECT TagName, EventTime, WindowEventTime, false AS Step, Status, Value, Fill_Status, Fill_Value, lead(EventTime) OVER (PARTITION BY TagName ORDER BY EventTime) AS Next_EventTime, lead(Fill_Status) OVER (PARTITION BY TagName ORDER BY EventTime) AS Next_Status ,CASE WHEN Next_Status = "Good" OR (Fill_Status = "Good" AND Next_Status = "Bad") THEN lead(Fill_Value) OVER (PARTITION BY TagName ORDER BY EventTime) ELSE Value END AS Next_Value_For_Status ,CASE WHEN Fill_Status = "Good" THEN Next_Value_For_Status ELSE 0 END AS Next_Value ,CASE WHEN Fill_Status = "Good" and Next_Status = "Good" THEN ((cast(Next_EventTime as double) - cast(EventTime as double)) / 60) WHEN Fill_Status = "Good" and Next_Status != "Good" THEN ((cast(Next_EventTime as integer) - cast(EventTime as double)) / 60) ELSE 0 END AS good_minutes ,CASE WHEN Step == false THEN ((Fill_Value + Next_Value) * 0.5) * good_minutes ELSE (Fill_Value * good_minutes) END AS twa_value FROM fill_value) ,project_result AS (SELECT TagName, WindowEventTime AS EventTime, sum(twa_value) / sum(good_minutes) AS Value from twa_calculations GROUP BY TagName, WindowEventTime) SELECT * FROM project_result WHERE EventTime BETWEEN to_timestamp("2011-01-01T00:00:00") AND to_timestamp("2011-01-02T23:59:59") ORDER BY TagName, EventTime '
MOCKED_QUERY= 'WITH raw_events AS (SELECT DISTINCT EventDate, TagName, from_utc_timestamp(to_timestamp(date_format(EventTime, \'yyyy-MM-dd HH:mm:ss.SSS\')), "+0000") as EventTime, Status, Value FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE EventDate BETWEEN date_sub(to_date(to_timestamp("2011-01-01T00:00:00+00:00")), 1) AND date_add(to_date(to_timestamp("2011-01-02T23:59:59+00:00")), 1) AND TagName in (\'MOCKED-TAGNAME\') ) ,date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS EventTime, explode(array(\'MOCKED-TAGNAME\')) AS TagName) ,window_events AS (SELECT coalesce(a.TagName, b.TagName) AS TagName, coalesce(a.EventTime, b.EventTime) as EventTime, window(coalesce(a.EventTime, b.EventTime), \'15 minute\').start WindowEventTime, b.Status, b.Value FROM date_array a FULL OUTER JOIN raw_events b ON CAST(a.EventTime AS long) = CAST(b.EventTime AS long) AND a.TagName = b.TagName) ,fill_status AS (SELECT *, last_value(Status, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as Fill_Status, CASE WHEN Fill_Status = "Good" THEN Value ELSE null END AS Good_Value FROM window_events) ,fill_value AS (SELECT *, last_value(Good_Value, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS Fill_Value FROM fill_status) ,twa_calculations AS (SELECT TagName, EventTime, WindowEventTime, false AS Step, Status, Value, Fill_Status, Fill_Value, lead(EventTime) OVER (PARTITION BY TagName ORDER BY EventTime) AS Next_EventTime, lead(Fill_Status) OVER (PARTITION BY TagName ORDER BY EventTime) AS Next_Status ,CASE WHEN Next_Status = "Good" OR (Fill_Status = "Good" AND Next_Status = "Bad") THEN lead(Fill_Value) OVER (PARTITION BY TagName ORDER BY EventTime) ELSE Value END AS Next_Value_For_Status ,CASE WHEN Fill_Status = "Good" THEN Next_Value_For_Status ELSE 0 END AS Next_Value ,CASE WHEN Fill_Status = "Good" and Next_Status = "Good" THEN ((cast(Next_EventTime as double) - cast(EventTime as double)) / 60) WHEN Fill_Status = "Good" and Next_Status != "Good" THEN ((cast(Next_EventTime as integer) - cast(EventTime as double)) / 60) ELSE 0 END AS good_minutes ,CASE WHEN Step == false THEN ((Fill_Value + Next_Value) * 0.5) * good_minutes ELSE (Fill_Value * good_minutes) END AS twa_value FROM fill_value) ,project_result AS (SELECT TagName, WindowEventTime AS EventTime, sum(twa_value) / sum(good_minutes) AS Value from twa_calculations GROUP BY TagName, WindowEventTime) SELECT * FROM project_result WHERE EventTime BETWEEN to_timestamp("2011-01-01T00:00:00") AND to_timestamp("2011-01-02T23:59:59") ORDER BY TagName, EventTime '
METADATA_MOCKED_QUERY = 'WITH raw_events AS (SELECT DISTINCT EventDate, TagName, from_utc_timestamp(to_timestamp(date_format(EventTime, \'yyyy-MM-dd HH:mm:ss.SSS\')), "+0000") as EventTime, Status, Value FROM `mocked-buiness-unit`.`sensors`.`mocked-asset_mocked-data-security-level_events_mocked-data-type` WHERE EventDate BETWEEN date_sub(to_date(to_timestamp("2011-01-01T00:00:00+00:00")), 1) AND date_add(to_date(to_timestamp("2011-01-02T23:59:59+00:00")), 1) AND TagName in (\'MOCKED-TAGNAME\') ) ,meta_data AS (SELECT TagName, IFNULL(Step, false) AS Step FROM `downstream`.`sensors`.`pernis_restricted_metadata` ),date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp("2011-01-01T00:00:00+00:00"), "+0000"), from_utc_timestamp(to_timestamp("2011-01-02T23:59:59+00:00"), "+0000"), INTERVAL \'15 minute\')) AS EventTime, explode(array(\'MOCKED-TAGNAME\')) AS TagName) ,window_events AS (SELECT coalesce(a.TagName, b.TagName) AS TagName, coalesce(a.EventTime, b.EventTime) as EventTime, window(coalesce(a.EventTime, b.EventTime), \'15 minute\').start WindowEventTime, b.Status, b.Value FROM date_array a FULL OUTER JOIN raw_events b ON CAST(a.EventTime AS long) = CAST(b.EventTime AS long) AND a.TagName = b.TagName) ,fill_status AS (SELECT *, last_value(Status, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as Fill_Status, CASE WHEN Fill_Status = "Good" THEN Value ELSE null END AS Good_Value FROM window_events) ,fill_value AS (SELECT *, last_value(Good_Value, true) OVER (PARTITION BY TagName ORDER BY EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS Fill_Value FROM fill_status) ,twa_calculations AS (SELECT f.TagName, f.EventTime, f.WindowEventTime, m.Step, f.Status, f.Value, f.Fill_Status, f.Fill_Value, lead(f.EventTime) OVER (PARTITION BY f.TagName ORDER BY f.EventTime) AS Next_EventTime, lead(f.Fill_Status) OVER (PARTITION BY f.TagName ORDER BY f.EventTime) AS Next_Status ,CASE WHEN Next_Status = "Good" OR (f.Fill_Status = "Good" AND Next_Status = "Bad") THEN lead(f.Fill_Value) OVER (PARTITION BY f.TagName ORDER BY f.EventTime) ELSE f.Value END AS Next_Value_For_Status ,CASE WHEN f.Fill_Status = "Good" THEN Next_Value_For_Status ELSE 0 END AS Next_Value ,CASE WHEN f.Fill_Status = "Good" and Next_Status = "Good" THEN ((cast(Next_EventTime as double) - cast(f.EventTime as double)) / 60) WHEN f.Fill_Status = "Good" and Next_Status != "Good" THEN ((cast(Next_EventTime as integer) - cast(f.EventTime as double)) / 60) ELSE 0 END AS good_minutes ,CASE WHEN m.Step == false THEN ((f.Fill_Value + Next_Value) * 0.5) * good_minutes ELSE (f.Fill_Value * good_minutes) END AS twa_value FROM fill_value f LEFT JOIN meta_data m ON f.TagName = m.TagName) ,project_result AS (SELECT TagName, WindowEventTime AS EventTime, sum(twa_value) / sum(good_minutes) AS Value from twa_calculations GROUP BY TagName, WindowEventTime) SELECT * FROM project_result WHERE EventTime BETWEEN to_timestamp("2011-01-01T00:00:00") AND to_timestamp("2011-01-02T23:59:59") ORDER BY TagName, EventTime '
MOCKED_PARAMETER_DICT = {
"business_unit": "mocked-buiness-unit",
"region": "mocked-region",
Expand Down Expand Up @@ -79,6 +80,25 @@ def test_time_weighted_average_with_window_size_mins(mocker: MockerFixture):
mocked_close.assert_called_once()
assert isinstance(actual, pd.DataFrame)


def test_time_weighted_average_metadata_step(mocker: MockerFixture):
MOCKED_PARAMETER_DICT["step"] = "metadata"
mocked_cursor = mocker.spy(MockedDBConnection, "cursor")
mocked_execute = mocker.spy(MockedCursor, "execute")
mocked_fetch_all = mocker.patch.object(MockedCursor, "fetchall_arrow", return_value = pa.Table.from_pandas(pd.DataFrame(data={'a': [1], 'b': [2], 'c': [3], 'd': [4]})))
mocked_close = mocker.spy(MockedCursor, "close")
mocker.patch(DATABRICKS_SQL_CONNECT, return_value = MockedDBConnection())

mocked_connection = DatabricksSQLConnection(SERVER_HOSTNAME, HTTP_PATH, ACCESS_TOKEN)

actual = time_weighted_average_get(mocked_connection, MOCKED_PARAMETER_DICT)

mocked_cursor.assert_called_once()
mocked_execute.assert_called_once_with(mocker.ANY, query=METADATA_MOCKED_QUERY)
mocked_fetch_all.assert_called_once()
mocked_close.assert_called_once()
assert isinstance(actual, pd.DataFrame)

def test_time_weighted_average_fails(mocker: MockerFixture):
mocker.spy(MockedDBConnection, "cursor")
mocker.spy(MockedCursor, "execute")
Expand Down

0 comments on commit ead4316

Please sign in to comment.