Skip to content

Commit

Permalink
update readme
Browse files Browse the repository at this point in the history
  • Loading branch information
eslamdyab21 committed Feb 15, 2025
1 parent 9a8ecdd commit 36f1ec0
Show file tree
Hide file tree
Showing 3 changed files with 225 additions and 5 deletions.
230 changes: 225 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
# SolarX-Lakehouse
# **Introduction
# Introduction

**SolarX Lakehouse** is a **big data platform** designed to handle and analyze energy consumption, solar energy production, and battery energy storage. The project follows a **modern lakehouse architecture**, integrating **Apache Spark, Iceberg, Kafka, Docker, and Kubernetes to enable scalable, real-time monitoring and batch processing.

Expand All @@ -13,7 +12,7 @@ The data powering **SolarX Lakehouse** comes from the **Solar-X Kafka logged dat
- **Spark Structured Streaming** processes live data efficiently.
- Supports **batch ETL processing** for historical analysis.
2. **Scalable Storage & Data Modeling:**
- Uses **Apache Iceberg to store raw and transformed data.
- Uses **Apache Iceberg** to store raw and transformed data.
- Implements **a structured Data Warehouse layer** for optimized analytics.
- Supports **time-travel queries**, schema evolution, and transactional consistency.
3. **Big Data Computation with Apache Spark:**
Expand All @@ -26,7 +25,7 @@ The data powering **SolarX Lakehouse** comes from the **Solar-X Kafka logged dat
- **Processed wh structured data** → Cleaned and Optimized for analytics & BI tools.
5. **Containerization & Orchestration (Docker + Kubernetes):**
- **Docker and Kubernetes** containerizes all components (Spark, Kafka, Iceberg) for easy deployment and orchestrating scalable workloads.
- Enables **high availability, fault tolerance, and resource optimization** in a cloud-native setup.

### **Tech Stack**
- **Processing:** Apache Spark, PySpark
- **Storage:** Minio object storage and Apache Iceberg
Expand Down Expand Up @@ -150,7 +149,7 @@ docker cp kafka:/kafka_log_solar_energy_data_2025-02-13.log ~/Documents/projects

An example of battery log data:
```log
{"time_stamp": "2025-02-14 02:08:59", "batteries": {"battery_1": {"capacity_kwh": 12, "max_charge_speed_w": 1, "current_energy_wh": 9860, "is_charging": 0, "status": "ideal", "max_output_w": 3.33}, "battery_2": {"capacity_kwh": 12, "max_charge_speed_w": 1, "current_energy_wh": 8600, "is_charging": 0, "status": "ideal", "max_output_w": 3.33}, "battery_3": {"capacity_kwh": 12, "max_charge_speed_w": 1, "current_energy_wh": 9993.67, "is_charging": 0, "status": "discharging", "max_output_w": 3.33}}}
{"time_stamp": "2025-02-14 02:08:59", "batteries": {"battery_1": {"capacity_kwh": 12, "max_charge_speed_w": 1, "current_energy_wh": 9860, "is_charging": 0, "status": "ideal", "max_output_w": 3.33}, "battery_2": {"capacity_kwh": 12, "max_charge_speed_w": 1, "current_energy_wh": 8600, "is_charging": 0, "status": "ideal", "max_output_w": 3.33}, "battery_3": {"capacity_kwh": 12, "max_charge_speed_w": 1, "current_energy_wh": 9993.67, "is_charging": 0, "status": "discharging", "max_output_w": 3.33}}
{"time_stamp": "2025-02-14 02:09:00", "batteries": {"battery_1": {"capacity_kwh": 12, "max_charge_speed_w": 1, "current_energy_wh": 9860, "is_charging": 0, "status": "ideal", "max_output_w": 3.33}, "battery_2": {"capacity_kwh": 12, "max_charge_speed_w": 1, "current_energy_wh": 8600, "is_charging": 0, "status": "ideal", "max_output_w": 3.33}, "battery_3": {"capacity_kwh": 12, "max_charge_speed_w": 1, "current_energy_wh": 9992.87, "is_charging": 0, "status": "discharging", "max_output_w": 3.33}}
Expand Down Expand Up @@ -540,6 +539,34 @@ GROUP BY day
```


<br/>
<br/>

## Battery Tables
You can find the code for battery and also the above two (solar and home) from solarx kafka logged data in the `notebooks/raw_kafka_log_data_iceberg.ipynb` which will be a `.py` file later with the others to run the pipeline with `Airflow`.

solar and home raw schema for kafka logged data is the same as above, we only add here the raw battery_readings table.
```sql
%%sql

CREATE TABLE SolarX_Raw_Transactions.battery_readings(
timestamp TIMESTAMP NOT NULL,
15_minutes_interval INT NOT NULL,
battery_name VARCHAR(15) NOT NULL,
capacity_kwh FLOAT NOT NULL,
max_charge_speed_w FLOAT NOT NULL,
current_energy_wh FLOAT NOT NULL,
is_charging FLOAT NOT NULL,
status VARCHAR(15) NOT NULL,
max_output_w FLOAT NOT NULL
)
USING iceberg
PARTITIONED BY (DAY(timestamp), battery_name, 15_minutes_interval);
```




<br/>
<br/>
<br/>
Expand Down Expand Up @@ -663,6 +690,52 @@ PARTITIONED BY (MONTH(date_key), solar_panel_id)
```



<br/>

### Battery Dimension
```sql
%%sql

