Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make QueryJob.done() method more performant #544

Merged
merged 1 commit into from Mar 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
91 changes: 35 additions & 56 deletions google/cloud/bigquery/job/query.py
Expand Up @@ -19,6 +19,7 @@
import re

from google.api_core import exceptions
from google.api_core.future import polling as polling_future
import requests

from google.cloud.bigquery.dataset import Dataset
Expand All @@ -42,7 +43,6 @@
from google.cloud.bigquery._tqdm_helpers import wait_for_query

from google.cloud.bigquery.job.base import _AsyncJob
from google.cloud.bigquery.job.base import _DONE_STATE
from google.cloud.bigquery.job.base import _JobConfig
from google.cloud.bigquery.job.base import _JobReference

Expand Down Expand Up @@ -974,61 +974,6 @@ def estimated_bytes_processed(self):
result = int(result)
return result

def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True):
"""Refresh the job and checks if it is complete.

Args:
retry (Optional[google.api_core.retry.Retry]):
How to retry the call that retrieves query results. If the job state is
``DONE``, retrying is aborted early, as the job will not change anymore.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
reload (Optional[bool]):
If ``True``, make an API call to refresh the job state of
unfinished jobs before checking. Default ``True``.

Returns:
bool: ``True`` if the job is complete or if fetching its status resulted in
an error, ``False`` otherwise.
"""
# Do not refresh if the state is already done, as the job will not
# change once complete.
is_done = self.state == _DONE_STATE
if not reload or is_done:
return is_done

# If an explicit timeout is not given, fall back to the transport timeout
# stored in _blocking_poll() in the process of polling for job completion.
transport_timeout = timeout if timeout is not None else self._transport_timeout

