Skip to content

Commit

Permalink
feat: add recursive delete (#420)
Browse files Browse the repository at this point in the history
* feat: add recursive delete

* made chunkify private

Co-authored-by: Christopher Wilcox <crwilcox@google.com>
  • Loading branch information
craiglabenz and crwilcox committed Aug 16, 2021
1 parent 0923c95 commit 813a57b
Show file tree
Hide file tree
Showing 17 changed files with 1,046 additions and 146 deletions.
84 changes: 83 additions & 1 deletion google/cloud/firestore_v1/async_client.py
Expand Up @@ -43,13 +43,17 @@
DocumentSnapshot,
)
from google.cloud.firestore_v1.async_transaction import AsyncTransaction
from google.cloud.firestore_v1.field_path import FieldPath
from google.cloud.firestore_v1.services.firestore import (
async_client as firestore_client,
)
from google.cloud.firestore_v1.services.firestore.transports import (
grpc_asyncio as firestore_grpc_transport,
)
from typing import Any, AsyncGenerator, Iterable, List
from typing import Any, AsyncGenerator, Iterable, List, Optional, Union, TYPE_CHECKING

if TYPE_CHECKING:
from google.cloud.firestore_v1.bulk_writer import BulkWriter # pragma: NO COVER


class AsyncClient(BaseClient):
Expand Down Expand Up @@ -300,6 +304,84 @@ async def collections(
async for collection_id in iterator:
yield self.collection(collection_id)

async def recursive_delete(
self,
reference: Union[AsyncCollectionReference, AsyncDocumentReference],
*,
bulk_writer: Optional["BulkWriter"] = None,
chunk_size: Optional[int] = 5000,
):
"""Deletes documents and their subcollections, regardless of collection
name.
Passing an AsyncCollectionReference leads to each document in the
collection getting deleted, as well as all of their descendents.
Passing an AsyncDocumentReference deletes that one document and all of
its descendents.
Args:
reference (Union[
:class:`@google.cloud.firestore_v1.async_collection.CollectionReference`,
:class:`@google.cloud.firestore_v1.async_document.DocumentReference`,
])
The reference to be deleted.
bulk_writer (Optional[:class:`@google.cloud.firestore_v1.bulk_writer.BulkWriter`])
The BulkWriter used to delete all matching documents. Supply this
if you want to override the default throttling behavior.
"""
return await self._recursive_delete(
reference, bulk_writer=bulk_writer, chunk_size=chunk_size,
)

async def _recursive_delete(
self,
reference: Union[AsyncCollectionReference, AsyncDocumentReference],
*,
bulk_writer: Optional["BulkWriter"] = None, # type: ignore
chunk_size: Optional[int] = 5000,
depth: Optional[int] = 0,
) -> int:
"""Recursion helper for `recursive_delete."""
from google.cloud.firestore_v1.bulk_writer import BulkWriter

bulk_writer = bulk_writer or BulkWriter()

num_deleted: int = 0

if isinstance(reference, AsyncCollectionReference):
chunk: List[DocumentSnapshot]
async for chunk in reference.recursive().select(
[FieldPath.document_id()]
)._chunkify(chunk_size):
doc_snap: DocumentSnapshot
for doc_snap in chunk:
num_deleted += 1
bulk_writer.delete(doc_snap.reference)

elif isinstance(reference, AsyncDocumentReference):
col_ref: AsyncCollectionReference
async for col_ref in reference.collections():
num_deleted += await self._recursive_delete(
col_ref,
bulk_writer=bulk_writer,
depth=depth + 1,
chunk_size=chunk_size,
)
num_deleted += 1
bulk_writer.delete(reference)

else:
raise TypeError(
f"Unexpected type for reference: {reference.__class__.__name__}"
)

if depth == 0:
bulk_writer.close()

return num_deleted

def batch(self) -> AsyncWriteBatch:
"""Get a batch instance from this client.
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/firestore_v1/async_collection.py
Expand Up @@ -72,6 +72,10 @@ def _query(self) -> async_query.AsyncQuery:
"""
return async_query.AsyncQuery(self)

async def _chunkify(self, chunk_size: int):
async for page in self._query()._chunkify(chunk_size):
yield page

async def add(
self,
document_data: dict,
Expand Down
44 changes: 43 additions & 1 deletion google/cloud/firestore_v1/async_query.py
Expand Up @@ -33,7 +33,8 @@
)

from google.cloud.firestore_v1 import async_document
from typing import AsyncGenerator, Type
from google.cloud.firestore_v1.base_document import DocumentSnapshot
from typing import AsyncGenerator, List, Optional, Type

# Types needed only for Type Hints
from google.cloud.firestore_v1.transaction import Transaction
Expand Down Expand Up @@ -126,6 +127,47 @@ def __init__(
recursive=recursive,
)

async def _chunkify(
self, chunk_size: int
) -> AsyncGenerator[List[DocumentSnapshot], None]:
# Catch the edge case where a developer writes the following:
# `my_query.limit(500)._chunkify(1000)`, which ultimately nullifies any
# need to yield chunks.
if self._limit and chunk_size > self._limit:
yield await self.get()
return

max_to_return: Optional[int] = self._limit
num_returned: int = 0
original: AsyncQuery = self._copy()
last_document: Optional[DocumentSnapshot] = None

while True:
# Optionally trim the `chunk_size` down to honor a previously
# applied limit as set by `self.limit()`
_chunk_size: int = original._resolve_chunk_size(num_returned, chunk_size)

# Apply the optionally pruned limit and the cursor, if we are past
# the first page.
_q = original.limit(_chunk_size)
if last_document:
_q = _q.start_after(last_document)

snapshots = await _q.get()
last_document = snapshots[-1]
num_returned += len(snapshots)

yield snapshots

# Terminate the iterator if we have reached either of two end
# conditions:
# 1. There are no more documents, or
# 2. We have reached the desired overall limit
if len(snapshots) < _chunk_size or (
max_to_return and num_returned >= max_to_return
):
return

async def get(
self,
transaction: Transaction = None,
Expand Down
13 changes: 9 additions & 4 deletions google/cloud/firestore_v1/base_client.py
Expand Up @@ -37,11 +37,9 @@
from google.cloud.firestore_v1 import __version__
from google.cloud.firestore_v1 import types
from google.cloud.firestore_v1.base_document import DocumentSnapshot
from google.cloud.firestore_v1.bulk_writer import (
BulkWriter,
BulkWriterOptions,
)

from google.cloud.firestore_v1.field_path import render_field_path
from google.cloud.firestore_v1.bulk_writer import BulkWriter, BulkWriterOptions
from typing import (
Any,
AsyncGenerator,
Expand Down Expand Up @@ -312,6 +310,13 @@ def _document_path_helper(self, *document_path) -> List[str]:
joined_path = joined_path[len(base_path) :]
return joined_path.split(_helpers.DOCUMENT_PATH_DELIMITER)

def recursive_delete(
self,
reference: Union[BaseCollectionReference, BaseDocumentReference],
bulk_writer: Optional["BulkWriter"] = None, # type: ignore
) -> int:
raise NotImplementedError

@staticmethod
def field_path(*field_names: str) -> str:
"""Create a **field path** from a list of nested field names.
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/firestore_v1/base_document.py
Expand Up @@ -315,10 +315,10 @@ def _prep_collections(

def collections(
self, page_size: int = None, retry: retries.Retry = None, timeout: float = None,
) -> NoReturn:
) -> None:
raise NotImplementedError

def on_snapshot(self, callback) -> NoReturn:
def on_snapshot(self, callback) -> None:
raise NotImplementedError


Expand Down
6 changes: 6 additions & 0 deletions google/cloud/firestore_v1/base_query.py
Expand Up @@ -424,6 +424,12 @@ def limit_to_last(self, count: int) -> "BaseQuery":
"""
return self._copy(limit=count, limit_to_last=True)

def _resolve_chunk_size(self, num_loaded: int, chunk_size: int) -> int:
"""Utility function for chunkify."""
if self._limit is not None and (num_loaded + chunk_size) > self._limit:
return max(self._limit - num_loaded, 0)
return chunk_size

def offset(self, num_to_skip: int) -> "BaseQuery":
"""Skip to an offset in a query.
Expand Down
88 changes: 87 additions & 1 deletion google/cloud/firestore_v1/client.py
Expand Up @@ -39,17 +39,22 @@
from google.cloud.firestore_v1.batch import WriteBatch
from google.cloud.firestore_v1.collection import CollectionReference
from google.cloud.firestore_v1.document import DocumentReference
from google.cloud.firestore_v1.field_path import FieldPath
from google.cloud.firestore_v1.transaction import Transaction
from google.cloud.firestore_v1.services.firestore import client as firestore_client
from google.cloud.firestore_v1.services.firestore.transports import (
grpc as firestore_grpc_transport,
)
from typing import Any, Generator, Iterable
from typing import Any, Generator, Iterable, List, Optional, Union, TYPE_CHECKING

# Types needed only for Type Hints
from google.cloud.firestore_v1.base_document import DocumentSnapshot


if TYPE_CHECKING:
from google.cloud.firestore_v1.bulk_writer import BulkWriter # pragma: NO COVER


class Client(BaseClient):
"""Client for interacting with Google Cloud Firestore API.
Expand Down Expand Up @@ -286,6 +291,87 @@ def collections(
for collection_id in iterator:
yield self.collection(collection_id)

def recursive_delete(
self,
reference: Union[CollectionReference, DocumentReference],
*,
bulk_writer: Optional["BulkWriter"] = None,
chunk_size: Optional[int] = 5000,
) -> int:
"""Deletes documents and their subcollections, regardless of collection
name.
Passing a CollectionReference leads to each document in the collection
getting deleted, as well as all of their descendents.
Passing a DocumentReference deletes that one document and all of its
descendents.
Args:
reference (Union[
:class:`@google.cloud.firestore_v1.collection.CollectionReference`,
:class:`@google.cloud.firestore_v1.document.DocumentReference`,
])
The reference to be deleted.
bulk_writer (Optional[:class:`@google.cloud.firestore_v1.bulk_writer.BulkWriter`])
The BulkWriter used to delete all matching documents. Supply this
if you want to override the default throttling behavior.
"""
return self._recursive_delete(
reference, bulk_writer=bulk_writer, chunk_size=chunk_size,
)

def _recursive_delete(
self,
reference: Union[CollectionReference, DocumentReference],
*,
bulk_writer: Optional["BulkWriter"] = None,
chunk_size: Optional[int] = 5000,
depth: Optional[int] = 0,
) -> int:
"""Recursion helper for `recursive_delete."""
from google.cloud.firestore_v1.bulk_writer import BulkWriter

bulk_writer = bulk_writer or BulkWriter()

num_deleted: int = 0

if isinstance(reference, CollectionReference):
chunk: List[DocumentSnapshot]
for chunk in (
reference.recursive()
.select([FieldPath.document_id()])
._chunkify(chunk_size)
):
doc_snap: DocumentSnapshot
for doc_snap in chunk:
num_deleted += 1
bulk_writer.delete(doc_snap.reference)

elif isinstance(reference, DocumentReference):
col_ref: CollectionReference
for col_ref in reference.collections():
num_deleted += self._recursive_delete(
col_ref,
bulk_writer=bulk_writer,
chunk_size=chunk_size,
depth=depth + 1,
)
num_deleted += 1
bulk_writer.delete(reference)

else:
raise TypeError(
f"Unexpected type for reference: {reference.__class__.__name__}"
)

if depth == 0:
bulk_writer.close()

return num_deleted

def batch(self) -> WriteBatch:
"""Get a batch instance from this client.
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/firestore_v1/collection.py
Expand Up @@ -137,6 +137,9 @@ def list_documents(
)
return (_item_to_document_ref(self, i) for i in iterator)

def _chunkify(self, chunk_size: int):
return self._query()._chunkify(chunk_size)

def get(
self,
transaction: Transaction = None,
Expand Down

0 comments on commit 813a57b

Please sign in to comment.