Skip to content

Commit

Permalink
feat: add retry/timeout to 'query.Query.stream'
Browse files Browse the repository at this point in the history
Toward #221
  • Loading branch information
tseaver committed Oct 13, 2020
1 parent 8038cce commit 2d413df
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 4 deletions.
18 changes: 17 additions & 1 deletion google/cloud/firestore_v1/query.py
Expand Up @@ -18,6 +18,9 @@
a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be
a more common way to create a query than direct usage of the constructor.
"""

from google.api_core import retry as retries # type: ignore

from google.cloud.firestore_v1.base_query import (
BaseCollectionGroup,
BaseQuery,
Expand Down Expand Up @@ -153,7 +156,7 @@ def get(self, transaction=None) -> list:
return list(result)

def stream(
self, transaction=None
self, transaction=None, retry: retries.Retry = None, timeout: float = None,
) -> Generator[document.DocumentSnapshot, Any, None]:
"""Read the documents in the collection that match this query.
Expand All @@ -176,6 +179,9 @@ def stream(
transaction
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (float): The timeout for this request.
Yields:
:class:`~google.cloud.firestore_v1.document.DocumentSnapshot`:
Expand All @@ -188,13 +194,23 @@ def stream(
)

parent_path, expected_prefix = self._parent._parent_info()

kwargs = {}

if retry is not None:
kwargs["retry"] = retry

if timeout is not None:
kwargs["timeout"] = timeout

response_iterator = self._client._firestore_api.run_query(
request={
"parent": parent_path,
"structured_query": self._to_protobuf(),
"transaction": _helpers.get_transaction_id(transaction),
},
metadata=self._client._rpc_metadata,
**kwargs,
)

for response in response_iterator:
Expand Down
27 changes: 24 additions & 3 deletions tests/unit/v1/test_query.py
Expand Up @@ -105,7 +105,7 @@ def test_get_limit_to_last(self):
# Execute the query and check the response.
query = self._make_one(parent)
query = query.order_by(
u"snooze", direction=firestore.Query.DESCENDING
"snooze", direction=firestore.Query.DESCENDING
).limit_to_last(2)
returned = query.get()

Expand Down Expand Up @@ -134,7 +134,7 @@ def test_get_limit_to_last(self):
metadata=client._rpc_metadata,
)

def test_stream_simple(self):
def _stream_helper(self, retry=None, timeout=None):
# Create a minimal fake GAPIC.
firestore_api = mock.Mock(spec=["run_query"])

Expand All @@ -154,7 +154,17 @@ def test_stream_simple(self):

# Execute the query and check the response.
query = self._make_one(parent)
get_response = query.stream()

kwargs = {}

if retry is not None:
kwargs["retry"] = retry

if timeout is not None:
kwargs["timeout"] = timeout

get_response = query.stream(**kwargs)

self.assertIsInstance(get_response, types.GeneratorType)
returned = list(get_response)
self.assertEqual(len(returned), 1)
Expand All @@ -171,8 +181,19 @@ def test_stream_simple(self):
"transaction": None,
},
metadata=client._rpc_metadata,
**kwargs,
)

def test_stream_simple(self):
self._stream_helper()

def test_stream_w_retry_timeout(self):
from google.api_core.retry import Retry

retry = Retry(predicate=object())
timeout = 123.0
self._stream_helper(retry=retry, timeout=timeout)

def test_stream_with_limit_to_last(self):
# Attach the fake GAPIC to a real client.
client = _make_client()
Expand Down

0 comments on commit 2d413df

Please sign in to comment.