Skip to content

Commit

Permalink
feat: retry failed query jobs in result() (#837)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)

Fixes #539  🦕

Previously, we only retried failed API requests. Now, we retry failed jobs (according to the predicate of the `Retry` object passed to `job.result()`).
  • Loading branch information
jimfulton committed Aug 11, 2021
1 parent 93d15e2 commit 519d99c
Show file tree
Hide file tree
Showing 6 changed files with 518 additions and 39 deletions.
110 changes: 81 additions & 29 deletions google/cloud/bigquery/client.py
Expand Up @@ -86,7 +86,7 @@
from google.cloud.bigquery.model import ModelReference
from google.cloud.bigquery.model import _model_arg_to_model_ref
from google.cloud.bigquery.query import _QueryResults
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY
from google.cloud.bigquery.routine import Routine
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import SchemaField
Expand Down Expand Up @@ -3163,6 +3163,7 @@ def query(
project: str = None,
retry: retries.Retry = DEFAULT_RETRY,
timeout: float = None,
job_retry: retries.Retry = DEFAULT_JOB_RETRY,
) -> job.QueryJob:
"""Run a SQL query.
Expand Down Expand Up @@ -3192,30 +3193,59 @@ def query(
Project ID of the project of where to run the job. Defaults
to the client's project.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC.
How to retry the RPC. This only applies to making RPC
calls. It isn't used to retry failed jobs. This has
a reasonable default that should only be overridden
with care.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
job_retry (Optional[google.api_core.retry.Retry]):
How to retry failed jobs. The default retries
rate-limit-exceeded errors. Passing ``None`` disables
job retry.
Not all jobs can be retried. If ``job_id`` is
provided, then the job returned by the query will not
be retryable, and an exception will be raised if a
non-``None`` (and non-default) value for ``job_retry``
is also provided.
Note that errors aren't detected until ``result()`` is
called on the job returned. The ``job_retry``
specified here becomes the default ``job_retry`` for
``result()``, where it can also be specified.
Returns:
google.cloud.bigquery.job.QueryJob: A new query job instance.
Raises:
TypeError:
If ``job_config`` is not an instance of :class:`~google.cloud.bigquery.job.QueryJobConfig`
class.
If ``job_config`` is not an instance of
:class:`~google.cloud.bigquery.job.QueryJobConfig`
class, or if both ``job_id`` and non-``None`` non-default
``job_retry`` are provided.
"""
job_id_given = job_id is not None
job_id = _make_job_id(job_id, job_id_prefix)
if (
job_id_given
and job_retry is not None
and job_retry is not DEFAULT_JOB_RETRY
):
raise TypeError(
"`job_retry` was provided, but the returned job is"
" not retryable, because a custom `job_id` was"
" provided."
)

job_id_save = job_id

if project is None:
project = self.project

if location is None:
location = self.location

job_config = copy.deepcopy(job_config)

if self._default_query_job_config:
if job_config:
_verify_job_config_type(
Expand All @@ -3225,6 +3255,8 @@ def query(
# that is in the default,
# should be filled in with the default
# the incoming therefore has precedence
#
# Note that _fill_from_default doesn't mutate the receiver
job_config = job_config._fill_from_default(
self._default_query_job_config
)
Expand All @@ -3233,34 +3265,54 @@ def query(
self._default_query_job_config,
google.cloud.bigquery.job.QueryJobConfig,
)
job_config = copy.deepcopy(self._default_query_job_config)
job_config = self._default_query_job_config

job_ref = job._JobReference(job_id, project=project, location=location)
query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config)
# Note that we haven't modified the original job_config (or
# _default_query_job_config) up to this point.
job_config_save = job_config

try:
query_job._begin(retry=retry, timeout=timeout)
except core_exceptions.Conflict as create_exc:
# The thought is if someone is providing their own job IDs and they get
# their job ID generation wrong, this could end up returning results for
# the wrong query. We thus only try to recover if job ID was not given.
if job_id_given:
raise create_exc
def do_query():
# Make a copy now, so that original doesn't get changed by the process
# below and to facilitate retry
job_config = copy.deepcopy(job_config_save)

job_id = _make_job_id(job_id_save, job_id_prefix)
job_ref = job._JobReference(job_id, project=project, location=location)
query_job = job.QueryJob(job_ref, query, client=self, job_config=job_config)

try:
query_job = self.get_job(
job_id,
project=project,
location=location,
retry=retry,
timeout=timeout,
)
except core_exceptions.GoogleAPIError: # (includes RetryError)
raise create_exc
query_job._begin(retry=retry, timeout=timeout)
except core_exceptions.Conflict as create_exc:
# The thought is if someone is providing their own job IDs and they get
# their job ID generation wrong, this could end up returning results for
# the wrong query. We thus only try to recover if job ID was not given.
if job_id_given:
raise create_exc

try:
query_job = self.get_job(
job_id,
project=project,
location=location,
retry=retry,
timeout=timeout,
)
except core_exceptions.GoogleAPIError: # (includes RetryError)
raise create_exc
else:
return query_job
else:
return query_job
else:
return query_job

future = do_query()
# The future might be in a failed state now, but if it's
# unrecoverable, we'll find out when we ask for it's result, at which
# point, we may retry.
if not job_id_given:
future._retry_do_query = do_query # in case we have to retry later
future._job_retry = job_retry

return future

def insert_rows(
self,
Expand Down
84 changes: 74 additions & 10 deletions google/cloud/bigquery/job/query.py
Expand Up @@ -36,7 +36,7 @@
from google.cloud.bigquery.query import ScalarQueryParameter
from google.cloud.bigquery.query import StructQueryParameter
from google.cloud.bigquery.query import UDFResource
from google.cloud.bigquery.retry import DEFAULT_RETRY
from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.table import _EmptyRowIterator
from google.cloud.bigquery.table import RangePartitioning
Expand Down Expand Up @@ -1260,6 +1260,7 @@ def result(
retry: "retries.Retry" = DEFAULT_RETRY,
timeout: float = None,
start_index: int = None,
job_retry: "retries.Retry" = DEFAULT_JOB_RETRY,
) -> Union["RowIterator", _EmptyRowIterator]:
"""Start the job and wait for it to complete and get the result.
Expand All @@ -1270,16 +1271,30 @@ def result(
max_results (Optional[int]):
The maximum total number of rows from this request.
retry (Optional[google.api_core.retry.Retry]):
How to retry the call that retrieves rows. If the job state is
``DONE``, retrying is aborted early even if the results are not
available, as this will not change anymore.
How to retry the call that retrieves rows. This only
applies to making RPC calls. It isn't used to retry
failed jobs. This has a reasonable default that
should only be overridden with care. If the job state
is ``DONE``, retrying is aborted early even if the
results are not available, as this will not change
anymore.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
If multiple requests are made under the hood, ``timeout``
applies to each individual request.
start_index (Optional[int]):
The zero-based index of the starting row to read.
job_retry (Optional[google.api_core.retry.Retry]):
How to retry failed jobs. The default retries
rate-limit-exceeded errors. Passing ``None`` disables
job retry.
Not all jobs can be retried. If ``job_id`` was
provided to the query that created this job, then the
job returned by the query will not be retryable, and
an exception will be raised if non-``None``
non-default ``job_retry`` is also provided.
Returns:
google.cloud.bigquery.table.RowIterator:
Expand All @@ -1295,17 +1310,66 @@ def result(
Raises:
google.cloud.exceptions.GoogleAPICallError:
If the job failed.
If the job failed and retries aren't successful.
concurrent.futures.TimeoutError:
If the job did not complete in the given timeout.
TypeError:
If Non-``None`` and non-default ``job_retry`` is
provided and the job is not retryable.
"""
try:
super(QueryJob, self).result(retry=retry, timeout=timeout)
retry_do_query = getattr(self, "_retry_do_query", None)
if retry_do_query is not None:
if job_retry is DEFAULT_JOB_RETRY:
job_retry = self._job_retry
else:
if job_retry is not None and job_retry is not DEFAULT_JOB_RETRY:
raise TypeError(
"`job_retry` was provided, but this job is"
" not retryable, because a custom `job_id` was"
" provided to the query that created this job."
)

first = True

def do_get_result():
nonlocal first

if first:
first = False
else:
# Note that we won't get here if retry_do_query is
# None, because we won't use a retry.

# The orinal job is failed. Create a new one.
job = retry_do_query()

# If it's already failed, we might as well stop:
if job.done() and job.exception() is not None:
raise job.exception()

# Become the new job:
self.__dict__.clear()
self.__dict__.update(job.__dict__)

# This shouldn't be necessary, because once we have a good
# job, it should stay good,and we shouldn't have to retry.
# But let's be paranoid. :)
self._retry_do_query = retry_do_query
self._job_retry = job_retry

super(QueryJob, self).result(retry=retry, timeout=timeout)

# Since the job could already be "done" (e.g. got a finished job
# via client.get_job), the superclass call to done() might not
# set the self._query_results cache.
self._reload_query_results(retry=retry, timeout=timeout)

if retry_do_query is not None and job_retry is not None:
do_get_result = job_retry(do_get_result)

do_get_result()

# Since the job could already be "done" (e.g. got a finished job
# via client.get_job), the superclass call to done() might not
# set the self._query_results cache.
self._reload_query_results(retry=retry, timeout=timeout)
except exceptions.GoogleAPICallError as exc:
exc.message += self._format_for_exception(self.query, self.job_id)
exc.query_job = self
Expand Down
20 changes: 20 additions & 0 deletions google/cloud/bigquery/retry.py
Expand Up @@ -32,6 +32,8 @@
auth_exceptions.TransportError,
)

_DEFAULT_JOB_DEADLINE = 60.0 * 10.0 # seconds


def _should_retry(exc):
"""Predicate for determining when to retry.
Expand All @@ -56,3 +58,21 @@ def _should_retry(exc):
on ``DEFAULT_RETRY``. For example, to change the deadline to 30 seconds,
pass ``retry=bigquery.DEFAULT_RETRY.with_deadline(30)``.
"""

job_retry_reasons = "rateLimitExceeded", "backendError"


def _job_should_retry(exc):
if not hasattr(exc, "errors") or len(exc.errors) == 0:
return False

reason = exc.errors[0]["reason"]
return reason in job_retry_reasons


DEFAULT_JOB_RETRY = retry.Retry(
predicate=_job_should_retry, deadline=_DEFAULT_JOB_DEADLINE
)
"""
The default job retry object.
"""
72 changes: 72 additions & 0 deletions tests/system/test_job_retry.py
@@ -0,0 +1,72 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
import threading
import time

import google.api_core.exceptions
import google.cloud.bigquery
import pytest


def thread(func):
thread = threading.Thread(target=func, daemon=True)
thread.start()
return thread


@pytest.mark.parametrize("job_retry_on_query", [True, False])
def test_query_retry_539(bigquery_client, dataset_id, job_retry_on_query):
"""
Test job_retry
See: https://github.com/googleapis/python-bigquery/issues/539
"""
from google.api_core import exceptions
from google.api_core.retry import if_exception_type, Retry

table_name = f"{dataset_id}.t539"

# Without a custom retry, we fail:
with pytest.raises(google.api_core.exceptions.NotFound):
bigquery_client.query(f"select count(*) from {table_name}").result()

retry_notfound = Retry(predicate=if_exception_type(exceptions.NotFound))

job_retry = dict(job_retry=retry_notfound) if job_retry_on_query else {}
job = bigquery_client.query(f"select count(*) from {table_name}", **job_retry)
job_id = job.job_id

# We can already know that the job failed, but we're not supposed
# to find out until we call result, which is where retry happend
assert job.done()
assert job.exception() is not None

@thread
def create_table():
time.sleep(1) # Give the first retry attempt time to fail.
with contextlib.closing(google.cloud.bigquery.Client()) as client:
client.query(f"create table {table_name} (id int64)").result()

job_retry = {} if job_retry_on_query else dict(job_retry=retry_notfound)
[[count]] = list(job.result(**job_retry))
assert count == 0

# The job was retried, and thus got a new job id
assert job.job_id != job_id

# Make sure we don't leave a thread behind:
create_table.join()
bigquery_client.query(f"drop table {table_name}").result()

0 comments on commit 519d99c

Please sign in to comment.