From 58ec8444420d29c2915ec5b148de780a36eaf3e2 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 20 Sep 2021 14:24:29 -0500 Subject: [PATCH] fix: avoid opening write stream more than once, make open method private (#305) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery-storage/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes # 🦕 --- .../cloud/bigquery_storage_v1beta2/writer.py | 21 +++++++++++++------ tests/unit/test_writer_v1beta2.py | 12 ++++++----- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/google/cloud/bigquery_storage_v1beta2/writer.py b/google/cloud/bigquery_storage_v1beta2/writer.py index d6d4b583..9e82ab60 100644 --- a/google/cloud/bigquery_storage_v1beta2/writer.py +++ b/google/cloud/bigquery_storage_v1beta2/writer.py @@ -36,7 +36,7 @@ _REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown" _RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated" -# open() takes between 0.25 and 0.4 seconds to be ready. Wait each loop before +# _open() takes between 0.25 and 0.4 seconds to be ready. Wait each loop before # checking again. This interval was chosen to result in about 3 loops. _WRITE_OPEN_INTERVAL = 0.08 @@ -91,10 +91,14 @@ def __init__( self._futures_queue = queue.Queue() self._inital_request_template = initial_request_template self._metadata = metadata + + # Only one call to `send()` should attempt to open the RPC. + self._opening = threading.Lock() + self._rpc = None self._stream_name = None - # The threads created in ``.open()``. + # The threads created in ``._open()``. self._consumer = None @property @@ -113,7 +117,7 @@ def add_close_callback(self, callback: Callable): """ self._close_callbacks.append(callback) - def open( + def _open( self, initial_request: gapic_types.AppendRowsRequest, timeout: float = _DEFAULT_TIMEOUT, @@ -221,9 +225,14 @@ def send(self, request: gapic_types.AppendRowsRequest) -> "AppendRowsFuture": "This manager has been closed and can not be used." ) - # If the manager hasn't been openned yet, automatically open it. - if not self.is_active: - return self.open(request) + # If the manager hasn't been openned yet, automatically open it. Only + # one call to `send()` should attempt to open the RPC. After `_open()`, + # the stream is active, unless something went wrong with the first call + # to open, in which case this send will fail anyway due to a closed + # RPC. + with self._opening: + if not self.is_active: + return self._open(request) # For each request, we expect exactly one response (in order). Add a # future to the queue so that when the response comes, the callback can diff --git a/tests/unit/test_writer_v1beta2.py b/tests/unit/test_writer_v1beta2.py index 2f4ada71..66d01fe0 100644 --- a/tests/unit/test_writer_v1beta2.py +++ b/tests/unit/test_writer_v1beta2.py @@ -55,7 +55,7 @@ def test_close_before_open(module_under_test): @mock.patch("google.api_core.bidi.BidiRpc", autospec=True) @mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True) -def test_open(background_consumer, bidi_rpc, module_under_test): +def test_initial_send(background_consumer, bidi_rpc, module_under_test): mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient) request_template = gapic_types.AppendRowsRequest( write_stream="stream-name-from-REQUEST_TEMPLATE", @@ -78,7 +78,7 @@ def test_open(background_consumer, bidi_rpc, module_under_test): proto_rows=gapic_types.AppendRowsRequest.ProtoData(rows=proto_rows), ) - future = manager.open(initial_request) + future = manager.send(initial_request) assert isinstance(future, module_under_test.AppendRowsFuture) background_consumer.assert_called_once_with(manager._rpc, manager._on_response) @@ -118,7 +118,7 @@ def test_open(background_consumer, bidi_rpc, module_under_test): @mock.patch("google.api_core.bidi.BidiRpc", autospec=True) @mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True) -def test_open_with_timeout(background_consumer, bidi_rpc, module_under_test): +def test_initial_send_with_timeout(background_consumer, bidi_rpc, module_under_test): mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient) manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE) type(bidi_rpc.return_value).is_active = mock.PropertyMock(return_value=False) @@ -129,8 +129,10 @@ def test_open_with_timeout(background_consumer, bidi_rpc, module_under_test): write_stream="this-is-a-stream-resource-path" ) - with pytest.raises(exceptions.Unknown), freezegun.freeze_time(auto_tick_seconds=1): - manager.open(initial_request, timeout=0.5) + with pytest.raises(exceptions.Unknown), freezegun.freeze_time( + auto_tick_seconds=module_under_test._DEFAULT_TIMEOUT + 1 + ): + manager.send(initial_request) def test_future_done_false(module_under_test):