Skip to content

Commit

Permalink
test: make _AsyncJob tests mock at a lower layer (#340)
Browse files Browse the repository at this point in the history
This is intented to make the `_AsyncJob` tests more robust
to changes in retry behavior. It also more explicitly tests
the retry behavior by observing API calls rather than calls
to certain methods.
  • Loading branch information
tswast committed Oct 26, 2020
1 parent fd08255 commit c9823d9
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 34 deletions.
1 change: 0 additions & 1 deletion google/cloud/bigquery/client.py
Expand Up @@ -625,7 +625,6 @@ def create_table(self, table, exists_ok=False, retry=DEFAULT_RETRY, timeout=None
def _call_api(
self, retry, span_name=None, span_attributes=None, job_ref=None, **kwargs
):

call = functools.partial(self._connection.api_request, **kwargs)
if retry:
call = retry(call)
Expand Down
7 changes: 3 additions & 4 deletions google/cloud/bigquery/job.py
Expand Up @@ -529,9 +529,8 @@ def state(self):
Optional[str]:
the state (None until set from the server).
"""
status = self._properties.get("status")
if status is not None:
return status.get("state")
status = self._properties.get("status", {})
return status.get("state")

def _set_properties(self, api_response):
"""Update properties from resource in body of ``api_response``
Expand Down Expand Up @@ -588,7 +587,7 @@ def _check_resource_config(cls, resource):

def to_api_repr(self):
"""Generate a resource for the job."""
raise NotImplementedError("Abstract")
return copy.deepcopy(self._properties)

_build_resource = to_api_repr # backward-compatibility alias

Expand Down
121 changes: 92 additions & 29 deletions tests/unit/test_job.py
Expand Up @@ -20,6 +20,8 @@
import warnings

import freezegun
from google.api_core import exceptions
import google.api_core.retry
import mock
import pytest
import requests
Expand Down Expand Up @@ -70,6 +72,12 @@ def _make_connection(*responses):
return mock_conn


def _make_retriable_exception():
return exceptions.TooManyRequests(
"retriable exception", errors=[{"reason": "rateLimitExceeded"}]
)


def _make_job_resource(
creation_time_ms=1437767599006,
started_time_ms=1437767600007,
Expand All @@ -84,6 +92,7 @@ def _make_job_resource(
user_email="bq-user@example.com",
):
resource = {
"status": {"state": "PENDING"},
"configuration": {job_type: {}},
"statistics": {"creationTime": creation_time_ms, job_type: {}},
"etag": etag,
Expand All @@ -97,9 +106,11 @@ def _make_job_resource(

if started or ended:
resource["statistics"]["startTime"] = started_time_ms
resource["status"]["state"] = "RUNNING"

if ended:
resource["statistics"]["endTime"] = ended_time_ms
resource["status"]["state"] = "DONE"

if job_type == "query":
resource["configuration"]["query"]["destinationTable"] = {
Expand Down Expand Up @@ -555,14 +566,14 @@ def test__check_resource_config_ok(self):
def test__build_resource(self):
client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, client)
with self.assertRaises(NotImplementedError):
job._build_resource()
resource = job._build_resource()
assert resource["jobReference"]["jobId"] == self.JOB_ID

def test_to_api_repr(self):
client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, client)
with self.assertRaises(NotImplementedError):
job.to_api_repr()
resource = job.to_api_repr()
assert resource["jobReference"]["jobId"] == self.JOB_ID

def test__begin_already(self):
job = self._set_properties_job()
Expand Down Expand Up @@ -965,43 +976,95 @@ def test_done_already(self):

self.assertTrue(job.done())

@mock.patch("google.api_core.future.polling.PollingFuture.result")
def test_result_default_wo_state(self, result):
from google.cloud.bigquery.retry import DEFAULT_RETRY

client = _make_client(project=self.PROJECT)
def test_result_default_wo_state(self):
begun_job_resource = _make_job_resource(
job_id=self.JOB_ID, project_id=self.PROJECT, started=True
)
done_job_resource = _make_job_resource(
job_id=self.JOB_ID, project_id=self.PROJECT, started=True, ended=True
)
conn = _make_connection(
_make_retriable_exception(),
begun_job_resource,
_make_retriable_exception(),
done_job_resource,
)
client = _make_client(project=self.PROJECT, connection=conn)
job = self._make_one(self.JOB_ID, client)
begin = job._begin = mock.Mock()

self.assertIs(job.result(), result.return_value)
self.assertIs(job.result(), job)

begin.assert_called_once_with(retry=DEFAULT_RETRY, timeout=None)
result.assert_called_once_with(timeout=None)
begin_call = mock.call(
method="POST",
path=f"/projects/{self.PROJECT}/jobs",
data={"jobReference": {"jobId": self.JOB_ID, "projectId": self.PROJECT}},
timeout=None,
)
reload_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}",
query_params={},
timeout=None,
)
conn.api_request.assert_has_calls(
[begin_call, begin_call, reload_call, reload_call]
)

@mock.patch("google.api_core.future.polling.PollingFuture.result")
def test_result_w_retry_wo_state(self, result):
client = _make_client(project=self.PROJECT)
def test_result_w_retry_wo_state(self):
begun_job_resource = _make_job_resource(
job_id=self.JOB_ID, project_id=self.PROJECT, started=True
)
done_job_resource = _make_job_resource(
job_id=self.JOB_ID, project_id=self.PROJECT, started=True, ended=True
)
conn = _make_connection(
exceptions.NotFound("not normally retriable"),
begun_job_resource,
# The call to done() / reload() does not get the custom retry
# policy passed to it, so we don't throw a non-retriable
# exception here. See:
# https://github.com/googleapis/python-bigquery/issues/24
_make_retriable_exception(),
done_job_resource,
)
client = _make_client(project=self.PROJECT, connection=conn)
job = self._make_one(self.JOB_ID, client)
begin = job._begin = mock.Mock()
retry = mock.Mock()
custom_predicate = mock.Mock()
custom_predicate.return_value = True
custom_retry = google.api_core.retry.Retry(predicate=custom_predicate)

self.assertIs(job.result(retry=retry), result.return_value)
self.assertIs(job.result(retry=custom_retry), job)

begin.assert_called_once_with(retry=retry, timeout=None)
result.assert_called_once_with(timeout=None)
begin_call = mock.call(
method="POST",
path=f"/projects/{self.PROJECT}/jobs",
data={"jobReference": {"jobId": self.JOB_ID, "projectId": self.PROJECT}},
timeout=None,
)
reload_call = mock.call(
method="GET",
path=f"/projects/{self.PROJECT}/jobs/{self.JOB_ID}",
query_params={},
timeout=None,
)
conn.api_request.assert_has_calls(
[begin_call, begin_call, reload_call, reload_call]
)

@mock.patch("google.api_core.future.polling.PollingFuture.result")
def test_result_explicit_w_state(self, result):
client = _make_client(project=self.PROJECT)
def test_result_explicit_w_state(self):
conn = _make_connection()
client = _make_client(project=self.PROJECT, connection=conn)
job = self._make_one(self.JOB_ID, client)
job._properties["status"] = {"state": "DONE"}
begin = job._begin = mock.Mock()
# Use _set_properties() instead of directly modifying _properties so
# that the result state is set properly.
job_resource = job._properties
job_resource["status"] = {"state": "DONE"}
job._set_properties(job_resource)
timeout = 1

self.assertIs(job.result(timeout=timeout), result.return_value)
self.assertIs(job.result(timeout=timeout), job)

begin.assert_not_called()
result.assert_called_once_with(timeout=timeout)
conn.api_request.assert_not_called()

def test_cancelled_wo_error_result(self):
client = _make_client(project=self.PROJECT)
Expand Down

0 comments on commit c9823d9

Please sign in to comment.