Skip to content

Commit

Permalink
Merge pull request #433 from rtdip/develop
Browse files Browse the repository at this point in the history
v0.6.6
  • Loading branch information
GBBBAS authored Aug 9, 2023
2 parents e850d04 + 8521cd6 commit a141ca9
Show file tree
Hide file tree
Showing 14 changed files with 115 additions and 67 deletions.
126 changes: 66 additions & 60 deletions src/sdk/python/rtdip_sdk/pipelines/destinations/spark/pcdm_to_delta.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class ValueTypeConstants():
FLOAT_VALUE = "ValueType = 'float'"
STRING_VALUE = "ValueType = 'string'"


class SparkPCDMToDeltaDestination(DestinationInterface):
'''
The Process Control Data Model written to Delta
Expand Down Expand Up @@ -123,71 +122,78 @@ def _get_eventdate_string(self, df: DataFrame) -> str:
dates_list = list(dates_df.toPandas()["EventDate"])
return str(dates_list).replace('[','').replace(']','')

def _write_delta_batch(self, df: DataFrame, destination: str):
def _write_delta_merge(self, df: DataFrame, destination: str):
df = df.select("EventDate", "TagName", "EventTime", "Status", "Value", "ChangeType")
when_matched_update_list = [
DeltaMergeConditionValues(
condition="(source.ChangeType IN ('insert', 'update', 'upsert')) AND ((source.Status != target.Status) OR (source.Value != target.Value))",
values={
"EventDate": "source.EventDate",
"TagName": "source.TagName",
"EventTime": "source.EventTime",
"Status": "source.Status",
"Value": "source.Value"
}
)
]
when_matched_delete_list = [
DeltaMergeCondition(
condition="source.ChangeType = 'delete'"
)
]
when_not_matched_insert_list = [
DeltaMergeConditionValues(
condition="(source.ChangeType IN ('insert', 'update', 'upsert'))",
values={
"EventDate": "source.EventDate",
"TagName": "source.TagName",
"EventTime": "source.EventTime",
"Status": "source.Status",
"Value": "source.Value"
}
)
]

merge_condition = "source.EventDate = target.EventDate AND source.TagName = target.TagName AND source.EventTime = target.EventTime"

if self.merge == True:
df = df.select("EventDate", "TagName", "EventTime", "Status", "Value", "ChangeType")
when_matched_update_list = [
DeltaMergeConditionValues(
condition="(source.ChangeType IN ('insert', 'update', 'upsert')) AND ((source.Status != target.Status) OR (source.Value != target.Value))",
values={
"EventDate": "source.EventDate",
"TagName": "source.TagName",
"EventTime": "source.EventTime",
"Status": "source.Status",
"Value": "source.Value"
}
)
]
when_matched_delete_list = [
DeltaMergeCondition(
condition="source.ChangeType = 'delete'"
)
]
when_not_matched_insert_list = [
DeltaMergeConditionValues(
condition="(source.ChangeType IN ('insert', 'update', 'upsert'))",
values={
"EventDate": "source.EventDate",
"TagName": "source.TagName",
"EventTime": "source.EventTime",
"Status": "source.Status",
"Value": "source.Value"
}
)
]
perform_merge = True
if self.try_broadcast_join != True:
eventdate_string = self._get_eventdate_string(df)
if eventdate_string == None or eventdate_string == "":
perform_merge = False
else:
merge_condition = "target.EventDate in ({}) AND ".format(eventdate_string) + merge_condition

merge_condition = "source.EventDate = target.EventDate AND source.TagName = target.TagName AND source.EventTime = target.EventTime"

perform_merge = True
if self.try_broadcast_join != True:
eventdate_string = self._get_eventdate_string(df)
if eventdate_string == None or eventdate_string == "":
perform_merge = False
else:
merge_condition = "target.EventDate in ({}) AND ".format(eventdate_string) + merge_condition
if perform_merge == True:
SparkDeltaMergeDestination(
spark=self.spark,
data=df,
destination=destination,
options=self.options,
merge_condition=merge_condition,
when_matched_update_list=when_matched_update_list,
when_matched_delete_list=when_matched_delete_list,
when_not_matched_insert_list=when_not_matched_insert_list,
try_broadcast_join=self.try_broadcast_join,
trigger=self.trigger,
query_name=self.query_name
).write_batch()

if perform_merge == True:
delta = SparkDeltaMergeDestination(
spark=self.spark,
data=df,
destination=destination,
options=self.options,
merge_condition=merge_condition,
when_matched_update_list=when_matched_update_list,
when_matched_delete_list=when_matched_delete_list,
when_not_matched_insert_list=when_not_matched_insert_list,
try_broadcast_join=self.try_broadcast_join
)
def _write_delta_batch(self, df: DataFrame, destination: str):

if self.merge == True:
self._write_delta_merge(df.filter(col("ChangeType").isin('insert', 'update', 'upsert')), destination)
self._write_delta_merge(df.filter(col("ChangeType") == 'delete'), destination)
else:
df = df.select("TagName", "EventTime", "Status", "Value")
delta = SparkDeltaDestination(
SparkDeltaDestination(
data=df,
destination=destination,
options=self.options
)

delta.write_batch()
options=self.options,
mode=self.mode,
trigger=self.trigger,
query_name=self.query_name
).write_batch()

def _write_data_by_type(self, df: DataFrame):
if self.merge == True:
Expand All @@ -197,7 +203,7 @@ def _write_data_by_type(self, df: DataFrame):
df = df.withColumn("EventTime", (floor(col("EventTime").cast("double")*1000)/1000).cast("timestamp"))

if self.remove_duplicates == True:
df = df.drop_duplicates(["TagName", "EventTime"])
df = df.drop_duplicates(["TagName", "EventTime", "ChangeType"])

float_df = (
df
Expand Down
1 change: 1 addition & 0 deletions src/sdk/python/rtdip_sdk/queries/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def get(connection: object, parameters_dict: dict) -> pd.DataFrame:
cursor.execute(query)
df = cursor.fetch_all()
cursor.close()
connection.close()
return df
except Exception as e:
logging.exception('error returning dataframe')
Expand Down
15 changes: 10 additions & 5 deletions src/sdk/python/rtdip_sdk/queries/time_series/_query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ def _sample_query(parameters_dict: dict) -> tuple:
",date_array AS (SELECT explode(sequence(from_utc_timestamp(to_timestamp(\"{{ start_date }}\"), \"{{ time_zone }}\"), from_utc_timestamp(to_timestamp(\"{{ end_date }}\"), \"{{ time_zone }}\"), INTERVAL '{{ time_interval_rate + ' ' + time_interval_unit }}')) AS timestamp_array, explode(array('{{ tag_names | join('\\', \\'') }}')) AS TagName) "
",window_buckets AS (SELECT timestamp_array AS window_start ,TagName ,LEAD(timestamp_array) OVER (ORDER BY timestamp_array) AS window_end FROM date_array) "
",project_resample_results AS (SELECT d.window_start ,d.window_end ,d.TagName ,{{ agg_method }}(e.Value) OVER (PARTITION BY d.TagName, d.window_start ORDER BY e.EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS Value FROM window_buckets d INNER JOIN raw_events e ON e.EventTime >= d.window_start AND e.EventTime < d.window_end AND e.TagName = d.TagName) "
"SELECT window_start AS EventTime ,TagName ,Value FROM project_resample_results GROUP BY window_start ,TagName ,Value ORDER BY TagName, EventTime "
"SELECT window_start AS EventTime ,TagName ,Value FROM project_resample_results GROUP BY window_start ,TagName ,Value "
"{% if is_resample is defined and is_resample == true %}"
"ORDER BY TagName, EventTime "
"{% endif %}"
)

sample_parameters = {
Expand All @@ -121,7 +124,8 @@ def _sample_query(parameters_dict: dict) -> tuple:
"time_interval_rate": parameters_dict['time_interval_rate'],
"time_interval_unit": parameters_dict['time_interval_unit'],
"agg_method": parameters_dict['agg_method'],
"time_zone": parameters_dict["time_zone"]
"time_zone": parameters_dict["time_zone"],
"is_resample": True
}

sql_template = Template(sample_query)
Expand All @@ -148,10 +152,10 @@ def _interpolation_query(parameters_dict: dict, sample_query: str, sample_parame
",linear_interpolation_calculations AS (SELECT coalesce(a.TagName, b.TagName) as TagName, coalesce(a.EventTime, b.EventTime) as EventTime, a.EventTime as Requested_EventTime, b.EventTime as Found_EventTime, b.Value, "
"last_value(b.EventTime, true) OVER (PARTITION BY a.TagName ORDER BY a.EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS Last_EventTime, last_value(b.Value, true) OVER (PARTITION BY a.TagName ORDER BY a.EventTime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS Last_Value, "
"first_value(b.EventTime, true) OVER (PARTITION BY a.TagName ORDER BY a.EventTime ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS Next_EventTime, first_value(b.Value, true) OVER (PARTITION BY a.TagName ORDER BY a.EventTime ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS Next_Value, "
"CASE WHEN b.Value is NULL THEN Last_Value + (unix_timestamp(a.EventTime) - unix_timestamp(Last_EventTime)) * ((Next_Value - Last_Value)) / ((unix_timestamp(Next_EventTime) - unix_timestamp(Last_EventTime))) ELSE b.Value END AS linear_interpolated_value FROM date_array a FULL OUTER JOIN resample b ON a.EventTime = b.EventTime AND a.TagName = b.TagName ORDER BY a.EventTime, b.TagName) "
"SELECT EventTime, TagName, linear_interpolated_value AS Value FROM linear_interpolation_calculations "
"CASE WHEN b.Value is NULL THEN Last_Value + (unix_timestamp(a.EventTime) - unix_timestamp(Last_EventTime)) * ((Next_Value - Last_Value)) / ((unix_timestamp(Next_EventTime) - unix_timestamp(Last_EventTime))) ELSE b.Value END AS linear_interpolated_value FROM date_array a FULL OUTER JOIN resample b ON a.EventTime = b.EventTime AND a.TagName = b.TagName) "
"SELECT EventTime, TagName, linear_interpolated_value AS Value FROM linear_interpolation_calculations ORDER BY TagName, EventTime "
"{% else %}"
"SELECT * FROM resample "
"SELECT * FROM resample ORDER BY TagName, EventTime "
"{% endif %}"
)

Expand Down Expand Up @@ -307,6 +311,7 @@ def _query_builder(parameters_dict: dict, query_type: str) -> str:

if query_type == "interpolate":
sample_prepared_query, sample_query, sample_parameters = _sample_query(parameters_dict)
sample_parameters["is_resample"] = False
return _interpolation_query(parameters_dict, sample_query, sample_parameters)

if query_type == "time_weighted_average":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import pandas as pd
from ._query_builder import _query_builder


def get(connection: object, parameters_dict: dict) -> pd.DataFrame:
'''
An RTDIP interpolation function that is intertwined with the RTDIP Resampling function.
Expand Down Expand Up @@ -68,6 +67,7 @@ def get(connection: object, parameters_dict: dict) -> pd.DataFrame:
cursor.execute(query)
df = cursor.fetch_all()
cursor.close()
connection.close()
return df
except Exception as e:
logging.exception('error returning dataframe')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def get(connection: object, parameters_dict: dict) -> pd.DataFrame:
cursor.execute(query)
df = cursor.fetch_all()
cursor.close()
connection.close()
return df
except Exception as e:
logging.exception('error returning dataframe')
Expand Down
1 change: 1 addition & 0 deletions src/sdk/python/rtdip_sdk/queries/time_series/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def get(connection: object, parameters_dict: dict) -> pd.DataFrame:
cursor.execute(query)
df = cursor.fetch_all()
cursor.close()
connection.close()
return df
except Exception as e:
logging.exception('error returning dataframe')
Expand Down
1 change: 1 addition & 0 deletions src/sdk/python/rtdip_sdk/queries/time_series/resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def get(connection: object, parameters_dict: dict) -> pd.DataFrame:
cursor.execute(query)
df = cursor.fetch_all()
cursor.close()
connection.close()
return df
except Exception as e:
logging.exception('error returning dataframe')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def get(connection: object, parameters_dict: dict) -> pd.DataFrame:
cursor.execute(query)
df = cursor.fetch_all()
cursor.close()
connection.close()
return df
except Exception as e:
logging.exception('error returning dataframe')
Expand Down
Loading

0 comments on commit a141ca9

Please sign in to comment.