Skip to content

Commit

Permalink
feat: add retry/timeout to 'async_collection.AsyncCollectionReference'
Browse files Browse the repository at this point in the history
Methods affected:

- 'add'
- 'get'
- 'list_documents'
- 'stream'

Towards #221
  • Loading branch information
tseaver committed Oct 14, 2020
1 parent ec8002c commit c81cd8c
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 53 deletions.
69 changes: 45 additions & 24 deletions google/cloud/firestore_v1/async_collection.py
Expand Up @@ -13,9 +13,11 @@
# limitations under the License.

"""Classes for representing collections for the Google Cloud Firestore API."""

from google.api_core import retry as retries # type: ignore

from google.cloud.firestore_v1.base_collection import (
BaseCollectionReference,
_auto_id,
_item_to_document_ref,
)
from google.cloud.firestore_v1 import (
Expand Down Expand Up @@ -70,7 +72,11 @@ def _query(self) -> async_query.AsyncQuery:
return async_query.AsyncQuery(self)

async def add(
self, document_data: dict, document_id: str = None
self,
document_data: dict,
document_id: str = None,
retry: retries.Retry = None,
timeout: float = None,
) -> Tuple[Any, Any]:
"""Create a document in the Firestore database with the provided data.
Expand All @@ -82,6 +88,9 @@ async def add(
automatically assigned by the server (the assigned ID will be
a random 20 character string composed of digits,
uppercase and lowercase letters).
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (float): The timeout for this request.
Returns:
Tuple[:class:`google.protobuf.timestamp_pb2.Timestamp`, \
Expand All @@ -95,44 +104,45 @@ async def add(
~google.cloud.exceptions.Conflict: If ``document_id`` is provided
and the document already exists.
"""
if document_id is None:
document_id = _auto_id()

document_ref = self.document(document_id)
write_result = await document_ref.create(document_data)
document_ref, kwargs = self._prep_add(
document_data, document_id, retry, timeout,
)
write_result = await document_ref.create(document_data, **kwargs)
return write_result.update_time, document_ref

async def list_documents(
self, page_size: int = None
self, page_size: int = None, retry: retries.Retry = None, timeout: float = None,
) -> AsyncGenerator[DocumentReference, None]:
"""List all subdocuments of the current collection.
Args:
page_size (Optional[int]]): The maximum number of documents
in each page of results from this request. Non-positive values
are ignored. Defaults to a sensible value set by the API.
in each page of results from this request. Non-positive values
are ignored. Defaults to a sensible value set by the API.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (float): The timeout for this request.
Returns:
Sequence[:class:`~google.cloud.firestore_v1.collection.DocumentReference`]:
iterator of subdocuments of the current collection. If the
collection does not exist at the time of `snapshot`, the
iterator will be empty
"""
parent, _ = self._parent_info()
request, kwargs = self._prep_list_documents(page_size, retry, timeout)

iterator = await self._client._firestore_api.list_documents(
request={
"parent": parent,
"collection_id": self.id,
"page_size": page_size,
"show_missing": True,
},
metadata=self._client._rpc_metadata,
request=request, metadata=self._client._rpc_metadata, **kwargs,
)
async for i in iterator:
yield _item_to_document_ref(self, i)

async def get(self, transaction: Transaction = None) -> list:
async def get(
self,
transaction: Transaction = None,
retry: retries.Retry = None,
timeout: float = None,
) -> list:
"""Read the documents in this collection.
This sends a ``RunQuery`` RPC and returns a list of documents
Expand All @@ -142,6 +152,9 @@ async def get(self, transaction: Transaction = None) -> list:
transaction
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (float): The timeout for this request.
If a ``transaction`` is used and it already has write operations
added, this method cannot be used (i.e. read-after-write is not
Expand All @@ -150,11 +163,15 @@ async def get(self, transaction: Transaction = None) -> list:
Returns:
list: The documents in this collection that match the query.
"""
query = self._query()
return await query.get(transaction=transaction)
query, kwargs = self._prep_get_or_stream(retry, timeout)

return await query.get(transaction=transaction, **kwargs)

async def stream(
self, transaction: Transaction = None
self,
transaction: Transaction = None,
retry: retries.Retry = None,
timeout: float = None,
) -> AsyncIterator[async_document.DocumentSnapshot]:
"""Read the documents in this collection.
Expand All @@ -177,11 +194,15 @@ async def stream(
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\
Transaction`]):
An existing transaction that the query will run in.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried.
timeout (float): The timeout for this request.
Yields:
:class:`~google.cloud.firestore_v1.document.DocumentSnapshot`:
The next document that fulfills the query.
"""
query = async_query.AsyncQuery(self)
async for d in query.stream(transaction=transaction):
query, kwargs = self._prep_get_or_stream(retry, timeout)

async for d in query.stream(transaction=transaction, **kwargs):
yield d # pytype: disable=name-error
40 changes: 40 additions & 0 deletions google/cloud/firestore_v1/base_collection.py
Expand Up @@ -148,6 +148,22 @@ def _parent_info(self) -> Tuple[Any, str]:
expected_prefix = _helpers.DOCUMENT_PATH_DELIMITER.join((parent_path, self.id))
return parent_path, expected_prefix

def _prep_add(
self,
document_data: dict,
document_id: str = None,
retry: retries.Retry = None,
timeout: float = None,
) -> Tuple[DocumentReference, dict]:
"""Shared setup for async / sync :method:`add`"""
if document_id is None:
document_id = _auto_id()

document_ref = self.document(document_id)
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)

