diff --git a/tests/models/test_backfill.py b/tests/models/test_backfill.py index f906a305061d1..b370db79d1f78 100644 --- a/tests/models/test_backfill.py +++ b/tests/models/test_backfill.py @@ -185,9 +185,12 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session): def test_reprocess_behavior(reprocess_behavior, run_counts, dag_maker, session): """ We have two modes whereby when there's an existing run(s) in the range - of the backfill, we clear the dagrun to be rerun. + of the backfill, we will create a new run. + This test might need to be altered if we change the behavior re multiple + runs of same execution date. But for now, it verifies the current behavior. """ - with dag_maker(schedule="@daily") as dag: + dag_id = "backfill-test-reprocess-behavior" + with dag_maker(schedule="@daily", dag_id=dag_id) as dag: PythonOperator(task_id="hi", python_callable=print) for num, (date, state) in enumerate( @@ -237,10 +240,15 @@ def test_reprocess_behavior(reprocess_behavior, run_counts, dag_maker, session): .where(BackfillDagRun.backfill_id == b.id) .order_by(BackfillDagRun.sort_ordinal) ) - dag_runs = session.scalars(query).all() - assert all(x.run_type == DagRunType.BACKFILL_JOB for x in dag_runs) - assert all(x.triggered_by == DagRunTriggeredByType.BACKFILL for x in dag_runs) - backfill_dates = [str(x.logical_date.date()) for x in dag_runs] + # these are all the dag runs that are part of this backfill + dag_runs_in_b = session.scalars(query).all() + # verify they all have the right run type + assert all(x.run_type == DagRunType.BACKFILL_JOB for x in dag_runs_in_b) + # verify they all have the right triggered by type + assert all(x.triggered_by == DagRunTriggeredByType.BACKFILL for x in dag_runs_in_b) + + # verify that we see the dates and counts expected + backfill_dates = [str(x.logical_date.date()) for x in dag_runs_in_b] assert Counter(backfill_dates) == run_counts def _get_bdr(date): @@ -263,7 +271,8 @@ def _get_bdr(date): elif reprocess_behavior is ReprocessBehavior.REPROCESS_NONE: assert actual_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS - assert all(x.state == DagRunState.QUEUED for x in dag_runs) + # all the runs created by the backfill should have state queued + assert all(x.state == DagRunState.QUEUED for x in dag_runs_in_b) def test_params_stored_correctly(dag_maker, session):