Skip to content

Commit

Permalink
Added condition to check if it is a scheduled save or rerun (#43453)
Browse files Browse the repository at this point in the history
* Aadded condition to check if it is a scheduled save or rerun

* Fix key name in context of task

* I added unit tests to the condition to check if it is a scheduled save or rerun

---------

Co-authored-by: RBAMOUSER\kubisk1 <[email protected]>
Co-authored-by: krzysztof-kubis <[email protected]>
Co-authored-by: krzysztof-kubis <[email protected]/>
  • Loading branch information
4 people authored Nov 9, 2024
1 parent 2a9bded commit 340a70b
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 1 deletion.
4 changes: 3 additions & 1 deletion providers/src/airflow/providers/dbt/cloud/operators/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,14 +149,16 @@ def execute(self, context: Context):
self.run_id = non_terminal_runs[0]["id"]
job_run_url = non_terminal_runs[0]["href"]

is_retry = context["ti"].try_number != 1

if not self.reuse_existing_run or not non_terminal_runs:
trigger_job_response = self.hook.trigger_job_run(
account_id=self.account_id,
job_id=self.job_id,
cause=self.trigger_reason,
steps_override=self.steps_override,
schema_override=self.schema_override,
retry_from_failure=self.retry_from_failure,
retry_from_failure=is_retry and self.retry_from_failure,
additional_run_config=self.additional_run_config,
)
self.run_id = trigger_job_response.json()["data"]["id"]
Expand Down
78 changes: 78 additions & 0 deletions providers/tests/dbt/cloud/operators/test_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@
),
}
}
JOB_RUN_ERROR_RESPONSE = {
"data": [
{
"id": RUN_ID,
"href": EXPECTED_JOB_RUN_OP_EXTRA_LINK.format(
account_id=ACCOUNT_ID, project_id=PROJECT_ID, run_id=RUN_ID
),
"status": DbtCloudJobRunStatus.ERROR.value,
}
]
}


def mock_response_json(response: dict):
Expand Down Expand Up @@ -421,6 +432,73 @@ def test_execute_retry_from_failure(self, mock_run_job, conn_id, account_id):
additional_run_config=self.config["additional_run_config"],
)

@patch.object(DbtCloudHook, "_run_and_get_response")
@pytest.mark.parametrize(
"conn_id, account_id",
[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
def test_execute_retry_from_failure_run(self, mock_run_req, conn_id, account_id):
operator = DbtCloudRunJobOperator(
task_id=TASK_ID,
dbt_cloud_conn_id=conn_id,
account_id=account_id,
trigger_reason=None,
dag=self.dag,
retry_from_failure=True,
**self.config,
)
self.mock_context["ti"].try_number = 1

assert operator.dbt_cloud_conn_id == conn_id
assert operator.job_id == self.config["job_id"]
assert operator.account_id == account_id
assert operator.check_interval == self.config["check_interval"]
assert operator.timeout == self.config["timeout"]
assert operator.retry_from_failure
assert operator.steps_override == self.config["steps_override"]
assert operator.schema_override == self.config["schema_override"]
assert operator.additional_run_config == self.config["additional_run_config"]

operator.execute(context=self.mock_context)

mock_run_req.assert_called()

@patch.object(
DbtCloudHook, "_run_and_get_response", return_value=mock_response_json(JOB_RUN_ERROR_RESPONSE)
)
@patch.object(DbtCloudHook, "retry_failed_job_run")
@pytest.mark.parametrize(
"conn_id, account_id",
[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
def test_execute_retry_from_failure_rerun(self, mock_run_req, mock_rerun_req, conn_id, account_id):
operator = DbtCloudRunJobOperator(
task_id=TASK_ID,
dbt_cloud_conn_id=conn_id,
account_id=account_id,
trigger_reason=None,
dag=self.dag,
retry_from_failure=True,
**self.config,
)
self.mock_context["ti"].try_number = 2

assert operator.dbt_cloud_conn_id == conn_id
assert operator.job_id == self.config["job_id"]
assert operator.account_id == account_id
assert operator.check_interval == self.config["check_interval"]
assert operator.timeout == self.config["timeout"]
assert operator.retry_from_failure
assert operator.steps_override == self.config["steps_override"]
assert operator.schema_override == self.config["schema_override"]
assert operator.additional_run_config == self.config["additional_run_config"]

operator.execute(context=self.mock_context)

mock_rerun_req.assert_called_once()

@patch.object(DbtCloudHook, "trigger_job_run")
@pytest.mark.parametrize(
"conn_id, account_id",
Expand Down

0 comments on commit 340a70b

Please sign in to comment.