Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add bulk writer #396

Merged
merged 5 commits into from Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions google/cloud/firestore_v1/async_client.py
Expand Up @@ -96,6 +96,19 @@ def __init__(
client_options=client_options,
)

def _to_sync_copy(self):
from google.cloud.firestore_v1.client import Client

if not getattr(self, "_sync_copy", None):
self._sync_copy = Client(
project=self.project,
credentials=self._credentials,
database=self._database,
client_info=self._client_info,
client_options=self._client_options,
)
return self._sync_copy

@property
def _firestore_api(self):
"""Lazy-loading getter GAPIC Firestore API.
Expand Down
44 changes: 33 additions & 11 deletions google/cloud/firestore_v1/base_batch.py
Expand Up @@ -14,16 +14,16 @@

"""Helpers for batch requests to the Google Cloud Firestore API."""


from google.cloud.firestore_v1 import _helpers
import abc
from typing import Dict, Union

# Types needed only for Type Hints
from google.cloud.firestore_v1.document import DocumentReference

from typing import Union
from google.api_core import retry as retries # type: ignore
from google.cloud.firestore_v1 import _helpers
from google.cloud.firestore_v1.base_document import BaseDocumentReference


class BaseWriteBatch(object):
class BaseBatch(metaclass=abc.ABCMeta):
"""Accumulate write operations to be sent in a batch.

This has the same set of methods for write operations that
Expand All @@ -38,9 +38,16 @@ class BaseWriteBatch(object):
def __init__(self, client) -> None:
self._client = client
self._write_pbs = []
self._document_references: Dict[str, BaseDocumentReference] = {}
self.write_results = None
self.commit_time = None

def __len__(self):
return len(self._document_references)

def __contains__(self, reference: BaseDocumentReference):
return reference._document_path in self._document_references

def _add_write_pbs(self, write_pbs: list) -> None:
"""Add `Write`` protobufs to this transaction.

Expand All @@ -52,7 +59,13 @@ def _add_write_pbs(self, write_pbs: list) -> None:
"""
self._write_pbs.extend(write_pbs)

def create(self, reference: DocumentReference, document_data: dict) -> None:
@abc.abstractmethod
def commit(self):
"""Sends all accumulated write operations to the server. The details of this
write depend on the implementing class."""
raise NotImplementedError()

def create(self, reference: BaseDocumentReference, document_data: dict) -> None:
"""Add a "change" to this batch to create a document.

If the document given by ``reference`` already exists, then this
Expand All @@ -65,11 +78,12 @@ def create(self, reference: DocumentReference, document_data: dict) -> None:
creating a document.
"""
write_pbs = _helpers.pbs_for_create(reference._document_path, document_data)
self._document_references[reference._document_path] = reference
self._add_write_pbs(write_pbs)

def set(
self,
reference: DocumentReference,
reference: BaseDocumentReference,
document_data: dict,
merge: Union[bool, list] = False,
) -> None:
Expand Down Expand Up @@ -98,11 +112,12 @@ def set(
reference._document_path, document_data
)

self._document_references[reference._document_path] = reference
self._add_write_pbs(write_pbs)

