Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add retry/timeout to manual surface #222

Merged
merged 41 commits into from Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
8d03921
feat: add retry/timeout to 'client.Client.get_all'
tseaver Oct 13, 2020
e7d2119
feat: add retry/timeout to 'client.Client.collections'
tseaver Oct 13, 2020
6bfe32f
feat: add retry/timeout to 'batch.Batch.commit'
tseaver Oct 13, 2020
6ba6d21
feat: add retry/timeout to 'document.DocumentReference.get'
tseaver Oct 13, 2020
e6ad4a1
feat: add retry/timeout to 'query.Query.get'
tseaver Oct 13, 2020
6e806b0
feat: add retry/timeout to 'query.CollectionGroup.get_partitions'
tseaver Oct 13, 2020
00736fe
feat: add retry/timeout to 'collection.CollectionReference.add'
tseaver Oct 13, 2020
2b80b91
feat: add retry/timeout to 'collection.CollectionReference.list_docum…
tseaver Oct 13, 2020
49d3b03
feat: add retry/timeout to 'collection.CollectionReference.get'
tseaver Oct 13, 2020
16a1e34
feat: add retry/timeout to 'collection.CollectionReference.stream'
tseaver Oct 13, 2020
a9547e6
feat: add retry/timeout to 'document.DocumentReference.collections'
tseaver Oct 13, 2020
d6df19c
feat: add retry/timeout to 'document.DocumentReference.delete'
tseaver Oct 13, 2020
6eae0e7
feat: add retry/timeout to 'document.DocumentReference.create'
tseaver Oct 13, 2020
559f2eb
feat: add retry/timeout to 'document.DocumentReference.set'
tseaver Oct 13, 2020
8038cce
feat: add retry/timeout to 'document.DocumentReference.update'
tseaver Oct 13, 2020
2d413df
feat: add retry/timeout to 'query.Query.stream'
tseaver Oct 13, 2020
e15b8f6
feat: add retry/timeout to 'transaction.Transaction.get_all'
tseaver Oct 13, 2020
9f5bbb4
feat: add retry/timeout to 'transaction.Transaction.get'
tseaver Oct 13, 2020
5a1ef50
feat: add retry/timeout to base class signatures
tseaver Oct 13, 2020
6dec6f3
fix: un-break docs build
tseaver Oct 13, 2020
812c41f
chore: factor out helper for computing retry / timeout kwargs
tseaver Oct 14, 2020
8c67138
chore: factor out common prep for 'collections'/'get_all' to base class
tseaver Oct 14, 2020
bad75e1
chore: factor out test helper for 'collections'
tseaver Oct 14, 2020
db01b59
feat: add retry/timeout to 'async_client.AsyncClient.{collections.get…
tseaver Oct 14, 2020
4090a00
chore: use factored-out helper to build retry/timeout kwargs
tseaver Oct 14, 2020
40fae96
chore: clean up tests for 'client.Client.get_all'
tseaver Oct 14, 2020
df615e4
chore: lint
tseaver Oct 14, 2020
a557a15
feat: add retry/timeout to 'async_batch.AsyncBatch.commit'
tseaver Oct 14, 2020
4e3be50
feat: add retry/timeout to 'async_document.AsyncDocument` methods
tseaver Oct 14, 2020
ec8002c
feat: add retry/timeout to 'async_query.Async{Query,CollectionGroup}'
tseaver Oct 14, 2020
c81cd8c
feat: add retry/timeout to 'async_collection.AsyncCollectionReference'
tseaver Oct 14, 2020
1acbde3
fix: typo
tseaver Oct 14, 2020
b998db2
chore: rename testcases/helper for clarity
tseaver Oct 14, 2020
46f27e6
feat: add retry/timeout to 'async_transaction.AsyncTransaction' methods
tseaver Oct 14, 2020
9b4707a
fix: typo
tseaver Oct 14, 2020
7a976a5
chore: appease pytype
tseaver Oct 14, 2020
4b1ec26
fix: actually test retry / timeout
tseaver Oct 14, 2020
c5e4056
fix: document system-specified defaults for 'retry' / 'timeout'
tseaver Oct 21, 2020
2360eac
fix: use gapic's 'DEFAULT' sentinel for 'retry'
tseaver Oct 21, 2020
f15a523
Merge branch 'master' into 221-retry-timeout
tseaver Oct 21, 2020
3feb63b
chore: lint
tseaver Oct 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 15 additions & 1 deletion google/cloud/firestore_v1/_helpers.py
Expand Up @@ -16,13 +16,14 @@

import datetime

from google.api_core.datetime_helpers import DatetimeWithNanoseconds # type: ignore
from google.api_core import gapic_v1 # type: ignore
from google.protobuf import struct_pb2
from google.type import latlng_pb2 # type: ignore
import grpc # type: ignore

