Skip to content

Commit

Permalink
feat: add retry/timeout to manual surface (#222)
Browse files Browse the repository at this point in the history
Closes #221
  • Loading branch information
tseaver committed Oct 21, 2020
1 parent 1d09f21 commit db5f286
Show file tree
Hide file tree
Showing 32 changed files with 1,656 additions and 609 deletions.
16 changes: 15 additions & 1 deletion google/cloud/firestore_v1/_helpers.py
Expand Up @@ -16,13 +16,14 @@

import datetime

from google.api_core.datetime_helpers import DatetimeWithNanoseconds # type: ignore
from google.api_core import gapic_v1 # type: ignore
from google.protobuf import struct_pb2
from google.type import latlng_pb2 # type: ignore
import grpc # type: ignore

from google.cloud import exceptions # type: ignore
from google.cloud._helpers import _datetime_to_pb_timestamp # type: ignore
from google.api_core.datetime_helpers import DatetimeWithNanoseconds # type: ignore
from google.cloud.firestore_v1.types.write import DocumentTransform
from google.cloud.firestore_v1 import transforms
from google.cloud.firestore_v1 import types
Expand Down Expand Up @@ -1042,3 +1043,16 @@ def modify_write(self, write, **unused_kwargs) -> None:
"""
current_doc = types.Precondition(exists=self._exists)
write._pb.current_document.CopyFrom(current_doc._pb)


def make_retry_timeout_kwargs(retry, timeout) -> dict:
"""Helper fo API methods which take optional 'retry' / 'timeout' args."""
kwargs = {}

if retry is not gapic_v1.method.DEFAULT:
kwargs["retry"] = retry

if timeout is not None:
kwargs["timeout"] = timeout

return kwargs
23 changes: 16 additions & 7 deletions google/cloud/firestore_v1/async_batch.py
Expand Up @@ -15,6 +15,9 @@
"""Helpers for batch requests to the Google Cloud Firestore API."""


from google.api_core import gapic_v1 # type: ignore
from google.api_core import retry as retries # type: ignore

from google.cloud.firestore_v1.base_batch import BaseWriteBatch


Expand All @@ -33,27 +36,33 @@ class AsyncWriteBatch(BaseWriteBatch):
def __init__(self, client) -> None:
super(AsyncWriteBatch, self).__init__(client=client)

async def commit(self) -> list:
async def commit(
self, retry: retries.Retry = gapic_v1.method.DEFAULT, timeout: float = None,
) -> list:
"""Commit the changes accumulated in this batch.
Args:
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
Returns:
List[:class:`google.cloud.proto.firestore.v1.write.WriteResult`, ...]:
The write results corresponding to the changes committed, returned
in the same order as the changes were applied to this batch. A
write result contains an ``update_time`` field.
"""
request, kwargs = self._prep_commit(retry, timeout)

commit_response = await self._client._firestore_api.commit(
request={
"database": self._client._database_string,
"writes": self._write_pbs,
"transaction": None,
},
metadata=self._client._rpc_metadata,
request=request, metadata=self._client._rpc_metadata, **kwargs,
)

self._write_pbs = []
self.write_results = results = list(commit_response.write_results)
self.commit_time = commit_response.commit_time

return results

async def __aenter__(self):
Expand Down
53 changes: 32 additions & 21 deletions google/cloud/firestore_v1/async_client.py
Expand Up @@ -24,17 +24,17 @@
:class:`~google.cloud.firestore_v1.async_document.AsyncDocumentReference`
"""

from google.api_core import gapic_v1 # type: ignore
from google.api_core import retry as retries # type: ignore

from google.cloud.firestore_v1.base_client import (
BaseClient,
DEFAULT_DATABASE,
_CLIENT_INFO,
_reference_info, # type: ignore
_parse_batch_get, # type: ignore
_get_doc_mask,
_path_helper,
)

from google.cloud.firestore_v1 import _helpers
from google.cloud.firestore_v1.async_query import AsyncCollectionGroup
from google.cloud.firestore_v1.async_batch import AsyncWriteBatch
from google.cloud.firestore_v1.async_collection import AsyncCollectionReference
Expand Down Expand Up @@ -208,7 +208,12 @@ def document(self, *document_path: Tuple[str]) -> AsyncDocumentReference:
)

async def get_all(
self, references: list, field_paths: Iterable[str] = None, transaction=None,
self,
references: list,
field_paths: Iterable[str] = None,
transaction=None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
) -> AsyncGenerator[DocumentSnapshot, Any]:
"""Retrieve a batch of documents.
Expand Down Expand Up @@ -239,48 +244,54 @@ async def get_all(
transaction (Optional[:class:`~google.cloud.firestore_v1.async_transaction.AsyncTransaction`]):
An existing transaction that these ``references`` will be
retrieved in.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
Yields:
.DocumentSnapshot: The next document snapshot that fulfills the
query, or :data:`None` if the document does not exist.
"""
document_paths, reference_map = _reference_info(references)
mask = _get_doc_mask(field_paths)
request, reference_map, kwargs = self._prep_get_all(
references, field_paths, transaction, retry, timeout
)

response_iterator = await self._firestore_api.batch_get_documents(
request={
"database": self._database_string,
"documents": document_paths,
"mask": mask,
"transaction": _helpers.get_transaction_id(transaction),
},
metadata=self._rpc_metadata,
request=request, metadata=self._rpc_metadata, **kwargs,
)

async for get_doc_response in response_iterator:
yield _parse_batch_get(get_doc_response, reference_map, self)

