Skip to content

Commit

Permalink
fix: QueryJob.exception() *returns* the errors, not raises them (#467)
Browse files Browse the repository at this point in the history
* fix: QueryJob.exception() should *return* errors

* Reload query job on error, raise any reload errors

* Catch errors on reloading failed query jobs

* Add additional unit test

* Increase retry deadline to mitigate test flakiness

* Store the more informative exception in done()
  • Loading branch information
plamut committed Feb 25, 2021
1 parent 699498c commit d763279
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 7 deletions.
26 changes: 22 additions & 4 deletions google/cloud/bigquery/job/query.py
Expand Up @@ -989,25 +989,43 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):
unfinished jobs before checking. Default ``True``.
Returns:
bool: True if the job is complete, False otherwise.
bool: ``True`` if the job is complete or if fetching its status resulted in
an error, ``False`` otherwise.
"""
# Do not refresh if the state is already done, as the job will not
# change once complete.
is_done = self.state == _DONE_STATE
if not reload or is_done:
return is_done

self._reload_query_results(retry=retry, timeout=timeout)

# If an explicit timeout is not given, fall back to the transport timeout
# stored in _blocking_poll() in the process of polling for job completion.
transport_timeout = timeout if timeout is not None else self._transport_timeout

try:
self._reload_query_results(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
# Reloading also updates error details on self, thus no need for an
# explicit self.set_exception() call if reloading succeeds.
try:
self.reload(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError:
# Use the query results reload exception, as it generally contains
# much more useful error information.
self.set_exception(exc)
return True
else:
return self.state == _DONE_STATE

# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if self._query_results.complete:
self.reload(retry=retry, timeout=transport_timeout)
try:
self.reload(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
self.set_exception(exc)
return True

return self.state == _DONE_STATE

Expand Down
2 changes: 1 addition & 1 deletion tests/unit/job/test_base.py
Expand Up @@ -967,7 +967,7 @@ def test_result_w_retry_wo_state(self):
custom_predicate = mock.Mock()
custom_predicate.return_value = True
custom_retry = google.api_core.retry.Retry(
predicate=custom_predicate, initial=0.001, maximum=0.001, deadline=0.001,
predicate=custom_predicate, initial=0.001, maximum=0.001, deadline=0.1,
)
self.assertIs(job.result(retry=custom_retry), job)

Expand Down
83 changes: 81 additions & 2 deletions tests/unit/job/test_query.py
Expand Up @@ -16,6 +16,7 @@
import copy
import http
import textwrap
import types

import freezegun
from google.api_core import exceptions
Expand Down Expand Up @@ -308,7 +309,7 @@ def test_cancelled(self):

self.assertTrue(job.cancelled())

def test_done(self):
def test_done_job_complete(self):
client = _make_client(project=self.PROJECT)
resource = self._make_resource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)
Expand Down Expand Up @@ -356,6 +357,84 @@ def test_done_w_timeout_and_longer_internal_api_timeout(self):
call_args = fake_reload.call_args
self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout)

def test_done_w_query_results_error_reload_ok_job_finished(self):
client = _make_client(project=self.PROJECT)
bad_request_error = exceptions.BadRequest("Error in query")
client._get_query_results = mock.Mock(side_effect=bad_request_error)

resource = self._make_resource(ended=False)
job = self._get_target_class().from_api_repr(resource, client)
job._exception = None

def fake_reload(self, *args, **kwargs):
self._properties["status"]["state"] = "DONE"
self.set_exception(copy.copy(bad_request_error))

fake_reload_method = types.MethodType(fake_reload, job)

with mock.patch.object(job, "reload", new=fake_reload_method):
is_done = job.done()

assert is_done
assert isinstance(job._exception, exceptions.BadRequest)

def test_done_w_query_results_error_reload_ok_job_still_running(self):
client = _make_client(project=self.PROJECT)
retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError)
client._get_query_results = mock.Mock(side_effect=retry_error)

resource = self._make_resource(ended=False)
job = self._get_target_class().from_api_repr(resource, client)
job._exception = None

def fake_reload(self, *args, **kwargs):
self._properties["status"]["state"] = "RUNNING"

fake_reload_method = types.MethodType(fake_reload, job)

with mock.patch.object(job, "reload", new=fake_reload_method):
is_done = job.done()

assert not is_done
assert job._exception is None

def test_done_w_query_results_error_reload_error(self):
client = _make_client(project=self.PROJECT)
bad_request_error = exceptions.BadRequest("Error in query")
client._get_query_results = mock.Mock(side_effect=bad_request_error)

resource = self._make_resource(ended=False)
job = self._get_target_class().from_api_repr(resource, client)
reload_error = exceptions.DataLoss("Oops, sorry!")
job.reload = mock.Mock(side_effect=reload_error)
job._exception = None

is_done = job.done()

assert is_done
assert job._exception is bad_request_error

def test_done_w_job_query_results_ok_reload_error(self):
client = _make_client(project=self.PROJECT)
query_results = google.cloud.bigquery.query._QueryResults(
properties={
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": "12345"},
}
)
client._get_query_results = mock.Mock(return_value=query_results)

resource = self._make_resource(ended=False)
job = self._get_target_class().from_api_repr(resource, client)
retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError)
job.reload = mock.Mock(side_effect=retry_error)
job._exception = None

is_done = job.done()

assert is_done
assert job._exception is retry_error

def test_query_plan(self):
from google.cloud._helpers import _RFC3339_MICROS
from google.cloud.bigquery.job import QueryPlanEntry
Expand Down Expand Up @@ -973,7 +1052,7 @@ def test_result_w_retry(self):
initial=0.001,
maximum=0.001,
multiplier=1.0,
deadline=0.001,
deadline=0.1,
predicate=custom_predicate,
)

Expand Down

0 comments on commit d763279

Please sign in to comment.