Skip to content

Commit

Permalink
I added unit tests to the condition to check if it is a scheduled sav…
Browse files Browse the repository at this point in the history
…e or rerun
  • Loading branch information
krzysztof-kubis committed Nov 8, 2024
1 parent 571eaca commit d988fa2
Showing 1 changed file with 78 additions and 0 deletions.
78 changes: 78 additions & 0 deletions providers/tests/dbt/cloud/operators/test_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@
),
}
}
JOB_RUN_ERROR_RESPONSE = {
"data": [
{
"id": RUN_ID,
"href": EXPECTED_JOB_RUN_OP_EXTRA_LINK.format(
account_id=ACCOUNT_ID, project_id=PROJECT_ID, run_id=RUN_ID
),
"status": DbtCloudJobRunStatus.ERROR.value,
}
]
}


def mock_response_json(response: dict):
Expand Down Expand Up @@ -421,6 +432,73 @@ def test_execute_retry_from_failure(self, mock_run_job, conn_id, account_id):
additional_run_config=self.config["additional_run_config"],
)

@patch.object(DbtCloudHook, "_run_and_get_response")
@pytest.mark.parametrize(
"conn_id, account_id",
[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
def test_execute_retry_from_failure_run(self, mock_run_req, conn_id, account_id):
operator = DbtCloudRunJobOperator(
task_id=TASK_ID,
dbt_cloud_conn_id=conn_id,
account_id=account_id,
trigger_reason=None,
dag=self.dag,
retry_from_failure=True,
**self.config,
)
self.mock_context["ti"].try_number = 1

assert operator.dbt_cloud_conn_id == conn_id
assert operator.job_id == self.config["job_id"]
assert operator.account_id == account_id
assert operator.check_interval == self.config["check_interval"]
assert operator.timeout == self.config["timeout"]
assert operator.retry_from_failure
assert operator.steps_override == self.config["steps_override"]
assert operator.schema_override == self.config["schema_override"]
assert operator.additional_run_config == self.config["additional_run_config"]

operator.execute(context=self.mock_context)

mock_run_req.assert_called()

@patch.object(
DbtCloudHook, "_run_and_get_response", return_value=mock_response_json(JOB_RUN_ERROR_RESPONSE)
)
@patch.object(DbtCloudHook, "retry_failed_job_run")
@pytest.mark.parametrize(
"conn_id, account_id",
[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
def test_execute_retry_from_failure_rerun(self, mock_run_req, mock_rerun_req, conn_id, account_id):
operator = DbtCloudRunJobOperator(
task_id=TASK_ID,
dbt_cloud_conn_id=conn_id,
account_id=account_id,
trigger_reason=None,
dag=self.dag,
retry_from_failure=True,
**self.config,
)
self.mock_context["ti"].try_number = 2

assert operator.dbt_cloud_conn_id == conn_id
assert operator.job_id == self.config["job_id"]
assert operator.account_id == account_id
assert operator.check_interval == self.config["check_interval"]
assert operator.timeout == self.config["timeout"]
assert operator.retry_from_failure
assert operator.steps_override == self.config["steps_override"]
assert operator.schema_override == self.config["schema_override"]
assert operator.additional_run_config == self.config["additional_run_config"]

operator.execute(context=self.mock_context)

mock_rerun_req.assert_called_once()

@patch.object(DbtCloudHook, "trigger_job_run")
@pytest.mark.parametrize(
"conn_id, account_id",
Expand Down

0 comments on commit d988fa2

Please sign in to comment.