From c9823d932205f128b673b05d6086ca783c85c354 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 26 Oct 2020 10:04:34 -0500 Subject: [PATCH] test: make `_AsyncJob` tests mock at a lower layer (#340) This is intented to make the `_AsyncJob` tests more robust to changes in retry behavior. It also more explicitly tests the retry behavior by observing API calls rather than calls to certain methods. --- google/cloud/bigquery/client.py | 1 - google/cloud/bigquery/job.py | 7 +- tests/unit/test_job.py | 121 ++++++++++++++++++++++++-------- 3 files changed, 95 insertions(+), 34 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index e4b5b22ab..57df9455e 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -625,7 +625,6 @@ def create_table(self, table, exists_ok=False, retry=DEFAULT_RETRY, timeout=None def _call_api( self, retry, span_name=None, span_attributes=None, job_ref=None, **kwargs ): - call = functools.partial(self._connection.api_request, **kwargs) if retry: call = retry(call) diff --git a/google/cloud/bigquery/job.py b/google/cloud/bigquery/job.py index 766db1d42..6cb138acf 100644 --- a/google/cloud/bigquery/job.py +++ b/google/cloud/bigquery/job.py @@ -529,9 +529,8 @@ def state(self): Optional[str]: the state (None until set from the server). """ - status = self._properties.get("status") - if status is not None: - return status.get("state") + status = self._properties.get("status", {}) + return status.get("state") def _set_properties(self, api_response): """Update properties from resource in body of ``api_response`` @@ -588,7 +587,7 @@ def _check_resource_config(cls, resource): def to_api_repr(self): """Generate a resource for the job.""" - raise NotImplementedError("Abstract") + return copy.deepcopy(self._properties) _build_resource = to_api_repr # backward-compatibility alias diff --git a/tests/unit/test_job.py b/tests/unit/test_job.py index 75212ae95..f577b08bd 100644 --- a/tests/unit/test_job.py +++ b/tests/unit/test_job.py @@ -20,6 +20,8 @@ import warnings import freezegun +from google.api_core import exceptions +import google.api_core.retry import mock import pytest import requests @@ -70,6 +72,12 @@ def _make_connection(*responses): return mock_conn +def _make_retriable_exception(): + return exceptions.TooManyRequests( + "retriable exception", errors=[{"reason": "rateLimitExceeded"}] + ) + + def _make_job_resource( creation_time_ms=1437767599006, started_time_ms=1437767600007, @@ -84,6 +92,7 @@ def _make_job_resource( user_email="bq-user@example.com", ): resource = { + "status": {"state": "PENDING"}, "configuration": {job_type: {}}, "statistics": {"creationTime": creation_time_ms, job_type: {}}, "etag": etag, @@ -97,9 +106,11 @@ def _make_job_resource( if started or ended: resource["statistics"]["startTime"] = started_time_ms + resource["status"]["state"] = "RUNNING" if ended: resource["statistics"]["endTime"] = ended_time_ms + resource["status"]["state"] = "DONE" if job_type == "query": resource["configuration"]["query"]["destinationTable"] = { @@ -555,14 +566,14 @@ def test__check_resource_config_ok(self): def test__build_resource(self): client = _make_client(project=self.PROJECT) job = self._make_one(self.JOB_ID, client) - with self.assertRaises(NotImplementedError): - job._build_resource() + resource = job._build_resource() + assert resource["jobReference"]["jobId"] == self.JOB_ID def test_to_api_repr(self): client = _make_client(project=self.PROJECT) job = self._make_one(self.JOB_ID, client) - with self.assertRaises(NotImplementedError): - job.to_api_repr() + resource = job.to_api_repr() + assert resource["jobReference"]["jobId"] == self.JOB_ID def test__begin_already(self): job = self._set_properties_job() @@ -965,43 +976,95 @@ def test_done_already(self): self.assertTrue(job.done()) - @mock.patch("google.api_core.future.polling.PollingFuture.result") - def test_result_default_wo_state(self, result): - from google.cloud.bigquery.retry import DEFAULT_RETRY - - client = _make_client(project=self.PROJECT) + def test_result_default_wo_state(self): + begun_job_resource = _make_job_resource( + job_id=self.JOB_ID, project_id=self.PROJECT, started=True + ) + done_job_resource = _make_job_resource( + job_id=self.JOB_ID, project_id=self.PROJECT, started=True, ended=True + ) + conn = _make_connection( + _make_retriable_exception(), + begun_job_resource, + _make_retriable_exception(), + done_job_resource, + ) + client = _make_client(project=self.PROJECT, connection=conn) job = self._make_one(self.JOB_ID, client) - begin = job._begin = mock.Mock() - self.assertIs(job.result(), result.return_value) + self.assertIs(job.result(), job) - begin.assert_called_once_with(retry=DEFAULT_RETRY, timeout=None) - result.assert_called_once_with(timeout=None) + begin_call = mock.call( + method="POST", + path=f"/projects/{self.PROJECT}/jobs", + data={"jobReference": {"jobId": self.JOB_ID, "projectId": self.PROJECT}}, + timeout=None, + ) + reload_call = mock.call( + method="GET", + path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}", + query_params={}, + timeout=None, + ) + conn.api_request.assert_has_calls( + [begin_call, begin_call, reload_call, reload_call] + ) - @mock.patch("google.api_core.future.polling.PollingFuture.result") - def test_result_w_retry_wo_state(self, result): - client = _make_client(project=self.PROJECT) + def test_result_w_retry_wo_state(self): + begun_job_resource = _make_job_resource( + job_id=self.JOB_ID, project_id=self.PROJECT, started=True + ) + done_job_resource = _make_job_resource( + job_id=self.JOB_ID, project_id=self.PROJECT, started=True, ended=True + ) + conn = _make_connection( + exceptions.NotFound("not normally retriable"), + begun_job_resource, + # The call to done() / reload() does not get the custom retry + # policy passed to it, so we don't throw a non-retriable + # exception here. See: + # https://github.com/googleapis/python-bigquery/issues/24 + _make_retriable_exception(), + done_job_resource, + ) + client = _make_client(project=self.PROJECT, connection=conn) job = self._make_one(self.JOB_ID, client) - begin = job._begin = mock.Mock() - retry = mock.Mock() + custom_predicate = mock.Mock() + custom_predicate.return_value = True + custom_retry = google.api_core.retry.Retry(predicate=custom_predicate) - self.assertIs(job.result(retry=retry), result.return_value) + self.assertIs(job.result(retry=custom_retry), job) - begin.assert_called_once_with(retry=retry, timeout=None) - result.assert_called_once_with(timeout=None) + begin_call = mock.call( + method="POST", + path=f"/projects/{self.PROJECT}/jobs", + data={"jobReference": {"jobId": self.JOB_ID, "projectId": self.PROJECT}}, + timeout=None, + ) + reload_call = mock.call( + method="GET", + path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}", + query_params={}, + timeout=None, + ) + conn.api_request.assert_has_calls( + [begin_call, begin_call, reload_call, reload_call] + ) - @mock.patch("google.api_core.future.polling.PollingFuture.result") - def test_result_explicit_w_state(self, result): - client = _make_client(project=self.PROJECT) + def test_result_explicit_w_state(self): + conn = _make_connection() + client = _make_client(project=self.PROJECT, connection=conn) job = self._make_one(self.JOB_ID, client) - job._properties["status"] = {"state": "DONE"} - begin = job._begin = mock.Mock() + # Use _set_properties() instead of directly modifying _properties so + # that the result state is set properly. + job_resource = job._properties + job_resource["status"] = {"state": "DONE"} + job._set_properties(job_resource) timeout = 1 - self.assertIs(job.result(timeout=timeout), result.return_value) + self.assertIs(job.result(timeout=timeout), job) - begin.assert_not_called() - result.assert_called_once_with(timeout=timeout) + conn.api_request.assert_not_called() def test_cancelled_wo_error_result(self): client = _make_client(project=self.PROJECT)