Skip to content

Commit

Permalink
edit airflow workflow to align with new wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
eslamdyab21 committed Jan 12, 2025
1 parent 697841f commit 2862e14
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions airflow/dags/etl_workflow_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def process_output_from_check_wh_schema_exists(ti):
# ------------- schema ------------------
check_schema_exists = SSHOperator(
task_id='check_schema_exists',
command='/opt/spark/bin/spark-submit --master spark://spark-master:7077 --num-executors 6 --executor-cores 1 --executor-memory 512M /home/iceberg/etl_scripts/check_schema_exists.py',
command='/bin/bash /home/iceberg/etl_scripts/airflow_bash_wrapper.sh check_schema_exists.py',
ssh_conn_id='spark_master_ssh',
dag=dag,
do_xcom_push=True,
Expand All @@ -70,7 +70,7 @@ def process_output_from_check_wh_schema_exists(ti):

create_raw_schema = SSHOperator(
task_id='create_raw_schema',
command='/opt/spark/bin/spark-submit --master spark://spark-master:7077 --num-executors 6 --executor-cores 1 --executor-memory 512M /home/iceberg/etl_scripts/create_raw_schema.py',
command='/bin/bash /home/iceberg/etl_scripts/airflow_bash_wrapper.sh create_raw_schema.py',
ssh_conn_id='spark_master_ssh',
dag=dag,
)
Expand All @@ -97,29 +97,28 @@ def process_output_from_check_wh_schema_exists(ti):
# ------------- raw etl ------------------
raw_home_power_readings_etl = SSHOperator(
task_id='raw_home_power_readings_etl',
command=f"""/opt/spark/bin/spark-submit --master spark://spark-master:7077 --num-executors 6 --executor-cores 1 --executor-memory 512M /home/iceberg/etl_scripts/raw_home_power_readings_etl.py {date} || true""",
command=f"""/bin/bash /home/iceberg/etl_scripts/airflow_bash_wrapper.sh raw_home_power_readings_etl.py {date}""",
ssh_conn_id='spark_master_ssh',
dag=dag,
)


raw_solar_panel_power_etl = SSHOperator(
task_id='raw_solar_panel_power_etl',
command='/opt/spark/bin/spark-submit --master spark://spark-master:7077 --num-executors 6 --executor-cores 1 --executor-memory 512M /home/iceberg/etl_scripts/raw_solar_panel_power_etl.py || true',
command='/bin/bash /home/iceberg/etl_scripts/airflow_bash_wrapper.sh raw_solar_panel_power_etl.py',
ssh_conn_id='spark_master_ssh',
dag=dag,
)

raw_solar_panel_power_readings_etl = SSHOperator(
task_id='raw_solar_panel_power_readings_etl',
command=f"""/opt/spark/bin/spark-submit --master spark://spark-master:7077 --num-executors 6 --executor-cores 1 --executor-memory 512M /home/iceberg/etl_scripts/raw_solar_panel_power_readings_etl.py {date} || true""",
command=f"""/bin/bash /home/iceberg/etl_scripts/airflow_bash_wrapper.sh raw_solar_panel_power_readings_etl.py {date}""",
ssh_conn_id='spark_master_ssh',
dag=dag,
)





check_schema_exists >> check_raw_schema_exists_output
check_raw_schema_exists_output >> [skip_create_raw_schema, create_raw_schema] >> merge_task
merge_task >> raw_home_power_readings_etl >> raw_solar_panel_power_etl >> raw_solar_panel_power_readings_etl

0 comments on commit 2862e14

Please sign in to comment.