Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: avoid opening write stream more than once, make open method priv…
…ate (#305)

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery-storage/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕
  • Loading branch information
tswast committed Sep 20, 2021
1 parent 9f145f8 commit 58ec844
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 11 deletions.
21 changes: 15 additions & 6 deletions google/cloud/bigquery_storage_v1beta2/writer.py
Expand Up @@ -36,7 +36,7 @@
_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown"
_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated"

# open() takes between 0.25 and 0.4 seconds to be ready. Wait each loop before
# _open() takes between 0.25 and 0.4 seconds to be ready. Wait each loop before
# checking again. This interval was chosen to result in about 3 loops.
_WRITE_OPEN_INTERVAL = 0.08

Expand Down Expand Up @@ -91,10 +91,14 @@ def __init__(
self._futures_queue = queue.Queue()
self._inital_request_template = initial_request_template
self._metadata = metadata

# Only one call to `send()` should attempt to open the RPC.
self._opening = threading.Lock()

self._rpc = None
self._stream_name = None

# The threads created in ``.open()``.
# The threads created in ``._open()``.
self._consumer = None

@property
Expand All @@ -113,7 +117,7 @@ def add_close_callback(self, callback: Callable):
"""
self._close_callbacks.append(callback)

def open(
def _open(
self,
initial_request: gapic_types.AppendRowsRequest,
timeout: float = _DEFAULT_TIMEOUT,
Expand Down Expand Up @@ -221,9 +225,14 @@ def send(self, request: gapic_types.AppendRowsRequest) -> "AppendRowsFuture":
"This manager has been closed and can not be used."
)

# If the manager hasn't been openned yet, automatically open it.
if not self.is_active:
return self.open(request)
# If the manager hasn't been openned yet, automatically open it. Only
# one call to `send()` should attempt to open the RPC. After `_open()`,
# the stream is active, unless something went wrong with the first call
# to open, in which case this send will fail anyway due to a closed
# RPC.
with self._opening:
if not self.is_active:
return self._open(request)

# For each request, we expect exactly one response (in order). Add a
# future to the queue so that when the response comes, the callback can
Expand Down
12 changes: 7 additions & 5 deletions tests/unit/test_writer_v1beta2.py
Expand Up @@ -55,7 +55,7 @@ def test_close_before_open(module_under_test):

@mock.patch("google.api_core.bidi.BidiRpc", autospec=True)
@mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True)
def test_open(background_consumer, bidi_rpc, module_under_test):
def test_initial_send(background_consumer, bidi_rpc, module_under_test):
mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
request_template = gapic_types.AppendRowsRequest(
write_stream="stream-name-from-REQUEST_TEMPLATE",
Expand All @@ -78,7 +78,7 @@ def test_open(background_consumer, bidi_rpc, module_under_test):
proto_rows=gapic_types.AppendRowsRequest.ProtoData(rows=proto_rows),
)

future = manager.open(initial_request)
future = manager.send(initial_request)

assert isinstance(future, module_under_test.AppendRowsFuture)
background_consumer.assert_called_once_with(manager._rpc, manager._on_response)
Expand Down Expand Up @@ -118,7 +118,7 @@ def test_open(background_consumer, bidi_rpc, module_under_test):

@mock.patch("google.api_core.bidi.BidiRpc", autospec=True)
@mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True)
def test_open_with_timeout(background_consumer, bidi_rpc, module_under_test):
def test_initial_send_with_timeout(background_consumer, bidi_rpc, module_under_test):
mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient)
manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE)
type(bidi_rpc.return_value).is_active = mock.PropertyMock(return_value=False)
Expand All @@ -129,8 +129,10 @@ def test_open_with_timeout(background_consumer, bidi_rpc, module_under_test):
write_stream="this-is-a-stream-resource-path"
)

with pytest.raises(exceptions.Unknown), freezegun.freeze_time(auto_tick_seconds=1):
manager.open(initial_request, timeout=0.5)
with pytest.raises(exceptions.Unknown), freezegun.freeze_time(
auto_tick_seconds=module_under_test._DEFAULT_TIMEOUT + 1
):
manager.send(initial_request)


def test_future_done_false(module_under_test):
Expand Down

0 comments on commit 58ec844

Please sign in to comment.