Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename execution_date to logical_date across codebase #43902

Merged
merged 15 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions airflow/api/client/local_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ def __init__(self, auth=None, session: httpx.Client | None = None):
self._session.auth = auth

def trigger_dag(
self, dag_id, run_id=None, conf=None, execution_date=None, replace_microseconds=True
self, dag_id, run_id=None, conf=None, logical_date=None, replace_microseconds=True
) -> dict | None:
dag_run = trigger_dag.trigger_dag(
dag_id=dag_id,
triggered_by=DagRunTriggeredByType.CLI,
run_id=run_id,
conf=conf,
execution_date=execution_date,
logical_date=logical_date,
replace_microseconds=replace_microseconds,
)
if dag_run:
Expand Down
122 changes: 61 additions & 61 deletions airflow/api/common/mark_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,19 @@ def _create_dagruns(
:param dag: The DAG to create runs for.
:param infos: List of logical dates and data intervals to evaluate.
:param state: The state to set the dag run to
:param run_type: The prefix will be used to construct dag run id: ``{run_id_prefix}__{execution_date}``.
:return: Newly created and existing dag runs for the execution dates supplied.
:param run_type: The prefix will be used to construct dag run id: ``{run_id_prefix}__{logical_date}``.
:return: Newly created and existing dag runs for the logical dates supplied.
"""
# Find out existing DAG runs that we don't need to create.
dag_runs = {
run.logical_date: run
for run in DagRun.find(dag_id=dag.dag_id, execution_date=[info.logical_date for info in infos])
for run in DagRun.find(dag_id=dag.dag_id, logical_date=[info.logical_date for info in infos])
}

for info in infos:
if info.logical_date not in dag_runs:
dag_runs[info.logical_date] = dag.create_dagrun(
execution_date=info.logical_date,
logical_date=info.logical_date,
data_interval=info.data_interval,
start_date=timezone.utcnow(),
external_trigger=False,
Expand All @@ -87,7 +87,7 @@ def set_state(
*,
tasks: Collection[Operator | tuple[Operator, int]],
run_id: str | None = None,
execution_date: datetime | None = None,
logical_date: datetime | None = None,
upstream: bool = False,
downstream: bool = False,
future: bool = False,
Expand All @@ -107,11 +107,11 @@ def set_state(
:param tasks: the iterable of tasks or (task, map_index) tuples from which to work.
``task.dag`` needs to be set
:param run_id: the run_id of the dagrun to start looking from
:param execution_date: the execution date from which to start looking (deprecated)
:param logical_date: the logical date from which to start looking (deprecated)
Lee-W marked this conversation as resolved.
Show resolved Hide resolved
:param upstream: Mark all parents (upstream tasks)
:param downstream: Mark all siblings (downstream tasks) of task_id
:param future: Mark all future tasks on the interval of the dag up until
last execution date.
last logical date.
:param past: Retroactively mark all tasks starting from start_date of the DAG
:param state: State to which the tasks need to be set
:param commit: Commit tasks to be altered to the database
Expand All @@ -121,11 +121,11 @@ def set_state(
if not tasks:
return []

if not exactly_one(execution_date, run_id):
raise ValueError("Exactly one of dag_run_id and execution_date must be set")
if not exactly_one(logical_date, run_id):
raise ValueError("Exactly one of dag_run_id and logical_date must be set")

if execution_date and not timezone.is_localized(execution_date):
raise ValueError(f"Received non-localized date {execution_date}")
if logical_date and not timezone.is_localized(logical_date):
raise ValueError(f"Received non-localized date {logical_date}")

task_dags = {task[0].dag if isinstance(task, tuple) else task.dag for task in tasks}
if len(task_dags) > 1:
Expand All @@ -134,8 +134,8 @@ def set_state(
if dag is None:
raise ValueError("Received tasks with no DAG")

if execution_date:
run_id = dag.get_dagrun(execution_date=execution_date, session=session).run_id
if logical_date:
run_id = dag.get_dagrun(logical_date=logical_date, session=session).run_id
if not run_id:
raise ValueError("Received tasks with no run_id")

Expand Down Expand Up @@ -200,26 +200,26 @@ def find_task_relatives(tasks, downstream, upstream):


@provide_session
def get_execution_dates(
dag: DAG, execution_date: datetime, future: bool, past: bool, *, session: SASession = NEW_SESSION
def get_logical_dates(
dag: DAG, logical_date: datetime, future: bool, past: bool, *, session: SASession = NEW_SESSION
) -> list[datetime]:
"""Return DAG execution dates."""
latest_execution_date = dag.get_latest_execution_date(session=session)
if latest_execution_date is None:
raise ValueError(f"Received non-localized date {execution_date}")
execution_date = timezone.coerce_datetime(execution_date)
"""Return DAG logical dates."""
latest_logical_date = dag.get_latest_logical_date(session=session)
if latest_logical_date is None:
raise ValueError(f"Received non-localized date {logical_date}")
logical_date = timezone.coerce_datetime(logical_date)
# determine date range of dag runs and tasks to consider
end_date = latest_execution_date if future else execution_date
end_date = latest_logical_date if future else logical_date
if dag.start_date:
start_date = dag.start_date
else:
start_date = execution_date
start_date = execution_date if not past else start_date
start_date = logical_date
start_date = logical_date if not past else start_date
if not dag.timetable.can_be_scheduled:
# If the DAG never schedules, need to look at existing DagRun if the user wants future or
# past runs.
dag_runs = dag.get_dagruns_between(start_date=start_date, end_date=end_date)
dates = sorted({d.execution_date for d in dag_runs})
dates = sorted({d.logical_date for d in dag_runs})
elif not dag.timetable.periodic:
dates = [start_date]
else:
Expand All @@ -235,7 +235,7 @@ def get_run_ids(dag: DAG, run_id: str, future: bool, past: bool, session: SASess
last_dagrun = dag.get_last_dagrun(include_externally_triggered=True, session=session)
current_dagrun = dag.get_dagrun(run_id=run_id, session=session)
first_dagrun = session.scalar(
select(DagRun).filter(DagRun.dag_id == dag.dag_id).order_by(DagRun.execution_date.asc()).limit(1)
select(DagRun).filter(DagRun.dag_id == dag.dag_id).order_by(DagRun.logical_date.asc()).limit(1)
)

if last_dagrun is None:
Expand All @@ -255,7 +255,7 @@ def get_run_ids(dag: DAG, run_id: str, future: bool, past: bool, session: SASess
dates = [
info.logical_date for info in dag.iter_dagrun_infos_between(start_date, end_date, align=False)
]
run_ids = [dr.run_id for dr in DagRun.find(dag_id=dag.dag_id, execution_date=dates, session=session)]
run_ids = [dr.run_id for dr in DagRun.find(dag_id=dag.dag_id, logical_date=dates, session=session)]
return run_ids


Expand All @@ -279,37 +279,37 @@ def _set_dag_run_state(dag_id: str, run_id: str, state: DagRunState, session: SA
def set_dag_run_state_to_success(
*,
dag: DAG,
execution_date: datetime | None = None,
logical_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession = NEW_SESSION,
) -> list[TaskInstance]:
"""
Set the dag run's state to success.

Set for a specific execution date and its task instances to success.
Set for a specific logical date and its task instances to success.

:param dag: the DAG of which to alter state
:param execution_date: the execution date from which to start looking(deprecated)
:param logical_date: the logical date from which to start looking(deprecated)
Lee-W marked this conversation as resolved.
Show resolved Hide resolved
:param run_id: the run_id to start looking from
:param commit: commit DAG and tasks to be altered to the database
:param session: database session
:return: If commit is true, list of tasks that have been updated,
otherwise list of tasks that will be updated
:raises: ValueError if dag or execution_date is invalid
:raises: ValueError if dag or logical_date is invalid
"""
if not exactly_one(execution_date, run_id):
if not exactly_one(logical_date, run_id):
return []

if not dag:
return []

if execution_date:
if not timezone.is_localized(execution_date):
raise ValueError(f"Received non-localized date {execution_date}")
dag_run = dag.get_dagrun(execution_date=execution_date)
if logical_date:
if not timezone.is_localized(logical_date):
raise ValueError(f"Received non-localized date {logical_date}")
dag_run = dag.get_dagrun(logical_date=logical_date)
if not dag_run:
raise ValueError(f"DagRun with execution_date: {execution_date} not found")
raise ValueError(f"DagRun with logical_date: {logical_date} not found")
run_id = dag_run.run_id
if not run_id:
raise ValueError(f"Invalid dag_run_id: {run_id}")
Expand All @@ -333,36 +333,36 @@ def set_dag_run_state_to_success(
def set_dag_run_state_to_failed(
*,
dag: DAG,
execution_date: datetime | None = None,
logical_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession = NEW_SESSION,
) -> list[TaskInstance]:
"""
Set the dag run's state to failed.

Set for a specific execution date and its task instances to failed.
Set for a specific logical date and its task instances to failed.

:param dag: the DAG of which to alter state
:param execution_date: the execution date from which to start looking(deprecated)
:param logical_date: the logical date from which to start looking(deprecated)
:param run_id: the DAG run_id to start looking from
:param commit: commit DAG and tasks to be altered to the database
:param session: database session
:return: If commit is true, list of tasks that have been updated,
otherwise list of tasks that will be updated
:raises: AssertionError if dag or execution_date is invalid
:raises: AssertionError if dag or logical_date is invalid
"""
if not exactly_one(execution_date, run_id):
if not exactly_one(logical_date, run_id):
return []
if not dag:
return []

if execution_date:
if not timezone.is_localized(execution_date):
raise ValueError(f"Received non-localized date {execution_date}")
dag_run = dag.get_dagrun(execution_date=execution_date)
if logical_date:
if not timezone.is_localized(logical_date):
raise ValueError(f"Received non-localized date {logical_date}")
dag_run = dag.get_dagrun(logical_date=logical_date)
if not dag_run:
raise ValueError(f"DagRun with execution_date: {execution_date} not found")
raise ValueError(f"DagRun with logical_date: {logical_date} not found")
run_id = dag_run.run_id

if not run_id:
Expand Down Expand Up @@ -429,16 +429,16 @@ def __set_dag_run_state_to_running_or_queued(
*,
new_state: DagRunState,
dag: DAG,
execution_date: datetime | None = None,
logical_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession,
) -> list[TaskInstance]:
"""
Set the dag run for a specific execution date to running.
Set the dag run for a specific logical date to running.

:param dag: the DAG of which to alter state
:param execution_date: the execution date from which to start looking
:param logical_date: the logical date from which to start looking
:param run_id: the id of the DagRun
:param commit: commit DAG and tasks to be altered to the database
:param session: database session
Expand All @@ -447,18 +447,18 @@ def __set_dag_run_state_to_running_or_queued(
"""
res: list[TaskInstance] = []

if not exactly_one(execution_date, run_id):
if not exactly_one(logical_date, run_id):
return res

if not dag:
return res

if execution_date:
if not timezone.is_localized(execution_date):
raise ValueError(f"Received non-localized date {execution_date}")
dag_run = dag.get_dagrun(execution_date=execution_date)
if logical_date:
if not timezone.is_localized(logical_date):
raise ValueError(f"Received non-localized date {logical_date}")
dag_run = dag.get_dagrun(logical_date=logical_date)
if not dag_run:
raise ValueError(f"DagRun with execution_date: {execution_date} not found")
raise ValueError(f"DagRun with logical_date: {logical_date} not found")
run_id = dag_run.run_id
if not run_id:
raise ValueError(f"DagRun with run_id: {run_id} not found")
Expand All @@ -474,20 +474,20 @@ def __set_dag_run_state_to_running_or_queued(
def set_dag_run_state_to_running(
*,
dag: DAG,
execution_date: datetime | None = None,
logical_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession = NEW_SESSION,
) -> list[TaskInstance]:
"""
Set the dag run's state to running.

Set for a specific execution date and its task instances to running.
Set for a specific logical date and its task instances to running.
"""
return __set_dag_run_state_to_running_or_queued(
new_state=DagRunState.RUNNING,
dag=dag,
execution_date=execution_date,
logical_date=logical_date,
run_id=run_id,
commit=commit,
session=session,
Expand All @@ -498,20 +498,20 @@ def set_dag_run_state_to_running(
def set_dag_run_state_to_queued(
*,
dag: DAG,
execution_date: datetime | None = None,
logical_date: datetime | None = None,
run_id: str | None = None,
commit: bool = False,
session: SASession = NEW_SESSION,
) -> list[TaskInstance]:
"""
Set the dag run's state to queued.

Set for a specific execution date and its task instances to queued.
Set for a specific logical date and its task instances to queued.
"""
return __set_dag_run_state_to_running_or_queued(
new_state=DagRunState.QUEUED,
dag=dag,
execution_date=execution_date,
logical_date=logical_date,
run_id=run_id,
commit=commit,
session=session,
Expand Down
Loading