from google.cloud import exceptions # type: ignore
from google.cloud._helpers import _datetime_to_pb_timestamp # type: ignore
from google.api_core.datetime_helpers import DatetimeWithNanoseconds # type: ignore
from google.cloud.firestore_v1.types.write import DocumentTransform
from google.cloud.firestore_v1 import transforms
from google.cloud.firestore_v1 import types
Expand Down Expand Up @@ -1042,3 +1043,16 @@ def modify_write(self, write, **unused_kwargs) -> None:
"""
current_doc = types.Precondition(exists=self._exists)
write._pb.current_document.CopyFrom(current_doc._pb)


def make_retry_timeout_kwargs(retry, timeout) -> dict:
"""Helper fo API methods which take optional 'retry' / 'timeout' args."""
kwargs = {}

if retry is not gapic_v1.method.DEFAULT:
kwargs["retry"] = retry

if timeout is not None:
kwargs["timeout"] = timeout

return kwargs
23 changes: 16 additions & 7 deletions google/cloud/firestore_v1/async_batch.py
Expand Up @@ -15,6 +15,9 @@
"""Helpers for batch requests to the Google Cloud Firestore API."""


from google.api_core import gapic_v1 # type: ignore
from google.api_core import retry as retries # type: ignore

from google.cloud.firestore_v1.base_batch import BaseWriteBatch


Expand All @@ -33,27 +36,33 @@ class AsyncWriteBatch(BaseWriteBatch):
def __init__(self, client) -> None:
super(AsyncWriteBatch, self).__init__(client=client)

async def commit(self) -> list:
async def commit(
self, retry: retries.Retry = gapic_v1.method.DEFAULT, timeout: float = None,
) -> list:
"""Commit the changes accumulated in this batch.

Args:
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.

Returns:
List[:class:`google.cloud.proto.firestore.v1.write.WriteResult`, ...]:
The write results corresponding to the changes committed, returned
in the same order as the changes were applied to this batch. A
write result contains an ``update_time`` field.
"""
request, kwargs = self._prep_commit(retry, timeout)

commit_response = await self._client._firestore_api.commit(
request={
"database": self._client._database_string,
"writes": self._write_pbs,
"transaction": None,
},
metadata=self._client._rpc_metadata,
request=request, metadata=self._client._rpc_metadata, **kwargs,
)

self._write_pbs = []
self.write_results = results = list(commit_response.write_results)
self.commit_time = commit_response.commit_time

return results

async def __aenter__(self):
Expand Down
53 changes: 32 additions & 21 deletions google/cloud/firestore_v1/async_client.py
Expand Up @@ -24,17 +24,17 @@
:class:`~google.cloud.firestore_v1.async_document.AsyncDocumentReference`
"""

from google.api_core import gapic_v1 # type: ignore
from google.api_core import retry as retries # type: ignore

from google.cloud.firestore_v1.base_client import (
BaseClient,
DEFAULT_DATABASE,
_CLIENT_INFO,
_reference_info, # type: ignore
_parse_batch_get, # type: ignore
_get_doc_mask,
_path_helper,
)

from google.cloud.firestore_v1 import _helpers
from google.cloud.firestore_v1.async_query import AsyncCollectionGroup
from google.cloud.firestore_v1.async_batch import AsyncWriteBatch
from google.cloud.firestore_v1.async_collection import AsyncCollectionReference
Expand Down Expand Up @@ -208,7 +208,12 @@ def document(self, *document_path: Tuple[str]) -> AsyncDocumentReference:
)

async def get_all(
self, references: list, field_paths: Iterable[str] = None, transaction=None,
self,
references: list,
field_paths: Iterable[str] = None,
transaction=None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
) -> AsyncGenerator[DocumentSnapshot, Any]:
"""Retrieve a batch of documents.

Expand Down Expand Up @@ -239,48 +244,54 @@ async def get_all(
transaction (Optional[:class:`~google.cloud.firestore_v1.async_transaction.AsyncTransaction`]):
An existing transaction that these ``references`` will be
retrieved in.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.

Yields:
.DocumentSnapshot: The next document snapshot that fulfills the
query, or :data:`None` if the document does not exist.
"""
document_paths, reference_map = _reference_info(references)
mask = _get_doc_mask(field_paths)
request, reference_map, kwargs = self._prep_get_all(
references, field_paths, transaction, retry, timeout
)

response_iterator = await self._firestore_api.batch_get_documents(
request={
"database": self._database_string,
"documents": document_paths,
"mask": mask,
"transaction": _helpers.get_transaction_id(transaction),
},
metadata=self._rpc_metadata,
request=request, metadata=self._rpc_metadata, **kwargs,
)

async for get_doc_response in response_iterator:
yield _parse_batch_get(get_doc_response, reference_map, self)

async def collections(self) -> AsyncGenerator[AsyncCollectionReference, Any]:
async def collections(
self, retry: retries.Retry = gapic_v1.method.DEFAULT, timeout: float = None,
) -> AsyncGenerator[AsyncCollectionReference, Any]:
"""List top-level collections of the client's database.

Args:
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.

Returns:
Sequence[:class:`~google.cloud.firestore_v1.async_collection.AsyncCollectionReference`]:
iterator of subcollections of the current document.
"""
request, kwargs = self._prep_collections(retry, timeout)
iterator = await self._firestore_api.list_collection_ids(
request={"parent": "{}/documents".format(self._database_string)},
metadata=self._rpc_metadata,
request=request, metadata=self._rpc_metadata, **kwargs,
)

