Skip to content

Commit

Permalink
Merge pull request #33 from chocoapp/feature/DATA-1791_defferable_sen…
Browse files Browse the repository at this point in the history
…sors

Feature/data 1791 defferable sensors
  • Loading branch information
siklosid authored Apr 15, 2024
2 parents 0e9dd3b + d8145ab commit 7b9bd16
Show file tree
Hide file tree
Showing 12 changed files with 40 additions and 45 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci-data.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ jobs:
with:
persist-credentials: false

- name: Set up Python 3.7
- name: Set up Python 3.9
uses: actions/setup-python@v2
with:
python-version: 3.7
python-version: 3.9

- name: Install dependencies
run: |
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ install: clean ## install the package to the active Python's site-packages


install-dev: clean ## install the package to the active Python's site-packages
virtualenv -p python3 venv; \
virtualenv -p python3.9 venv; \
source venv/bin/activate; \
python -m pip install --upgrade pip; \
python setup.py install; \
pip install -e . ; \
pip install -r reqs/dev.txt -r reqs/test.txt
SYSTEM_VERSION_COMPAT=0 CFLAGS='-std=c++20' pip install -r reqs/dev.txt -r reqs/test.txt

install-test: clean ## install the package to the active Python's site-packages
virtualenv -p python3 venv; \
virtualenv -p python3.9 venv; \
source venv/bin/activate; \
python -m pip install --upgrade pip; \
pip install -r reqs/test.txt -r reqs/base.txt
Expand Down
4 changes: 1 addition & 3 deletions dagger/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
# Airflow parameters
airflow_config = config.get('airflow', None) or {}
WITH_DATA_NODES = airflow_config.get('with_data_nodes', False)
EXTERNAL_SENSOR_POKE_INTERVAL = airflow_config.get('external_sensor_poke_interval', 600)
EXTERNAL_SENSOR_TIMEOUT = airflow_config.get('external_sensor_timeout', 28800)
EXTERNAL_SENSOR_MODE = airflow_config.get('external_sensor_mode', 'reschedule')
EXTERNAL_SENSOR_DEFAULT_ARGS = airflow_config.get('external_sensor_default_args', {})
IS_DUMMY_OPERATOR_SHORT_CIRCUIT = airflow_config.get('is_dummy_operator_short_circuit', False)