CREATE TABLE SolarX_WH.dim_battery(
battery_key INT NOT NULL,
battery_id SMALLINT NOT NULL,
name VARCHAR(15) NOT NULL,
capacity_kwh FLOAT NOT NULL,
max_charge_speed_w FLOAT NOT NULL,
max_output_w FLOAT NOT NULL,

-- scd type2
start_date TIMESTAMP NOT NULL,
end_date TIMESTAMP,

current_flag BOOLEAN
)
USING iceberg;
```
We model the `capacity_kwh`, `max_charge_speed_w` and `max_output_w` as slowly changing dimensions of type 2 to be able to historically track the solar panels changes.

<br/>

### Battery Fact
```sql
%%sql

CREATE TABLE SolarX_WH.fact_battery_power_readings(
battery_key SMALLINT NOT NULL, -- REFERENCES dim_batteryl(solar_panel_key)
date_key TIMESTAMP NOT NULL, -- REFERENCES dim_date(date_key)

battery_id SMALLINT NOT NULL,
current_energy_wh FLOAT NOT NULL,
is_charging SMALLINT NOT NULL,
status VARCHAR(15) NOT NULL
)

USING iceberg
PARTITIONED BY (MONTH(date_key), battery_id)
```


<br/>

### Date Dimension
Expand Down Expand Up @@ -1044,6 +1117,153 @@ ON target.solar_panel_id = source.panel_id AND target.date_key = source.truncate
```




<br/>


### Battery Dimension
We use the `SolarX_Raw_Transactions.battery_readings` as the source data here and load it into the `dim_battery`.

A `CTE` `battery_info` is used to get the last updated values for batteries 1 to 3.
- Update part in `scd2`
```sql
%%sql

WITH battery_info AS (
SELECT
timestamp,
CAST(SPLIT_PART(battery_name, '_', 2) AS INT) as battery_id,
battery.battery_name,
battery.capacity_kwh,
battery.max_charge_speed_w,
battery.max_output_w
FROM SolarX_Raw_Transactions.battery_readings battery
ORDER BY timestamp desc
LIMIT 3
)

MERGE INTO SolarX_WH.dim_battery dim_battery
USING battery_info battery_raw
ON dim_battery.battery_id = battery_raw.battery_id AND dim_battery.current_flag = TRUE

WHEN MATCHED AND (
dim_battery.capacity_kwh != battery_raw.capacity_kwh OR
dim_battery.max_charge_speed_w != battery_raw.max_charge_speed_w OR
dim_battery.max_output_w != battery_raw.max_output_w
) THEN UPDATE SET
dim_battery.end_date = NOW(),
dim_battery.current_flag = FALSE;
```

- Insert part in `scd2`
```sql
%%sql

WITH battery_info AS (
SELECT
timestamp,
CAST(SPLIT_PART(battery_name, '_', 2) AS INT) as battery_id,
battery.battery_name,
battery.capacity_kwh,
battery.max_charge_speed_w,
battery.max_output_w
FROM SolarX_Raw_Transactions.battery_readings battery
ORDER BY timestamp desc
LIMIT 3
)

MERGE INTO SolarX_WH.dim_battery dim_battery
USING battery_info battery_raw
ON dim_battery.battery_id = battery_raw.battery_id AND dim_battery.current_flag = TRUE

WHEN NOT MATCHED THEN
INSERT (
battery_key,
battery_id,
name,
capacity_kwh,
max_charge_speed_w,
max_output_w,
start_date,
end_date,
current_flag
) VALUES (
CAST(CONCAT(battery_raw.battery_id, date_format(timestamp, 'yyyyMMdd')) AS INT),
battery_raw.battery_id,
battery_raw.battery_name,
battery_raw.capacity_kwh,
battery_raw.max_charge_speed_w,
battery_raw.max_output_w,
NOW(),
NULL,
TRUE
);
```


<br/>

### Battery Reading Fact
We use the high rate raw `SolarX_Raw_Transactions.battery_readings` as the source data here and load it into the `fact_battery_power_readings` after extracting the relevant data from the source.


Like solar this `ETL` is a bit different than the others because it involves a `join` step between the `dim_battery` and the staging table of the raw data to get the `battery_key` from the dimension of the corresponding inserted battery power reading.


And in this merge `broadcast` join is used to broadcast the smaller table (dimension table) to the other staging table partitions to avoid shuffling latency, and to do saw I had to get both the staging table and the dimension table into two `pyspark df` and the join with pyspark, because the broadcast join can't be leverage in inline normal sql with iceberg.


- The idea of the `etl` in inline normal sql with iceberg
![](images/battery_etl_idea.png)
- With spark broadcast join (full code in `notebooks/wh_etl.ipynb` or `wh_fact_battery_power_readings_etl.py`)
```python
from pyspark.sql.functions import broadcast

staging_df = spark.sql(staging_query)
dimension_df = spark.sql(dim_battery_current_query)

# Broadcast the smaller dimension table for the join
joined_df = staging_df.join(
broadcast(dimension_df),
(staging_df.battery_id == dimension_df.dim_battery_id),
"left"
)


joined_df.createOrReplaceTempView("staging_temp_view")
```


```sql
%%sql

MERGE INTO SolarX_WH.fact_battery_power_readings AS target
USING staging_temp_view AS source
ON target.battery_id = source.battery_id AND target.date_key = source.timestamp_15min

WHEN NOT MATCHED THEN
INSERT (battery_key,
date_key,
battery_id,
current_energy_wh,
is_charging,
status
)
VALUES (source.battery_key,
source.timestamp_15min,
source.battery_id,
source.current_energy_wh,
source.is_charging,
source.status
);
```

![](images/battery_wh_etl2.png)




<br/>
<br/>

Expand Down
Binary file added images/battery_etl_idea.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/battery_wh_etl2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 36f1ec0

Please sign in to comment.