return document_ref, kwargs

def add(
self,
document_data: dict,
Expand All @@ -157,6 +173,21 @@ def add(
) -> Union[Tuple[Any, Any], Coroutine[Any, Any, Tuple[Any, Any]]]:
raise NotImplementedError

def _prep_list_documents(
self, page_size: int = None, retry: retries.Retry = None, timeout: float = None,
) -> Tuple[dict, dict]:
"""Shared setup for async / sync :method:`list_documents`"""
parent, _ = self._parent_info()
request = {
"parent": parent,
"collection_id": self.id,
"page_size": page_size,
"show_missing": True,
}
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)

return request, kwargs

def list_documents(
self, page_size: int = None, retry: retries.Retry = None, timeout: float = None,
) -> Union[
Expand Down Expand Up @@ -379,6 +410,15 @@ def end_at(
query = self._query()
return query.end_at(document_fields)

def _prep_get_or_stream(
self, retry: retries.Retry = None, timeout: float = None,
) -> Tuple[Any, dict]:
"""Shared setup for async / sync :meth:`get` / :meth:`stream`"""
query = self._query()
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)

return query, kwargs

def get(
self,
transaction: Transaction = None,
Expand Down
30 changes: 9 additions & 21 deletions google/cloud/firestore_v1/collection.py
Expand Up @@ -18,13 +18,11 @@

from google.cloud.firestore_v1.base_collection import (
BaseCollectionReference,
_auto_id,
_item_to_document_ref,
)
from google.cloud.firestore_v1 import query as query_mod
from google.cloud.firestore_v1.watch import Watch
from google.cloud.firestore_v1 import document
from google.cloud.firestore_v1 import _helpers
from typing import Any, Callable, Generator, Tuple

# Types needed only for Type Hints
Expand Down Expand Up @@ -101,11 +99,9 @@ def add(
~google.cloud.exceptions.Conflict: If ``document_id`` is provided
and the document already exists.
"""
if document_id is None:
document_id = _auto_id()

document_ref = self.document(document_id)
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
document_ref, kwargs = self._prep_add(
document_data, document_id, retry, timeout,
)
write_result = document_ref.create(document_data, **kwargs)
return write_result.update_time, document_ref

Expand All @@ -128,18 +124,10 @@ def list_documents(
collection does not exist at the time of `snapshot`, the
iterator will be empty
"""
parent, _ = self._parent_info()
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
request, kwargs = self._prep_list_documents(page_size, retry, timeout)

iterator = self._client._firestore_api.list_documents(
request={
"parent": parent,
"collection_id": self.id,
"page_size": page_size,
"show_missing": True,
},
metadata=self._client._rpc_metadata,
**kwargs,
request=request, metadata=self._client._rpc_metadata, **kwargs,
)
return (_item_to_document_ref(self, i) for i in iterator)

Expand Down Expand Up @@ -169,8 +157,8 @@ def get(
Returns:
list: The documents in this collection that match the query.
"""
query = query_mod.Query(self)
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
query, kwargs = self._prep_get_or_stream(retry, timeout)

return query.get(transaction=transaction, **kwargs)

def stream(
Expand Down Expand Up @@ -208,8 +196,8 @@ def stream(
:class:`~google.cloud.firestore_v1.document.DocumentSnapshot`:
The next document that fulfills the query.
"""
query = query_mod.Query(self)
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
query, kwargs = self._prep_get_or_stream(retry, timeout)

return query.stream(transaction=transaction, **kwargs)

def on_snapshot(self, callback: Callable) -> Watch:
Expand Down

0 comments on commit c81cd8c

Please sign in to comment.