Skip to content

Commit

Permalink
feat: partition queries (#210)
Browse files Browse the repository at this point in the history
Implement the new partition queries feature for Firestore.
  • Loading branch information
Chris Rossi committed Oct 6, 2020
1 parent 1fb3914 commit 4f75a75
Show file tree
Hide file tree
Showing 12 changed files with 739 additions and 50 deletions.
2 changes: 2 additions & 0 deletions google/cloud/firestore.py
Expand Up @@ -26,6 +26,7 @@
from google.cloud.firestore_v1 import AsyncTransaction
from google.cloud.firestore_v1 import AsyncWriteBatch
from google.cloud.firestore_v1 import Client
from google.cloud.firestore_v1 import CollectionGroup
from google.cloud.firestore_v1 import CollectionReference
from google.cloud.firestore_v1 import DELETE_FIELD
from google.cloud.firestore_v1 import DocumentReference
Expand Down Expand Up @@ -61,6 +62,7 @@
"AsyncTransaction",
"AsyncWriteBatch",
"Client",
"CollectionGroup",
"CollectionReference",
"DELETE_FIELD",
"DocumentReference",
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/firestore_v1/__init__.py
Expand Up @@ -40,6 +40,7 @@
from google.cloud.firestore_v1.client import Client
from google.cloud.firestore_v1.collection import CollectionReference
from google.cloud.firestore_v1.document import DocumentReference
from google.cloud.firestore_v1.query import CollectionGroup
from google.cloud.firestore_v1.query import Query
from google.cloud.firestore_v1.transaction import Transaction
from google.cloud.firestore_v1.transaction import transactional
Expand Down Expand Up @@ -115,6 +116,7 @@
"AsyncTransaction",
"AsyncWriteBatch",
"Client",
"CollectionGroup",
"CollectionReference",
"DELETE_FIELD",
"DocumentReference",
Expand Down
10 changes: 4 additions & 6 deletions google/cloud/firestore_v1/async_client.py
Expand Up @@ -35,7 +35,7 @@
)

