Skip to content

Commit

Permalink
feat: make futures compatible with as_completed()
Browse files Browse the repository at this point in the history
The futures implementation is adjusted to work well with the built-in
function with the same name in `concurrent.futures` package.
  • Loading branch information
plamut committed Apr 26, 2021
1 parent b7817f2 commit f468d97
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 180 deletions.
165 changes: 19 additions & 146 deletions google/cloud/pubsub_v1/futures.py
Expand Up @@ -14,14 +14,15 @@

from __future__ import absolute_import

import threading
import uuid
import concurrent.futures
import warnings

import google.api_core.future

from google.cloud.pubsub_v1.publisher import exceptions


class Future(google.api_core.future.Future):
class Future(concurrent.futures.Future, google.api_core.future.Future):
"""Encapsulation of the asynchronous execution of an action.
This object is returned from asychronous Pub/Sub calls, and is the
Expand All @@ -31,156 +32,28 @@ class Future(google.api_core.future.Future):
methods in this library.
Args:
completed (Optional[Any]): An event, with the same interface as
:class:`threading.Event`. This is provided so that callers
with different concurrency models (e.g. ``threading`` or
``multiprocessing``) can supply an event that is compatible
with that model. The ``wait()`` and ``set()`` methods will be
used. If this argument is not provided, then a new
:class:`threading.Event` will be created and used.
completed (Optional[Any]):
[deprecated] Not used anymore and will be removed in the future.
"""

# This could be a sentinel object or None, but the sentinel object's ID
# can change if the process is forked, and None has the possibility of
# actually being a result.
_SENTINEL = uuid.uuid4()

def __init__(self, completed=None):
self._result = self._SENTINEL
self._exception = self._SENTINEL
self._callbacks = []
if completed is None:
completed = threading.Event()
self._completed = completed

def cancel(self):
"""Actions in Pub/Sub generally may not be canceled.
This method always returns False.
"""
return False
if completed is not None:
msg = (
"Parameter `completed` is not used anymore and will be "
"removed in the future."
)
warnings.warn(msg, category=DeprecationWarning)

def cancelled(self):
"""Actions in Pub/Sub generally may not be canceled.
This method always returns False.
"""
return False
super().__init__()

def running(self):
"""Actions in Pub/Sub generally may not be canceled.
"""Return ``True`` if the associated Pub/Sub action has not yet completed.
Returns:
bool: ``True`` if this method has not yet completed, or
``False`` if it has completed.
Returns: bool:
"""
return not self.done()

def done(self):
"""Return True the future is done, False otherwise.
This still returns True in failure cases; checking :meth:`result` or
:meth:`exception` is the canonical way to assess success or failure.
"""
return self._exception != self._SENTINEL or self._result != self._SENTINEL

def result(self, timeout=None):
"""Resolve the future and return a value where appropriate.
Args:
timeout (Union[int, float]): The number of seconds before this call
times out and raises TimeoutError.
Raises:
concurrent.futures.TimeoutError: If the request times out.
Exception: For undefined exceptions in the underlying
call execution.
"""
# Attempt to get the exception if there is one.
# If there is not one, then we know everything worked, and we can
# return an appropriate value.
err = self.exception(timeout=timeout)
if err is None:
return self._result
raise err

def exception(self, timeout=None):
"""Return the exception raised by the call, if any.
Args:
timeout (Union[int, float]): The number of seconds before this call
times out and raises TimeoutError.
Raises:
concurrent.futures.TimeoutError: If the request times out.
Returns:
Exception: The exception raised by the call, if any.
"""
# Wait until the future is done.
if not self._completed.wait(timeout=timeout):
raise exceptions.TimeoutError("Timed out waiting for result.")

# If the batch completed successfully, this should return None.
if self._result != self._SENTINEL:
return None

# Okay, this batch had an error; this should return it.
return self._exception

def add_done_callback(self, callback):
"""Attach the provided callable to the future.
The provided function is called, with this future as its only argument,
when the future finishes running.
Args:
callback (Callable): The function to call.
Returns:
None
"""
if self.done():
return callback(self)
self._callbacks.append(callback)

def set_result(self, result):
"""Set the result of the future to the provided result.
Args:
result (Any): The result
"""
# Sanity check: A future can only complete once.
if self.done():
raise RuntimeError("set_result can only be called once.")

# Set the result and trigger the future.
self._result = result
self._trigger()

def set_exception(self, exception):
"""Set the result of the future to the given exception.
Args:
exception (:exc:`Exception`): The exception raised.
"""
# Sanity check: A future can only complete once.
if self.done():
raise RuntimeError("set_exception can only be called once.")

# Set the exception and trigger the future.
self._exception = exception
self._trigger()

def _trigger(self):
"""Trigger all callbacks registered to this Future.
This method is called internally by the batch once the batch
completes.
Args:
message_id (str): The message ID, as a string.
"""
self._completed.set()
for callback in self._callbacks:
callback(self)
def set_running_or_notify_cancel(self):
raise NotImplementedError(
"Only used by executors from `concurrent.futures` package."
)
22 changes: 15 additions & 7 deletions google/cloud/pubsub_v1/publisher/futures.py
Expand Up @@ -25,6 +25,20 @@ class Future(futures.Future):
ID, unless an error occurs.
"""

