From 9f145f87d6a54e757044ff4110d2cafd57ce08fa Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 20 Sep 2021 13:43:18 -0500 Subject: [PATCH] fix: avoid failure if closing `AppendRowsStream` before opening (#304) * fix: avoid failure if closing AppendRowsStream before opening * don't create a thread if we're going to block anyway --- google/cloud/bigquery_storage_v1beta2/writer.py | 14 +++----------- tests/unit/test_writer_v1beta2.py | 9 +++++++++ 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/google/cloud/bigquery_storage_v1beta2/writer.py b/google/cloud/bigquery_storage_v1beta2/writer.py index 3a77a07e..d6d4b583 100644 --- a/google/cloud/bigquery_storage_v1beta2/writer.py +++ b/google/cloud/bigquery_storage_v1beta2/writer.py @@ -259,21 +259,12 @@ def close(self, reason: Optional[Exception] = None): This method is idempotent. Additional calls will have no effect. - The method does not block, it delegates the shutdown operations to a background - thread. - Args: reason: The reason to close this. If ``None``, this is considered an "intentional" shutdown. This is passed to the callbacks specified via :meth:`add_close_callback`. """ - self._regular_shutdown_thread = threading.Thread( - name=_REGULAR_SHUTDOWN_THREAD_NAME, - daemon=True, - target=self._shutdown, - kwargs={"reason": reason}, - ) - self._regular_shutdown_thread.start() + self._shutdown(reason=reason) def _shutdown(self, reason: Optional[Exception] = None): """Run the actual shutdown sequence (stop the stream and all helper threads). @@ -293,7 +284,8 @@ def _shutdown(self, reason: Optional[Exception] = None): self._consumer.stop() self._consumer = None - self._rpc.close() + if self._rpc is not None: + self._rpc.close() self._rpc = None self._closed = True _LOGGER.debug("Finished stopping manager.") diff --git a/tests/unit/test_writer_v1beta2.py b/tests/unit/test_writer_v1beta2.py index 7da7c66a..2f4ada71 100644 --- a/tests/unit/test_writer_v1beta2.py +++ b/tests/unit/test_writer_v1beta2.py @@ -20,6 +20,7 @@ from google.api_core import exceptions from google.cloud.bigquery_storage_v1beta2.services import big_query_write from google.cloud.bigquery_storage_v1beta2 import types as gapic_types +from google.cloud.bigquery_storage_v1beta2 import exceptions as bqstorage_exceptions from google.protobuf import descriptor_pb2 @@ -44,6 +45,14 @@ def test_constructor_and_default_state(module_under_test): assert manager._client is mock_client +def test_close_before_open(module_under_test): + mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient) + manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE) + manager.close() + with pytest.raises(bqstorage_exceptions.StreamClosedError): + manager.send(object()) + + @mock.patch("google.api_core.bidi.BidiRpc", autospec=True) @mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True) def test_open(background_consumer, bidi_rpc, module_under_test):