from google.cloud.firestore_v1 import _helpers
from google.cloud.firestore_v1.async_query import AsyncQuery
from google.cloud.firestore_v1.async_query import AsyncCollectionGroup
from google.cloud.firestore_v1.async_batch import AsyncWriteBatch
from google.cloud.firestore_v1.async_collection import AsyncCollectionReference
from google.cloud.firestore_v1.async_document import (
Expand Down Expand Up @@ -150,7 +150,7 @@ def collection(self, *collection_path) -> AsyncCollectionReference:
"""
return AsyncCollectionReference(*_path_helper(collection_path), client=self)

def collection_group(self, collection_id) -> AsyncQuery:
def collection_group(self, collection_id) -> AsyncCollectionGroup:
"""
Creates and returns a new AsyncQuery that includes all documents in the
database that are contained in a collection or subcollection with the
Expand All @@ -167,12 +167,10 @@ def collection_group(self, collection_id) -> AsyncQuery:
path will be included. Cannot contain a slash.
Returns:
:class:`~google.cloud.firestore_v1.async_query.AsyncQuery`:
:class:`~google.cloud.firestore_v1.async_query.AsyncCollectionGroup`:
The created AsyncQuery.
"""
return AsyncQuery(
self._get_collection_reference(collection_id), all_descendants=True
)
return AsyncCollectionGroup(self._get_collection_reference(collection_id))

def document(self, *document_path) -> AsyncDocumentReference:
"""Get a reference to a document in a collection.
Expand Down
82 changes: 82 additions & 0 deletions google/cloud/firestore_v1/async_query.py
Expand Up @@ -19,7 +19,9 @@
a more common way to create a query than direct usage of the constructor.
"""
from google.cloud.firestore_v1.base_query import (
BaseCollectionGroup,
BaseQuery,
QueryPartition,
_query_response_to_snapshot,
_collection_group_query_response_to_snapshot,
_enum_from_direction,
Expand Down Expand Up @@ -207,3 +209,83 @@ async def stream(
)
if snapshot is not None:
yield snapshot


class AsyncCollectionGroup(AsyncQuery, BaseCollectionGroup):
"""Represents a Collection Group in the Firestore API.
This is a specialization of :class:`.AsyncQuery` that includes all documents in the
database that are contained in a collection or subcollection of the given
parent.
Args:
parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
The collection that this query applies to.
"""

def __init__(
self,
parent,
projection=None,
field_filters=(),
orders=(),
limit=None,
limit_to_last=False,
offset=None,
start_at=None,
end_at=None,
all_descendants=True,
) -> None:
super(AsyncCollectionGroup, self).__init__(
parent=parent,
projection=projection,
field_filters=field_filters,
orders=orders,
limit=limit,
limit_to_last=limit_to_last,
offset=offset,
start_at=start_at,
end_at=end_at,
all_descendants=all_descendants,
)

async def get_partitions(
self, partition_count
) -> AsyncGenerator[QueryPartition, None]:
"""Partition a query for parallelization.
Partitions a query by returning partition cursors that can be used to run the
query in parallel. The returned partition cursors are split points that can be
used as starting/end points for the query results.
Args:
partition_count (int): The desired maximum number of partition points. The
number must be strictly positive. The actual number of partitions
returned may be fewer.
"""
self._validate_partition_query()
query = AsyncQuery(
self._parent,
orders=self._PARTITION_QUERY_ORDER,
start_at=self._start_at,
end_at=self._end_at,
all_descendants=self._all_descendants,
)

parent_path, expected_prefix = self._parent._parent_info()
pager = await self._client._firestore_api.partition_query(
request={
"parent": parent_path,
"structured_query": query._to_protobuf(),
"partition_count": partition_count,
},
metadata=self._client._rpc_metadata,
)

start_at = None
async for cursor_pb in pager:
cursor = self._client.document(cursor_pb.values[0].reference_value)
yield QueryPartition(self, start_at, cursor)
start_at = cursor

yield QueryPartition(self, start_at, None)
112 changes: 112 additions & 0 deletions google/cloud/firestore_v1/base_query.py
Expand Up @@ -1020,3 +1020,115 @@ def _collection_group_query_response_to_snapshot(
update_time=response_pb._pb.document.update_time,
)
return snapshot


class BaseCollectionGroup(BaseQuery):
"""Represents a Collection Group in the Firestore API.
This is a specialization of :class:`.Query` that includes all documents in the
database that are contained in a collection or subcollection of the given
parent.
Args:
parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
The collection that this query applies to.
"""

_PARTITION_QUERY_ORDER = (
BaseQuery._make_order(
field_path_module.FieldPath.document_id(), BaseQuery.ASCENDING,
),
)

def __init__(
self,
parent,
projection=None,
field_filters=(),
orders=(),
limit=None,
limit_to_last=False,
offset=None,
start_at=None,
end_at=None,
all_descendants=True,
) -> None:
if not all_descendants:
raise ValueError("all_descendants must be True for collection group query.")

super(BaseCollectionGroup, self).__init__(
parent=parent,
projection=projection,
field_filters=field_filters,
orders=orders,
limit=limit,
limit_to_last=limit_to_last,
offset=offset,
start_at=start_at,
end_at=end_at,
all_descendants=all_descendants,
)

def _validate_partition_query(self):
if self._field_filters:
raise ValueError("Can't partition query with filters.")

if self._projection:
raise ValueError("Can't partition query with projection.")

if self._limit:
raise ValueError("Can't partition query with limit.")

if self._offset:
raise ValueError("Can't partition query with offset.")


class QueryPartition:
"""Represents a bounded partition of a collection group query.
Contains cursors that can be used in a query as a starting and/or end point for the
collection group query. The cursors may only be used in a query that matches the
constraints of the query that produced this partition.
Args:
query (BaseQuery): The original query that this is a partition of.
start_at (Optional[~google.cloud.firestore_v1.document.DocumentSnapshot]):
Cursor for first query result to include. If `None`, the partition starts at
the beginning of the result set.
end_at (Optional[~google.cloud.firestore_v1.document.DocumentSnapshot]):
Cursor for first query result after the last result included in the
partition. If `None`, the partition runs to the end of the result set.
"""

def __init__(self, query, start_at, end_at):
self._query = query
self._start_at = start_at
self._end_at = end_at

@property
def start_at(self):
return self._start_at

@property
def end_at(self):
return self._end_at

def query(self):
"""Generate a new query using this partition's bounds.
Returns:
BaseQuery: Copy of the original query with start and end bounds set by the
cursors from this partition.
"""
query = self._query
start_at = ([self.start_at], True) if self.start_at else None
end_at = ([self.end_at], True) if self.end_at else None

return type(query)(
query._parent,
all_descendants=query._all_descendants,
orders=query._PARTITION_QUERY_ORDER,
start_at=start_at,
end_at=end_at,
)
10 changes: 4 additions & 6 deletions google/cloud/firestore_v1/client.py
Expand Up @@ -35,7 +35,7 @@
)

from google.cloud.firestore_v1 import _helpers
from google.cloud.firestore_v1.query import Query
from google.cloud.firestore_v1.query import CollectionGroup
from google.cloud.firestore_v1.batch import WriteBatch
from google.cloud.firestore_v1.collection import CollectionReference
from google.cloud.firestore_v1.document import DocumentReference
Expand Down Expand Up @@ -145,7 +145,7 @@ def collection(self, *collection_path) -> CollectionReference:
"""
return CollectionReference(*_path_helper(collection_path), client=self)

def collection_group(self, collection_id) -> Query:
def collection_group(self, collection_id) -> CollectionGroup:
"""
Creates and returns a new Query that includes all documents in the
database that are contained in a collection or subcollection with the
Expand All @@ -162,12 +162,10 @@ def collection_group(self, collection_id) -> Query:
path will be included. Cannot contain a slash.
Returns:
:class:`~google.cloud.firestore_v1.query.Query`:
:class:`~google.cloud.firestore_v1.query.CollectionGroup`:
The created Query.
"""
return Query(
self._get_collection_reference(collection_id), all_descendants=True
)
return CollectionGroup(self._get_collection_reference(collection_id))

def document(self, *document_path) -> DocumentReference:
"""Get a reference to a document in a collection.
Expand Down
80 changes: 80 additions & 0 deletions google/cloud/firestore_v1/query.py
Expand Up @@ -19,7 +19,9 @@
a more common way to create a query than direct usage of the constructor.
"""
from google.cloud.firestore_v1.base_query import (
BaseCollectionGroup,
BaseQuery,
QueryPartition,
_query_response_to_snapshot,
_collection_group_query_response_to_snapshot,
_enum_from_direction,
Expand Down Expand Up @@ -239,3 +241,81 @@ def on_snapshot(docs, changes, read_time):
return Watch.for_query(
self, callback, document.DocumentSnapshot, document.DocumentReference
)


class CollectionGroup(Query, BaseCollectionGroup):
"""Represents a Collection Group in the Firestore API.
This is a specialization of :class:`.Query` that includes all documents in the
database that are contained in a collection or subcollection of the given
parent.
Args:
parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
The collection that this query applies to.
"""

def __init__(
self,
parent,
projection=None,
field_filters=(),
orders=(),
limit=None,
limit_to_last=False,
offset=None,
start_at=None,
end_at=None,
all_descendants=True,
) -> None:
super(CollectionGroup, self).__init__(
parent=parent,
projection=projection,
field_filters=field_filters,
orders=orders,
limit=limit,
limit_to_last=limit_to_last,
offset=offset,
start_at=start_at,
end_at=end_at,
all_descendants=all_descendants,
)

def get_partitions(self, partition_count) -> Generator[QueryPartition, None, None]:
"""Partition a query for parallelization.
Partitions a query by returning partition cursors that can be used to run the
query in parallel. The returned partition cursors are split points that can be
used as starting/end points for the query results.
Args:
partition_count (int): The desired maximum number of partition points. The
number must be strictly positive. The actual number of partitions
returned may be fewer.
"""
self._validate_partition_query()
query = Query(
self._parent,
orders=self._PARTITION_QUERY_ORDER,
start_at=self._start_at,
end_at=self._end_at,
all_descendants=self._all_descendants,
)

parent_path, expected_prefix = self._parent._parent_info()
pager = self._client._firestore_api.partition_query(
request={
"parent": parent_path,
"structured_query": query._to_protobuf(),
"partition_count": partition_count,
},
metadata=self._client._rpc_metadata,
)

start_at = None
for cursor_pb in pager:
cursor = self._client.document(cursor_pb.values[0].reference_value)
yield QueryPartition(self, start_at, cursor)
start_at = cursor

yield QueryPartition(self, start_at, None)

0 comments on commit 4f75a75

Please sign in to comment.