From e29a2c0ac6c5d2ebf2311646e552a02f184cfedc Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Sat, 15 May 2021 16:32:38 +0200 Subject: [PATCH] feat: make publish futures compatible with concurrent.futures.as_completed() (#397) * feat: make futures compatible with as_completed() The futures implementation is adjusted to work well with the built-in function with the same name in `concurrent.futures` package. * Fix two unit tests in pre-Python 3.8 If setting a result/exception on a concurrent.futures.Future object, an exception is raised only in Python3.8+, thus we conditionally disable two unit tests. This behavior change is fine, though, because users should never use the set_result() and set_exception() methods directly. * Cover missing code line with a test * Use double underscore for internal cancelled flag * Prefix manager reference with double underscore * Remove Future's completed parameter altogether This parameter is unlikely to be used by any 3rd party code, but even if it is, it's better to cause a loud error rather than silently changing its effect to a no-op. --- google/cloud/pubsub_v1/futures.py | 160 ++---------------- .../pubsub_v1/publisher/_batch/thread.py | 2 +- google/cloud/pubsub_v1/publisher/futures.py | 22 ++- google/cloud/pubsub_v1/subscriber/futures.py | 14 +- .../publisher/test_futures_publisher.py | 8 + .../subscriber/test_futures_subscriber.py | 8 +- .../subscriber/test_subscriber_client.py | 12 +- tests/unit/pubsub_v1/test_futures.py | 102 +++++++---- 8 files changed, 127 insertions(+), 201 deletions(-) diff --git a/google/cloud/pubsub_v1/futures.py b/google/cloud/pubsub_v1/futures.py index ba861e40c..4dc72fdaa 100644 --- a/google/cloud/pubsub_v1/futures.py +++ b/google/cloud/pubsub_v1/futures.py @@ -14,14 +14,12 @@ from __future__ import absolute_import -import threading -import uuid +import concurrent.futures import google.api_core.future -from google.cloud.pubsub_v1.publisher import exceptions -class Future(google.api_core.future.Future): +class Future(concurrent.futures.Future, google.api_core.future.Future): """Encapsulation of the asynchronous execution of an action. This object is returned from asychronous Pub/Sub calls, and is the @@ -29,158 +27,32 @@ class Future(google.api_core.future.Future): This object should not be created directly, but is returned by other methods in this library. - - Args: - completed (Optional[Any]): An event, with the same interface as - :class:`threading.Event`. This is provided so that callers - with different concurrency models (e.g. ``threading`` or - ``multiprocessing``) can supply an event that is compatible - with that model. The ``wait()`` and ``set()`` methods will be - used. If this argument is not provided, then a new - :class:`threading.Event` will be created and used. """ - # This could be a sentinel object or None, but the sentinel object's ID - # can change if the process is forked, and None has the possibility of - # actually being a result. - _SENTINEL = uuid.uuid4() - - def __init__(self, completed=None): - self._result = self._SENTINEL - self._exception = self._SENTINEL - self._callbacks = [] - if completed is None: - completed = threading.Event() - self._completed = completed - - def cancel(self): - """Actions in Pub/Sub generally may not be canceled. - - This method always returns False. - """ - return False - - def cancelled(self): - """Actions in Pub/Sub generally may not be canceled. - - This method always returns False. - """ - return False - def running(self): - """Actions in Pub/Sub generally may not be canceled. + """Return ``True`` if the associated Pub/Sub action has not yet completed. - Returns: - bool: ``True`` if this method has not yet completed, or - ``False`` if it has completed. + Returns: bool: """ return not self.done() - def done(self): - """Return True the future is done, False otherwise. - - This still returns True in failure cases; checking :meth:`result` or - :meth:`exception` is the canonical way to assess success or failure. - """ - return self._exception != self._SENTINEL or self._result != self._SENTINEL - - def result(self, timeout=None): - """Resolve the future and return a value where appropriate. - - Args: - timeout (Union[int, float]): The number of seconds before this call - times out and raises TimeoutError. - - Raises: - concurrent.futures.TimeoutError: If the request times out. - Exception: For undefined exceptions in the underlying - call execution. - """ - # Attempt to get the exception if there is one. - # If there is not one, then we know everything worked, and we can - # return an appropriate value. - err = self.exception(timeout=timeout) - if err is None: - return self._result - raise err - - def exception(self, timeout=None): - """Return the exception raised by the call, if any. - - Args: - timeout (Union[int, float]): The number of seconds before this call - times out and raises TimeoutError. - - Raises: - concurrent.futures.TimeoutError: If the request times out. - - Returns: - Exception: The exception raised by the call, if any. - """ - # Wait until the future is done. - if not self._completed.wait(timeout=timeout): - raise exceptions.TimeoutError("Timed out waiting for result.") - - # If the batch completed successfully, this should return None. - if self._result != self._SENTINEL: - return None - - # Okay, this batch had an error; this should return it. - return self._exception - - def add_done_callback(self, callback): - """Attach the provided callable to the future. - - The provided function is called, with this future as its only argument, - when the future finishes running. - - Args: - callback (Callable): The function to call. - - Returns: - None - """ - if self.done(): - return callback(self) - self._callbacks.append(callback) + def set_running_or_notify_cancel(self): + raise NotImplementedError( + "Only used by executors from `concurrent.futures` package." + ) def set_result(self, result): - """Set the result of the future to the provided result. + """Set the return value of work associated with the future. - Args: - result (Any): The result + Do not use this method, it should only be used internally by the library and its + unit tests. """ - # Sanity check: A future can only complete once. - if self.done(): - raise RuntimeError("set_result can only be called once.") - - # Set the result and trigger the future. - self._result = result - self._trigger() + return super().set_result(result=result) def set_exception(self, exception): - """Set the result of the future to the given exception. - - Args: - exception (:exc:`Exception`): The exception raised. - """ - # Sanity check: A future can only complete once. - if self.done(): - raise RuntimeError("set_exception can only be called once.") - - # Set the exception and trigger the future. - self._exception = exception - self._trigger() - - def _trigger(self): - """Trigger all callbacks registered to this Future. - - This method is called internally by the batch once the batch - completes. + """Set the result of the future as being the given exception. - Args: - message_id (str): The message ID, as a string. + Do not use this method, it should only be used internally by the library and its + unit tests. """ - self._completed.set() - for callback in self._callbacks: - callback(self) + return super().set_exception(exception=exception) diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index 3f9a17f74..b3936c215 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -380,7 +380,7 @@ def publish(self, message): # Track the future on this batch (so that the result of the # future can be set). - future = futures.Future(completed=threading.Event()) + future = futures.Future() self._futures.append(future) # Try to commit, but it must be **without** the lock held, since diff --git a/google/cloud/pubsub_v1/publisher/futures.py b/google/cloud/pubsub_v1/publisher/futures.py index fa8a79998..04748e854 100644 --- a/google/cloud/pubsub_v1/publisher/futures.py +++ b/google/cloud/pubsub_v1/publisher/futures.py @@ -25,6 +25,20 @@ class Future(futures.Future): ID, unless an error occurs. """ + def cancel(self): + """Actions in Pub/Sub generally may not be canceled. + + This method always returns ``False``. + """ + return False + + def cancelled(self): + """Actions in Pub/Sub generally may not be canceled. + + This method always returns ``False``. + """ + return False + def result(self, timeout=None): """Return the message ID or raise an exception. @@ -43,10 +57,4 @@ def result(self, timeout=None): Exception: For undefined exceptions in the underlying call execution. """ - # Attempt to get the exception if there is one. - # If there is not one, then we know everything worked, and we can - # return an appropriate value. - err = self.exception(timeout=timeout) - if err is None: - return self._result - raise err + return super().result(timeout=timeout) diff --git a/google/cloud/pubsub_v1/subscriber/futures.py b/google/cloud/pubsub_v1/subscriber/futures.py index f9fdd76ab..97a911076 100644 --- a/google/cloud/pubsub_v1/subscriber/futures.py +++ b/google/cloud/pubsub_v1/subscriber/futures.py @@ -28,9 +28,9 @@ class StreamingPullFuture(futures.Future): def __init__(self, manager): super(StreamingPullFuture, self).__init__() - self._manager = manager - self._manager.add_close_callback(self._on_close_callback) - self._cancelled = False + self.__manager = manager + self.__manager.add_close_callback(self._on_close_callback) + self.__cancelled = False def _on_close_callback(self, manager, result): if self.done(): @@ -47,12 +47,14 @@ def cancel(self): """Stops pulling messages and shutdowns the background thread consuming messages. """ - self._cancelled = True - return self._manager.close() + # NOTE: We circumvent the base future's self._state to track the cancellation + # state, as this state has different meaning with streaming pull futures. + self.__cancelled = True + return self.__manager.close() def cancelled(self): """ returns: bool: ``True`` if the subscription has been cancelled. """ - return self._cancelled + return self.__cancelled diff --git a/tests/unit/pubsub_v1/publisher/test_futures_publisher.py b/tests/unit/pubsub_v1/publisher/test_futures_publisher.py index eb32d0518..45bc48542 100644 --- a/tests/unit/pubsub_v1/publisher/test_futures_publisher.py +++ b/tests/unit/pubsub_v1/publisher/test_futures_publisher.py @@ -20,6 +20,14 @@ class TestFuture(object): + def test_cancel(self): + future = futures.Future() + assert future.cancel() is False + + def test_cancelled(self): + future = futures.Future() + assert future.cancelled() is False + def test_result_on_success(self): future = futures.Future() future.set_result("570307942214048") diff --git a/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py b/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py index 909337cc8..5411674c0 100644 --- a/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py +++ b/tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py @@ -31,13 +31,12 @@ def make_future(self): def test_default_state(self): future = self.make_future() + manager = future._StreamingPullFuture__manager assert future.running() assert not future.done() assert not future.cancelled() - future._manager.add_close_callback.assert_called_once_with( - future._on_close_callback - ) + manager.add_close_callback.assert_called_once_with(future._on_close_callback) def test__on_close_callback_success(self): future = self.make_future() @@ -71,8 +70,9 @@ def test__on_close_callback_future_already_done(self): def test_cancel(self): future = self.make_future() + manager = future._StreamingPullFuture__manager future.cancel() - future._manager.close.assert_called_once() + manager.close.assert_called_once() assert future.cancelled() diff --git a/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py b/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py index dbeb7b343..7624c9212 100644 --- a/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py +++ b/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py @@ -137,7 +137,8 @@ def test_subscribe(manager_open, creds): future = client.subscribe("sub_name_a", callback=mock.sentinel.callback) assert isinstance(future, futures.StreamingPullFuture) - assert future._manager._subscription == "sub_name_a" + manager = future._StreamingPullFuture__manager + assert manager._subscription == "sub_name_a" manager_open.assert_called_once_with( mock.ANY, callback=mock.sentinel.callback, @@ -164,10 +165,11 @@ def test_subscribe_options(manager_open, creds): ) assert isinstance(future, futures.StreamingPullFuture) - assert future._manager._subscription == "sub_name_a" - assert future._manager.flow_control == flow_control - assert future._manager._scheduler == scheduler - assert future._manager._await_callbacks_on_shutdown is mock.sentinel.await_callbacks + manager = future._StreamingPullFuture__manager + assert manager._subscription == "sub_name_a" + assert manager.flow_control == flow_control + assert manager._scheduler == scheduler + assert manager._await_callbacks_on_shutdown is mock.sentinel.await_callbacks manager_open.assert_called_once_with( mock.ANY, callback=mock.sentinel.callback, diff --git a/tests/unit/pubsub_v1/test_futures.py b/tests/unit/pubsub_v1/test_futures.py index 11349d5d4..2b26289c4 100644 --- a/tests/unit/pubsub_v1/test_futures.py +++ b/tests/unit/pubsub_v1/test_futures.py @@ -12,7 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import concurrent.futures +import sys import threading +import time import mock import pytest @@ -25,36 +28,6 @@ def _future(*args, **kwargs): return futures.Future(*args, **kwargs) -def test_constructor_defaults(): - with mock.patch.object(threading, "Event", autospec=True) as Event: - future = _future() - - assert future._result == futures.Future._SENTINEL - assert future._exception == futures.Future._SENTINEL - assert future._callbacks == [] - assert future._completed is Event.return_value - - Event.assert_called_once_with() - - -def test_constructor_explicit_completed(): - completed = mock.sentinel.completed - future = _future(completed=completed) - - assert future._result == futures.Future._SENTINEL - assert future._exception == futures.Future._SENTINEL - assert future._callbacks == [] - assert future._completed is completed - - -def test_cancel(): - assert _future().cancel() is False - - -def test_cancelled(): - assert _future().cancelled() is False - - def test_running(): future = _future() assert future.running() is True @@ -112,8 +85,8 @@ def test_add_done_callback_pending_batch(): future = _future() callback = mock.Mock() future.add_done_callback(callback) - assert len(future._callbacks) == 1 - assert callback in future._callbacks + assert len(future._done_callbacks) == 1 + assert callback in future._done_callbacks assert callback.call_count == 0 @@ -134,15 +107,76 @@ def test_trigger(): callback.assert_called_once_with(future) +def test_set_running_or_notify_cancel_not_implemented_error(): + future = _future() + with pytest.raises(NotImplementedError) as exc_info: + future.set_running_or_notify_cancel() + + assert exc_info.value.args + error_msg = exc_info.value.args[0] + assert "used by executors" in error_msg + assert "concurrent.futures" in error_msg + + +@pytest.mark.skipif( + sys.version_info < (3, 8), + reason="InvalidStateError is only available in Python 3.8+", +) def test_set_result_once_only(): future = _future() future.set_result("12345") - with pytest.raises(RuntimeError): + with pytest.raises(concurrent.futures.InvalidStateError): future.set_result("67890") +@pytest.mark.skipif( + sys.version_info < (3, 8), + reason="InvalidStateError is only available in Python 3.8+", +) def test_set_exception_once_only(): future = _future() future.set_exception(ValueError("wah wah")) - with pytest.raises(RuntimeError): + with pytest.raises(concurrent.futures.InvalidStateError): future.set_exception(TypeError("other wah wah")) + + +def test_as_completed_compatibility(): + all_futures = {i: _future() for i in range(6)} + done_futures = [] + + def resolve_future(future_idx, delay=0): + time.sleep(delay) + future = all_futures[future_idx] + if future_idx % 2 == 0: + future.set_result(f"{future_idx}: I'm done!") + else: + future.set_exception(Exception(f"Future {future_idx} errored")) + + all_futures[2].set_result("2: I'm done!") + + # Start marking the futures as completed (either with success or error) at + # different times and check that ther "as completed" order is correct. + for future_idx, delay in ((0, 0.8), (3, 0.6), (1, 0.4), (5, 0.2)): + threading.Thread( + target=resolve_future, args=(future_idx, delay), daemon=True + ).start() + + try: + # Use a loop instead of a list comprehension to gather futures completed + # before the timeout error occurs. + for future in concurrent.futures.as_completed(all_futures.values(), timeout=1): + done_futures.append(future) + except concurrent.futures.TimeoutError: + pass + else: # pragma: NO COVER + pytest.fail("Not all Futures should have been recognized as completed.") + + # NOTE: Future 4 was never resolved. + expected = [ + all_futures[2], + all_futures[5], + all_futures[1], + all_futures[3], + all_futures[0], + ] + assert done_futures == expected