Skip to content

Commit

Permalink
test notes
Browse files Browse the repository at this point in the history
  • Loading branch information
dstandish committed Oct 26, 2024
1 parent db2dd25 commit 34282d0
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions tests/models/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,12 @@ def test_create_backfill_simple(reverse, existing, dag_maker, session):
def test_reprocess_behavior(reprocess_behavior, run_counts, dag_maker, session):
"""
We have two modes whereby when there's an existing run(s) in the range
of the backfill, we clear the dagrun to be rerun.
of the backfill, we will create a new run.
This test might need to be altered if we change the behavior re multiple
runs of same execution date. But for now, it verifies the current behavior.
"""
with dag_maker(schedule="@daily") as dag:
dag_id = "backfill-test-reprocess-behavior"
with dag_maker(schedule="@daily", dag_id=dag_id) as dag:
PythonOperator(task_id="hi", python_callable=print)

for num, (date, state) in enumerate(
Expand Down Expand Up @@ -237,10 +240,15 @@ def test_reprocess_behavior(reprocess_behavior, run_counts, dag_maker, session):
.where(BackfillDagRun.backfill_id == b.id)
.order_by(BackfillDagRun.sort_ordinal)
)
dag_runs = session.scalars(query).all()
assert all(x.run_type == DagRunType.BACKFILL_JOB for x in dag_runs)
assert all(x.triggered_by == DagRunTriggeredByType.BACKFILL for x in dag_runs)
backfill_dates = [str(x.logical_date.date()) for x in dag_runs]
# these are all the dag runs that are part of this backfill
dag_runs_in_b = session.scalars(query).all()
# verify they all have the right run type
assert all(x.run_type == DagRunType.BACKFILL_JOB for x in dag_runs_in_b)
# verify they all have the right triggered by type
assert all(x.triggered_by == DagRunTriggeredByType.BACKFILL for x in dag_runs_in_b)

# verify that we see the dates and counts expected
backfill_dates = [str(x.logical_date.date()) for x in dag_runs_in_b]
assert Counter(backfill_dates) == run_counts

def _get_bdr(date):
Expand All @@ -263,7 +271,8 @@ def _get_bdr(date):
elif reprocess_behavior is ReprocessBehavior.REPROCESS_NONE:
assert actual_reason == BackfillDagRunExceptionReason.ALREADY_EXISTS

assert all(x.state == DagRunState.QUEUED for x in dag_runs)
# all the runs created by the backfill should have state queued
assert all(x.state == DagRunState.QUEUED for x in dag_runs_in_b)


def test_params_stored_correctly(dag_maker, session):
Expand Down

0 comments on commit 34282d0

Please sign in to comment.