Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: partition queries #210

Merged
merged 2 commits into from Oct 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions google/cloud/firestore.py
Expand Up @@ -26,6 +26,7 @@
from google.cloud.firestore_v1 import AsyncTransaction
from google.cloud.firestore_v1 import AsyncWriteBatch
from google.cloud.firestore_v1 import Client
from google.cloud.firestore_v1 import CollectionGroup
from google.cloud.firestore_v1 import CollectionReference
from google.cloud.firestore_v1 import DELETE_FIELD
from google.cloud.firestore_v1 import DocumentReference
Expand Down Expand Up @@ -61,6 +62,7 @@
"AsyncTransaction",
"AsyncWriteBatch",
"Client",
"CollectionGroup",
"CollectionReference",
"DELETE_FIELD",
"DocumentReference",
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/firestore_v1/__init__.py
Expand Up @@ -40,6 +40,7 @@
from google.cloud.firestore_v1.client import Client
from google.cloud.firestore_v1.collection import CollectionReference
from google.cloud.firestore_v1.document import DocumentReference
from google.cloud.firestore_v1.query import CollectionGroup
from google.cloud.firestore_v1.query import Query
from google.cloud.firestore_v1.transaction import Transaction
from google.cloud.firestore_v1.transaction import transactional
Expand Down Expand Up @@ -115,6 +116,7 @@
"AsyncTransaction",
"AsyncWriteBatch",
"Client",
"CollectionGroup",
"CollectionReference",
"DELETE_FIELD",
"DocumentReference",
Expand Down
10 changes: 4 additions & 6 deletions google/cloud/firestore_v1/async_client.py
Expand Up @@ -35,7 +35,7 @@
)

