Skip to content

Commit

Permalink
warehouse home power etl script
Browse files Browse the repository at this point in the history
  • Loading branch information
eslamdyab21 committed Jan 24, 2025
1 parent a39ac2a commit 17200b7
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 0 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1086,3 +1086,8 @@ so that workers can resolve the master service name correctly instead of accessi
```bash
./bin/spark-submit --master spark://spark-master-service:7077 --conf spark.driver.host=spark-master-service --conf spark.driver.bindAddress=0.0.0.0 --conf spark.driver.port=5000 --conf spark.broadcast.port=5001 --conf spark.replClassServer.port=5002 --conf spark.blockManager.port=5003 --num-executors 4 --executor-cores 1 --executor-memory 512M /home/iceberg/etl_scripts/raw_solar_panel_power_readings_etl.py 2013-01-01
```

- Wh home power etl
```bash
./bin/spark-submit --master spark://spark-master-service:7077 --conf spark.driver.host=spark-master-service --conf spark.driver.bindAddress=0.0.0.0 --conf spark.driver.port=5000 --conf spark.broadcast.port=5001 --conf spark.replClassServer.port=5002 --conf spark.blockManager.port=5003 --num-executors 4 --executor-cores 1 --executor-memory 512M /home/iceberg/etl_scripts/wh_home_power_etl.py
```
77 changes: 77 additions & 0 deletions lakehouse/spark/home_appliances_consumption.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
"Refrigerator":
{
"consumption": [300,1500],
"time": "00:00-24:00"
},
"Electric Oven":
{
"consumption": [2000,5000],
"time": "16:00-16:30,21:00-21:30"
},
"Electric Kettle":
{
"consumption": [1500,1500],
"time": "07:00-07:15,12:00-12:15,16:30-17:00,21:30-22:00"
},
"Air Conditioner":
{
"consumption": [500,3000],
"time": "00:00-24:00"
},
"Incandescent Light Bulbs":
{
"consumption": [60,60],
"time":"00:00-24:00"
},
"LED Light Bulbs":
{
"consumption": [10,10],
"time": "00:00-24:00"
},
"Laptop":
{
"consumption": [50,100],
"time":"00:00-24:00"
},
"Computer":
{
"consumption": [100, 600],
"time": "00:00-24:00"
},
"LCD Monitor":
{
"consumption": [50,300],
"time": "10:00-12:00,16:00-20:00,22:00-24:00"
},
"Router":
{
"consumption": [5,20],
"time":"00:00-24:00"
},
"Smartphone Charger":
{
"consumption": [5,5],
"time":"00:00-24:00"
},
"Blow Dryer":
{
"consumption": [800,1800],
"time":"07:00-07:30,15:00-15:30"
},
"Iron":
{
"consumption": [1000,1000],
"time":"07:00-07:30,15:00-15:30"
},
"Washing Machine" :
{
"consumption": [500,1000],
"time":"17:00-19:00"
},
"Water Heater" :
{
"consumption": [1000,4000],
"time":"12:00-16:00"
}
}
70 changes: 70 additions & 0 deletions spark_etls/wh_home_power_etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from pyspark.sql import SparkSession
import pandas as pd
import json
import logging


def get_home_appliances_df():
with open('/home/iceberg/warehouse/home_appliances_consumption.json') as f:
HOME_USAGE_POWER = json.load(f)
HOME_USAGE_POWER.items()

df = pd.DataFrame([
{
"home_key" : 1,
"name": name,
"min_consumption_rating": info["consumption"][0],
"max_consumption_rating": info["consumption"][1],
"usage_time": info["time"]
}
for name, info in HOME_USAGE_POWER.items()
])
df.index += 1
df.index.name = 'home_appliance_key'
return df.reset_index()



def main():
spark = (
SparkSession
.builder
.appName("create raw solar panel power")
.getOrCreate()
)

home_appliances_df = spark.createDataFrame(get_home_appliances_df())
home_appliances_df.createOrReplaceTempView("temp_view")

spark.sql("""
MERGE INTO SolarX_WH.dim_home_appliances dim_app
USING
(SELECT home_appliance_key as home_appliance_key,
home_key as home_key,
name as appliance,
min_consumption_rating as min_consumption_power_wh,
max_consumption_rating as max_consumption_power_wh,
usage_time as usage_time
FROM temp_view) tmp
ON dim_app.home_appliance_key = tmp.home_appliance_key
WHEN MATCHED AND (
dim_app.min_consumption_power_wh != tmp.min_consumption_power_wh OR
dim_app.max_consumption_power_wh != tmp.max_consumption_power_wh
) THEN UPDATE SET
dim_app.min_consumption_power_wh = tmp.min_consumption_power_wh,
dim_app.max_consumption_power_wh = tmp.max_consumption_power_wh
WHEN NOT MATCHED THEN INSERT *
""")

logging.info(f"""wh-home-power-power-etl -> Load dim_home_appliances successfully into iceberg""")


spark.stop()


if __name__ == "__main__":
logging.basicConfig(level = "INFO")
main()

0 comments on commit 17200b7

Please sign in to comment.