/
async_query.py
360 lines (308 loc) · 13.7 KB
/
async_query.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
# Copyright 2020 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Classes for representing queries for the Google Cloud Firestore API.
A :class:`~google.cloud.firestore_v1.query.Query` can be created directly from
a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be
a more common way to create a query than direct usage of the constructor.
"""
from google.api_core import gapic_v1 # type: ignore
from google.api_core import retry as retries # type: ignore
from google.cloud import firestore_v1
from google.cloud.firestore_v1.base_query import (
BaseCollectionGroup,
BaseQuery,
QueryPartition,
_query_response_to_snapshot,
_collection_group_query_response_to_snapshot,
_enum_from_direction,
)
from google.cloud.firestore_v1 import async_document
from google.cloud.firestore_v1.base_document import DocumentSnapshot
from typing import AsyncGenerator, List, Optional, Type
# Types needed only for Type Hints
from google.cloud.firestore_v1.transaction import Transaction
class AsyncQuery(BaseQuery):
"""Represents a query to the Firestore API.
Instances of this class are considered immutable: all methods that
would modify an instance instead return a new instance.
Args:
parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
The collection that this query applies to.
projection (Optional[:class:`google.cloud.proto.firestore.v1.\
query.StructuredQuery.Projection`]):
A projection of document fields to limit the query results to.
field_filters (Optional[Tuple[:class:`google.cloud.proto.firestore.v1.\
query.StructuredQuery.FieldFilter`, ...]]):
The filters to be applied in the query.
orders (Optional[Tuple[:class:`google.cloud.proto.firestore.v1.\
query.StructuredQuery.Order`, ...]]):
The "order by" entries to use in the query.
limit (Optional[int]):
The maximum number of documents the query is allowed to return.
offset (Optional[int]):
The number of results to skip.
start_at (Optional[Tuple[dict, bool]]):
Two-tuple of :
* a mapping of fields. Any field that is present in this mapping
must also be present in ``orders``
* an ``after`` flag
The fields and the flag combine to form a cursor used as
a starting point in a query result set. If the ``after``
flag is :data:`True`, the results will start just after any
documents which have fields matching the cursor, otherwise
any matching documents will be included in the result set.
When the query is formed, the document values
will be used in the order given by ``orders``.
end_at (Optional[Tuple[dict, bool]]):
Two-tuple of:
* a mapping of fields. Any field that is present in this mapping
must also be present in ``orders``
* a ``before`` flag
The fields and the flag combine to form a cursor used as
an ending point in a query result set. If the ``before``
flag is :data:`True`, the results will end just before any
documents which have fields matching the cursor, otherwise
any matching documents will be included in the result set.
When the query is formed, the document values
will be used in the order given by ``orders``.
all_descendants (Optional[bool]):
When false, selects only collections that are immediate children
of the `parent` specified in the containing `RunQueryRequest`.
When true, selects all descendant collections.
recursive (Optional[bool]):
When true, returns all documents and all documents in any subcollections
below them. Defaults to false.
"""
def __init__(
self,
parent,
projection=None,
field_filters=(),
orders=(),
limit=None,
limit_to_last=False,
offset=None,
start_at=None,
end_at=None,
all_descendants=False,
recursive=False,
) -> None:
super(AsyncQuery, self).__init__(
parent=parent,
projection=projection,
field_filters=field_filters,
orders=orders,
limit=limit,
limit_to_last=limit_to_last,
offset=offset,
start_at=start_at,
end_at=end_at,
all_descendants=all_descendants,
recursive=recursive,
)
async def _chunkify(
self, chunk_size: int
) -> AsyncGenerator[List[DocumentSnapshot], None]:
# Catch the edge case where a developer writes the following:
# `my_query.limit(500)._chunkify(1000)`, which ultimately nullifies any
# need to yield chunks.
if self._limit and chunk_size > self._limit:
yield await self.get()
return
max_to_return: Optional[int] = self._limit
num_returned: int = 0
original: AsyncQuery = self._copy()
last_document: Optional[DocumentSnapshot] = None
while True:
# Optionally trim the `chunk_size` down to honor a previously
# applied limit as set by `self.limit()`
_chunk_size: int = original._resolve_chunk_size(num_returned, chunk_size)
# Apply the optionally pruned limit and the cursor, if we are past
# the first page.
_q = original.limit(_chunk_size)
if last_document:
_q = _q.start_after(last_document)
snapshots = await _q.get()
last_document = snapshots[-1]
num_returned += len(snapshots)
yield snapshots
# Terminate the iterator if we have reached either of two end
# conditions:
# 1. There are no more documents, or
# 2. We have reached the desired overall limit
if len(snapshots) < _chunk_size or (
max_to_return and num_returned >= max_to_return
):
return
async def get(
self,
transaction: Transaction = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
) -> list:
"""Read the documents in the collection that match this query.
This sends a ``RunQuery`` RPC and returns a list of documents
returned in the stream of ``RunQueryResponse`` messages.
Args:
transaction
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
If a ``transaction`` is used and it already has write operations
added, this method cannot be used (i.e. read-after-write is not
allowed).
Returns:
list: The documents in the collection that match this query.
"""
is_limited_to_last = self._limit_to_last
if self._limit_to_last:
# In order to fetch up to `self._limit` results from the end of the
# query flip the defined ordering on the query to start from the
# end, retrieving up to `self._limit` results from the backend.
for order in self._orders:
order.direction = _enum_from_direction(
self.DESCENDING
if order.direction == self.ASCENDING
else self.ASCENDING
)
self._limit_to_last = False
result = self.stream(transaction=transaction, retry=retry, timeout=timeout)
result = [d async for d in result]
if is_limited_to_last:
result = list(reversed(result))
return result
async def stream(
self,
transaction=None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
) -> AsyncGenerator[async_document.DocumentSnapshot, None]:
"""Read the documents in the collection that match this query.
This sends a ``RunQuery`` RPC and then returns an iterator which
consumes each document returned in the stream of ``RunQueryResponse``
messages.
.. note::
The underlying stream of responses will time out after
the ``max_rpc_timeout_millis`` value set in the GAPIC
client configuration for the ``RunQuery`` API. Snapshots
not consumed from the iterator before that point will be lost.
If a ``transaction`` is used and it already has write operations
added, this method cannot be used (i.e. read-after-write is not
allowed).
Args:
transaction
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
Yields:
:class:`~google.cloud.firestore_v1.async_document.DocumentSnapshot`:
The next document that fulfills the query.
"""
request, expected_prefix, kwargs = self._prep_stream(
transaction, retry, timeout,
)
response_iterator = await self._client._firestore_api.run_query(
request=request, metadata=self._client._rpc_metadata, **kwargs,
)
async for response in response_iterator:
if self._all_descendants:
snapshot = _collection_group_query_response_to_snapshot(
response, self._parent
)
else:
snapshot = _query_response_to_snapshot(
response, self._parent, expected_prefix
)
if snapshot is not None:
yield snapshot
@staticmethod
def _get_collection_reference_class() -> Type[
"firestore_v1.async_collection.AsyncCollectionReference"
]:
from google.cloud.firestore_v1.async_collection import AsyncCollectionReference
return AsyncCollectionReference
class AsyncCollectionGroup(AsyncQuery, BaseCollectionGroup):
"""Represents a Collection Group in the Firestore API.
This is a specialization of :class:`.AsyncQuery` that includes all documents in the
database that are contained in a collection or subcollection of the given
parent.
Args:
parent (:class:`~google.cloud.firestore_v1.collection.CollectionReference`):
The collection that this query applies to.
"""
def __init__(
self,
parent,
projection=None,
field_filters=(),
orders=(),
limit=None,
limit_to_last=False,
offset=None,
start_at=None,
end_at=None,
all_descendants=True,
recursive=False,
) -> None:
super(AsyncCollectionGroup, self).__init__(
parent=parent,
projection=projection,
field_filters=field_filters,
orders=orders,
limit=limit,
limit_to_last=limit_to_last,
offset=offset,
start_at=start_at,
end_at=end_at,
all_descendants=all_descendants,
recursive=recursive,
)
@staticmethod
def _get_query_class():
return AsyncQuery
async def get_partitions(
self,
partition_count,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
) -> AsyncGenerator[QueryPartition, None]:
"""Partition a query for parallelization.
Partitions a query by returning partition cursors that can be used to run the
query in parallel. The returned partition cursors are split points that can be
used as starting/end points for the query results.
Args:
partition_count (int): The desired maximum number of partition points. The
number must be strictly positive. The actual number of partitions
returned may be fewer.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
"""
request, kwargs = self._prep_get_partitions(partition_count, retry, timeout)
pager = await self._client._firestore_api.partition_query(
request=request, metadata=self._client._rpc_metadata, **kwargs,
)
start_at = None
async for cursor_pb in pager:
cursor = self._client.document(cursor_pb.values[0].reference_value)
yield QueryPartition(self, start_at, cursor)
start_at = cursor
yield QueryPartition(self, start_at, None)