/
collection.py
245 lines (200 loc) · 9.64 KB
/
collection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# Copyright 2017 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Classes for representing collections for the Google Cloud Firestore API."""
from google.api_core import gapic_v1 # type: ignore
from google.api_core import retry as retries # type: ignore
from google.cloud.firestore_v1.base_collection import (
BaseCollectionReference,
_item_to_document_ref,
)
from google.cloud.firestore_v1 import query as query_mod
from google.cloud.firestore_v1.watch import Watch
from google.cloud.firestore_v1 import document
from typing import Any, Callable, Generator, Tuple
# Types needed only for Type Hints
from google.cloud.firestore_v1.transaction import Transaction
class CollectionReference(BaseCollectionReference):
"""A reference to a collection in a Firestore database.
The collection may already exist or this class can facilitate creation
of documents within the collection.
Args:
path (Tuple[str, ...]): The components in the collection path.
This is a series of strings representing each collection and
sub-collection ID, as well as the document IDs for any documents
that contain a sub-collection.
kwargs (dict): The keyword arguments for the constructor. The only
supported keyword is ``client`` and it must be a
:class:`~google.cloud.firestore_v1.client.Client` if provided. It
represents the client that created this collection reference.
Raises:
ValueError: if
* the ``path`` is empty
* there are an even number of elements
* a collection ID in ``path`` is not a string
* a document ID in ``path`` is not a string
TypeError: If a keyword other than ``client`` is used.
"""
def __init__(self, *path, **kwargs) -> None:
super(CollectionReference, self).__init__(*path, **kwargs)
def _query(self) -> query_mod.Query:
"""Query factory.
Returns:
:class:`~google.cloud.firestore_v1.query.Query`
"""
return query_mod.Query(self)
def add(
self,
document_data: dict,
document_id: str = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
) -> Tuple[Any, Any]:
"""Create a document in the Firestore database with the provided data.
Args:
document_data (dict): Property names and values to use for
creating the document.
document_id (Optional[str]): The document identifier within the
current collection. If not provided, an ID will be
automatically assigned by the server (the assigned ID will be
a random 20 character string composed of digits,
uppercase and lowercase letters).
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
Returns:
Tuple[:class:`google.protobuf.timestamp_pb2.Timestamp`, \
:class:`~google.cloud.firestore_v1.document.DocumentReference`]:
Pair of
* The ``update_time`` when the document was created/overwritten.
* A document reference for the created document.
Raises:
~google.cloud.exceptions.Conflict: If ``document_id`` is provided
and the document already exists.
"""
document_ref, kwargs = self._prep_add(
document_data, document_id, retry, timeout,
)
write_result = document_ref.create(document_data, **kwargs)
return write_result.update_time, document_ref
def list_documents(
self,
page_size: int = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
) -> Generator[Any, Any, None]:
"""List all subdocuments of the current collection.
Args:
page_size (Optional[int]]): The maximum number of documents
in each page of results from this request. Non-positive values
are ignored. Defaults to a sensible value set by the API.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
Returns:
Sequence[:class:`~google.cloud.firestore_v1.collection.DocumentReference`]:
iterator of subdocuments of the current collection. If the
collection does not exist at the time of `snapshot`, the
iterator will be empty
"""
request, kwargs = self._prep_list_documents(page_size, retry, timeout)
iterator = self._client._firestore_api.list_documents(
request=request, metadata=self._client._rpc_metadata, **kwargs,
)
return (_item_to_document_ref(self, i) for i in iterator)
def _chunkify(self, chunk_size: int):
return self._query()._chunkify(chunk_size)
def get(
self,
transaction: Transaction = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
) -> list:
"""Read the documents in this collection.
This sends a ``RunQuery`` RPC and returns a list of documents
returned in the stream of ``RunQueryResponse`` messages.
Args:
transaction
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
If a ``transaction`` is used and it already has write operations
added, this method cannot be used (i.e. read-after-write is not
allowed).
Returns:
list: The documents in this collection that match the query.
"""
query, kwargs = self._prep_get_or_stream(retry, timeout)
return query.get(transaction=transaction, **kwargs)
def stream(
self,
transaction: Transaction = None,
retry: retries.Retry = gapic_v1.method.DEFAULT,
timeout: float = None,
) -> Generator[document.DocumentSnapshot, Any, None]:
"""Read the documents in this collection.
This sends a ``RunQuery`` RPC and then returns an iterator which
consumes each document returned in the stream of ``RunQueryResponse``
messages.
.. note::
The underlying stream of responses will time out after
the ``max_rpc_timeout_millis`` value set in the GAPIC
client configuration for the ``RunQuery`` API. Snapshots
not consumed from the iterator before that point will be lost.
If a ``transaction`` is used and it already has write operations
added, this method cannot be used (i.e. read-after-write is not
allowed).
Args:
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.\
Transaction`]):
An existing transaction that the query will run in.
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
Yields:
:class:`~google.cloud.firestore_v1.document.DocumentSnapshot`:
The next document that fulfills the query.
"""
query, kwargs = self._prep_get_or_stream(retry, timeout)
return query.stream(transaction=transaction, **kwargs)
def on_snapshot(self, callback: Callable) -> Watch:
"""Monitor the documents in this collection.
This starts a watch on this collection using a background thread. The
provided callback is run on the snapshot of the documents.
Args:
callback (Callable[[:class:`~google.cloud.firestore.collection.CollectionSnapshot`], NoneType]):
a callback to run when a change occurs.
Example:
from google.cloud import firestore_v1
db = firestore_v1.Client()
collection_ref = db.collection(u'users')
def on_snapshot(collection_snapshot, changes, read_time):
for doc in collection_snapshot.documents:
print(u'{} => {}'.format(doc.id, doc.to_dict()))
# Watch this collection
collection_watch = collection_ref.on_snapshot(on_snapshot)
# Terminate this watch
collection_watch.unsubscribe()
"""
return Watch.for_query(
self._query(),
callback,
document.DocumentSnapshot,
document.DocumentReference,
)