diff --git a/google/cloud/bigquery/job.py b/google/cloud/bigquery/job.py index 204c5f774..e2e7e839a 100644 --- a/google/cloud/bigquery/job.py +++ b/google/cloud/bigquery/job.py @@ -819,8 +819,9 @@ def result(self, retry=DEFAULT_RETRY, timeout=None): """ if self.state is None: self._begin(retry=retry, timeout=timeout) - # TODO: modify PollingFuture so it can pass a retry argument to done(). - return super(_AsyncJob, self).result(timeout=timeout) + + kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry} + return super(_AsyncJob, self).result(timeout=timeout, **kwargs) def cancelled(self): """Check if the job has been cancelled. @@ -1845,7 +1846,7 @@ def destination(self): """ return TableReference.from_api_repr( _helpers._get_sub_prop( - self._properties, ["configuration", "copy", "destinationTable"], + self._properties, ["configuration", "copy", "destinationTable"] ) ) @@ -2043,10 +2044,7 @@ def __init__(self, job_id, source, destination_uris, client, job_config=None): self._configuration = job_config if source: - source_ref = { - "projectId": source.project, - "datasetId": source.dataset_id, - } + source_ref = {"projectId": source.project, "datasetId": source.dataset_id} if isinstance(source, (Table, TableListItem, TableReference)): source_ref["tableId"] = source.table_id @@ -3138,10 +3136,10 @@ def done(self, retry=DEFAULT_RETRY, timeout=None, reload=True): return self.state == _DONE_STATE - def _blocking_poll(self, timeout=None): + def _blocking_poll(self, timeout=None, **kwargs): self._done_timeout = timeout self._transport_timeout = timeout - super(QueryJob, self)._blocking_poll(timeout=timeout) + super(QueryJob, self)._blocking_poll(timeout=timeout, **kwargs) @staticmethod def _format_for_exception(query, job_id): diff --git a/setup.py b/setup.py index c7410601e..548ceac09 100644 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ # 'Development Status :: 5 - Production/Stable' release_status = "Development Status :: 5 - Production/Stable" dependencies = [ - "google-api-core[grpc] >= 1.22.2, < 2.0.0dev", + "google-api-core[grpc] >= 1.23.0, < 2.0.0dev", "proto-plus >= 1.10.0", "google-cloud-core >= 1.4.1, < 2.0dev", "google-resumable-media >= 0.6.0, < 2.0dev", diff --git a/testing/constraints-3.6.txt b/testing/constraints-3.6.txt index cea0ed84e..91a507a5c 100644 --- a/testing/constraints-3.6.txt +++ b/testing/constraints-3.6.txt @@ -1,4 +1,4 @@ -google-api-core==1.22.2 +google-api-core==1.23.0 google-cloud-bigquery-storage==2.0.0 google-cloud-core==1.4.1 google-resumable-media==0.6.0 diff --git a/tests/unit/test_job.py b/tests/unit/test_job.py index 2d1e8fec8..8590e0576 100644 --- a/tests/unit/test_job.py +++ b/tests/unit/test_job.py @@ -864,7 +864,7 @@ def test_cancel_w_custom_retry(self): job = self._set_properties_job() api_request_patcher = mock.patch.object( - job._client._connection, "api_request", side_effect=[ValueError, response], + job._client._connection, "api_request", side_effect=[ValueError, response] ) retry = DEFAULT_RETRY.with_deadline(1).with_predicate( lambda exc: isinstance(exc, ValueError) @@ -885,7 +885,7 @@ def test_cancel_w_custom_retry(self): [ mock.call(method="POST", path=api_path, query_params={}, timeout=7.5), mock.call( - method="POST", path=api_path, query_params={}, timeout=7.5, + method="POST", path=api_path, query_params={}, timeout=7.5 ), # was retried once ], ) @@ -1034,7 +1034,6 @@ def test_result_w_retry_wo_state(self): custom_predicate = mock.Mock() custom_predicate.return_value = True custom_retry = google.api_core.retry.Retry(predicate=custom_predicate) - self.assertIs(job.result(retry=custom_retry), job) begin_call = mock.call( @@ -2757,7 +2756,7 @@ def test_cancel_w_bound_client(self): final_attributes.assert_called_with({"path": PATH}, client, job) conn.api_request.assert_called_once_with( - method="POST", path=PATH, query_params={}, timeout=None, + method="POST", path=PATH, query_params={}, timeout=None ) self._verifyResourceProperties(job, RESOURCE) @@ -2779,7 +2778,7 @@ def test_cancel_w_alternate_client(self): conn1.api_request.assert_not_called() conn2.api_request.assert_called_once_with( - method="POST", path=PATH, query_params={}, timeout=None, + method="POST", path=PATH, query_params={}, timeout=None ) self._verifyResourceProperties(job, RESOURCE) @@ -3205,7 +3204,7 @@ def test_exists_miss_w_bound_client(self): final_attributes.assert_called_with({"path": PATH}, client, job) conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={"fields": "id"}, timeout=None, + method="GET", path=PATH, query_params={"fields": "id"}, timeout=None ) def test_exists_hit_w_alternate_client(self): @@ -3620,7 +3619,7 @@ def test_exists_miss_w_bound_client(self): final_attributes.assert_called_with({"path": PATH}, client, job) conn.api_request.assert_called_once_with( - method="GET", path=PATH, query_params={"fields": "id"}, timeout=None, + method="GET", path=PATH, query_params={"fields": "id"}, timeout=None ) def test_exists_hit_w_alternate_client(self): @@ -4812,6 +4811,60 @@ def test_result_with_max_results(self): tabledata_list_request[1]["query_params"]["maxResults"], max_results ) + def test_result_w_retry(self): + from google.cloud.bigquery.table import RowIterator + + query_resource = { + "jobComplete": False, + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, + } + query_resource_done = { + "jobComplete": True, + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, + "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, + "totalRows": "2", + } + job_resource = self._make_resource(started=True) + job_resource_done = self._make_resource(started=True, ended=True) + job_resource_done["configuration"]["query"]["destinationTable"] = { + "projectId": "dest-project", + "datasetId": "dest_dataset", + "tableId": "dest_table", + } + + connection = _make_connection( + exceptions.NotFound("not normally retriable"), + query_resource, + exceptions.NotFound("not normally retriable"), + query_resource_done, + exceptions.NotFound("not normally retriable"), + job_resource_done, + ) + client = _make_client(self.PROJECT, connection=connection) + job = self._get_target_class().from_api_repr(job_resource, client) + + custom_predicate = mock.Mock() + custom_predicate.return_value = True + custom_retry = google.api_core.retry.Retry(predicate=custom_predicate) + + self.assertIsInstance(job.result(retry=custom_retry), RowIterator) + query_results_call = mock.call( + method="GET", + path=f"/projects/{self.PROJECT}/queries/{self.JOB_ID}", + query_params={"maxResults": 0}, + timeout=None, + ) + reload_call = mock.call( + method="GET", + path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}", + query_params={}, + timeout=None, + ) + + connection.api_request.assert_has_calls( + [query_results_call, query_results_call, reload_call] + ) + def test_result_w_empty_schema(self): from google.cloud.bigquery.table import _EmptyRowIterator