From 87b063eaaa75430196d9910911cebfa38c7e3735 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 20 Sep 2021 13:35:54 -0500 Subject: [PATCH 1/2] fix: avoid opening write stream more than once, make open method private --- google/cloud/bigquery_storage_v1beta2/writer.py | 12 +++++++----- tests/unit/test_writer_v1beta2.py | 12 +++++++----- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/google/cloud/bigquery_storage_v1beta2/writer.py b/google/cloud/bigquery_storage_v1beta2/writer.py index 3a77a07e..afa4957f 100644 --- a/google/cloud/bigquery_storage_v1beta2/writer.py +++ b/google/cloud/bigquery_storage_v1beta2/writer.py @@ -36,7 +36,7 @@ _REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown" _RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated" -# open() takes between 0.25 and 0.4 seconds to be ready. Wait each loop before +# _open() takes between 0.25 and 0.4 seconds to be ready. Wait each loop before # checking again. This interval was chosen to result in about 3 loops. _WRITE_OPEN_INTERVAL = 0.08 @@ -91,10 +91,11 @@ def __init__( self._futures_queue = queue.Queue() self._inital_request_template = initial_request_template self._metadata = metadata + self._opening = threading.Lock() self._rpc = None self._stream_name = None - # The threads created in ``.open()``. + # The threads created in ``._open()``. self._consumer = None @property @@ -113,7 +114,7 @@ def add_close_callback(self, callback: Callable): """ self._close_callbacks.append(callback) - def open( + def _open( self, initial_request: gapic_types.AppendRowsRequest, timeout: float = _DEFAULT_TIMEOUT, @@ -222,8 +223,9 @@ def send(self, request: gapic_types.AppendRowsRequest) -> "AppendRowsFuture": ) # If the manager hasn't been openned yet, automatically open it. - if not self.is_active: - return self.open(request) + with self._opening: + if not self.is_active: + return self._open(request) # For each request, we expect exactly one response (in order). Add a # future to the queue so that when the response comes, the callback can diff --git a/tests/unit/test_writer_v1beta2.py b/tests/unit/test_writer_v1beta2.py index 7da7c66a..0992f3aa 100644 --- a/tests/unit/test_writer_v1beta2.py +++ b/tests/unit/test_writer_v1beta2.py @@ -46,7 +46,7 @@ def test_constructor_and_default_state(module_under_test): @mock.patch("google.api_core.bidi.BidiRpc", autospec=True) @mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True) -def test_open(background_consumer, bidi_rpc, module_under_test): +def test_initial_send(background_consumer, bidi_rpc, module_under_test): mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient) request_template = gapic_types.AppendRowsRequest( write_stream="stream-name-from-REQUEST_TEMPLATE", @@ -69,7 +69,7 @@ def test_open(background_consumer, bidi_rpc, module_under_test): proto_rows=gapic_types.AppendRowsRequest.ProtoData(rows=proto_rows), ) - future = manager.open(initial_request) + future = manager.send(initial_request) assert isinstance(future, module_under_test.AppendRowsFuture) background_consumer.assert_called_once_with(manager._rpc, manager._on_response) @@ -109,7 +109,7 @@ def test_open(background_consumer, bidi_rpc, module_under_test): @mock.patch("google.api_core.bidi.BidiRpc", autospec=True) @mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True) -def test_open_with_timeout(background_consumer, bidi_rpc, module_under_test): +def test_initial_send_with_timeout(background_consumer, bidi_rpc, module_under_test): mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient) manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE) type(bidi_rpc.return_value).is_active = mock.PropertyMock(return_value=False) @@ -120,8 +120,10 @@ def test_open_with_timeout(background_consumer, bidi_rpc, module_under_test): write_stream="this-is-a-stream-resource-path" ) - with pytest.raises(exceptions.Unknown), freezegun.freeze_time(auto_tick_seconds=1): - manager.open(initial_request, timeout=0.5) + with pytest.raises(exceptions.Unknown), freezegun.freeze_time( + auto_tick_seconds=module_under_test._DEFAULT_TIMEOUT + 1 + ): + manager.send(initial_request) def test_future_done_false(module_under_test): From e524ac799724e37f847de701575beb57ef2f1d42 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 20 Sep 2021 14:12:07 -0500 Subject: [PATCH 2/2] comment purpose of _opening lock --- google/cloud/bigquery_storage_v1beta2/writer.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/google/cloud/bigquery_storage_v1beta2/writer.py b/google/cloud/bigquery_storage_v1beta2/writer.py index 22ee851c..9e82ab60 100644 --- a/google/cloud/bigquery_storage_v1beta2/writer.py +++ b/google/cloud/bigquery_storage_v1beta2/writer.py @@ -91,7 +91,10 @@ def __init__( self._futures_queue = queue.Queue() self._inital_request_template = initial_request_template self._metadata = metadata + + # Only one call to `send()` should attempt to open the RPC. self._opening = threading.Lock() + self._rpc = None self._stream_name = None @@ -222,7 +225,11 @@ def send(self, request: gapic_types.AppendRowsRequest) -> "AppendRowsFuture": "This manager has been closed and can not be used." ) - # If the manager hasn't been openned yet, automatically open it. + # If the manager hasn't been openned yet, automatically open it. Only + # one call to `send()` should attempt to open the RPC. After `_open()`, + # the stream is active, unless something went wrong with the first call + # to open, in which case this send will fail anyway due to a closed + # RPC. with self._opening: if not self.is_active: return self._open(request)