Skip to content

Commit

Permalink
more controlable way to get latest data
Browse files Browse the repository at this point in the history
  • Loading branch information
eslamdyab21 committed Feb 18, 2025
1 parent 5919dcb commit e6ae09b
Showing 1 changed file with 33 additions and 17 deletions.
50 changes: 33 additions & 17 deletions spark_etls/wh_dim_battery_power_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,25 @@ def load_2_iceberg(spark):


spark.sql("""
WITH battery_info AS (
WITH battery_readings_15m AS (
SELECT
timestamp,
TIMESTAMP(FLOOR(UNIX_MICROS(timestamp) / (15 * 60 * 1000000)) * (15 * 60)) AS timestamp_15min,
CAST(SPLIT_PART(battery_name, '_', 2) AS INT) as battery_id,
battery.battery_name,
battery.capacity_kwh,
battery.max_charge_speed_w,
battery.max_output_w
FROM SolarX_Raw_Transactions.battery_readings battery
ORDER BY timestamp desc
LIMIT 3
battery_name,
capacity_kwh,
max_charge_speed_w,
max_output_w,
ROW_NUMBER() OVER (PARTITION BY TIMESTAMP(FLOOR(UNIX_MICROS(timestamp) / (15 * 60 * 1000000)) * (15 * 60)), battery_name ORDER BY timestamp DESC) AS row_num
FROM SolarX_Raw_Transactions.battery_readings
),
battery_info AS (
SELECT * FROM (
SELECT * FROM battery_readings_15m
WHERE row_num = 1
ORDER BY timestamp_15min DESC
LIMIT 3
)
)
Expand All @@ -35,17 +43,25 @@ def load_2_iceberg(spark):


spark.sql("""
WITH battery_info AS (
WITH battery_readings_15m AS (
SELECT
timestamp,
TIMESTAMP(FLOOR(UNIX_MICROS(timestamp) / (15 * 60 * 1000000)) * (15 * 60)) AS timestamp_15min,
CAST(SPLIT_PART(battery_name, '_', 2) AS INT) as battery_id,
battery.battery_name,
battery.capacity_kwh,
battery.max_charge_speed_w,
battery.max_output_w
FROM SolarX_Raw_Transactions.battery_readings battery
ORDER BY timestamp desc
LIMIT 3
battery_name,
capacity_kwh,
max_charge_speed_w,
max_output_w,
ROW_NUMBER() OVER (PARTITION BY TIMESTAMP(FLOOR(UNIX_MICROS(timestamp) / (15 * 60 * 1000000)) * (15 * 60)), battery_name ORDER BY timestamp DESC) AS row_num
FROM SolarX_Raw_Transactions.battery_readings
),
battery_info AS (
SELECT * FROM (
SELECT * FROM battery_readings_15m
WHERE row_num = 1
ORDER BY timestamp_15min DESC
LIMIT 3
)
)
Expand All @@ -65,7 +81,7 @@ def load_2_iceberg(spark):
end_date,
current_flag
) VALUES (
CAST(CONCAT(battery_raw.battery_id, date_format(timestamp, 'yyyyMMdd')) AS INT),
CAST(CONCAT(battery_raw.battery_id, date_format(NOW(), 'yyyyMMdd')) AS INT),
battery_raw.battery_id,
battery_raw.battery_name,
battery_raw.capacity_kwh,
Expand Down

0 comments on commit e6ae09b

Please sign in to comment.