From 96ad27b584ba27acbb9dbe54f418426fd47764f7 Mon Sep 17 00:00:00 2001 From: David Siklosi Date: Sat, 13 Apr 2024 20:59:23 +0200 Subject: [PATCH 1/4] Moving to python3.9; Upgrading airflow version; removing legacy postgres operator --- Makefile | 6 ++-- .../redshift_load_creator.py | 5 ++- .../redshift_transform_creator.py | 6 ++-- .../redshift_unload_creator.py | 7 +++-- .../airflow/operators/postgres_operator.py | 2 +- reqs/dev.txt | 31 +++++++++---------- reqs/test.txt | 2 +- setup.py | 3 +- 8 files changed, 30 insertions(+), 32 deletions(-) diff --git a/Makefile b/Makefile index 02daa9e..8872dba 100644 --- a/Makefile +++ b/Makefile @@ -96,15 +96,15 @@ install: clean ## install the package to the active Python's site-packages install-dev: clean ## install the package to the active Python's site-packages - virtualenv -p python3 venv; \ + virtualenv -p python3.9 venv; \ source venv/bin/activate; \ python -m pip install --upgrade pip; \ python setup.py install; \ pip install -e . ; \ - pip install -r reqs/dev.txt -r reqs/test.txt + SYSTEM_VERSION_COMPAT=0 CFLAGS='-std=c++20' pip install -r reqs/dev.txt -r reqs/test.txt install-test: clean ## install the package to the active Python's site-packages - virtualenv -p python3 venv; \ + virtualenv -p python3.9 venv; \ source venv/bin/activate; \ python -m pip install --upgrade pip; \ pip install -r reqs/test.txt -r reqs/base.txt diff --git a/dagger/dag_creator/airflow/operator_creators/redshift_load_creator.py b/dagger/dag_creator/airflow/operator_creators/redshift_load_creator.py index 8d14182..f1576f4 100644 --- a/dagger/dag_creator/airflow/operator_creators/redshift_load_creator.py +++ b/dagger/dag_creator/airflow/operator_creators/redshift_load_creator.py @@ -2,9 +2,8 @@ from typing import Optional from dagger.dag_creator.airflow.operator_creator import OperatorCreator -from dagger.dag_creator.airflow.operators.redshift_sql_operator import ( - RedshiftSQLOperator, -) +from dagger.dag_creator.airflow.operators.redshift_sql_operator import RedshiftSQLOperator + class RedshiftLoadCreator(OperatorCreator): diff --git a/dagger/dag_creator/airflow/operator_creators/redshift_transform_creator.py b/dagger/dag_creator/airflow/operator_creators/redshift_transform_creator.py index 0218a6f..c8eb8dd 100644 --- a/dagger/dag_creator/airflow/operator_creators/redshift_transform_creator.py +++ b/dagger/dag_creator/airflow/operator_creators/redshift_transform_creator.py @@ -1,7 +1,7 @@ from os.path import join from dagger.dag_creator.airflow.operator_creator import OperatorCreator -from dagger.dag_creator.airflow.operators.postgres_operator import PostgresOperator +from dagger.dag_creator.airflow.operators.redshift_sql_operator import RedshiftSQLOperator class RedshiftTransformCreator(OperatorCreator): @@ -22,11 +22,11 @@ def _read_sql(directory, file_path): def _create_operator(self, **kwargs): sql_string = self._read_sql(self._task.pipeline.directory, self._task.sql_file) - redshift_op = PostgresOperator( + redshift_op = RedshiftSQLOperator( dag=self._dag, task_id=self._task.name, sql=sql_string, - postgres_conn_id=self._task.postgres_conn_id, + redshift_conn_id=self._task.postgres_conn_id, params=self._template_parameters, **kwargs, ) diff --git a/dagger/dag_creator/airflow/operator_creators/redshift_unload_creator.py b/dagger/dag_creator/airflow/operator_creators/redshift_unload_creator.py index 7fd74d7..cb7be04 100644 --- a/dagger/dag_creator/airflow/operator_creators/redshift_unload_creator.py +++ b/dagger/dag_creator/airflow/operator_creators/redshift_unload_creator.py @@ -1,7 +1,7 @@ from os.path import join from dagger.dag_creator.airflow.operator_creator import OperatorCreator -from dagger.dag_creator.airflow.operators.postgres_operator import PostgresOperator +from dagger.dag_creator.airflow.operators.redshift_sql_operator import RedshiftSQLOperator REDSHIFT_UNLOAD_CMD = """ unload ('{sql_string}') @@ -58,12 +58,13 @@ def _create_operator(self, **kwargs): unload_cmd = self._get_unload_command(sql_string) - redshift_op = PostgresOperator( + redshift_op = RedshiftSQLOperator( dag=self._dag, task_id=self._task.name, sql=unload_cmd, - postgres_conn_id=self._task.postgres_conn_id, + redshift_conn_id=self._task.postgres_conn_id, params=self._template_parameters, + autocommit=True, **kwargs, ) diff --git a/dagger/dag_creator/airflow/operators/postgres_operator.py b/dagger/dag_creator/airflow/operators/postgres_operator.py index c01b255..ce90250 100644 --- a/dagger/dag_creator/airflow/operators/postgres_operator.py +++ b/dagger/dag_creator/airflow/operators/postgres_operator.py @@ -1,6 +1,6 @@ from typing import Iterable, Mapping, Optional, Union -from airflow.hooks.postgres_hook import PostgresHook +from airflow.providers.postgres.hooks.postgres import PostgresHook from airflow.utils.decorators import apply_defaults from dagger.dag_creator.airflow.operators.dagger_base_operator import DaggerBaseOperator diff --git a/reqs/dev.txt b/reqs/dev.txt index 806d6c5..c52136a 100644 --- a/reqs/dev.txt +++ b/reqs/dev.txt @@ -1,19 +1,18 @@ -apache-airflow[amazon,postgres,s3,statsd]==2.3.4 +pip==24.0 +apache-airflow[amazon,postgres,s3,statsd]==2.9.0 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.9.0/constraints-3.9.txt" black==22.10.0 -boto3==1.26.16 +boto3==1.34.82 bumpversion==0.6.0 -coverage==6.5.0 -elasticsearch==7.17.7 -flake8==5.0.4 -neo4j==5.2.1 -numpydoc==1.5.0 -pip==22.3.1 +coverage==7.4.4 +#elasticsearch==7.17.7 +flake8==7.0.0 +#neo4j==5.19.0 +numpydoc==1.7.0 pre-commit==2.20.0 -sphinx-rtd-theme==1.1.1 -Sphinx==4.3.2 -SQLAlchemy==1.4.44 -tox==3.27.1 -twine==4.0.1 -watchdog==2.1.9 -Werkzeug==2.2.2 -wheel==0.38.4 +sphinx-rtd-theme==2.0.0 +Sphinx==7.2.6 +SQLAlchemy +tox==4.14.2 +twine==5.0.0 +watchdog==4.0.0 +Werkzeug diff --git a/reqs/test.txt b/reqs/test.txt index c568f77..195932d 100644 --- a/reqs/test.txt +++ b/reqs/test.txt @@ -1,3 +1,3 @@ -apache-airflow[amazon,postgres,s3,statsd]==2.3.4 +apache-airflow[amazon,postgres,s3,statsd]==2.9.0 pytest-cov==4.0.0 pytest==7.2.0 diff --git a/setup.py b/setup.py index 3f80fe3..080a5bb 100644 --- a/setup.py +++ b/setup.py @@ -45,8 +45,7 @@ def reqs(*f): classifiers=[ "Development Status :: 2 - Pre-Alpha", "Intended Audience :: Developers", - "Programming Language :: Python :: 3.6", - "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.9", ], description="Config Driven ETL", entry_points={"console_scripts": ["dagger=dagger.main:cli"]}, From 3e62d78068b396779e89b2ddedd32c87eff3cf57 Mon Sep 17 00:00:00 2001 From: David Siklosi Date: Mon, 15 Apr 2024 11:00:22 +0200 Subject: [PATCH 2/4] Making sensor default args more flexible --- dagger/conf.py | 4 +--- dagger/dag_creator/airflow/dag_creator.py | 8 ++------ dagger/dagger_config.yaml | 8 +++++--- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/dagger/conf.py b/dagger/conf.py index 667c207..6b5488f 100644 --- a/dagger/conf.py +++ b/dagger/conf.py @@ -21,9 +21,7 @@ # Airflow parameters airflow_config = config.get('airflow', None) or {} WITH_DATA_NODES = airflow_config.get('with_data_nodes', False) -EXTERNAL_SENSOR_POKE_INTERVAL = airflow_config.get('external_sensor_poke_interval', 600) -EXTERNAL_SENSOR_TIMEOUT = airflow_config.get('external_sensor_timeout', 28800) -EXTERNAL_SENSOR_MODE = airflow_config.get('external_sensor_mode', 'reschedule') +EXTERNAL_SENSOR_DEFAULT_ARGS = airflow_config.get('external_sensor_default_args', {}) IS_DUMMY_OPERATOR_SHORT_CIRCUIT = airflow_config.get('is_dummy_operator_short_circuit', False) # Neo4j parameters diff --git a/dagger/dag_creator/airflow/dag_creator.py b/dagger/dag_creator/airflow/dag_creator.py index 2b208b9..031a3a4 100644 --- a/dagger/dag_creator/airflow/dag_creator.py +++ b/dagger/dag_creator/airflow/dag_creator.py @@ -72,12 +72,7 @@ def _get_external_task_sensor(self, from_task_id: str, to_task_id: str, follow_e to_pipe_id = self._task_graph.get_node(to_task_id).obj.pipeline.name - - extra_args = { - 'mode': conf.EXTERNAL_SENSOR_MODE, - 'poke_interval': conf.EXTERNAL_SENSOR_POKE_INTERVAL, - 'timeout': conf.EXTERNAL_SENSOR_TIMEOUT, - } + extra_args = conf.EXTERNAL_SENSOR_DEFAULT_ARGS.copy() extra_args.update(follow_external_dependency) return ExternalTaskSensor( @@ -141,6 +136,7 @@ def _create_edge_without_data(self, from_task_id: str, to_task_ids: list, node: to_task_ids: The IDs of the tasks to which the edge connects. node: The current node in a task graph. """ + from_pipe = ( self._task_graph.get_node(from_task_id).obj.pipeline_name if from_task_id else None ) diff --git a/dagger/dagger_config.yaml b/dagger/dagger_config.yaml index 9eac6ff..3366828 100644 --- a/dagger/dagger_config.yaml +++ b/dagger/dagger_config.yaml @@ -1,8 +1,10 @@ airflow: + external_sensor_default_args: + poll_interval: 30 + timeout: 28800 + mode: reschedule + deferrable: true with_data_node: false - external_sensor_poke_interval: 600 - external_sensor_timeout: 28800 - external_sensor_mode: reschedule is_dummy_operator_short_circuit: false From 41f15544787651da6b9a2b3e085a766656f93226 Mon Sep 17 00:00:00 2001 From: David Siklosi Date: Mon, 15 Apr 2024 14:15:44 +0200 Subject: [PATCH 3/4] Upgrading python in CI --- .github/workflows/ci-data.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci-data.yml b/.github/workflows/ci-data.yml index 599d325..bef5bed 100644 --- a/.github/workflows/ci-data.yml +++ b/.github/workflows/ci-data.yml @@ -17,10 +17,10 @@ jobs: with: persist-credentials: false - - name: Set up Python 3.7 + - name: Set up Python 3.9 uses: actions/setup-python@v2 with: - python-version: 3.7 + python-version: 3.9 - name: Install dependencies run: | From d8145ab6b0ccda834b58f107ef3fd7ed5a1a83a0 Mon Sep 17 00:00:00 2001 From: David Siklosi Date: Mon, 15 Apr 2024 14:25:25 +0200 Subject: [PATCH 4/4] Adding graphviz dependency to test --- reqs/dev.txt | 4 ++-- reqs/test.txt | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/reqs/dev.txt b/reqs/dev.txt index c52136a..238b2e2 100644 --- a/reqs/dev.txt +++ b/reqs/dev.txt @@ -4,9 +4,9 @@ black==22.10.0 boto3==1.34.82 bumpversion==0.6.0 coverage==7.4.4 -#elasticsearch==7.17.7 +elasticsearch==7.17.7 flake8==7.0.0 -#neo4j==5.19.0 +neo4j==5.19.0 numpydoc==1.7.0 pre-commit==2.20.0 sphinx-rtd-theme==2.0.0 diff --git a/reqs/test.txt b/reqs/test.txt index 195932d..7bdc89f 100644 --- a/reqs/test.txt +++ b/reqs/test.txt @@ -1,3 +1,4 @@ apache-airflow[amazon,postgres,s3,statsd]==2.9.0 pytest-cov==4.0.0 pytest==7.2.0 +graphviz