while True:
for i in iterator.collection_ids:
yield self.collection(i)
if iterator.next_page_token:
next_request = request.copy()
next_request["page_token"] = iterator.next_page_token
iterator = await self._firestore_api.list_collection_ids(
request={
"parent": "{}/documents".format(self._database_string),
"page_token": iterator.next_page_token,
},
metadata=self._rpc_metadata,
request=next_request, metadata=self._rpc_metadata, **kwargs,
)
else:
return
Expand Down
77 changes: 53 additions & 24 deletions google/cloud/firestore_v1/async_collection.py
Expand Up @@ -13,9 +13,12 @@
# limitations under the License.

"""Classes for representing collections for the Google Cloud Firestore API."""

from google.api_core import gapic_v1 # type: ignore
from google.api_core import retry as retries # type: ignore

from google.cloud.firestore_v1.base_collection import (
BaseCollectionReference,
_auto_id,
_item_to_document_ref,
)
from google.cloud.firestore_v1 import (
Expand Down Expand Up @@ -70,7 +73,11 @@ def _query(self) -> async_query.AsyncQuery:
return async_query.AsyncQuery(self)

async def add(
self, document_data: dict, document_id: str = None
self,
document_data: dict,
document_id: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
) -> Tuple[Any, Any]:
"""Create a document in the Firestore database with the provided data.

Expand All @@ -82,6 +89,10 @@ async def add(
automatically assigned by the server (the assigned ID will be
a random 20 character string composed of digits,
uppercase and lowercase letters).
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.

Returns:
Tuple[:class:`google.protobuf.timestamp_pb2.Timestamp`, \
Expand All @@ -95,44 +106,49 @@ async def add(
~google.cloud.exceptions.Conflict: If ``document_id`` is provided
and the document already exists.
"""
if document_id is None:
document_id = _auto_id()

document_ref = self.document(document_id)
write_result = await document_ref.create(document_data)
document_ref, kwargs = self._prep_add(
document_data, document_id, retry, timeout,
)
write_result = await document_ref.create(document_data, **kwargs)
return write_result.update_time, document_ref

async def list_documents(
self, page_size: int = None
self,
page_size: int = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
) -> AsyncGenerator[DocumentReference, None]:
"""List all subdocuments of the current collection.

Args:
page_size (Optional[int]]): The maximum number of documents
in each page of results from this request. Non-positive values
are ignored. Defaults to a sensible value set by the API.
in each page of results from this request. Non-positive values
are ignored. Defaults to a sensible value set by the API.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.

Returns:
Sequence[:class:`~google.cloud.firestore_v1.collection.DocumentReference`]:
iterator of subdocuments of the current collection. If the
collection does not exist at the time of `snapshot`, the
iterator will be empty
"""
parent, _ = self._parent_info()
request, kwargs = self._prep_list_documents(page_size, retry, timeout)

iterator = await self._client._firestore_api.list_documents(
request={
"parent": parent,
"collection_id": self.id,
"page_size": page_size,
"show_missing": True,
},
metadata=self._client._rpc_metadata,
request=request, metadata=self._client._rpc_metadata, **kwargs,
)
async for i in iterator:
yield _item_to_document_ref(self, i)

async def get(self, transaction: Transaction = None) -> list:
async def get(
self,
transaction: Transaction = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
) -> list:
"""Read the documents in this collection.

This sends a ``RunQuery`` RPC and returns a list of documents
Expand All @@ -142,6 +158,10 @@ async def get(self, transaction: Transaction = None) -> list:
transaction
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.

If a ``transaction`` is used and it already has write operations
added, this method cannot be used (i.e. read-after-write is not
Expand All @@ -150,11 +170,15 @@ async def get(self, transaction: Transaction = None) -> list:
Returns:
list: The documents in this collection that match the query.
"""
query = self._query()
return await query.get(transaction=transaction)
query, kwargs = self._prep_get_or_stream(retry, timeout)

return await query.get(transaction=transaction, **kwargs)

async def stream(
self, transaction: Transaction = None
self,
transaction: Transaction = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
) -> AsyncIterator[async_document.DocumentSnapshot]:
"""Read the documents in this collection.

Expand All @@ -177,11 +201,16 @@ async def stream(
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\
Transaction`]):
An existing transaction that the query will run in.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.

Yields:
:class:`~google.cloud.firestore_v1.document.DocumentSnapshot`:
The next document that fulfills the query.
"""
query = async_query.AsyncQuery(self)
async for d in query.stream(transaction=transaction):
query, kwargs = self._prep_get_or_stream(retry, timeout)

async for d in query.stream(transaction=transaction, **kwargs):
yield d # pytype: disable=name-error