from google.cloud.firestore_v1 import _helpers
from google.cloud.firestore_v1.async_query import AsyncQuery
from google.cloud.firestore_v1.async_query import AsyncCollectionGroup
from google.cloud.firestore_v1.async_batch import AsyncWriteBatch
from google.cloud.firestore_v1.async_collection import AsyncCollectionReference
from google.cloud.firestore_v1.async_document import (
Expand Down Expand Up @@ -150,7 +150,7 @@ def collection(self, *collection_path) -> AsyncCollectionReference:
"""
return AsyncCollectionReference(*_path_helper(collection_path), client=self)

def collection_group(self, collection_id) -> AsyncQuery:
def collection_group(self, collection_id) -> AsyncCollectionGroup:
"""
Creates and returns a new AsyncQuery that includes all documents in the
database that are contained in a collection or subcollection with the
Expand All @@ -167,12 +167,10 @@ def collection_group(self, collection_id) -> AsyncQuery:
path will be included. Cannot contain a slash.

Returns:
:class:`~google.cloud.firestore_v1.async_query.AsyncQuery`:
:class:`~google.cloud.firestore_v1.async_query.AsyncCollectionGroup`:
The created AsyncQuery.
"""
return AsyncQuery(
self._get_collection_reference(collection_id), all_descendants=True
)
return AsyncCollectionGroup(self._get_collection_reference(collection_id))

def document(self, *document_path) -> AsyncDocumentReference:
"""Get a reference to a document in a collection.
Expand Down
82 changes: 82 additions & 0 deletions google/cloud/firestore_v1/async_query.py
Expand Up @@ -19,7 +19,9 @@
a more common way to create a query than direct usage of the constructor.
"""
from google.cloud.firestore_v1.base_query import (
BaseCollectionGroup,
BaseQuery,
QueryPartition,
_query_response_to_snapshot,
_collection_group_query_response_to_snapshot,
_enum_from_direction,
Expand Down Expand Up @@ -207,3 +209,83 @@ async def stream(
)
if snapshot is not None:
yield snapshot


class AsyncCollectionGroup(AsyncQuery, BaseCollectionGroup):
"""Represents a Collection Group in the Firestore API.

This is a specialization of :class:`.AsyncQuery` that includes all documents in the
database that are contained in a collection or subcollection of the given
parent.

Args:
parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
The collection that this query applies to.
"""

def __init__(
self,
parent,
projection=None,
field_filters=(),
orders=(),
limit=None,
limit_to_last=False,
offset=None,
start_at=None,
end_at=None,
all_descendants=True,
) -> None:
super(AsyncCollectionGroup, self).__init__(
parent=parent,
projection=projection,
field_filters=field_filters,
orders=orders,
limit=limit,
limit_to_last=limit_to_last,
offset=offset,
start_at=start_at,
end_at=end_at,
all_descendants=all_descendants,
)

async def get_partitions(
self, partition_count
) -> AsyncGenerator[QueryPartition, None]:
"""Partition a query for parallelization.

Partitions a query by returning partition cursors that can be used to run the
query in parallel. The returned partition cursors are split points that can be
used as starting/end points for the query results.

Args:
partition_count (int): The desired maximum number of partition points. The
number must be strictly positive. The actual number of partitions
returned may be fewer.
"""
self._validate_partition_query()
query = AsyncQuery(
self._parent,
orders=self._PARTITION_QUERY_ORDER,
start_at=self._start_at,
end_at=self._end_at,
all_descendants=self._all_descendants,
)

parent_path, expected_prefix = self._parent._parent_info()
pager = await self._client._firestore_api.partition_query(
request={
"parent": parent_path,
"structured_query": query._to_protobuf(),
"partition_count": partition_count,
},
metadata=self._client._rpc_metadata,
)

start_at = None
async for cursor_pb in pager:
cursor = self._client.document(cursor_pb.values[0].reference_value)
yield QueryPartition(self, start_at, cursor)
start_at = cursor

yield QueryPartition(self, start_at, None)
112 changes: 112 additions & 0 deletions google/cloud/firestore_v1/base_query.py
Expand Up @@ -1020,3 +1020,115 @@ def _collection_group_query_response_to_snapshot(
update_time=response_pb._pb.document.update_time,
)
return snapshot


class BaseCollectionGroup(BaseQuery):
"""Represents a Collection Group in the Firestore API.

This is a specialization of :class:`.Query` that includes all documents in the
database that are contained in a collection or subcollection of the given
parent.

Args:
parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
The collection that this query applies to.
"""

_PARTITION_QUERY_ORDER = (
BaseQuery._make_order(
field_path_module.FieldPath.document_id(), BaseQuery.ASCENDING,
),
)

def __init__(
self,
parent,
projection=None,
field_filters=(),
orders=(),
limit=None,
limit_to_last=False,
offset=None,
start_at=None,
end_at=None,
all_descendants=True,
) -> None:
if not all_descendants:
raise ValueError("all_descendants must be True for collection group query.")

super(BaseCollectionGroup, self).__init__(
parent=parent,
projection=projection,
field_filters=field_filters,
orders=orders,
limit=limit,
limit_to_last=limit_to_last,
offset=offset,
start_at=start_at,
end_at=end_at,
all_descendants=all_descendants,
)

def _validate_partition_query(self):
if self._field_filters:
raise ValueError("Can't partition query with filters.")

if self._projection:
raise ValueError("Can't partition query with projection.")

if self._limit:
raise ValueError("Can't partition query with limit.")

if self._offset:
raise ValueError("Can't partition query with offset.")


class QueryPartition:
"""Represents a bounded partition of a collection group query.

Contains cursors that can be used in a query as a starting and/or end point for the
collection group query. The cursors may only be used in a query that matches the
constraints of the query that produced this partition.

Args:
query (BaseQuery): The original query that this is a partition of.
start_at (Optional[~google.cloud.firestore_v1.document.DocumentSnapshot]):
Cursor for first query result to include. If `None`, the partition starts at
the beginning of the result set.
end_at (Optional[~google.cloud.firestore_v1.document.DocumentSnapshot]):
Cursor for first query result after the last result included in the
partition. If `None`, the partition runs to the end of the result set.

"""

def __init__(self, query, start_at, end_at):
self._query = query
self._start_at = start_at
self._end_at = end_at

@property
def start_at(self):
return self._start_at

@property
def end_at(self):
return self._end_at

def query(self):
"""Generate a new query using this partition's bounds.

Returns:
BaseQuery: Copy of the original query with start and end bounds set by the
cursors from this partition.
"""
query = self._query
start_at = ([self.start_at], True) if self.start_at else None
end_at = ([self.end_at], True) if self.end_at else None

return type(query)(
query._parent,
all_descendants=query._all_descendants,
orders=query._PARTITION_QUERY_ORDER,
start_at=start_at,
end_at=end_at,
)
10 changes: 4 additions & 6 deletions google/cloud/firestore_v1/client.py
Expand Up @@ -35,7 +35,7 @@
)

from google.cloud.firestore_v1 import _helpers
from google.cloud.firestore_v1.query import Query
from google.cloud.firestore_v1.query import CollectionGroup
from google.cloud.firestore_v1.batch import WriteBatch
from google.cloud.firestore_v1.collection import CollectionReference
from google.cloud.firestore_v1.document import DocumentReference
Expand Down Expand Up @@ -145,7 +145,7 @@ def collection(self, *collection_path) -> CollectionReference:
"""
return CollectionReference(*_path_helper(collection_path), client=self)

def collection_group(self, collection_id) -> Query:
def collection_group(self, collection_id) -> CollectionGroup:
"""
Creates and returns a new Query that includes all documents in the
database that are contained in a collection or subcollection with the
Expand All @@ -162,12 +162,10 @@ def collection_group(self, collection_id) -> Query:
path will be included. Cannot contain a slash.

Returns:
:class:`~google.cloud.firestore_v1.query.Query`:
:class:`~google.cloud.firestore_v1.query.CollectionGroup`:
The created Query.
"""
return Query(
self._get_collection_reference(collection_id), all_descendants=True
)
return CollectionGroup(self._get_collection_reference(collection_id))

def document(self, *document_path) -> DocumentReference:
"""Get a reference to a document in a collection.
Expand Down
80 changes: 80 additions & 0 deletions google/cloud/firestore_v1/query.py
Expand Up @@ -19,7 +19,9 @@
a more common way to create a query than direct usage of the constructor.
"""
from google.cloud.firestore_v1.base_query import (
BaseCollectionGroup,
BaseQuery,
QueryPartition,
_query_response_to_snapshot,
_collection_group_query_response_to_snapshot,
_enum_from_direction,
Expand Down Expand Up @@ -239,3 +241,81 @@ def on_snapshot(docs, changes, read_time):
return Watch.for_query(
self, callback, document.DocumentSnapshot, document.DocumentReference
)


class CollectionGroup(Query, BaseCollectionGroup):
"""Represents a Collection Group in the Firestore API.

This is a specialization of :class:`.Query` that includes all documents in the
database that are contained in a collection or subcollection of the given
parent.

Args:
parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
The collection that this query applies to.
"""

def __init__(
self,
parent,
projection=None,
field_filters=(),
orders=(),
limit=None,
limit_to_last=False,
offset=None,
start_at=None,
end_at=None,
all_descendants=True,
) -> None:
super(CollectionGroup, self).__init__(
parent=parent,
projection=projection,
field_filters=field_filters,
orders=orders,
limit=limit,
limit_to_last=limit_to_last,
offset=offset,
start_at=start_at,
end_at=end_at,
all_descendants=all_descendants,
)

def get_partitions(self, partition_count) -> Generator[QueryPartition, None, None]:
"""Partition a query for parallelization.

Partitions a query by returning partition cursors that can be used to run the
query in parallel. The returned partition cursors are split points that can be
used as starting/end points for the query results.

Args:
partition_count (int): The desired maximum number of partition points. The
number must be strictly positive. The actual number of partitions
returned may be fewer.
"""
self._validate_partition_query()
query = Query(
self._parent,
orders=self._PARTITION_QUERY_ORDER,
start_at=self._start_at,
end_at=self._end_at,
all_descendants=self._all_descendants,
)

parent_path, expected_prefix = self._parent._parent_info()
pager = self._client._firestore_api.partition_query(
request={
"parent": parent_path,
"structured_query": query._to_protobuf(),
"partition_count": partition_count,
},
metadata=self._client._rpc_metadata,
)

start_at = None
for cursor_pb in pager:
cursor = self._client.document(cursor_pb.values[0].reference_value)
yield QueryPartition(self, start_at, cursor)
start_at = cursor

yield QueryPartition(self, start_at, None)