Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: QueryJob.exception() *returns* the errors, not raises them #467

Merged
merged 8 commits into from Feb 25, 2021
24 changes: 20 additions & 4 deletions google/cloud/bigquery/job/query.py
Expand Up @@ -988,25 +988,41 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):
unfinished jobs before checking. Default ``True``.

Returns:
bool: True if the job is complete, False otherwise.
bool: ``True`` if the job is complete or if fetching its status resulted in
an error, ``False`` otherwise.
"""
# Do not refresh if the state is already done, as the job will not
# change once complete.
is_done = self.state == _DONE_STATE
if not reload or is_done:
return is_done

self._reload_query_results(retry=retry, timeout=timeout)

# If an explicit timeout is not given, fall back to the transport timeout
# stored in _blocking_poll() in the process of polling for job completion.
transport_timeout = timeout if timeout is not None else self._transport_timeout

try:
self._reload_query_results(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError:
# Reloading also updates error details on self, thus no need for an
# explicit self.set_exception() call if reloading succeeds.
try:
self.reload(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
self.set_exception(exc)
plamut marked this conversation as resolved.
Show resolved Hide resolved
return True
else:
return self.state == _DONE_STATE

# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if self._query_results.complete:
self.reload(retry=retry, timeout=transport_timeout)
try:
self.reload(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
self.set_exception(exc)
return True

return self.state == _DONE_STATE

Expand Down
2 changes: 1 addition & 1 deletion tests/unit/job/test_base.py
Expand Up @@ -943,7 +943,7 @@ def test_result_w_retry_wo_state(self):
custom_predicate = mock.Mock()
custom_predicate.return_value = True
custom_retry = google.api_core.retry.Retry(
predicate=custom_predicate, initial=0.001, maximum=0.001, deadline=0.001,
predicate=custom_predicate, initial=0.001, maximum=0.001, deadline=0.1,
)
self.assertIs(job.result(retry=custom_retry), job)

Expand Down
83 changes: 81 additions & 2 deletions tests/unit/job/test_query.py
Expand Up @@ -16,6 +16,7 @@
import copy
import http
import textwrap
import types

import freezegun
from google.api_core import exceptions
Expand Down Expand Up @@ -308,7 +309,7 @@ def test_cancelled(self):

self.assertTrue(job.cancelled())

def test_done(self):
def test_done_job_complete(self):
client = _make_client(project=self.PROJECT)
resource = self._make_resource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)
Expand Down Expand Up @@ -356,6 +357,84 @@ def test_done_w_timeout_and_longer_internal_api_timeout(self):
call_args = fake_reload.call_args
self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout)

def test_done_w_query_results_error_reload_ok_job_finished(self):
client = _make_client(project=self.PROJECT)
bad_request_error = exceptions.BadRequest("Error in query")
client._get_query_results = mock.Mock(side_effect=bad_request_error)

resource = self._make_resource(ended=False)
job = self._get_target_class().from_api_repr(resource, client)
job._exception = None

def fake_reload(self, *args, **kwargs):
self._properties["status"]["state"] = "DONE"
self.set_exception(copy.copy(bad_request_error))

fake_reload_method = types.MethodType(fake_reload, job)

with mock.patch.object(job, "reload", new=fake_reload_method):
is_done = job.done()

assert is_done
assert isinstance(job._exception, exceptions.BadRequest)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use the private property in this and the other tests? any objections to calling job.exception() here and the other tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reasoning was that job.exception() can execute additional logic, and errors in that method would make the tests for done() fail, too, even if there was nothing wrong with the done() method itself.

One could argue that the chosen unit of test is too small, and that the class itself should represent a unit as opposed to its individual methods, but addressing that would require quite some refactoring (we already tinker with internal _properties , for instance).

Here, practicality beats purity IMHO, thus the "cheating" by examining the internal state of the class. Or do you have a strong opinion on this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works for me. I agree that ideally we'd have higher-level tests than this, but makes sense to stay with existing conventions, especially given our 100% coverage requirement.


def test_done_w_query_results_error_reload_ok_job_still_running(self):
client = _make_client(project=self.PROJECT)
retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError)
client._get_query_results = mock.Mock(side_effect=retry_error)

resource = self._make_resource(ended=False)
job = self._get_target_class().from_api_repr(resource, client)
job._exception = None

def fake_reload(self, *args, **kwargs):
self._properties["status"]["state"] = "RUNNING"

fake_reload_method = types.MethodType(fake_reload, job)

with mock.patch.object(job, "reload", new=fake_reload_method):
is_done = job.done()

assert not is_done
assert job._exception is None

def test_done_w_query_results_error_reload_error(self):
client = _make_client(project=self.PROJECT)
bad_request_error = exceptions.BadRequest("Error in query")
client._get_query_results = mock.Mock(side_effect=bad_request_error)

resource = self._make_resource(ended=False)
job = self._get_target_class().from_api_repr(resource, client)
reload_error = exceptions.DataLoss("Oops, sorry!")
job.reload = mock.Mock(side_effect=reload_error)
job._exception = None

is_done = job.done()

assert is_done
assert job._exception is reload_error

def test_done_w_job_query_results_ok_reload_error(self):
client = _make_client(project=self.PROJECT)
query_results = google.cloud.bigquery.query._QueryResults(
properties={
"jobComplete": True,
"jobReference": {"projectId": self.PROJECT, "jobId": "12345"},
}
)
client._get_query_results = mock.Mock(return_value=query_results)

resource = self._make_resource(ended=False)
job = self._get_target_class().from_api_repr(resource, client)
retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError)
job.reload = mock.Mock(side_effect=retry_error)
job._exception = None

is_done = job.done()

assert is_done
assert job._exception is retry_error

def test_query_plan(self):
from google.cloud._helpers import _RFC3339_MICROS
from google.cloud.bigquery.job import QueryPlanEntry
Expand Down Expand Up @@ -973,7 +1052,7 @@ def test_result_w_retry(self):
initial=0.001,
maximum=0.001,
multiplier=1.0,
deadline=0.001,
deadline=0.1,
predicate=custom_predicate,
)

Expand Down