Skip to content

Commit

Permalink
fix: avoid failure if closing AppendRowsStream before opening (#304)
Browse files Browse the repository at this point in the history
* fix: avoid failure if closing AppendRowsStream before opening

* don't create a thread if we're going to block anyway
  • Loading branch information
tswast committed Sep 20, 2021
1 parent 69e3fb8 commit 9f145f8
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 11 deletions.
14 changes: 3 additions & 11 deletions google/cloud/bigquery_storage_v1beta2/writer.py
Expand Up @@ -259,21 +259,12 @@ def close(self, reason: Optional[Exception] = None):
This method is idempotent. Additional calls will have no effect.
The method does not block, it delegates the shutdown operations to a background
thread.
Args:
reason: The reason to close this. If ``None``, this is considered
an "intentional" shutdown. This is passed to the callbacks
specified via :meth:`add_close_callback`.
"""
self._regular_shutdown_thread = threading.Thread(
name=_REGULAR_SHUTDOWN_THREAD_NAME,
daemon=True,
target=self._shutdown,
kwargs={"reason": reason},
)
self._regular_shutdown_thread.start()
self._shutdown(reason=reason)

def _shutdown(self, reason: Optional[Exception] = None):
"""Run the actual shutdown sequence (stop the stream and all helper threads).
Expand All @@ -293,7 +284,8 @@ def _shutdown(self, reason: Optional[Exception] = None):
self._consumer.stop()
self._consumer = None

self._rpc.close()
if self._rpc is not None:
self._rpc.close()
self._rpc = None
self._closed = True
_LOGGER.debug("Finished stopping manager.")
Expand Down
9 changes: 9 additions & 0 deletions tests/unit/test_writer_v1beta2.py
Expand Up @@ -20,6 +20,7 @@
from google.api_core import exceptions
from google.cloud.bigquery_storage_v1beta2.services import big_query_write
from google.cloud.bigquery_storage_v1beta2 import types as gapic_types
from google.cloud.bigquery_storage_v1beta2 import exceptions as bqstorage_exceptions
from google.protobuf import descriptor_pb2


Expand All @@ -44,6 +45,14 @@ def test_constructor_and_default_state(module_under_test):
assert manager._client is mock_client


def test_close_before_open(module_under_test):
mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE)
manager.close()
with pytest.raises(bqstorage_exceptions.StreamClosedError):
manager.send(object())


@mock.patch("google.api_core.bidi.BidiRpc", autospec=True)
@mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True)
def test_open(background_consumer, bidi_rpc, module_under_test):
Expand Down

0 comments on commit 9f145f8

Please sign in to comment.