From 9fc3c08cdeebfbd69bc815f951a07b2d086b0a69 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 27 Sep 2021 12:10:21 -0500 Subject: [PATCH] feat: add `AppendRowsStream` to use write API from v1 endpoint (#309) This is just a duplicate of the class in the v1beta2 endpoint. I see for reads we tried to be clever by using the v1 version from the v1beta2 endpoint, but it would be harder to do with the write API. The `initial_request_template` parameter means that we need to make sure for certain that we are using the generated types for the correct endpoint. Since "beta" is clearly in the endpoint and import name, I think leaving the v1beta2 writer module as-is, with additional features and fixes only added to v1 makes some sense. Alternatively, we could add some tests to ensure these classes stay in sync? --- google/cloud/bigquery_storage_v1/__init__.py | 4 + google/cloud/bigquery_storage_v1/client.py | 5 + .../cloud/bigquery_storage_v1/exceptions.py | 17 + google/cloud/bigquery_storage_v1/writer.py | 412 ++++++++++++++++++ .../cloud/bigquery_storage_v1beta2/writer.py | 1 - samples/snippets/append_rows_proto2.py | 10 +- tests/system/test_writer.py | 6 +- tests/unit/test_writer_v1.py | 158 +++++++ 8 files changed, 604 insertions(+), 9 deletions(-) create mode 100644 google/cloud/bigquery_storage_v1/exceptions.py create mode 100644 google/cloud/bigquery_storage_v1/writer.py create mode 100644 tests/unit/test_writer_v1.py diff --git a/google/cloud/bigquery_storage_v1/__init__.py b/google/cloud/bigquery_storage_v1/__init__.py index 4cca4cfe..4ccfd350 100644 --- a/google/cloud/bigquery_storage_v1/__init__.py +++ b/google/cloud/bigquery_storage_v1/__init__.py @@ -30,6 +30,10 @@ class BigQueryReadClient(client.BigQueryReadClient): __doc__ = client.BigQueryReadClient.__doc__ +class BigQueryWriteClient(client.BigQueryWriteClient): + __doc__ = client.BigQueryWriteClient.__doc__ + + __all__ = ( # google.cloud.bigquery_storage_v1 "__version__", diff --git a/google/cloud/bigquery_storage_v1/client.py b/google/cloud/bigquery_storage_v1/client.py index d6dcea10..75ef3834 100644 --- a/google/cloud/bigquery_storage_v1/client.py +++ b/google/cloud/bigquery_storage_v1/client.py @@ -25,6 +25,7 @@ from google.cloud.bigquery_storage_v1 import reader from google.cloud.bigquery_storage_v1.services import big_query_read +from google.cloud.bigquery_storage_v1.services import big_query_write _SCOPES = ( @@ -135,3 +136,7 @@ def read_rows( offset, {"retry": retry, "timeout": timeout, "metadata": metadata}, ) + + +class BigQueryWriteClient(big_query_write.BigQueryWriteClient): + __doc__ = big_query_write.BigQueryWriteClient.__doc__ diff --git a/google/cloud/bigquery_storage_v1/exceptions.py b/google/cloud/bigquery_storage_v1/exceptions.py new file mode 100644 index 00000000..0887071d --- /dev/null +++ b/google/cloud/bigquery_storage_v1/exceptions.py @@ -0,0 +1,17 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class StreamClosedError(Exception): + """Operation not supported while stream is closed.""" diff --git a/google/cloud/bigquery_storage_v1/writer.py b/google/cloud/bigquery_storage_v1/writer.py new file mode 100644 index 00000000..1b0e9d47 --- /dev/null +++ b/google/cloud/bigquery_storage_v1/writer.py @@ -0,0 +1,412 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import division + +import itertools +import logging +import queue +import time +import threading +from typing import Callable, Optional, Sequence, Tuple + +from google.api_core import bidi +from google.api_core.future import polling as polling_future +from google.api_core import exceptions +import google.api_core.retry +import grpc + +from google.cloud.bigquery_storage_v1 import exceptions as bqstorage_exceptions +from google.cloud.bigquery_storage_v1 import types as gapic_types +from google.cloud.bigquery_storage_v1.services import big_query_write + +_LOGGER = logging.getLogger(__name__) +_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated" + +# _open() takes between 0.25 and 0.4 seconds to be ready. Wait each loop before +# checking again. This interval was chosen to result in about 3 loops. +_WRITE_OPEN_INTERVAL = 0.08 + +# Use a default timeout that is quite long to avoid potential infinite loops, +# but still work for all expected requests +_DEFAULT_TIMEOUT = 600 + + +def _wrap_as_exception(maybe_exception) -> Exception: + """Wrap an object as a Python exception, if needed. + Args: + maybe_exception (Any): The object to wrap, usually a gRPC exception class. + Returns: + The argument itself if an instance of ``BaseException``, otherwise + the argument represented as an instance of ``Exception`` (sub)class. + """ + if isinstance(maybe_exception, grpc.RpcError): + return exceptions.from_grpc_error(maybe_exception) + elif isinstance(maybe_exception, BaseException): + return maybe_exception + + return Exception(maybe_exception) + + +class AppendRowsStream(object): + """A manager object which can append rows to a stream.""" + + def __init__( + self, + client: big_query_write.BigQueryWriteClient, + initial_request_template: gapic_types.AppendRowsRequest, + metadata: Sequence[Tuple[str, str]] = (), + ): + """Construct a stream manager. + + Args: + client: + Client responsible for making requests. + initial_request_template: + Data to include in the first request sent to the stream. This + must contain + :attr:`google.cloud.bigquery_storage_v1.types.AppendRowsRequest.write_stream` + and + :attr:`google.cloud.bigquery_storage_v1.types.AppendRowsRequest.ProtoData.writer_schema`. + metadata: + Extra headers to include when sending the streaming request. + """ + self._client = client + self._closing = threading.Lock() + self._closed = False + self._close_callbacks = [] + self._futures_queue = queue.Queue() + self._inital_request_template = initial_request_template + self._metadata = metadata + + # Only one call to `send()` should attempt to open the RPC. + self._opening = threading.Lock() + + self._rpc = None + self._stream_name = None + + # The threads created in ``._open()``. + self._consumer = None + + @property + def is_active(self) -> bool: + """bool: True if this manager is actively streaming. + + Note that ``False`` does not indicate this is complete shut down, + just that it stopped getting new messages. + """ + return self._consumer is not None and self._consumer.is_active + + def add_close_callback(self, callback: Callable): + """Schedules a callable when the manager closes. + Args: + callback (Callable): The method to call. + """ + self._close_callbacks.append(callback) + + def _open( + self, + initial_request: gapic_types.AppendRowsRequest, + timeout: float = _DEFAULT_TIMEOUT, + ) -> "AppendRowsFuture": + """Open an append rows stream. + + This is automatically called by the first call to the + :attr:`google.cloud.bigquery_storage_v1.writer.AppendRowsStream.send` + method. + + Args: + initial_request: + The initial request to start the stream. Must have + :attr:`google.cloud.bigquery_storage_v1.types.AppendRowsRequest.write_stream` + and ``proto_rows.writer_schema.proto_descriptor`` and + properties populated. + timeout: + How long (in seconds) to wait for the stream to be ready. + + Returns: + A future, which can be used to process the response to the initial + request when it arrives. + """ + if self.is_active: + raise ValueError("This manager is already open.") + + if self._closed: + raise bqstorage_exceptions.StreamClosedError( + "This manager has been closed and can not be re-used." + ) + + start_time = time.monotonic() + request = gapic_types.AppendRowsRequest() + gapic_types.AppendRowsRequest.copy_from(request, self._inital_request_template) + request._pb.MergeFrom(initial_request._pb) + self._stream_name = request.write_stream + + inital_response_future = AppendRowsFuture(self) + self._futures_queue.put(inital_response_future) + + self._rpc = bidi.BidiRpc( + self._client.append_rows, + initial_request=request, + # TODO: pass in retry and timeout. Blocked by + # https://github.com/googleapis/python-api-core/issues/262 + metadata=tuple( + itertools.chain( + self._metadata, + # This header is required so that the BigQuery Storage API + # knows which region to route the request to. + (("x-goog-request-params", f"write_stream={self._stream_name}"),), + ) + ), + ) + self._rpc.add_done_callback(self._on_rpc_done) + + self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response) + self._consumer.start() + + # Make sure RPC has started before returning. + # Without this, consumers may get: + # + # ValueError: Can not send() on an RPC that has never been open()ed. + # + # when they try to send a request. + while not self._rpc.is_active and self._consumer.is_active: + # Avoid 100% CPU while waiting for RPC to be ready. + time.sleep(_WRITE_OPEN_INTERVAL) + + # TODO: Check retry.deadline instead of (per-request) timeout. + # Blocked by + # https://github.com/googleapis/python-api-core/issues/262 + if timeout is None: + continue + current_time = time.monotonic() + if current_time - start_time > timeout: + break + + # Something went wrong when opening the RPC. + if not self._consumer.is_active: + # TODO: Share the exception from _rpc.open(). Blocked by + # https://github.com/googleapis/python-api-core/issues/268 + request_exception = exceptions.Unknown( + "There was a problem opening the stream. " + "Try turning on DEBUG level logs to see the error." + ) + self.close(reason=request_exception) + raise request_exception + + return inital_response_future + + def send(self, request: gapic_types.AppendRowsRequest) -> "AppendRowsFuture": + """Send an append rows request to the open stream. + + Args: + request: + The request to add to the stream. + + Returns: + A future, which can be used to process the response when it + arrives. + """ + if self._closed: + raise bqstorage_exceptions.StreamClosedError( + "This manager has been closed and can not be used." + ) + + # If the manager hasn't been openned yet, automatically open it. Only + # one call to `send()` should attempt to open the RPC. After `_open()`, + # the stream is active, unless something went wrong with the first call + # to open, in which case this send will fail anyway due to a closed + # RPC. + with self._opening: + if not self.is_active: + return self._open(request) + + # For each request, we expect exactly one response (in order). Add a + # future to the queue so that when the response comes, the callback can + # pull it off and notify completion. + future = AppendRowsFuture(self) + self._futures_queue.put(future) + self._rpc.send(request) + return future + + def _on_response(self, response: gapic_types.AppendRowsResponse): + """Process a response from a consumer callback.""" + # If the stream has closed, but somehow we still got a response message + # back, discard it. The response futures queue has been drained, with + # an exception reported. + if self._closed: + raise bqstorage_exceptions.StreamClosedError( + f"Stream closed before receiving response: {response}" + ) + + # Since we have 1 response per request, if we get here from a response + # callback, the queue should never be empty. + future: AppendRowsFuture = self._futures_queue.get_nowait() + if response.error.code: + exc = exceptions.from_grpc_status( + response.error.code, response.error.message + ) + future.set_exception(exc) + else: + future.set_result(response) + + def close(self, reason: Optional[Exception] = None): + """Stop consuming messages and shutdown all helper threads. + + This method is idempotent. Additional calls will have no effect. + + Args: + reason: The reason to close this. If ``None``, this is considered + an "intentional" shutdown. This is passed to the callbacks + specified via :meth:`add_close_callback`. + """ + self._shutdown(reason=reason) + + def _shutdown(self, reason: Optional[Exception] = None): + """Run the actual shutdown sequence (stop the stream and all helper threads). + + Args: + reason: + The reason to close the stream. If ``None``, this is + considered an "intentional" shutdown. + """ + with self._closing: + if self._closed: + return + + # Stop consuming messages. + if self.is_active: + _LOGGER.debug("Stopping consumer.") + self._consumer.stop() + self._consumer = None + + if self._rpc is not None: + self._rpc.close() + self._rpc = None + self._closed = True + _LOGGER.debug("Finished stopping manager.") + + # We know that no new items will be added to the queue because + # we've marked the stream as closed. + while not self._futures_queue.empty(): + # Mark each future as failed. Since the consumer thread has + # stopped (or at least is attempting to stop), we won't get + # response callbacks to populate the remaining futures. + future = self._futures_queue.get_nowait() + if reason is None: + exc = bqstorage_exceptions.StreamClosedError( + "Stream closed before receiving a response." + ) + else: + exc = reason + future.set_exception(exc) + + for callback in self._close_callbacks: + callback(self, reason) + + def _on_rpc_done(self, future): + """Triggered whenever the underlying RPC terminates without recovery. + + This is typically triggered from one of two threads: the background + consumer thread (when calling ``recv()`` produces a non-recoverable + error) or the grpc management thread (when cancelling the RPC). + + This method is *non-blocking*. It will start another thread to deal + with shutting everything down. This is to prevent blocking in the + background consumer and preventing it from being ``joined()``. + """ + _LOGGER.info("RPC termination has signaled streaming pull manager shutdown.") + error = _wrap_as_exception(future) + thread = threading.Thread( + name=_RPC_ERROR_THREAD_NAME, target=self._shutdown, kwargs={"reason": error} + ) + thread.daemon = True + thread.start() + + +class AppendRowsFuture(polling_future.PollingFuture): + """Encapsulation of the asynchronous execution of an action. + + This object is returned from long-running BigQuery Storage API calls, and + is the interface to determine the status of those calls. + + This object should not be created directly, but is returned by other + methods in this library. + """ + + def __init__(self, manager: AppendRowsStream): + super().__init__() + self.__manager = manager + self.__cancelled = False + self._is_done = False + + def cancel(self): + """Stops pulling messages and shutdowns the background thread consuming + messages. + + The method does not block, it just triggers the shutdown and returns + immediately. To block until the background stream is terminated, call + :meth:`result()` after cancelling the future. + """ + # NOTE: We circumvent the base future's self._state to track the cancellation + # state, as this state has different meaning with streaming pull futures. + # See: https://github.com/googleapis/python-pubsub/pull/397 + self.__cancelled = True + return self.__manager.close() + + def cancelled(self): + """ + returns: + bool: ``True`` if the write stream has been cancelled. + """ + return self.__cancelled + + def done(self, retry: Optional[google.api_core.retry.Retry] = None) -> bool: + """Check the status of the future. + + Args: + retry: + Not used. Included for compatibility with base clase. Future + status is updated by a background thread. + + Returns: + ``True`` if the request has finished, otherwise ``False``. + """ + # Consumer should call set_result or set_exception method, where this + # gets set to True *after* first setting _result. + # + # Consumer runs in a background thread, but this access is thread-safe: + # https://docs.python.org/3/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe + return self._is_done + + def set_exception(self, exception): + """Set the result of the future as being the given exception. + + Do not use this method, it should only be used internally by the library and its + unit tests. + """ + return_value = super().set_exception(exception=exception) + self._is_done = True + return return_value + + def set_result(self, result): + """Set the return value of work associated with the future. + + Do not use this method, it should only be used internally by the library and its + unit tests. + """ + return_value = super().set_result(result=result) + self._is_done = True + return return_value diff --git a/google/cloud/bigquery_storage_v1beta2/writer.py b/google/cloud/bigquery_storage_v1beta2/writer.py index 9e82ab60..728107d2 100644 --- a/google/cloud/bigquery_storage_v1beta2/writer.py +++ b/google/cloud/bigquery_storage_v1beta2/writer.py @@ -33,7 +33,6 @@ from google.cloud.bigquery_storage_v1beta2.services import big_query_write _LOGGER = logging.getLogger(__name__) -_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown" _RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated" # _open() takes between 0.25 and 0.4 seconds to be ready. Wait each loop before diff --git a/samples/snippets/append_rows_proto2.py b/samples/snippets/append_rows_proto2.py index d0390993..a06c29d7 100644 --- a/samples/snippets/append_rows_proto2.py +++ b/samples/snippets/append_rows_proto2.py @@ -20,9 +20,9 @@ import datetime import decimal -from google.cloud import bigquery_storage_v1beta2 -from google.cloud.bigquery_storage_v1beta2 import types -from google.cloud.bigquery_storage_v1beta2 import writer +from google.cloud import bigquery_storage_v1 +from google.cloud.bigquery_storage_v1 import types +from google.cloud.bigquery_storage_v1 import writer from google.protobuf import descriptor_pb2 # If you make updates to the sample_data.proto protocol buffers definition, @@ -36,13 +36,13 @@ def append_rows_proto2(project_id: str, dataset_id: str, table_id: str): """Create a write stream, write some sample data, and commit the stream.""" - write_client = bigquery_storage_v1beta2.BigQueryWriteClient() + write_client = bigquery_storage_v1.BigQueryWriteClient() parent = write_client.table_path(project_id, dataset_id, table_id) write_stream = types.WriteStream() # When creating the stream, choose the type. Use the PENDING type to wait # until the stream is committed before it is visible. See: - # https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta2#google.cloud.bigquery.storage.v1beta2.WriteStream.Type + # https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.Type write_stream.type_ = types.WriteStream.Type.PENDING write_stream = write_client.create_write_stream( parent=parent, write_stream=write_stream diff --git a/tests/system/test_writer.py b/tests/system/test_writer.py index a8d119d0..91cb11e5 100644 --- a/tests/system/test_writer.py +++ b/tests/system/test_writer.py @@ -15,14 +15,14 @@ from google.api_core import exceptions import pytest -from google.cloud.bigquery_storage_v1beta2 import types as gapic_types +from google.cloud.bigquery_storage_v1 import types as gapic_types @pytest.fixture(scope="session") def bqstorage_write_client(credentials): - from google.cloud import bigquery_storage_v1beta2 + from google.cloud import bigquery_storage_v1 - return bigquery_storage_v1beta2.BigQueryWriteClient(credentials=credentials) + return bigquery_storage_v1.BigQueryWriteClient(credentials=credentials) def test_append_rows_with_invalid_stream_name_fails_fast(bqstorage_write_client): diff --git a/tests/unit/test_writer_v1.py b/tests/unit/test_writer_v1.py new file mode 100644 index 00000000..1ef51320 --- /dev/null +++ b/tests/unit/test_writer_v1.py @@ -0,0 +1,158 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock + +import freezegun +import pytest + +from google.api_core import exceptions +from google.cloud.bigquery_storage_v1.services import big_query_write +from google.cloud.bigquery_storage_v1 import types as gapic_types +from google.cloud.bigquery_storage_v1 import exceptions as bqstorage_exceptions +from google.protobuf import descriptor_pb2 + + +REQUEST_TEMPLATE = gapic_types.AppendRowsRequest() + + +@pytest.fixture(scope="module") +def module_under_test(): + from google.cloud.bigquery_storage_v1 import writer + + return writer + + +def test_constructor_and_default_state(module_under_test): + mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient) + manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE) + + # Public state + assert manager.is_active is False + + # Private state + assert manager._client is mock_client + + +def test_close_before_open(module_under_test): + mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient) + manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE) + manager.close() + with pytest.raises(bqstorage_exceptions.StreamClosedError): + manager.send(object()) + + +@mock.patch("google.api_core.bidi.BidiRpc", autospec=True) +@mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True) +def test_initial_send(background_consumer, bidi_rpc, module_under_test): + mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient) + request_template = gapic_types.AppendRowsRequest( + write_stream="stream-name-from-REQUEST_TEMPLATE", + offset=0, + proto_rows=gapic_types.AppendRowsRequest.ProtoData( + writer_schema=gapic_types.ProtoSchema( + proto_descriptor=descriptor_pb2.DescriptorProto() + ) + ), + ) + manager = module_under_test.AppendRowsStream(mock_client, request_template) + type(bidi_rpc.return_value).is_active = mock.PropertyMock( + return_value=(False, True) + ) + proto_rows = gapic_types.ProtoRows() + proto_rows.serialized_rows.append(b"hello, world") + initial_request = gapic_types.AppendRowsRequest( + write_stream="this-is-a-stream-resource-path", + offset=42, + proto_rows=gapic_types.AppendRowsRequest.ProtoData(rows=proto_rows), + ) + + future = manager.send(initial_request) + + assert isinstance(future, module_under_test.AppendRowsFuture) + background_consumer.assert_called_once_with(manager._rpc, manager._on_response) + background_consumer.return_value.start.assert_called_once() + assert manager._consumer == background_consumer.return_value + + # Make sure the request template and the first request are merged as + # expected. Needs to be especially careful that nested properties such as + # writer_schema and rows are merged as expected. + expected_request = gapic_types.AppendRowsRequest( + write_stream="this-is-a-stream-resource-path", + offset=42, + proto_rows=gapic_types.AppendRowsRequest.ProtoData( + writer_schema=gapic_types.ProtoSchema( + proto_descriptor=descriptor_pb2.DescriptorProto() + ), + rows=proto_rows, + ), + ) + bidi_rpc.assert_called_once_with( + start_rpc=mock_client.append_rows, + initial_request=expected_request, + # Extra header is required to route requests to the correct location. + metadata=( + ("x-goog-request-params", "write_stream=this-is-a-stream-resource-path"), + ), + ) + + bidi_rpc.return_value.add_done_callback.assert_called_once_with( + manager._on_rpc_done + ) + assert manager._rpc == bidi_rpc.return_value + + manager._consumer.is_active = True + assert manager.is_active is True + + +@mock.patch("google.api_core.bidi.BidiRpc", autospec=True) +@mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True) +def test_initial_send_with_timeout(background_consumer, bidi_rpc, module_under_test): + mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient) + manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE) + type(bidi_rpc.return_value).is_active = mock.PropertyMock(return_value=False) + type(background_consumer.return_value).is_active = mock.PropertyMock( + return_value=False + ) + initial_request = gapic_types.AppendRowsRequest( + write_stream="this-is-a-stream-resource-path" + ) + + with pytest.raises(exceptions.Unknown), freezegun.freeze_time( + auto_tick_seconds=module_under_test._DEFAULT_TIMEOUT + 1 + ): + manager.send(initial_request) + + +def test_future_done_false(module_under_test): + mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient) + manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE) + future = module_under_test.AppendRowsFuture(manager) + assert not future.done() + + +def test_future_done_true_with_result(module_under_test): + mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient) + manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE) + future = module_under_test.AppendRowsFuture(manager) + future.set_result(object()) + assert future.done() + + +def test_future_done_true_with_exception(module_under_test): + mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient) + manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE) + future = module_under_test.AppendRowsFuture(manager) + future.set_exception(ValueError()) + assert future.done()