def cancel(self):
"""Actions in Pub/Sub generally may not be canceled.
This method always returns ``False``.
"""
return False

def cancelled(self):
"""Actions in Pub/Sub generally may not be canceled.
This method always returns ``False``.
"""
return False

def result(self, timeout=None):
"""Return the message ID or raise an exception.
Expand All @@ -43,10 +57,4 @@ def result(self, timeout=None):
Exception: For undefined exceptions in the underlying
call execution.
"""
# Attempt to get the exception if there is one.
# If there is not one, then we know everything worked, and we can
# return an appropriate value.
err = self.exception(timeout=timeout)
if err is None:
return self._result
raise err
return super().result(timeout=timeout)
2 changes: 2 additions & 0 deletions google/cloud/pubsub_v1/subscriber/futures.py
Expand Up @@ -47,6 +47,8 @@ def cancel(self):
"""Stops pulling messages and shutdowns the background thread consuming
messages.
"""
# NOTE: We circumvent the base future's self._state to track the cancellation
# state, as this state has different meaning with streaming pull futures.
self._cancelled = True
return self._manager.close()

Expand Down
8 changes: 8 additions & 0 deletions tests/unit/pubsub_v1/publisher/test_futures_publisher.py
Expand Up @@ -20,6 +20,14 @@


class TestFuture(object):
def test_cancel(self):
future = futures.Future()
assert future.cancel() is False

def test_cancelled(self):
future = futures.Future()
assert future.cancelled() is False

def test_result_on_success(self):
future = futures.Future()
future.set_result("570307942214048")
Expand Down
88 changes: 61 additions & 27 deletions tests/unit/pubsub_v1/test_futures.py
Expand Up @@ -12,10 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import concurrent.futures
import threading
import time

import mock
import pytest
import warnings

from google.cloud.pubsub_v1 import exceptions
from google.cloud.pubsub_v1 import futures
Expand All @@ -25,34 +28,23 @@ def _future(*args, **kwargs):
return futures.Future(*args, **kwargs)


def test_constructor_defaults():
with mock.patch.object(threading, "Event", autospec=True) as Event:
future = _future()
def test_constructor_default_no_warning():
with warnings.catch_warnings(record=True) as warned:
_future()
assert not warned

assert future._result == futures.Future._SENTINEL
assert future._exception == futures.Future._SENTINEL
assert future._callbacks == []
assert future._completed is Event.return_value

Event.assert_called_once_with()


def test_constructor_explicit_completed():
def test_constructor_custom_completed_arg():
completed = mock.sentinel.completed
future = _future(completed=completed)

assert future._result == futures.Future._SENTINEL
assert future._exception == futures.Future._SENTINEL
assert future._callbacks == []
assert future._completed is completed


def test_cancel():
assert _future().cancel() is False

with warnings.catch_warnings(record=True) as warned:
_future(completed=completed)

def test_cancelled():
assert _future().cancelled() is False
assert len(warned) == 1
assert issubclass(warned[0].category, DeprecationWarning)
warning_msg = str(warned[0].message)
assert "completed" in warning_msg
assert "not used" in warning_msg


def test_running():
Expand Down Expand Up @@ -112,8 +104,8 @@ def test_add_done_callback_pending_batch():
future = _future()
callback = mock.Mock()
future.add_done_callback(callback)
assert len(future._callbacks) == 1
assert callback in future._callbacks
assert len(future._done_callbacks) == 1
assert callback in future._done_callbacks
assert callback.call_count == 0


Expand All @@ -137,12 +129,54 @@ def test_trigger():
def test_set_result_once_only():
future = _future()
future.set_result("12345")
with pytest.raises(RuntimeError):
with pytest.raises(concurrent.futures.InvalidStateError):
future.set_result("67890")


def test_set_exception_once_only():
future = _future()
future.set_exception(ValueError("wah wah"))
with pytest.raises(RuntimeError):
with pytest.raises(concurrent.futures.InvalidStateError):
future.set_exception(TypeError("other wah wah"))


def test_as_completed_compatibility():
all_futures = {i: _future() for i in range(6)}
done_futures = []

def resolve_future(future_idx, delay=0):
time.sleep(delay)
future = all_futures[future_idx]
if future_idx % 2 == 0:
future.set_result(f"{future_idx}: I'm done!")
else:
future.set_exception(Exception(f"Future {future_idx} errored"))

all_futures[2].set_result("2: I'm done!")

# Start marking the futures as completed (either with success or error) at
# different times and check that ther "as completed" order is correct.
for future_idx, delay in ((0, 0.8), (3, 0.6), (1, 0.4), (5, 0.2)):
threading.Thread(
target=resolve_future, args=(future_idx, delay), daemon=True
).start()

try:
# Use a loop instead of a list comprehension to gather futures completed
# before the timeout error occurs.
for future in concurrent.futures.as_completed(all_futures.values(), timeout=1):
done_futures.append(future)
except concurrent.futures.TimeoutError:
pass
else: # pragma: NO COVER
pytest.fail("Not all Futures should have been recognized as completed.")

# NOTE: Future 4 was never resolved.
expected = [
all_futures[2],
all_futures[5],
all_futures[1],
all_futures[3],
all_futures[0],
]
assert done_futures == expected

0 comments on commit f468d97

Please sign in to comment.