async def collections(self) -> AsyncGenerator[AsyncCollectionReference, Any]:
async def collections(
self, retry: retries.Retry = gapic_v1.method.DEFAULT, timeout: float = None,
) -> AsyncGenerator[AsyncCollectionReference, Any]:
"""List top-level collections of the client's database.
Args:
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
Returns:
Sequence[:class:`~google.cloud.firestore_v1.async_collection.AsyncCollectionReference`]:
iterator of subcollections of the current document.
"""
request, kwargs = self._prep_collections(retry, timeout)
iterator = await self._firestore_api.list_collection_ids(
request={"parent": "{}/documents".format(self._database_string)},
metadata=self._rpc_metadata,
request=request, metadata=self._rpc_metadata, **kwargs,
)

while True:
for i in iterator.collection_ids:
yield self.collection(i)
if iterator.next_page_token:
next_request = request.copy()
next_request["page_token"] = iterator.next_page_token
iterator = await self._firestore_api.list_collection_ids(
request={
"parent": "{}/documents".format(self._database_string),
"page_token": iterator.next_page_token,
},
metadata=self._rpc_metadata,
request=next_request, metadata=self._rpc_metadata, **kwargs,
)
else:
return
Expand Down
77 changes: 53 additions & 24 deletions google/cloud/firestore_v1/async_collection.py
Expand Up @@ -13,9 +13,12 @@
# limitations under the License.

"""Classes for representing collections for the Google Cloud Firestore API."""

from google.api_core import gapic_v1 # type: ignore
from google.api_core import retry as retries # type: ignore

from google.cloud.firestore_v1.base_collection import (
BaseCollectionReference,
_auto_id,
_item_to_document_ref,
)
from google.cloud.firestore_v1 import (
Expand Down Expand Up @@ -70,7 +73,11 @@ def _query(self) -> async_query.AsyncQuery:
return async_query.AsyncQuery(self)

async def add(
self, document_data: dict, document_id: str = None
self,
document_data: dict,
document_id: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
) -> Tuple[Any, Any]:
"""Create a document in the Firestore database with the provided data.
Expand All @@ -82,6 +89,10 @@ async def add(
automatically assigned by the server (the assigned ID will be
a random 20 character string composed of digits,
uppercase and lowercase letters).
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
Returns:
Tuple[:class:`google.protobuf.timestamp_pb2.Timestamp`, \
Expand All @@ -95,44 +106,49 @@ async def add(
~google.cloud.exceptions.Conflict: If ``document_id`` is provided
and the document already exists.
"""
if document_id is None:
document_id = _auto_id()

document_ref = self.document(document_id)
write_result = await document_ref.create(document_data)
document_ref, kwargs = self._prep_add(
document_data, document_id, retry, timeout,
)
write_result = await document_ref.create(document_data, **kwargs)
return write_result.update_time, document_ref

async def list_documents(
self, page_size: int = None
self,
page_size: int = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
) -> AsyncGenerator[DocumentReference, None]:
"""List all subdocuments of the current collection.
Args:
page_size (Optional[int]]): The maximum number of documents
in each page of results from this request. Non-positive values
are ignored. Defaults to a sensible value set by the API.
in each page of results from this request. Non-positive values
are ignored. Defaults to a sensible value set by the API.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
Returns:
Sequence[:class:`~google.cloud.firestore_v1.collection.DocumentReference`]:
iterator of subdocuments of the current collection. If the
collection does not exist at the time of `snapshot`, the
iterator will be empty
"""
parent, _ = self._parent_info()
request, kwargs = self._prep_list_documents(page_size, retry, timeout)

iterator = await self._client._firestore_api.list_documents(
request={
"parent": parent,
"collection_id": self.id,
"page_size": page_size,
"show_missing": True,
},
metadata=self._client._rpc_metadata,
request=request, metadata=self._client._rpc_metadata, **kwargs,
)
async for i in iterator:
yield _item_to_document_ref(self, i)

async def get(self, transaction: Transaction = None) -> list:
async def get(
self,
transaction: Transaction = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
) -> list:
"""Read the documents in this collection.
This sends a ``RunQuery`` RPC and returns a list of documents
Expand All @@ -142,6 +158,10 @@ async def get(self, transaction: Transaction = None) -> list:
transaction
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
If a ``transaction`` is used and it already has write operations
added, this method cannot be used (i.e. read-after-write is not
Expand All @@ -150,11 +170,15 @@ async def get(self, transaction: Transaction = None) -> list:
Returns:
list: The documents in this collection that match the query.
"""
query = self._query()
return await query.get(transaction=transaction)
query, kwargs = self._prep_get_or_stream(retry, timeout)

return await query.get(transaction=transaction, **kwargs)

async def stream(
self, transaction: Transaction = None
self,
transaction: Transaction = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
) -> AsyncIterator[async_document.DocumentSnapshot]:
"""Read the documents in this collection.
Expand All @@ -177,11 +201,16 @@ async def stream(
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\
Transaction`]):
An existing transaction that the query will run in.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
Yields:
:class:`~google.cloud.firestore_v1.document.DocumentSnapshot`:
The next document that fulfills the query.
"""
query = async_query.AsyncQuery(self)
async for d in query.stream(transaction=transaction):
query, kwargs = self._prep_get_or_stream(retry, timeout)

async for d in query.stream(transaction=transaction, **kwargs):
yield d # pytype: disable=name-error

0 comments on commit db5f286

Please sign in to comment.