Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add recursive delete #420

Merged
merged 6 commits into from Aug 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
84 changes: 83 additions & 1 deletion google/cloud/firestore_v1/async_client.py
Expand Up @@ -43,13 +43,17 @@
DocumentSnapshot,
)
from google.cloud.firestore_v1.async_transaction import AsyncTransaction
from google.cloud.firestore_v1.field_path import FieldPath
from google.cloud.firestore_v1.services.firestore import (
async_client as firestore_client,
)
from google.cloud.firestore_v1.services.firestore.transports import (
grpc_asyncio as firestore_grpc_transport,
)
from typing import Any, AsyncGenerator, Iterable, List
from typing import Any, AsyncGenerator, Iterable, List, Optional, Union, TYPE_CHECKING

if TYPE_CHECKING:
from google.cloud.firestore_v1.bulk_writer import BulkWriter # pragma: NO COVER


class AsyncClient(BaseClient):
Expand Down Expand Up @@ -300,6 +304,84 @@ async def collections(
async for collection_id in iterator:
yield self.collection(collection_id)

async def recursive_delete(
self,
reference: Union[AsyncCollectionReference, AsyncDocumentReference],
*,
bulk_writer: Optional["BulkWriter"] = None,
chunk_size: Optional[int] = 5000,
):
"""Deletes documents and their subcollections, regardless of collection
name.

Passing an AsyncCollectionReference leads to each document in the
collection getting deleted, as well as all of their descendents.

Passing an AsyncDocumentReference deletes that one document and all of
its descendents.

Args:
reference (Union[
:class:`@google.cloud.firestore_v1.async_collection.CollectionReference`,
:class:`@google.cloud.firestore_v1.async_document.DocumentReference`,
])
The reference to be deleted.

bulk_writer (Optional[:class:`@google.cloud.firestore_v1.bulk_writer.BulkWriter`])
The BulkWriter used to delete all matching documents. Supply this
if you want to override the default throttling behavior.
"""
return await self._recursive_delete(
reference, bulk_writer=bulk_writer, chunk_size=chunk_size,
)

async def _recursive_delete(
self,
reference: Union[AsyncCollectionReference, AsyncDocumentReference],
*,
bulk_writer: Optional["BulkWriter"] = None, # type: ignore
chunk_size: Optional[int] = 5000,
depth: Optional[int] = 0,
) -> int:
"""Recursion helper for `recursive_delete."""
from google.cloud.firestore_v1.bulk_writer import BulkWriter

bulk_writer = bulk_writer or BulkWriter()

num_deleted: int = 0

if isinstance(reference, AsyncCollectionReference):
chunk: List[DocumentSnapshot]
crwilcox marked this conversation as resolved.
Show resolved Hide resolved
async for chunk in reference.recursive().select(
[FieldPath.document_id()]
)._chunkify(chunk_size):
doc_snap: DocumentSnapshot
for doc_snap in chunk:
num_deleted += 1
bulk_writer.delete(doc_snap.reference)

elif isinstance(reference, AsyncDocumentReference):
col_ref: AsyncCollectionReference
async for col_ref in reference.collections():
crwilcox marked this conversation as resolved.
Show resolved Hide resolved
num_deleted += await self._recursive_delete(
col_ref,
bulk_writer=bulk_writer,
depth=depth + 1,
chunk_size=chunk_size,
)
num_deleted += 1
bulk_writer.delete(reference)

else:
raise TypeError(
f"Unexpected type for reference: {reference.__class__.__name__}"
)

if depth == 0:
bulk_writer.close()

return num_deleted

def batch(self) -> AsyncWriteBatch:
"""Get a batch instance from this client.

Expand Down
4 changes: 4 additions & 0 deletions google/cloud/firestore_v1/async_collection.py
Expand Up @@ -72,6 +72,10 @@ def _query(self) -> async_query.AsyncQuery:
"""
return async_query.AsyncQuery(self)

async def _chunkify(self, chunk_size: int):
async for page in self._query()._chunkify(chunk_size):
yield page

async def add(
self,
document_data: dict,
Expand Down
44 changes: 43 additions & 1 deletion google/cloud/firestore_v1/async_query.py
Expand Up @@ -33,7 +33,8 @@
)

from google.cloud.firestore_v1 import async_document
from typing import AsyncGenerator, Type
from google.cloud.firestore_v1.base_document import DocumentSnapshot
from typing import AsyncGenerator, List, Optional, Type

# Types needed only for Type Hints
from google.cloud.firestore_v1.transaction import Transaction
Expand Down Expand Up @@ -126,6 +127,47 @@ def __init__(
recursive=recursive,
)

async def _chunkify(
self, chunk_size: int
) -> AsyncGenerator[List[DocumentSnapshot], None]:
# Catch the edge case where a developer writes the following:
# `my_query.limit(500)._chunkify(1000)`, which ultimately nullifies any
# need to yield chunks.
if self._limit and chunk_size > self._limit:
yield await self.get()
return

max_to_return: Optional[int] = self._limit
num_returned: int = 0
original: AsyncQuery = self._copy()
last_document: Optional[DocumentSnapshot] = None

while True:
# Optionally trim the `chunk_size` down to honor a previously
# applied limit as set by `self.limit()`
_chunk_size: int = original._resolve_chunk_size(num_returned, chunk_size)

# Apply the optionally pruned limit and the cursor, if we are past
# the first page.
_q = original.limit(_chunk_size)
if last_document:
_q = _q.start_after(last_document)

snapshots = await _q.get()
last_document = snapshots[-1]
num_returned += len(snapshots)

yield snapshots

# Terminate the iterator if we have reached either of two end
# conditions:
# 1. There are no more documents, or
# 2. We have reached the desired overall limit
if len(snapshots) < _chunk_size or (
max_to_return and num_returned >= max_to_return
):
return

async def get(
self,
transaction: Transaction = None,
Expand Down
13 changes: 9 additions & 4 deletions google/cloud/firestore_v1/base_client.py
Expand Up @@ -37,11 +37,9 @@
from google.cloud.firestore_v1 import __version__
from google.cloud.firestore_v1 import types
from google.cloud.firestore_v1.base_document import DocumentSnapshot
from google.cloud.firestore_v1.bulk_writer import (
BulkWriter,
BulkWriterOptions,
)

from google.cloud.firestore_v1.field_path import render_field_path
from google.cloud.firestore_v1.bulk_writer import BulkWriter, BulkWriterOptions
from typing import (
Any,
AsyncGenerator,
Expand Down Expand Up @@ -312,6 +310,13 @@ def _document_path_helper(self, *document_path) -> List[str]:
joined_path = joined_path[len(base_path) :]
return joined_path.split(_helpers.DOCUMENT_PATH_DELIMITER)

def recursive_delete(
self,
reference: Union[BaseCollectionReference, BaseDocumentReference],
bulk_writer: Optional["BulkWriter"] = None, # type: ignore
) -> int:
raise NotImplementedError

@staticmethod
def field_path(*field_names: str) -> str:
"""Create a **field path** from a list of nested field names.
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/firestore_v1/base_document.py
Expand Up @@ -315,10 +315,10 @@ def _prep_collections(

def collections(
self, page_size: int = None, retry: retries.Retry = None, timeout: float = None,
) -> NoReturn:
crwilcox marked this conversation as resolved.
Show resolved Hide resolved
) -> None:
raise NotImplementedError

def on_snapshot(self, callback) -> NoReturn:
def on_snapshot(self, callback) -> None:
raise NotImplementedError


Expand Down
6 changes: 6 additions & 0 deletions google/cloud/firestore_v1/base_query.py
Expand Up @@ -424,6 +424,12 @@ def limit_to_last(self, count: int) -> "BaseQuery":
"""
return self._copy(limit=count, limit_to_last=True)

def _resolve_chunk_size(self, num_loaded: int, chunk_size: int) -> int:
"""Utility function for chunkify."""
if self._limit is not None and (num_loaded + chunk_size) > self._limit:
return max(self._limit - num_loaded, 0)
return chunk_size

def offset(self, num_to_skip: int) -> "BaseQuery":
"""Skip to an offset in a query.

Expand Down
88 changes: 87 additions & 1 deletion google/cloud/firestore_v1/client.py
Expand Up @@ -39,17 +39,22 @@
from google.cloud.firestore_v1.batch import WriteBatch
from google.cloud.firestore_v1.collection import CollectionReference
from google.cloud.firestore_v1.document import DocumentReference
from google.cloud.firestore_v1.field_path import FieldPath
from google.cloud.firestore_v1.transaction import Transaction
from google.cloud.firestore_v1.services.firestore import client as firestore_client
from google.cloud.firestore_v1.services.firestore.transports import (
grpc as firestore_grpc_transport,
)
from typing import Any, Generator, Iterable
from typing import Any, Generator, Iterable, List, Optional, Union, TYPE_CHECKING

# Types needed only for Type Hints
from google.cloud.firestore_v1.base_document import DocumentSnapshot


if TYPE_CHECKING:
from google.cloud.firestore_v1.bulk_writer import BulkWriter # pragma: NO COVER


class Client(BaseClient):
"""Client for interacting with Google Cloud Firestore API.

Expand Down Expand Up @@ -286,6 +291,87 @@ def collections(
for collection_id in iterator:
yield self.collection(collection_id)

def recursive_delete(
self,
reference: Union[CollectionReference, DocumentReference],
*,
bulk_writer: Optional["BulkWriter"] = None,
chunk_size: Optional[int] = 5000,
) -> int:
"""Deletes documents and their subcollections, regardless of collection
name.

Passing a CollectionReference leads to each document in the collection
getting deleted, as well as all of their descendents.

Passing a DocumentReference deletes that one document and all of its
descendents.

Args:
reference (Union[
:class:`@google.cloud.firestore_v1.collection.CollectionReference`,
:class:`@google.cloud.firestore_v1.document.DocumentReference`,
])
The reference to be deleted.

bulk_writer (Optional[:class:`@google.cloud.firestore_v1.bulk_writer.BulkWriter`])
The BulkWriter used to delete all matching documents. Supply this
if you want to override the default throttling behavior.

"""
return self._recursive_delete(
reference, bulk_writer=bulk_writer, chunk_size=chunk_size,
)

def _recursive_delete(
self,
reference: Union[CollectionReference, DocumentReference],
*,
bulk_writer: Optional["BulkWriter"] = None,
chunk_size: Optional[int] = 5000,
depth: Optional[int] = 0,
) -> int:
"""Recursion helper for `recursive_delete."""
from google.cloud.firestore_v1.bulk_writer import BulkWriter

bulk_writer = bulk_writer or BulkWriter()

num_deleted: int = 0

if isinstance(reference, CollectionReference):
chunk: List[DocumentSnapshot]
crwilcox marked this conversation as resolved.
Show resolved Hide resolved
for chunk in (
reference.recursive()
.select([FieldPath.document_id()])
._chunkify(chunk_size)
):
doc_snap: DocumentSnapshot
for doc_snap in chunk:
num_deleted += 1
bulk_writer.delete(doc_snap.reference)

elif isinstance(reference, DocumentReference):
col_ref: CollectionReference
for col_ref in reference.collections():
num_deleted += self._recursive_delete(
col_ref,
bulk_writer=bulk_writer,
chunk_size=chunk_size,
depth=depth + 1,
)
num_deleted += 1
bulk_writer.delete(reference)

else:
raise TypeError(
f"Unexpected type for reference: {reference.__class__.__name__}"
)

if depth == 0:
bulk_writer.close()

return num_deleted

def batch(self) -> WriteBatch:
"""Get a batch instance from this client.

Expand Down
3 changes: 3 additions & 0 deletions google/cloud/firestore_v1/collection.py
Expand Up @@ -137,6 +137,9 @@ def list_documents(
)
return (_item_to_document_ref(self, i) for i in iterator)

def _chunkify(self, chunk_size: int):
return self._query()._chunkify(chunk_size)

def get(
self,
transaction: Transaction = None,
Expand Down