# Neo4j parameters
Expand Down
8 changes: 2 additions & 6 deletions dagger/dag_creator/airflow/dag_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,7 @@ def _get_external_task_sensor(self, from_task_id: str, to_task_id: str, follow_e

to_pipe_id = self._task_graph.get_node(to_task_id).obj.pipeline.name


extra_args = {
'mode': conf.EXTERNAL_SENSOR_MODE,
'poke_interval': conf.EXTERNAL_SENSOR_POKE_INTERVAL,
'timeout': conf.EXTERNAL_SENSOR_TIMEOUT,
}
extra_args = conf.EXTERNAL_SENSOR_DEFAULT_ARGS.copy()
extra_args.update(follow_external_dependency)

return ExternalTaskSensor(
Expand Down Expand Up @@ -141,6 +136,7 @@ def _create_edge_without_data(self, from_task_id: str, to_task_ids: list, node:
to_task_ids: The IDs of the tasks to which the edge connects.
node: The current node in a task graph.
"""

from_pipe = (
self._task_graph.get_node(from_task_id).obj.pipeline_name if from_task_id else None
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
from typing import Optional

from dagger.dag_creator.airflow.operator_creator import OperatorCreator
from dagger.dag_creator.airflow.operators.redshift_sql_operator import (
RedshiftSQLOperator,
)
from dagger.dag_creator.airflow.operators.redshift_sql_operator import RedshiftSQLOperator



class RedshiftLoadCreator(OperatorCreator):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from os.path import join

from dagger.dag_creator.airflow.operator_creator import OperatorCreator
from dagger.dag_creator.airflow.operators.postgres_operator import PostgresOperator
from dagger.dag_creator.airflow.operators.redshift_sql_operator import RedshiftSQLOperator


class RedshiftTransformCreator(OperatorCreator):
Expand All @@ -22,11 +22,11 @@ def _read_sql(directory, file_path):
def _create_operator(self, **kwargs):
sql_string = self._read_sql(self._task.pipeline.directory, self._task.sql_file)

redshift_op = PostgresOperator(
redshift_op = RedshiftSQLOperator(
dag=self._dag,
task_id=self._task.name,
sql=sql_string,
postgres_conn_id=self._task.postgres_conn_id,
redshift_conn_id=self._task.postgres_conn_id,
params=self._template_parameters,
**kwargs,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from os.path import join

from dagger.dag_creator.airflow.operator_creator import OperatorCreator
from dagger.dag_creator.airflow.operators.postgres_operator import PostgresOperator
from dagger.dag_creator.airflow.operators.redshift_sql_operator import RedshiftSQLOperator

REDSHIFT_UNLOAD_CMD = """
unload ('{sql_string}')
Expand Down Expand Up @@ -58,12 +58,13 @@ def _create_operator(self, **kwargs):

unload_cmd = self._get_unload_command(sql_string)

redshift_op = PostgresOperator(
redshift_op = RedshiftSQLOperator(
dag=self._dag,
task_id=self._task.name,
sql=unload_cmd,
postgres_conn_id=self._task.postgres_conn_id,
redshift_conn_id=self._task.postgres_conn_id,
params=self._template_parameters,
autocommit=True,
**kwargs,
)

Expand Down
2 changes: 1 addition & 1 deletion dagger/dag_creator/airflow/operators/postgres_operator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Iterable, Mapping, Optional, Union

from airflow.hooks.postgres_hook import PostgresHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.decorators import apply_defaults
from dagger.dag_creator.airflow.operators.dagger_base_operator import DaggerBaseOperator

Expand Down
8 changes: 5 additions & 3 deletions dagger/dagger_config.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
airflow:
external_sensor_default_args:
poll_interval: 30
timeout: 28800
mode: reschedule
deferrable: true
with_data_node: false
external_sensor_poke_interval: 600
external_sensor_timeout: 28800
external_sensor_mode: reschedule
is_dummy_operator_short_circuit: false


Expand Down
29 changes: 14 additions & 15 deletions reqs/dev.txt
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
apache-airflow[amazon,postgres,s3,statsd]==2.3.4
pip==24.0
apache-airflow[amazon,postgres,s3,statsd]==2.9.0 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.9.0/constraints-3.9.txt"
black==22.10.0
boto3==1.26.16
boto3==1.34.82
bumpversion==0.6.0
coverage==6.5.0
coverage==7.4.4
elasticsearch==7.17.7
flake8==5.0.4
neo4j==5.2.1
numpydoc==1.5.0
pip==22.3.1
flake8==7.0.0
neo4j==5.19.0
numpydoc==1.7.0
pre-commit==2.20.0
sphinx-rtd-theme==1.1.1
Sphinx==4.3.2
SQLAlchemy==1.4.44
tox==3.27.1
twine==4.0.1
watchdog==2.1.9
Werkzeug==2.2.2
wheel==0.38.4
sphinx-rtd-theme==2.0.0
Sphinx==7.2.6
SQLAlchemy
tox==4.14.2
twine==5.0.0
watchdog==4.0.0
Werkzeug
3 changes: 2 additions & 1 deletion reqs/test.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
apache-airflow[amazon,postgres,s3,statsd]==2.3.4
apache-airflow[amazon,postgres,s3,statsd]==2.9.0
pytest-cov==4.0.0
pytest==7.2.0
graphviz
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ def reqs(*f):
classifiers=[
"Development Status :: 2 - Pre-Alpha",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.9",
],
description="Config Driven ETL",
entry_points={"console_scripts": ["dagger=dagger.main:cli"]},
Expand Down

0 comments on commit 7b9bd16

Please sign in to comment.