Skip to content

Commit

Permalink
feat: add 'retry'/'timeout' to 'Query.fetch' and iterator.
Browse files Browse the repository at this point in the history
Toward #3
  • Loading branch information
tseaver committed Aug 11, 2020
1 parent bf3a417 commit ed2061e
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 7 deletions.
47 changes: 45 additions & 2 deletions google/cloud/datastore/query.py
Expand Up @@ -344,6 +344,8 @@ def fetch(
end_cursor=None,
client=None,
eventual=False,
retry=None,
timeout=None,
):
"""Execute the Query; return an iterator for the matching entities.
Expand Down Expand Up @@ -380,6 +382,17 @@ def fetch(
but cannot be used inside a transaction or
will raise ValueError.
:type retry: :class:`google.api_core.retry.Retry`
:param retry:
A retry object used to retry requests. If ``None`` is specified,
requests will be retried using a default configuration.
:type timeout: float
:param timeout:
Time, in seconds, to wait for the request to complete.
Note that if ``retry`` is specified, the timeout applies
to each individual attempt.
:rtype: :class:`Iterator`
:returns: The iterator for the query.
"""
Expand All @@ -394,6 +407,8 @@ def fetch(
start_cursor=start_cursor,
end_cursor=end_cursor,
eventual=eventual,
retry=retry,
timeout=timeout,
)


