diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 5c1118500..491983f8e 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -19,6 +19,7 @@ import re from google.api_core import exceptions +from google.api_core.future import polling as polling_future import requests from google.cloud.bigquery.dataset import Dataset @@ -42,7 +43,6 @@ from google.cloud.bigquery._tqdm_helpers import wait_for_query from google.cloud.bigquery.job.base import _AsyncJob -from google.cloud.bigquery.job.base import _DONE_STATE from google.cloud.bigquery.job.base import _JobConfig from google.cloud.bigquery.job.base import _JobReference @@ -974,61 +974,6 @@ def estimated_bytes_processed(self): result = int(result) return result - def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True): - """Refresh the job and checks if it is complete. - - Args: - retry (Optional[google.api_core.retry.Retry]): - How to retry the call that retrieves query results. If the job state is - ``DONE``, retrying is aborted early, as the job will not change anymore. - timeout (Optional[float]): - The number of seconds to wait for the underlying HTTP transport - before using ``retry``. - reload (Optional[bool]): - If ``True``, make an API call to refresh the job state of - unfinished jobs before checking. Default ``True``. - - Returns: - bool: ``True`` if the job is complete or if fetching its status resulted in - an error, ``False`` otherwise. - """ - # Do not refresh if the state is already done, as the job will not - # change once complete. - is_done = self.state == _DONE_STATE - if not reload or is_done: - return is_done - - # If an explicit timeout is not given, fall back to the transport timeout - # stored in _blocking_poll() in the process of polling for job completion. - transport_timeout = timeout if timeout is not None else self._transport_timeout - - try: - self._reload_query_results(retry=retry, timeout=transport_timeout) - except exceptions.GoogleAPIError as exc: - # Reloading also updates error details on self, thus no need for an - # explicit self.set_exception() call if reloading succeeds. - try: - self.reload(retry=retry, timeout=transport_timeout) - except exceptions.GoogleAPIError: - # Use the query results reload exception, as it generally contains - # much more useful error information. - self.set_exception(exc) - return True - else: - return self.state == _DONE_STATE - - # Only reload the job once we know the query is complete. - # This will ensure that fields such as the destination table are - # correctly populated. - if self._query_results.complete: - try: - self.reload(retry=retry, timeout=transport_timeout) - except exceptions.GoogleAPIError as exc: - self.set_exception(exc) - return True - - return self.state == _DONE_STATE - def _blocking_poll(self, timeout=None, **kwargs): self._done_timeout = timeout self._transport_timeout = timeout @@ -1130,6 +1075,40 @@ def _reload_query_results(self, retry=DEFAULT_RETRY, timeout=None): timeout=transport_timeout, ) + def _done_or_raise(self, retry=DEFAULT_RETRY, timeout=None): + """Check if the query has finished running and raise if it's not. + + If the query has finished, also reload the job itself. + """ + # If an explicit timeout is not given, fall back to the transport timeout + # stored in _blocking_poll() in the process of polling for job completion. + transport_timeout = timeout if timeout is not None else self._transport_timeout + + try: + self._reload_query_results(retry=retry, timeout=transport_timeout) + except exceptions.GoogleAPIError as exc: + # Reloading also updates error details on self, thus no need for an + # explicit self.set_exception() call if reloading succeeds. + try: + self.reload(retry=retry, timeout=transport_timeout) + except exceptions.GoogleAPIError: + # Use the query results reload exception, as it generally contains + # much more useful error information. + self.set_exception(exc) + finally: + return + + # Only reload the job once we know the query is complete. + # This will ensure that fields such as the destination table are + # correctly populated. + if not self._query_results.complete: + raise polling_future._OperationNotComplete() + else: + try: + self.reload(retry=retry, timeout=transport_timeout) + except exceptions.GoogleAPIError as exc: + self.set_exception(exc) + def result( self, page_size=None, diff --git a/tests/unit/job/test_query.py b/tests/unit/job/test_query.py index 655a121e6..4665933ea 100644 --- a/tests/unit/job/test_query.py +++ b/tests/unit/job/test_query.py @@ -309,16 +309,7 @@ def test_cancelled(self): self.assertTrue(job.cancelled()) - def test_done_job_complete(self): - client = _make_client(project=self.PROJECT) - resource = self._make_resource(ended=True) - job = self._get_target_class().from_api_repr(resource, client) - job._query_results = google.cloud.bigquery.query._QueryResults.from_api_repr( - {"jobComplete": True, "jobReference": resource["jobReference"]} - ) - self.assertTrue(job.done()) - - def test_done_w_timeout(self): + def test__done_or_raise_w_timeout(self): client = _make_client(project=self.PROJECT) resource = self._make_resource(ended=False) job = self._get_target_class().from_api_repr(resource, client) @@ -326,7 +317,7 @@ def test_done_w_timeout(self): with mock.patch.object( client, "_get_query_results" ) as fake_get_results, mock.patch.object(job, "reload") as fake_reload: - job.done(timeout=42) + job._done_or_raise(timeout=42) fake_get_results.assert_called_once() call_args = fake_get_results.call_args @@ -335,7 +326,7 @@ def test_done_w_timeout(self): call_args = fake_reload.call_args self.assertEqual(call_args.kwargs.get("timeout"), 42) - def test_done_w_timeout_and_longer_internal_api_timeout(self): + def test__done_or_raise_w_timeout_and_longer_internal_api_timeout(self): client = _make_client(project=self.PROJECT) resource = self._make_resource(ended=False) job = self._get_target_class().from_api_repr(resource, client) @@ -344,7 +335,7 @@ def test_done_w_timeout_and_longer_internal_api_timeout(self): with mock.patch.object( client, "_get_query_results" ) as fake_get_results, mock.patch.object(job, "reload") as fake_reload: - job.done(timeout=5.5) + job._done_or_raise(timeout=5.5) # The expected timeout used is simply the given timeout, as the latter # is shorter than the job's internal done timeout. @@ -357,7 +348,7 @@ def test_done_w_timeout_and_longer_internal_api_timeout(self): call_args = fake_reload.call_args self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout) - def test_done_w_query_results_error_reload_ok_job_finished(self): + def test__done_or_raise_w_query_results_error_reload_ok(self): client = _make_client(project=self.PROJECT) bad_request_error = exceptions.BadRequest("Error in query") client._get_query_results = mock.Mock(side_effect=bad_request_error) @@ -373,32 +364,11 @@ def fake_reload(self, *args, **kwargs): fake_reload_method = types.MethodType(fake_reload, job) with mock.patch.object(job, "reload", new=fake_reload_method): - is_done = job.done() + job._done_or_raise() - assert is_done assert isinstance(job._exception, exceptions.BadRequest) - def test_done_w_query_results_error_reload_ok_job_still_running(self): - client = _make_client(project=self.PROJECT) - retry_error = exceptions.RetryError("Too many retries", cause=TimeoutError) - client._get_query_results = mock.Mock(side_effect=retry_error) - - resource = self._make_resource(ended=False) - job = self._get_target_class().from_api_repr(resource, client) - job._exception = None - - def fake_reload(self, *args, **kwargs): - self._properties["status"]["state"] = "RUNNING" - - fake_reload_method = types.MethodType(fake_reload, job) - - with mock.patch.object(job, "reload", new=fake_reload_method): - is_done = job.done() - - assert not is_done - assert job._exception is None - - def test_done_w_query_results_error_reload_error(self): + def test__done_or_raise_w_query_results_error_reload_error(self): client = _make_client(project=self.PROJECT) bad_request_error = exceptions.BadRequest("Error in query") client._get_query_results = mock.Mock(side_effect=bad_request_error) @@ -409,12 +379,11 @@ def test_done_w_query_results_error_reload_error(self): job.reload = mock.Mock(side_effect=reload_error) job._exception = None - is_done = job.done() + job._done_or_raise() - assert is_done assert job._exception is bad_request_error - def test_done_w_job_query_results_ok_reload_error(self): + def test__done_or_raise_w_job_query_results_ok_reload_error(self): client = _make_client(project=self.PROJECT) query_results = google.cloud.bigquery.query._QueryResults( properties={ @@ -430,9 +399,8 @@ def test_done_w_job_query_results_ok_reload_error(self): job.reload = mock.Mock(side_effect=retry_error) job._exception = None - is_done = job.done() + job._done_or_raise() - assert is_done assert job._exception is retry_error def test_query_plan(self): @@ -1905,8 +1873,6 @@ def test_reload_w_timeout(self): ) def test_iter(self): - import types - begun_resource = self._make_resource() query_resource = { "jobComplete": True,