/
writer.py
404 lines (338 loc) · 15.1 KB
/
writer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
import itertools
import logging
import queue
import time
import threading
from typing import Callable, Optional, Sequence, Tuple
from google.api_core import bidi
from google.api_core.future import polling as polling_future
from google.api_core import exceptions
import google.api_core.retry
import grpc
from google.cloud.bigquery_storage_v1beta2 import exceptions as bqstorage_exceptions
from google.cloud.bigquery_storage_v1beta2 import types as gapic_types
from google.cloud.bigquery_storage_v1beta2.services import big_query_write
_LOGGER = logging.getLogger(__name__)
_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"
# open() takes between 0.25 and 0.4 seconds to be ready. Wait each loop before
# checking again. This interval was chosen to result in about 3 loops.
_WRITE_OPEN_INTERVAL = 0.08
# Use a default timeout that is quite long to avoid potential infinite loops,
# but still work for all expected requests
_DEFAULT_TIMEOUT = 600
def _wrap_as_exception(maybe_exception) -> Exception:
"""Wrap an object as a Python exception, if needed.
Args:
maybe_exception (Any): The object to wrap, usually a gRPC exception class.
Returns:
The argument itself if an instance of ``BaseException``, otherwise
the argument represented as an instance of ``Exception`` (sub)class.
"""
if isinstance(maybe_exception, grpc.RpcError):
return exceptions.from_grpc_error(maybe_exception)
elif isinstance(maybe_exception, BaseException):
return maybe_exception
return Exception(maybe_exception)
class AppendRowsStream(object):
"""A manager object which can append rows to a stream."""
def __init__(
self,
client: big_query_write.BigQueryWriteClient,
initial_request_template: gapic_types.AppendRowsRequest,
metadata: Sequence[Tuple[str, str]] = (),
):
"""Construct a stream manager.
Args:
client:
Client responsible for making requests.
initial_request_template:
Data to include in the first request sent to the stream. This
must contain
:attr:`google.cloud.bigquery_storage_v1beta2.types.AppendRowsRequest.write_stream`
and
:attr:`google.cloud.bigquery_storage_v1beta2.types.AppendRowsRequest.ProtoData.writer_schema`.
metadata:
Extra headers to include when sending the streaming request.
"""
self._client = client
self._closing = threading.Lock()
self._closed = False
self._close_callbacks = []
self._futures_queue = queue.Queue()
self._inital_request_template = initial_request_template
self._metadata = metadata
self._rpc = None
self._stream_name = None
# The threads created in ``.open()``.
self._consumer = None
@property
def is_active(self) -> bool:
"""bool: True if this manager is actively streaming.
Note that ``False`` does not indicate this is complete shut down,
just that it stopped getting new messages.
"""
return self._consumer is not None and self._consumer.is_active
def add_close_callback(self, callback: Callable):
"""Schedules a callable when the manager closes.
Args:
callback (Callable): The method to call.
"""
self._close_callbacks.append(callback)
def open(
self,
initial_request: gapic_types.AppendRowsRequest,
timeout: float = _DEFAULT_TIMEOUT,
) -> "AppendRowsFuture":
"""Open an append rows stream.
This is automatically called by the first call to the
:attr:`google.cloud.bigquery_storage_v1beta2.writer.AppendRowsStream.send`
method.
Args:
initial_request:
The initial request to start the stream. Must have
:attr:`google.cloud.bigquery_storage_v1beta2.types.AppendRowsRequest.write_stream`
and ``proto_rows.writer_schema.proto_descriptor`` and
properties populated.
timeout:
How long (in seconds) to wait for the stream to be ready.
Returns:
A future, which can be used to process the response to the initial
request when it arrives.
"""
if self.is_active:
raise ValueError("This manager is already open.")
if self._closed:
raise bqstorage_exceptions.StreamClosedError(
"This manager has been closed and can not be re-used."
)
start_time = time.monotonic()
request = gapic_types.AppendRowsRequest()
gapic_types.AppendRowsRequest.copy_from(request, self._inital_request_template)
request._pb.MergeFrom(initial_request._pb)
self._stream_name = request.write_stream
inital_response_future = AppendRowsFuture(self)
self._futures_queue.put(inital_response_future)
self._rpc = bidi.BidiRpc(
self._client.append_rows,
initial_request=request,
# TODO: pass in retry and timeout. Blocked by
# https://github.com/googleapis/python-api-core/issues/262
metadata=tuple(
itertools.chain(
self._metadata,
# This header is required so that the BigQuery Storage API
# knows which region to route the request to.
(("x-goog-request-params", f"write_stream={self._stream_name}"),),
)
),
)
self._rpc.add_done_callback(self._on_rpc_done)
self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response)
self._consumer.start()
# Make sure RPC has started before returning.
# Without this, consumers may get:
#
# ValueError: Can not send() on an RPC that has never been open()ed.
#
# when they try to send a request.
while not self._rpc.is_active and self._consumer.is_active:
# Avoid 100% CPU while waiting for RPC to be ready.
time.sleep(_WRITE_OPEN_INTERVAL)
# TODO: Check retry.deadline instead of (per-request) timeout.
# Blocked by
# https://github.com/googleapis/python-api-core/issues/262
if timeout is None:
continue
current_time = time.monotonic()
if current_time - start_time > timeout:
break
# Something went wrong when opening the RPC.
if not self._consumer.is_active:
# TODO: Share the exception from _rpc.open(). Blocked by
# https://github.com/googleapis/python-api-core/issues/268
request_exception = exceptions.Unknown(
"There was a problem opening the stream. "
"Try turning on DEBUG level logs to see the error."
)
self.close(reason=request_exception)
raise request_exception
return inital_response_future
def send(self, request: gapic_types.AppendRowsRequest) -> "AppendRowsFuture":
"""Send an append rows request to the open stream.
Args:
request:
The request to add to the stream.
Returns:
A future, which can be used to process the response when it
arrives.
"""
if self._closed:
raise bqstorage_exceptions.StreamClosedError(
"This manager has been closed and can not be used."
)
# If the manager hasn't been openned yet, automatically open it.
if not self.is_active:
return self.open(request)
# For each request, we expect exactly one response (in order). Add a
# future to the queue so that when the response comes, the callback can
# pull it off and notify completion.
future = AppendRowsFuture(self)
self._futures_queue.put(future)
self._rpc.send(request)
return future
def _on_response(self, response: gapic_types.AppendRowsResponse):
"""Process a response from a consumer callback."""
# If the stream has closed, but somehow we still got a response message
# back, discard it. The response futures queue has been drained, with
# an exception reported.
if self._closed:
raise bqstorage_exceptions.StreamClosedError(
f"Stream closed before receiving response: {response}"
)
# Since we have 1 response per request, if we get here from a response
# callback, the queue should never be empty.
future: AppendRowsFuture = self._futures_queue.get_nowait()
if response.error.code:
exc = exceptions.from_grpc_status(
response.error.code, response.error.message
)
future.set_exception(exc)
else:
future.set_result(response)
def close(self, reason: Optional[Exception] = None):
"""Stop consuming messages and shutdown all helper threads.
This method is idempotent. Additional calls will have no effect.
Args:
reason: The reason to close this. If ``None``, this is considered
an "intentional" shutdown. This is passed to the callbacks
specified via :meth:`add_close_callback`.
"""
self._shutdown(reason=reason)
def _shutdown(self, reason: Optional[Exception] = None):
"""Run the actual shutdown sequence (stop the stream and all helper threads).
Args:
reason:
The reason to close the stream. If ``None``, this is
considered an "intentional" shutdown.
"""
with self._closing:
if self._closed:
return
# Stop consuming messages.
if self.is_active:
_LOGGER.debug("Stopping consumer.")
self._consumer.stop()
self._consumer = None
if self._rpc is not None:
self._rpc.close()
self._rpc = None
self._closed = True
_LOGGER.debug("Finished stopping manager.")
# We know that no new items will be added to the queue because
# we've marked the stream as closed.
while not self._futures_queue.empty():
# Mark each future as failed. Since the consumer thread has
# stopped (or at least is attempting to stop), we won't get
# response callbacks to populate the remaining futures.
future = self._futures_queue.get_nowait()
if reason is None:
exc = bqstorage_exceptions.StreamClosedError(
"Stream closed before receiving a response."
)
else:
exc = reason
future.set_exception(exc)
for callback in self._close_callbacks:
callback(self, reason)
def _on_rpc_done(self, future):
"""Triggered whenever the underlying RPC terminates without recovery.
This is typically triggered from one of two threads: the background
consumer thread (when calling ``recv()`` produces a non-recoverable
error) or the grpc management thread (when cancelling the RPC).
This method is *non-blocking*. It will start another thread to deal
with shutting everything down. This is to prevent blocking in the
background consumer and preventing it from being ``joined()``.
"""
_LOGGER.info("RPC termination has signaled streaming pull manager shutdown.")
error = _wrap_as_exception(future)
thread = threading.Thread(
name=_RPC_ERROR_THREAD_NAME, target=self._shutdown, kwargs={"reason": error}
)
thread.daemon = True
thread.start()
class AppendRowsFuture(polling_future.PollingFuture):
"""Encapsulation of the asynchronous execution of an action.
This object is returned from long-running BigQuery Storage API calls, and
is the interface to determine the status of those calls.
This object should not be created directly, but is returned by other
methods in this library.
"""
def __init__(self, manager: AppendRowsStream):
super().__init__()
self.__manager = manager
self.__cancelled = False
self._is_done = False
def cancel(self):
"""Stops pulling messages and shutdowns the background thread consuming
messages.
The method does not block, it just triggers the shutdown and returns
immediately. To block until the background stream is terminated, call
:meth:`result()` after cancelling the future.
"""
# NOTE: We circumvent the base future's self._state to track the cancellation
# state, as this state has different meaning with streaming pull futures.
# See: https://github.com/googleapis/python-pubsub/pull/397
self.__cancelled = True
return self.__manager.close()
def cancelled(self):
"""
returns:
bool: ``True`` if the write stream has been cancelled.
"""
return self.__cancelled
def done(self, retry: Optional[google.api_core.retry.Retry] = None) -> bool:
"""Check the status of the future.
Args:
retry:
Not used. Included for compatibility with base clase. Future
status is updated by a background thread.
Returns:
``True`` if the request has finished, otherwise ``False``.
"""
# Consumer should call set_result or set_exception method, where this
# gets set to True *after* first setting _result.
#
# Consumer runs in a background thread, but this access is thread-safe:
# https://docs.python.org/3/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe
return self._is_done
def set_exception(self, exception):
"""Set the result of the future as being the given exception.
Do not use this method, it should only be used internally by the library and its
unit tests.
"""
return_value = super().set_exception(exception=exception)
self._is_done = True
return return_value
def set_result(self, result):
"""Set the return value of work associated with the future.
Do not use this method, it should only be used internally by the library and its
unit tests.
"""
return_value = super().set_result(result=result)
self._is_done = True
return return_value