Expand Down Expand Up @@ -427,6 +442,17 @@ class Iterator(page_iterator.Iterator):
Setting True will use eventual consistency,
but cannot be used inside a transaction or
will raise ValueError.
:type retry: :class:`google.api_core.retry.Retry`
:param retry:
A retry object used to retry requests. If ``None`` is specified,
requests will be retried using a default configuration.
:type timeout: float
:param timeout:
Time, in seconds, to wait for the request to complete.
Note that if ``retry`` is specified, the timeout applies
to each individual attempt.
"""

next_page_token = None
Expand All @@ -440,6 +466,8 @@ def __init__(
start_cursor=None,
end_cursor=None,
eventual=False,
retry=None,
timeout=None,
):
super(Iterator, self).__init__(
client=client,
Expand All @@ -451,6 +479,8 @@ def __init__(
self._offset = offset
self._end_cursor = end_cursor
self._eventual = eventual
self._retry = retry
self._timeout = timeout
# The attributes below will change over the life of the iterator.
self._more_results = True
self._skipped_results = 0
Expand Down Expand Up @@ -536,8 +566,17 @@ def _next_page(self):
partition_id = entity_pb2.PartitionId(
project_id=self._query.project, namespace_id=self._query.namespace
)

kwargs = {}

if self._retry is not None:
kwargs["retry"] = self._retry

if self._timeout is not None:
kwargs["timeout"] = self._timeout

response_pb = self.client._datastore_api.run_query(
self._query.project, partition_id, read_options, query=query_pb
self._query.project, partition_id, read_options, query=query_pb, **kwargs
)

while (
Expand All @@ -551,7 +590,11 @@ def _next_page(self):
query_pb.start_cursor = response_pb.batch.skipped_cursor
query_pb.offset -= response_pb.batch.skipped_results
response_pb = self.client._datastore_api.run_query(
self._query.project, partition_id, read_options, query=query_pb
self._query.project,
partition_id,
read_options,
query=query_pb,
**kwargs
)

entity_pbs = self._process_query_results(response_pb)
Expand Down
46 changes: 41 additions & 5 deletions tests/unit/test_query.py
Expand Up @@ -332,26 +332,37 @@ def test_fetch_defaults_w_client_attr(self):

client = self._make_client()
query = self._make_one(client)

iterator = query.fetch()

self.assertIsInstance(iterator, Iterator)
self.assertIs(iterator._query, query)
self.assertIs(iterator.client, client)
self.assertIsNone(iterator.max_results)
self.assertEqual(iterator._offset, 0)
self.assertIsNone(iterator._retry)
self.assertIsNone(iterator._timeout)

def test_fetch_w_explicit_client(self):
def test_fetch_w_explicit_client_w_retry_w_timeout(self):
from google.cloud.datastore.query import Iterator

client = self._make_client()
other_client = self._make_client()
query = self._make_one(client)
iterator = query.fetch(limit=7, offset=8, client=other_client)
retry = mock.Mock()
timeout = 100000

iterator = query.fetch(
limit=7, offset=8, client=other_client, retry=retry, timeout=timeout
)

self.assertIsInstance(iterator, Iterator)
self.assertIs(iterator._query, query)
self.assertIs(iterator.client, other_client)
self.assertEqual(iterator.max_results, 7)
self.assertEqual(iterator._offset, 8)
self.assertEqual(iterator._retry, retry)
self.assertEqual(iterator._timeout, timeout)


class TestIterator(unittest.TestCase):
Expand All @@ -367,6 +378,7 @@ def _make_one(self, *args, **kw):
def test_constructor_defaults(self):
query = object()
client = object()

iterator = self._make_one(query, client)

self.assertFalse(iterator._started)
Expand All @@ -379,6 +391,8 @@ def test_constructor_defaults(self):
self.assertIsNone(iterator._offset)
self.assertIsNone(iterator._end_cursor)
self.assertTrue(iterator._more_results)
self.assertIsNone(iterator._retry)
self.assertIsNone(iterator._timeout)

def test_constructor_explicit(self):
query = object()
Expand All @@ -387,13 +401,18 @@ def test_constructor_explicit(self):
offset = 9
start_cursor = b"8290\xff"
end_cursor = b"so20rc\ta"
retry = mock.Mock()
timeout = 100000

iterator = self._make_one(
query,
client,
limit=limit,
offset=offset,
start_cursor=start_cursor,
end_cursor=end_cursor,
retry=retry,
timeout=timeout,
)

self.assertFalse(iterator._started)
Expand All @@ -406,6 +425,8 @@ def test_constructor_explicit(self):
self.assertEqual(iterator._offset, offset)
self.assertEqual(iterator._end_cursor, end_cursor)
self.assertTrue(iterator._more_results)
self.assertEqual(iterator._retry, retry)
self.assertEqual(iterator._timeout, timeout)

def test__build_protobuf_empty(self):
from google.cloud.datastore_v1.proto import query_pb2
Expand Down Expand Up @@ -513,7 +534,7 @@ def test__process_query_results_bad_enum(self):
with self.assertRaises(ValueError):
iterator._process_query_results(response_pb)

def _next_page_helper(self, txn_id=None):
def _next_page_helper(self, txn_id=None, retry=None, timeout=None):
from google.api_core import page_iterator
from google.cloud.datastore_v1.proto import datastore_pb2
from google.cloud.datastore_v1.proto import entity_pb2
Expand All @@ -531,9 +552,18 @@ def _next_page_helper(self, txn_id=None):
client = _Client(project, datastore_api=ds_api, transaction=transaction)

query = Query(client)
iterator = self._make_one(query, client)
kwargs = {}

if retry is not None:
kwargs["retry"] = retry

if timeout is not None:
kwargs["timeout"] = timeout

iterator = self._make_one(query, client, **kwargs)

page = iterator._next_page()

self.assertIsInstance(page, page_iterator.Page)
self.assertIs(page._parent, iterator)

Expand All @@ -544,12 +574,18 @@ def _next_page_helper(self, txn_id=None):
read_options = datastore_pb2.ReadOptions(transaction=txn_id)
empty_query = query_pb2.Query()
ds_api.run_query.assert_called_once_with(
project, partition_id, read_options, query=empty_query
project, partition_id, read_options, query=empty_query, **kwargs
)

def test__next_page(self):
self._next_page_helper()

def test__next_page_w_retry(self):
self._next_page_helper(retry=mock.Mock())

def test__next_page_w_timeout(self):
self._next_page_helper(timeout=100000)

def test__next_page_in_transaction(self):
txn_id = b"1xo1md\xe2\x98\x83"
self._next_page_helper(txn_id)
Expand Down

0 comments on commit ed2061e

Please sign in to comment.