def update(
self,
reference: DocumentReference,
reference: BaseDocumentReference,
field_updates: dict,
option: _helpers.WriteOption = None,
) -> None:
Expand All @@ -126,10 +141,11 @@ def update(
write_pbs = _helpers.pbs_for_update(
reference._document_path, field_updates, option
)
self._document_references[reference._document_path] = reference
self._add_write_pbs(write_pbs)

def delete(
self, reference: DocumentReference, option: _helpers.WriteOption = None
self, reference: BaseDocumentReference, option: _helpers.WriteOption = None
) -> None:
"""Add a "change" to delete a document.

Expand All @@ -146,9 +162,15 @@ def delete(
state of the document before applying changes.
"""
write_pb = _helpers.pb_for_delete(reference._document_path, option)
self._document_references[reference._document_path] = reference
self._add_write_pbs([write_pb])

def _prep_commit(self, retry, timeout):

class BaseWriteBatch(BaseBatch):
"""Base class for a/sync implementations of the `commit` RPC. `commit` is useful
for lower volumes or when the order of write operations is important."""

def _prep_commit(self, retry: retries.Retry, timeout: float):
"""Shared setup for async/sync :meth:`commit`."""
request = {
"database": self._client._database_string,
Expand Down
20 changes: 19 additions & 1 deletion google/cloud/firestore_v1/base_client.py
Expand Up @@ -37,7 +37,10 @@
from google.cloud.firestore_v1 import __version__
from google.cloud.firestore_v1 import types
from google.cloud.firestore_v1.base_document import DocumentSnapshot

from google.cloud.firestore_v1.bulk_writer import (
BulkWriter,
BulkWriterOptions,
)
from google.cloud.firestore_v1.field_path import render_field_path
from typing import (
Any,
Expand Down Expand Up @@ -278,6 +281,21 @@ def _get_collection_reference(self, collection_id: str) -> BaseCollectionReferen
def document(self, *document_path) -> BaseDocumentReference:
raise NotImplementedError

def bulk_writer(self, options: Optional[BulkWriterOptions] = None) -> BulkWriter:
"""Get a BulkWriter instance from this client.

Args:
:class:`@google.cloud.firestore_v1.bulk_writer.BulkWriterOptions`:
Optional control parameters for the
:class:`@google.cloud.firestore_v1.bulk_writer.BulkWriter` returned.

Returns:
:class:`@google.cloud.firestore_v1.bulk_writer.BulkWriter`:
A utility to efficiently create and save many `WriteBatch` instances
to the server.
"""
return BulkWriter(client=self, options=options)

def _document_path_helper(self, *document_path) -> List[str]:
"""Standardize the format of path to tuple of path segments and strip the database string from path if present.

Expand Down
4 changes: 3 additions & 1 deletion google/cloud/firestore_v1/batch.py
Expand Up @@ -21,7 +21,9 @@


class WriteBatch(BaseWriteBatch):
"""Accumulate write operations to be sent in a batch.
"""Accumulate write operations to be sent in a batch. Use this over
`BulkWriteBatch` for lower volumes or when the order of operations
within a given batch is important.

This has the same set of methods for write operations that
:class:`~google.cloud.firestore_v1.document.DocumentReference` does,
Expand Down
89 changes: 89 additions & 0 deletions google/cloud/firestore_v1/bulk_batch.py
@@ -0,0 +1,89 @@
# Copyright 2021 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helpers for batch requests to the Google Cloud Firestore API."""
from google.api_core import gapic_v1 # type: ignore
from google.api_core import retry as retries # type: ignore

from google.cloud.firestore_v1 import _helpers
from google.cloud.firestore_v1.base_batch import BaseBatch
from google.cloud.firestore_v1.types.firestore import BatchWriteResponse


class BulkWriteBatch(BaseBatch):
"""Accumulate write operations to be sent in a batch. Use this over
`WriteBatch` for higher volumes (e.g., via `BulkWriter`) and when the order
of operations within a given batch is unimportant.

Because the order in which individual write operations are applied to the database
is not guaranteed, `batch_write` RPCs can never contain multiple operations
to the same document. If calling code detects a second write operation to a
known document reference, it should first cut off the previous batch and
send it, then create a new batch starting with the latest write operation.
In practice, the [Async]BulkWriter classes handle this.

This has the same set of methods for write operations that
:class:`~google.cloud.firestore_v1.document.DocumentReference` does,
e.g. :meth:`~google.cloud.firestore_v1.document.DocumentReference.create`.

Args:
client (:class:`~google.cloud.firestore_v1.client.Client`):
The client that created this batch.
"""

def __init__(self, client) -> None:
super(BulkWriteBatch, self).__init__(client=client)

def commit(
self, retry: retries.Retry = gapic_v1.method.DEFAULT, timeout: float = None
) -> BatchWriteResponse:
"""Writes the changes accumulated in this batch.

Write operations are not guaranteed to be applied in order and must not
contain multiple writes to any given document. Preferred over `commit`
for performance reasons if these conditions are acceptable.

Args:
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.

Returns:
:class:`google.cloud.proto.firestore.v1.write.BatchWriteResponse`:
Container holding the write results corresponding to the changes
committed, returned in the same order as the changes were applied to
this batch. An individual write result contains an ``update_time``
field.
"""
request, kwargs = self._prep_commit(retry, timeout)

_api = self._client._firestore_api
save_response: BatchWriteResponse = _api.batch_write(
request=request, metadata=self._client._rpc_metadata, **kwargs,
)

self._write_pbs = []
self.write_results = list(save_response.write_results)

return save_response

def _prep_commit(self, retry: retries.Retry, timeout: float):
request = {
"database": self._client._database_string,
"writes": self._write_pbs,
"labels": None,
}
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
return request, kwargs