Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: _get_external_task_sensor #21

Merged
merged 4 commits into from
Jul 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions dagger/dag_creator/airflow/dag_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,23 @@ def execution_date_fn(execution_date, **kwargs):

return execution_date_fn

def _get_external_task_sensor_name(self, from_task_id: str) -> str:
def _get_external_task_sensor_name_dict(self, from_task_id: str) -> dict:
from_pipeline_name = self._task_graph.get_node(from_task_id).obj.pipeline_name
from_task_name = self._task_graph.get_node(from_task_id).obj.name
return f"{from_pipeline_name}-{from_task_name}-sensor"
return {
"from_pipeline_name": from_pipeline_name,
"from_task_name": from_task_name,
"external_sensor_name": f"{from_pipeline_name}-{from_task_name}-sensor",
}

def _get_external_task_sensor(self, from_task_id: str, to_task_id: str) -> ExternalTaskSensor:
"""
create an object of external task sensor for a specific from_task_id and to_task_id
"""
external_sensor_name = self._get_external_task_sensor_name(from_task_id)
from_pipeline_name = external_sensor_name.split("-")[0]
from_task_name = external_sensor_name.split("-")[1]
external_task_sensor_name_dict = self._get_external_task_sensor_name_dict(from_task_id)
external_sensor_name = external_task_sensor_name_dict["external_sensor_name"]
from_pipeline_name = external_task_sensor_name_dict["from_pipeline_name"]
from_task_name = external_task_sensor_name_dict["from_task_name"]

from_pipeline_schedule = self._task_graph.get_node(from_task_id).obj.pipeline.schedule
to_pipeline_schedule = self._task_graph.get_node(to_task_id).obj.pipeline.schedule
Expand Down Expand Up @@ -117,9 +122,7 @@ def _create_data_task(self, pipe_id, node):

dataset_id = node.obj.airflow_name
if dataset_id not in self._data_tasks[pipe_id]:
self._data_tasks[pipe_id][
dataset_id
] = self._operator_factory.create_dataset_operator(
self._data_tasks[pipe_id][dataset_id] = self._operator_factory.create_dataset_operator(
re.sub("[^0-9a-zA-Z-_]+", "_", dataset_id), self._dags[pipe_id]
)

Expand All @@ -133,9 +136,7 @@ def _create_edge_without_data(self, from_task_id: str, to_task_ids: list, node:
node: The current node in a task graph.
"""
from_pipe = (
self._task_graph.get_node(from_task_id).obj.pipeline_name
if from_task_id
else None
self._task_graph.get_node(from_task_id).obj.pipeline_name if from_task_id else None
)
for to_task_id in to_task_ids:
edge_properties = self._task_graph.get_edge(node.obj.alias(), to_task_id)
Expand All @@ -146,7 +147,9 @@ def _create_edge_without_data(self, from_task_id: str, to_task_ids: list, node:
from_schedule = self._task_graph.get_node(from_task_id).obj.pipeline.schedule
to_schedule = self._task_graph.get_node(to_task_id).obj.pipeline.schedule
if not from_schedule.startswith("@") and not to_schedule.startswith("@"):
external_task_sensor_name = self._get_external_task_sensor_name(from_task_id)
external_task_sensor_name = self._get_external_task_sensor_name_dict(
from_task_id
)["external_sensor_name"]
if (
external_task_sensor_name
not in self._sensor_dict.get(to_pipe, dict()).keys()
Expand All @@ -167,9 +170,7 @@ def _create_edge_without_data(self, from_task_id: str, to_task_ids: list, node:

def _create_edge_with_data(self, from_task_id, to_task_ids, node):
from_pipe = (
self._task_graph.get_node(from_task_id).obj.pipeline_name
if from_task_id
else None
self._task_graph.get_node(from_task_id).obj.pipeline_name if from_task_id else None
)
data_id = node.obj.airflow_name
if from_pipe:
Expand Down
Loading