try:
self._reload_query_results(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
# Reloading also updates error details on self, thus no need for an
# explicit self.set_exception() call if reloading succeeds.
try:
self.reload(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError:
# Use the query results reload exception, as it generally contains
# much more useful error information.
self.set_exception(exc)
return True
else:
return self.state == _DONE_STATE

# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if self._query_results.complete:
try:
self.reload(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
self.set_exception(exc)
return True

return self.state == _DONE_STATE

def _blocking_poll(self, timeout=None, **kwargs):
self._done_timeout = timeout
self._transport_timeout = timeout
Expand Down Expand Up @@ -1130,6 +1075,40 @@ def _reload_query_results(self, retry=DEFAULT_RETRY, timeout=None):
timeout=transport_timeout,
)

def _done_or_raise(self, retry=DEFAULT_RETRY, timeout=None):
"""Check if the query has finished running and raise if it's not.

If the query has finished, also reload the job itself.
"""
# If an explicit timeout is not given, fall back to the transport timeout
# stored in _blocking_poll() in the process of polling for job completion.
transport_timeout = timeout if timeout is not None else self._transport_timeout

try:
self._reload_query_results(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
# Reloading also updates error details on self, thus no need for an
# explicit self.set_exception() call if reloading succeeds.
try:
self.reload(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError:
# Use the query results reload exception, as it generally contains
# much more useful error information.
self.set_exception(exc)
finally:
return

# Only reload the job once we know the query is complete.
# This will ensure that fields such as the destination table are
# correctly populated.
if not self._query_results.complete:
raise polling_future._OperationNotComplete()
else:
try:
self.reload(retry=retry, timeout=transport_timeout)
except exceptions.GoogleAPIError as exc:
self.set_exception(exc)

def result(
self,
page_size=None,
Expand Down
54 changes: 10 additions & 44 deletions tests/unit/job/test_query.py
Expand Up @@ -309,24 +309,15 @@ def test_cancelled(self):

self.assertTrue(job.cancelled())

def test_done_job_complete(self):
client = _make_client(project=self.PROJECT)
resource = self._make_resource(ended=True)
job = self._get_target_class().from_api_repr(resource, client)
job._query_results = google.cloud.bigquery.query._QueryResults.from_api_repr(
{"jobComplete": True, "jobReference": resource["jobReference"]}
)
self.assertTrue(job.done())

def test_done_w_timeout(self):
def test__done_or_raise_w_timeout(self):
client = _make_client(project=self.PROJECT)
resource = self._make_resource(ended=False)
job = self._get_target_class().from_api_repr(resource, client)

with mock.patch.object(
client, "_get_query_results"
) as fake_get_results, mock.patch.object(job, "reload") as fake_reload:
job.done(timeout=42)
job._done_or_raise(timeout=42)

fake_get_results.assert_called_once()
call_args = fake_get_results.call_args
Expand All @@ -335,7 +326,7 @@ def test_done_w_timeout(self):
call_args = fake_reload.call_args
self.assertEqual(call_args.kwargs.get("timeout"), 42)

def test_done_w_timeout_and_longer_internal_api_timeout(self):
def test__done_or_raise_w_timeout_and_longer_internal_api_timeout(self):
client = _make_client(project=self.PROJECT)
resource = self._make_resource(ended=False)
job = self._get_target_class().from_api_repr(resource, client)
Expand All @@ -344,7 +335,7 @@ def test_done_w_timeout_and_longer_internal_api_timeout(self):
with mock.patch.object(
client, "_get_query_results"
) as fake_get_results, mock.patch.object(job, "reload") as fake_reload:
job.done(timeout=5.5)
job._done_or_raise(timeout=5.5)

# The expected timeout used is simply the given timeout, as the latter
# is shorter than the job's internal done timeout.
Expand All @@ -357,7 +348,7 @@ def test_done_w_timeout_and_longer_internal_api_timeout(self):
call_args = fake_reload.call_args
self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout)

def test_done_w_query_results_error_reload_ok_job_finished(self):
def test__done_or_raise_w_query_results_error_reload_ok(self):
client = _make_client(project=self.PROJECT)
bad_request_error = exceptions.BadRequest("Error in query")
client._get_query_results = mock.Mock(side_effect=bad_request_error)
Expand All @@ -373,32 +364,11 @@ def fake_reload(self, *args, **kwargs):
fake_reload_method = types.MethodType(fake_reload, job)

with mock.patch.object(job, "reload", new=fake_reload_method):
is_done = job.done()
job._done_or_raise()

assert is_done
assert isinstance(job._exception, exceptions.BadRequest)

def test_done_w_query_results_error_reload_ok_job_still_running(self):
client = _make_client(project=self.PROJECT)
retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError)
client._get_query_results = mock.Mock(side_effect=retry_error)

resource = self._make_resource(ended=False)
job = self._get_target_class().from_api_repr(resource, client)
job._exception = None

def fake_reload(self, *args, **kwargs):
self._properties["status"]["state"] = "RUNNING"

fake_reload_method = types.MethodType(fake_reload, job)

with mock.patch.object(job, "reload", new=fake_reload_method):
is_done = job.done()

assert not is_done
assert job._exception is None

def test_done_w_query_results_error_reload_error(self):
def test__done_or_raise_w_query_results_error_reload_error(self):
client = _make_client(project=self.PROJECT)
bad_request_error = exceptions.BadRequest("Error in query")
client._get_query_results = mock.Mock(side_effect=bad_request_error)
Expand All @@ -409,12 +379,11 @@ def test_done_w_query_results_error_reload_error(self):
job.reload = mock.Mock(side_effect=reload_error)
job._exception = None

is_done = job.done()
job._done_or_raise()

assert is_done
assert job._exception is bad_request_error

def test_done_w_job_query_results_ok_reload_error(self):
def test__done_or_raise_w_job_query_results_ok_reload_error(self):
client = _make_client(project=self.PROJECT)
query_results = google.cloud.bigquery.query._QueryResults(
properties={
Expand All @@ -430,9 +399,8 @@ def test_done_w_job_query_results_ok_reload_error(self):
job.reload = mock.Mock(side_effect=retry_error)
job._exception = None

is_done = job.done()
job._done_or_raise()

assert is_done
assert job._exception is retry_error

def test_query_plan(self):
Expand Down Expand Up @@ -1905,8 +1873,6 @@ def test_reload_w_timeout(self):
)

def test_iter(self):
import types

begun_resource = self._make_resource()
query_resource = {
"jobComplete": True,
Expand Down