Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/data 1791 defferable sensors #33

Merged
merged 4 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ install: clean ## install the package to the active Python's site-packages


install-dev: clean ## install the package to the active Python's site-packages
virtualenv -p python3 venv; \
virtualenv -p python3.9 venv; \
source venv/bin/activate; \
python -m pip install --upgrade pip; \
python setup.py install; \
pip install -e . ; \
pip install -r reqs/dev.txt -r reqs/test.txt
SYSTEM_VERSION_COMPAT=0 CFLAGS='-std=c++20' pip install -r reqs/dev.txt -r reqs/test.txt

install-test: clean ## install the package to the active Python's site-packages
virtualenv -p python3 venv; \
virtualenv -p python3.9 venv; \
source venv/bin/activate; \
python -m pip install --upgrade pip; \
pip install -r reqs/test.txt -r reqs/base.txt
Expand Down
4 changes: 1 addition & 3 deletions dagger/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
# Airflow parameters
airflow_config = config.get('airflow', None) or {}
WITH_DATA_NODES = airflow_config.get('with_data_nodes', False)
EXTERNAL_SENSOR_POKE_INTERVAL = airflow_config.get('external_sensor_poke_interval', 600)
EXTERNAL_SENSOR_TIMEOUT = airflow_config.get('external_sensor_timeout', 28800)
EXTERNAL_SENSOR_MODE = airflow_config.get('external_sensor_mode', 'reschedule')
EXTERNAL_SENSOR_DEFAULT_ARGS = airflow_config.get('external_sensor_default_args', {})
IS_DUMMY_OPERATOR_SHORT_CIRCUIT = airflow_config.get('is_dummy_operator_short_circuit', False)

# Neo4j parameters
Expand Down
8 changes: 2 additions & 6 deletions dagger/dag_creator/airflow/dag_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,7 @@ def _get_external_task_sensor(self, from_task_id: str, to_task_id: str, follow_e

to_pipe_id = self._task_graph.get_node(to_task_id).obj.pipeline.name


extra_args = {
'mode': conf.EXTERNAL_SENSOR_MODE,
'poke_interval': conf.EXTERNAL_SENSOR_POKE_INTERVAL,
'timeout': conf.EXTERNAL_SENSOR_TIMEOUT,
}
extra_args = conf.EXTERNAL_SENSOR_DEFAULT_ARGS.copy()
extra_args.update(follow_external_dependency)

return ExternalTaskSensor(
Expand Down Expand Up @@ -141,6 +136,7 @@ def _create_edge_without_data(self, from_task_id: str, to_task_ids: list, node:
to_task_ids: The IDs of the tasks to which the edge connects.
node: The current node in a task graph.
"""

from_pipe = (
self._task_graph.get_node(from_task_id).obj.pipeline_name if from_task_id else None
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
from typing import Optional

from dagger.dag_creator.airflow.operator_creator import OperatorCreator
from dagger.dag_creator.airflow.operators.redshift_sql_operator import (
RedshiftSQLOperator,
)
from dagger.dag_creator.airflow.operators.redshift_sql_operator import RedshiftSQLOperator



class RedshiftLoadCreator(OperatorCreator):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from os.path import join

from dagger.dag_creator.airflow.operator_creator import OperatorCreator
from dagger.dag_creator.airflow.operators.postgres_operator import PostgresOperator
from dagger.dag_creator.airflow.operators.redshift_sql_operator import RedshiftSQLOperator


class RedshiftTransformCreator(OperatorCreator):
Expand All @@ -22,11 +22,11 @@ def _read_sql(directory, file_path):
def _create_operator(self, **kwargs):
sql_string = self._read_sql(self._task.pipeline.directory, self._task.sql_file)

redshift_op = PostgresOperator(
redshift_op = RedshiftSQLOperator(
dag=self._dag,
task_id=self._task.name,
sql=sql_string,
postgres_conn_id=self._task.postgres_conn_id,
redshift_conn_id=self._task.postgres_conn_id,
params=self._template_parameters,
**kwargs,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from os.path import join

from dagger.dag_creator.airflow.operator_creator import OperatorCreator
from dagger.dag_creator.airflow.operators.postgres_operator import PostgresOperator
from dagger.dag_creator.airflow.operators.redshift_sql_operator import RedshiftSQLOperator

REDSHIFT_UNLOAD_CMD = """
unload ('{sql_string}')
Expand Down Expand Up @@ -58,12 +58,13 @@ def _create_operator(self, **kwargs):

unload_cmd = self._get_unload_command(sql_string)

redshift_op = PostgresOperator(
redshift_op = RedshiftSQLOperator(
dag=self._dag,
task_id=self._task.name,
sql=unload_cmd,
postgres_conn_id=self._task.postgres_conn_id,
redshift_conn_id=self._task.postgres_conn_id,
params=self._template_parameters,
autocommit=True,
**kwargs,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Iterable, Mapping, Optional, Union

from airflow.hooks.postgres_hook import PostgresHook
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.decorators import apply_defaults
from dagger.dag_creator.airflow.operators.dagger_base_operator import DaggerBaseOperator

Expand Down
8 changes: 5 additions & 3 deletions dagger/dagger_config.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
airflow:
external_sensor_default_args:
poll_interval: 30
timeout: 28800
mode: reschedule
deferrable: true
with_data_node: false
external_sensor_poke_interval: 600
external_sensor_timeout: 28800
external_sensor_mode: reschedule
is_dummy_operator_short_circuit: false


Expand Down
31 changes: 15 additions & 16 deletions reqs/dev.txt
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
apache-airflow[amazon,postgres,s3,statsd]==2.3.4
pip==24.0
apache-airflow[amazon,postgres,s3,statsd]==2.9.0 --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.9.0/constraints-3.9.txt"
black==22.10.0
boto3==1.26.16
boto3==1.34.82
bumpversion==0.6.0
coverage==6.5.0
elasticsearch==7.17.7
flake8==5.0.4
neo4j==5.2.1
numpydoc==1.5.0
pip==22.3.1
coverage==7.4.4
#elasticsearch==7.17.7
flake8==7.0.0
#neo4j==5.19.0
numpydoc==1.7.0
pre-commit==2.20.0
sphinx-rtd-theme==1.1.1
Sphinx==4.3.2
SQLAlchemy==1.4.44
tox==3.27.1
twine==4.0.1
watchdog==2.1.9
Werkzeug==2.2.2
wheel==0.38.4
sphinx-rtd-theme==2.0.0
Sphinx==7.2.6
SQLAlchemy
tox==4.14.2
twine==5.0.0
watchdog==4.0.0
Werkzeug
2 changes: 1 addition & 1 deletion reqs/test.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
apache-airflow[amazon,postgres,s3,statsd]==2.3.4
apache-airflow[amazon,postgres,s3,statsd]==2.9.0
pytest-cov==4.0.0
pytest==7.2.0
3 changes: 1 addition & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ def reqs(*f):
classifiers=[
"Development Status :: 2 - Pre-Alpha",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.9",
],
description="Config Driven ETL",
entry_points={"console_scripts": ["dagger=dagger.main:cli"]},
Expand Down
Loading