Replies: 2 comments
-
I think the only way you can try to achieve it is by introducing a custom cluster policy https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/cluster-policies.html -> generally speaking queue is not overrideable at runtime by the user - because it is part of the dag defintion, but "task_instance_mutation_hook" is executed before the task instance is queued, and then it can - I think - modify the queue based on any of the parameters passed. It has to be done carefully of course, and such dag run will not respect some of the assertions that schedulers calculate (for example scheduler will not schedule for exection tasks if their queue is full). but assuming this is something you want to "override" and you want to rely on the user knowing what they are doing, I think it might be doable. |
Beta Was this translation helpful? Give feedback.
-
@potiuk # $AIRFLOW_HOME/dags/dynamic_pipelines.py
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
default_args = {
'queue': 'queueA',
'retries': 1,
}
with DAG(
dag_id='example_dynamic_dag',
default_args=default_args,
schedule=None,
start_date=datetime(2023, 1, 1),
catchup=False,
params={'queue': ''},
) as dag:
task1 = BashOperator(
task_id='print_date',
bash_command='date',
) I added the following airflow_local_settings.py separately in Data Center A and Data Center B. # $AIRFLOW_HOME/config/airflow_local_settings.py
from airflow.policies import HookImplementations
from airflow.models.taskinstance import TaskInstance
def reroute_tasks(task_instance: TaskInstance):
if task_instance.dag_id == 'example_dynamic_dag':
if task_instance.dag_run.conf['queue'] == 'queueB':
task_instance.queue = 'queueB'
def apply_cluster_policies():
HookImplementations.cluster_policies = {
'task_instance_mutation_hook': [reroute_tasks],
}
apply_cluster_policies() When I execute this command, it will be executed by the worker in Data Center A. airflow dags trigger example_dynamic_dag When I execute another command, it will be executed by the worker in Data Center B. airflow dags trigger example_dynamic_dag --conf '{"queue" : "queueB"}' Is this correct? |
Beta Was this translation helpful? Give feedback.
-
I am using version 2.6.2 of Airflow, with Celery as the executor. I have multiple data centers that are isolated from each other. Each data center plans to deploy an Airflow Celery worker, which listens to different queues. For example, the worker in Data Center A listens to queueA, and the worker in Data Center B listens to queueB. The dags to be executed in each data center are the same. When triggering a dagrun, I want to specify different queues so that this dagrun can be executed on the corresponding worker in the respective data center. Is it possible for me to achieve this? How should I implement it? Or are there other ways to achieve similar functionality?
Beta Was this translation helpful? Give feedback.
All reactions