Skip to content

Commit

Permalink
Use rerun endpoint only when ti.try_number is greater than 1
Browse files Browse the repository at this point in the history
  • Loading branch information
boraberke committed Apr 28, 2024
1 parent a26397e commit 8050381
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 7 deletions.
12 changes: 6 additions & 6 deletions airflow/providers/dbt/cloud/hooks/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,12 +429,12 @@ def trigger_job_run(

if retry_from_failure:
return self.retry_failed_job_run(job_id, account_id)
else:
return self._run_and_get_response(
method="POST",
endpoint=f"{account_id}/jobs/{job_id}/run/",
payload=json.dumps(payload),
)

return self._run_and_get_response(
method="POST",
endpoint=f"{account_id}/jobs/{job_id}/run/",
payload=json.dumps(payload),
)

@fallback_to_default_account
def list_job_runs(
Expand Down
6 changes: 5 additions & 1 deletion airflow/providers/dbt/cloud/operators/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class DbtCloudRunJobOperator(BaseOperator):
request when triggering the job.
:param reuse_existing_run: Flag to determine whether to reuse existing non terminal job run. If set to
true and non terminal job runs found, it use the latest run without triggering a new job run.
:param retry_from_failure: Flag to determine whether to retry the job run from failure. If set to true
and the task retry number is greater than 1, it will retry the job run from the failure point. For
more information on retry logic, see the `dbt Cloud API documentation, reference
https://docs.getdbt.com/dbt-cloud/api-v2#/operations/Retry%20Failed%20Job
:param deferrable: Run operator in the deferrable mode
:return: The ID of the triggered dbt Cloud job run.
"""
Expand Down Expand Up @@ -152,7 +156,7 @@ def execute(self, context: Context):
cause=self.trigger_reason,
steps_override=self.steps_override,
schema_override=self.schema_override,
retry_from_failure=self.retry_from_failure,
retry_from_failure=self.retry_from_failure and context["ti"].try_number > 1,
additional_run_config=self.additional_run_config,
)
self.run_id = trigger_job_response.json()["data"]["id"]
Expand Down
4 changes: 4 additions & 0 deletions docs/apache-airflow-providers-dbt-cloud/operators.rst
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ resource utilization while the job is running.
When ``wait_for_termination`` is False and ``deferrable`` is False, we just submit the job and can only
track the job status with the :class:`~airflow.providers.dbt.cloud.sensors.dbt.DbtCloudJobRunSensor`.

When ``retry_from_failure`` is True and Task Instance ``try_number`` is greater than 1, we retry
the failed run for a job from the point of failure, if the run failed. Otherwise we trigger a new run.
For more information on the retry logic, reference the
`API documentation <https://docs.getdbt.com/dbt-cloud/api-v2#/operations/Retry%20Failed%20Job>`__.

While ``schema_override`` and ``steps_override`` are explicit, optional parameters for the
``DbtCloudRunJobOperator``, custom run configurations can also be passed to the operator using the
Expand Down
53 changes: 53 additions & 0 deletions tests/providers/dbt/cloud/operators/test_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ def test_execute_wait_for_termination(
cause=f"Triggered via Apache Airflow by task {TASK_ID!r} in the {self.dag.dag_id} DAG.",
steps_override=self.config["steps_override"],
schema_override=self.config["schema_override"],
retry_from_failure=False,
additional_run_config=self.config["additional_run_config"],
)

Expand Down Expand Up @@ -299,6 +300,7 @@ def test_execute_no_wait_for_termination(self, mock_run_job, conn_id, account_id
cause=f"Triggered via Apache Airflow by task {TASK_ID!r} in the {self.dag.dag_id} DAG.",
steps_override=self.config["steps_override"],
schema_override=self.config["schema_override"],
retry_from_failure=False,
additional_run_config=self.config["additional_run_config"],
)

Expand Down Expand Up @@ -361,6 +363,56 @@ def test_execute_no_wait_for_termination_and_reuse_existing_run(

mock_get_job_run.assert_not_called()

@patch.object(DbtCloudHook, "trigger_job_run")
@pytest.mark.parametrize(
"try_number, expected_retry_from_failure",
[
(1, False),
(2, True),
(3, True),
],
)
@pytest.mark.parametrize(
"conn_id, account_id",
[(ACCOUNT_ID_CONN, None), (NO_ACCOUNT_ID_CONN, ACCOUNT_ID)],
ids=["default_account", "explicit_account"],
)
def test_execute_retry_from_failure(
self, mock_run_job, try_number, expected_retry_from_failure, conn_id, account_id
):
operator = DbtCloudRunJobOperator(
task_id=TASK_ID,
dbt_cloud_conn_id=conn_id,
account_id=account_id,
trigger_reason=None,
dag=self.dag,
retry_from_failure=True,
**self.config,
)

assert operator.dbt_cloud_conn_id == conn_id
assert operator.job_id == self.config["job_id"]
assert operator.account_id == account_id
assert operator.check_interval == self.config["check_interval"]
assert operator.timeout == self.config["timeout"]
assert operator.retry_from_failure
assert operator.steps_override == self.config["steps_override"]
assert operator.schema_override == self.config["schema_override"]
assert operator.additional_run_config == self.config["additional_run_config"]

self.mock_ti.try_number = try_number
operator.execute(context={"ti": self.mock_ti})

mock_run_job.assert_called_once_with(
account_id=account_id,
job_id=JOB_ID,
cause=f"Triggered via Apache Airflow by task {TASK_ID!r} in the {self.dag.dag_id} DAG.",
steps_override=self.config["steps_override"],
schema_override=self.config["schema_override"],
retry_from_failure=expected_retry_from_failure,
additional_run_config=self.config["additional_run_config"],
)

@patch.object(DbtCloudHook, "trigger_job_run")
@pytest.mark.parametrize(
"conn_id, account_id",
Expand Down Expand Up @@ -393,6 +445,7 @@ def test_custom_trigger_reason(self, mock_run_job, conn_id, account_id):
cause=custom_trigger_reason,
steps_override=self.config["steps_override"],
schema_override=self.config["schema_override"],
retry_from_failure=False,
additional_run_config=self.config["additional_run_config"],
)

Expand Down

0 comments on commit 8050381

Please sign in to comment.