From aa9aca83f7a02fc92a87ec49c4d050e6e3137d15 Mon Sep 17 00:00:00 2001 From: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com> Date: Mon, 14 Sep 2020 09:41:17 -0400 Subject: [PATCH] feat: Implement committer (#13) * feat: Implement committer Also small fix to retrying connection so it doesn't leak reads/writes from previous connections. * fix: Patch retrying connection and add comments. * Update committer.py * Update committer_impl.py --- .../pubsublite/internal/wire/committer.py | 13 + .../internal/wire/committer_impl.py | 138 +++++++++ .../internal/wire/connection_reinitializer.py | 3 +- .../pubsublite/internal/wire/publisher.py | 13 +- .../internal/wire/retrying_connection.py | 7 + setup.py | 60 ++-- .../internal/wire/committer_impl_test.py | 266 ++++++++++++++++++ 7 files changed, 462 insertions(+), 38 deletions(-) create mode 100644 google/cloud/pubsublite/internal/wire/committer.py create mode 100644 google/cloud/pubsublite/internal/wire/committer_impl.py create mode 100644 tests/unit/pubsublite/internal/wire/committer_impl_test.py diff --git a/google/cloud/pubsublite/internal/wire/committer.py b/google/cloud/pubsublite/internal/wire/committer.py new file mode 100644 index 00000000..25fff3f9 --- /dev/null +++ b/google/cloud/pubsublite/internal/wire/committer.py @@ -0,0 +1,13 @@ +from abc import abstractmethod +from typing import AsyncContextManager + +from google.cloud.pubsublite_v1 import Cursor + + +class Committer(AsyncContextManager): + """ + A Committer is able to commit subscribers' completed offsets. + """ + @abstractmethod + async def commit(self, cursor: Cursor) -> None: + pass diff --git a/google/cloud/pubsublite/internal/wire/committer_impl.py b/google/cloud/pubsublite/internal/wire/committer_impl.py new file mode 100644 index 00000000..36a84a1c --- /dev/null +++ b/google/cloud/pubsublite/internal/wire/committer_impl.py @@ -0,0 +1,138 @@ +import asyncio +from typing import Optional, List, Iterable + +from absl import logging + +from google.cloud.pubsublite.internal.wire.committer import Committer +from google.cloud.pubsublite.internal.wire.retrying_connection import RetryingConnection, ConnectionFactory +from google.api_core.exceptions import FailedPrecondition, GoogleAPICallError +from google.cloud.pubsublite.internal.wire.connection_reinitializer import ConnectionReinitializer +from google.cloud.pubsublite.internal.wire.connection import Connection +from google.cloud.pubsublite.internal.wire.serial_batcher import SerialBatcher, BatchTester +from google.cloud.pubsublite_v1 import Cursor +from google.cloud.pubsublite_v1.types import StreamingCommitCursorRequest, StreamingCommitCursorResponse, InitialCommitCursorRequest +from google.cloud.pubsublite.internal.wire.work_item import WorkItem + + +class CommitterImpl(Committer, ConnectionReinitializer[StreamingCommitCursorRequest, StreamingCommitCursorResponse], BatchTester[Cursor]): + _initial: InitialCommitCursorRequest + _flush_seconds: float + _connection: RetryingConnection[StreamingCommitCursorRequest, StreamingCommitCursorResponse] + + _batcher: SerialBatcher[Cursor, None] + + _outstanding_commits: List[List[WorkItem[Cursor, None]]] + + _receiver: Optional[asyncio.Future] + _flusher: Optional[asyncio.Future] + + def __init__(self, initial: InitialCommitCursorRequest, flush_seconds: float, + factory: ConnectionFactory[StreamingCommitCursorRequest, StreamingCommitCursorResponse]): + self._initial = initial + self._flush_seconds = flush_seconds + self._connection = RetryingConnection(factory, self) + self._batcher = SerialBatcher(self) + self._outstanding_commits = [] + self._receiver = None + self._flusher = None + + async def __aenter__(self): + await self._connection.__aenter__() + + def _start_loopers(self): + assert self._receiver is None + assert self._flusher is None + self._receiver = asyncio.ensure_future(self._receive_loop()) + self._flusher = asyncio.ensure_future(self._flush_loop()) + + async def _stop_loopers(self): + if self._receiver: + self._receiver.cancel() + await self._receiver + self._receiver = None + if self._flusher: + self._flusher.cancel() + await self._flusher + self._flusher = None + + def _handle_response(self, response: StreamingCommitCursorResponse): + if "commit" not in response: + self._connection.fail(FailedPrecondition("Received an invalid subsequent response on the commit stream.")) + if response.commit.acknowledged_commits > len(self._outstanding_commits): + self._connection.fail( + FailedPrecondition("Received a commit response on the stream with no outstanding commits.")) + for _ in range(response.commit.acknowledged_commits): + batch = self._outstanding_commits.pop(0) + for item in batch: + item.response_future.set_result(None) + + async def _receive_loop(self): + try: + while True: + response = await self._connection.read() + self._handle_response(response) + except asyncio.CancelledError: + return + + async def _flush_loop(self): + try: + while True: + await asyncio.sleep(self._flush_seconds) + await self._flush() + except asyncio.CancelledError: + return + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if self._connection.error(): + self._fail_if_retrying_failed() + else: + await self._flush() + await self._connection.__aexit__(exc_type, exc_val, exc_tb) + + def _fail_if_retrying_failed(self): + if self._connection.error(): + for batch in self._outstanding_commits: + for item in batch: + item.response_future.set_exception(self._connection.error()) + + async def _flush(self): + batch = self._batcher.flush() + if not batch: + return + self._outstanding_commits.append(batch) + req = StreamingCommitCursorRequest() + req.commit.cursor = batch[-1].request + try: + await self._connection.write(req) + except GoogleAPICallError as e: + logging.debug(f"Failed commit on stream: {e}") + self._fail_if_retrying_failed() + + async def commit(self, cursor: Cursor) -> None: + future = self._batcher.add(cursor) + if self._batcher.should_flush(): + # always returns false currently, here in case this changes in the future. + await self._flush() + await future + + async def reinitialize(self, connection: Connection[StreamingCommitCursorRequest, StreamingCommitCursorResponse]): + await self._stop_loopers() + await connection.write(StreamingCommitCursorRequest(initial=self._initial)) + response = await connection.read() + if "initial" not in response: + self._connection.fail(FailedPrecondition("Received an invalid initial response on the publish stream.")) + if self._outstanding_commits: + # Roll up outstanding commits + rollup: List[WorkItem[Cursor, None]] = [] + for batch in self._outstanding_commits: + for item in batch: + rollup.append(item) + self._outstanding_commits = [rollup] + req = StreamingCommitCursorRequest() + req.commit.cursor = rollup[-1].request + await connection.write(req) + self._start_loopers() + + def test(self, requests: Iterable[Cursor]) -> bool: + # There is no bound on the number of outstanding cursors. + return False diff --git a/google/cloud/pubsublite/internal/wire/connection_reinitializer.py b/google/cloud/pubsublite/internal/wire/connection_reinitializer.py index 9b415cdc..e29c06fb 100644 --- a/google/cloud/pubsublite/internal/wire/connection_reinitializer.py +++ b/google/cloud/pubsublite/internal/wire/connection_reinitializer.py @@ -7,7 +7,8 @@ class ConnectionReinitializer(Generic[Request, Response], metaclass=ABCMeta): """A class capable of reinitializing a connection after a new one has been created.""" @abstractmethod def reinitialize(self, connection: Connection[Request, Response]): - """Reinitialize a connection. + """Reinitialize a connection. Must ensure no calls to the associated RetryingConnection + occur until this completes. Args: connection: The connection to reinitialize diff --git a/google/cloud/pubsublite/internal/wire/publisher.py b/google/cloud/pubsublite/internal/wire/publisher.py index 3b987f85..70f5262b 100644 --- a/google/cloud/pubsublite/internal/wire/publisher.py +++ b/google/cloud/pubsublite/internal/wire/publisher.py @@ -1,20 +1,13 @@ -from abc import ABC, abstractmethod +from abc import abstractmethod +from typing import AsyncContextManager from google.cloud.pubsublite_v1.types import PubSubMessage from google.cloud.pubsublite.publish_metadata import PublishMetadata -class Publisher(ABC): +class Publisher(AsyncContextManager): """ A Pub/Sub Lite asynchronous wire protocol publisher. """ - @abstractmethod - async def __aenter__(self): - raise NotImplementedError() - - @abstractmethod - async def __aexit__(self, exc_type, exc_val, exc_tb): - raise NotImplementedError() - @abstractmethod async def publish(self, message: PubSubMessage) -> PublishMetadata: """ diff --git a/google/cloud/pubsublite/internal/wire/retrying_connection.py b/google/cloud/pubsublite/internal/wire/retrying_connection.py index a787deda..e97ebd9b 100644 --- a/google/cloud/pubsublite/internal/wire/retrying_connection.py +++ b/google/cloud/pubsublite/internal/wire/retrying_connection.py @@ -48,15 +48,22 @@ async def _run_loop(self): """ Processes actions on this connection and handles retries until cancelled. """ + last_failure: GoogleAPICallError try: bad_retries = 0 while True: try: async with self._connection_factory.new() as connection: + # Needs to happen prior to reinitialization to clear outstanding waiters. + while not self._write_queue.empty(): + self._write_queue.get_nowait().response_future.set_exception(last_failure) + self._read_queue = asyncio.Queue(maxsize=1) + self._write_queue = asyncio.Queue(maxsize=1) await self._reinitializer.reinitialize(connection) bad_retries = 0 await self._loop_connection(connection) except GoogleAPICallError as e: + last_failure = e if not is_retryable(e): self.fail(e) return diff --git a/setup.py b/setup.py index 0adede5a..51667806 100644 --- a/setup.py +++ b/setup.py @@ -25,40 +25,46 @@ readme_filename = os.path.join(package_root, "README.rst") with io.open(readme_filename, encoding="utf-8") as readme_file: - readme = readme_file.read() + readme = readme_file.read() dependencies = [ "google-api-core >= 1.22.0", "absl-py >= 0.9.0", "proto-plus >= 0.4.0", + "grpcio", "setuptools" ] +test_dependencies = [ + "asynctest", + "pytest", + "pytest-asyncio" +] + setuptools.setup( - name="google-cloud-pubsublite", - version=version, - long_description=readme, - author="Google LLC", - author_email="googleapis-packages@google.com", - license="Apache 2.0", - url="https://github.com/googleapis/python-pubsublite", - packages=setuptools.PEP420PackageFinder.find(), - namespace_packages=("google", "google.cloud"), - platforms="Posix; MacOS X; Windows", - include_package_data=True, - install_requires=dependencies, - setup_requires=('pytest-runner',), - tests_require=['asynctest', 'pytest', 'pytest-asyncio'], - python_requires=">=3.6", - classifiers=[ - "Development Status :: 4 - Beta", - "Intended Audience :: Developers", - "Operating System :: OS Independent", - "Programming Language :: Python :: 3.6", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", - "Topic :: Internet", - "Topic :: Software Development :: Libraries :: Python Modules", - ], - zip_safe=False, + name="google-cloud-pubsublite", + version=version, + long_description=readme, + author="Google LLC", + author_email="googleapis-packages@google.com", + license="Apache 2.0", + url="https://github.com/googleapis/python-pubsublite", + packages=setuptools.PEP420PackageFinder.find(), + namespace_packages=("google", "google.cloud"), + platforms="Posix; MacOS X; Windows", + include_package_data=True, + install_requires=dependencies, + extras_require={"tests": test_dependencies}, + python_requires=">=3.6", + classifiers=[ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Topic :: Internet", + "Topic :: Software Development :: Libraries :: Python Modules", + ], + zip_safe=False, ) diff --git a/tests/unit/pubsublite/internal/wire/committer_impl_test.py b/tests/unit/pubsublite/internal/wire/committer_impl_test.py new file mode 100644 index 00000000..a9bebc76 --- /dev/null +++ b/tests/unit/pubsublite/internal/wire/committer_impl_test.py @@ -0,0 +1,266 @@ +import asyncio +from unittest.mock import call +from collections import defaultdict +from typing import Dict, List + +from asynctest.mock import MagicMock, CoroutineMock +import pytest + +from google.cloud.pubsublite.internal.wire.committer import Committer +from google.cloud.pubsublite.internal.wire.committer_impl import CommitterImpl +from google.cloud.pubsublite.internal.wire.connection import Connection, ConnectionFactory +from google.api_core.exceptions import InternalServerError +from google.cloud.pubsublite_v1.types.cursor import StreamingCommitCursorRequest, StreamingCommitCursorResponse, \ + InitialCommitCursorRequest +from google.cloud.pubsublite_v1.types.common import Cursor +from google.cloud.pubsublite.testing.test_utils import make_queue_waiter +from google.cloud.pubsublite.internal.wire.retrying_connection import _MIN_BACKOFF_SECS + +FLUSH_SECONDS = 100000 + +# All test coroutines will be treated as marked. +pytestmark = pytest.mark.asyncio + + +@pytest.fixture() +def default_connection(): + conn = MagicMock(spec=Connection[StreamingCommitCursorRequest, StreamingCommitCursorResponse]) + conn.__aenter__.return_value = conn + return conn + + +@pytest.fixture() +def connection_factory(default_connection): + factory = MagicMock(spec=ConnectionFactory[StreamingCommitCursorRequest, StreamingCommitCursorResponse]) + factory.new.return_value = default_connection + return factory + + +@pytest.fixture() +def initial_request(): + return StreamingCommitCursorRequest(initial=InitialCommitCursorRequest(subscription="mysub")) + + +class QueuePair: + called: asyncio.Queue + results: asyncio.Queue + + def __init__(self): + self.called = asyncio.Queue() + self.results = asyncio.Queue() + + +@pytest.fixture +def sleep_queues() -> Dict[float, QueuePair]: + return defaultdict(QueuePair) + + +@pytest.fixture +def asyncio_sleep(monkeypatch, sleep_queues): + """Requests.get() mocked to return {'mock_key':'mock_response'}.""" + mock = CoroutineMock() + monkeypatch.setattr(asyncio, "sleep", mock) + + async def sleeper(delay: float): + await make_queue_waiter(sleep_queues[delay].called, sleep_queues[delay].results)(delay) + + mock.side_effect = sleeper + return mock + + +@pytest.fixture() +def committer(connection_factory, initial_request): + return CommitterImpl(initial_request.initial, FLUSH_SECONDS, connection_factory) + + +def as_request(cursor: Cursor): + req = StreamingCommitCursorRequest() + req.commit.cursor = cursor + return req + + +def as_response(count: int): + res = StreamingCommitCursorResponse() + res.commit.acknowledged_commits = count + return res + + +async def test_basic_commit_after_timeout( + committer: Committer, default_connection, initial_request, asyncio_sleep, sleep_queues): + sleep_called = sleep_queues[FLUSH_SECONDS].called + sleep_results = sleep_queues[FLUSH_SECONDS].results + cursor1 = Cursor(offset=321) + cursor2 = Cursor(offset=1) + write_called_queue = asyncio.Queue() + write_result_queue = asyncio.Queue() + default_connection.write.side_effect = make_queue_waiter(write_called_queue, write_result_queue) + read_called_queue = asyncio.Queue() + read_result_queue = asyncio.Queue() + default_connection.read.side_effect = make_queue_waiter(read_called_queue, read_result_queue) + read_result_queue.put_nowait(StreamingCommitCursorResponse(initial={})) + write_result_queue.put_nowait(None) + async with committer: + # Set up connection + await write_called_queue.get() + await read_called_queue.get() + default_connection.write.assert_has_calls([call(initial_request)]) + + # Commit cursors + commit_fut1 = asyncio.ensure_future(committer.commit(cursor1)) + commit_fut2 = asyncio.ensure_future(committer.commit(cursor2)) + assert not commit_fut1.done() + assert not commit_fut2.done() + + # Wait for writes to be waiting + await sleep_called.get() + asyncio_sleep.assert_called_with(FLUSH_SECONDS) + + # Handle the connection write + await sleep_results.put(None) + await write_called_queue.get() + await write_result_queue.put(None) + # Called with second cursor + default_connection.write.assert_has_calls([call(initial_request), call(as_request(cursor2))]) + assert not commit_fut1.done() + assert not commit_fut2.done() + + # Send the connection response with 1 ack since only one request was sent. + await read_result_queue.put(as_response(count=1)) + await commit_fut1 + await commit_fut2 + + +async def test_commits_multi_cycle(committer: Committer, default_connection, initial_request, asyncio_sleep, + sleep_queues): + sleep_called = sleep_queues[FLUSH_SECONDS].called + sleep_results = sleep_queues[FLUSH_SECONDS].results + cursor1 = Cursor(offset=321) + cursor2 = Cursor(offset=1) + write_called_queue = asyncio.Queue() + write_result_queue = asyncio.Queue() + default_connection.write.side_effect = make_queue_waiter(write_called_queue, write_result_queue) + read_called_queue = asyncio.Queue() + read_result_queue = asyncio.Queue() + default_connection.read.side_effect = make_queue_waiter(read_called_queue, read_result_queue) + read_result_queue.put_nowait(StreamingCommitCursorResponse(initial={})) + write_result_queue.put_nowait(None) + async with committer: + # Set up connection + await write_called_queue.get() + await read_called_queue.get() + default_connection.write.assert_has_calls([call(initial_request)]) + + # Write message 1 + commit_fut1 = asyncio.ensure_future(committer.commit(cursor1)) + assert not commit_fut1.done() + + # Wait for writes to be waiting + await sleep_called.get() + asyncio_sleep.assert_called_with(FLUSH_SECONDS) + + # Handle the connection write + await sleep_results.put(None) + await write_called_queue.get() + await write_result_queue.put(None) + default_connection.write.assert_has_calls([call(initial_request), call(as_request(cursor1))]) + assert not commit_fut1.done() + + # Wait for writes to be waiting + await sleep_called.get() + asyncio_sleep.assert_has_calls([call(FLUSH_SECONDS), call(FLUSH_SECONDS)]) + + # Write message 2 + commit_fut2 = asyncio.ensure_future(committer.commit(cursor2)) + assert not commit_fut2.done() + + # Handle the connection write + await sleep_results.put(None) + await write_called_queue.get() + await write_result_queue.put(None) + default_connection.write.assert_has_calls( + [call(initial_request), call(as_request(cursor1)), call(as_request(cursor2))]) + assert not commit_fut1.done() + assert not commit_fut2.done() + + # Send the connection responses + await read_result_queue.put(as_response(count=2)) + await commit_fut1 + await commit_fut2 + + +async def test_publishes_retried_on_restart(committer: Committer, default_connection, initial_request, asyncio_sleep, + sleep_queues): + sleep_called = sleep_queues[FLUSH_SECONDS].called + sleep_results = sleep_queues[FLUSH_SECONDS].results + cursor1 = Cursor(offset=321) + cursor2 = Cursor(offset=1) + write_called_queue = asyncio.Queue() + write_result_queue = asyncio.Queue() + default_connection.write.side_effect = make_queue_waiter(write_called_queue, write_result_queue) + read_called_queue = asyncio.Queue() + read_result_queue = asyncio.Queue() + default_connection.read.side_effect = make_queue_waiter(read_called_queue, read_result_queue) + read_result_queue.put_nowait(StreamingCommitCursorResponse(initial={})) + write_result_queue.put_nowait(None) + async with committer: + # Set up connection + await write_called_queue.get() + await read_called_queue.get() + default_connection.write.assert_has_calls([call(initial_request)]) + + # Write message 1 + commit_fut1 = asyncio.ensure_future(committer.commit(cursor1)) + assert not commit_fut1.done() + + # Wait for writes to be waiting + await sleep_called.get() + asyncio_sleep.assert_called_with(FLUSH_SECONDS) + + # Handle the connection write + await sleep_results.put(None) + await write_called_queue.get() + await write_result_queue.put(None) + default_connection.write.assert_has_calls([call(initial_request), call(as_request(cursor1))]) + assert not commit_fut1.done() + + # Wait for writes to be waiting + await sleep_called.get() + asyncio_sleep.assert_has_calls([call(FLUSH_SECONDS), call(FLUSH_SECONDS)]) + + # Write message 2 + commit_fut2 = asyncio.ensure_future(committer.commit(cursor2)) + assert not commit_fut2.done() + + # Handle the connection write + await sleep_results.put(None) + await write_called_queue.get() + await write_result_queue.put(None) + default_connection.write.assert_has_calls( + [call(initial_request), call(as_request(cursor1)), call(as_request(cursor2))]) + assert not commit_fut1.done() + assert not commit_fut2.done() + + # Fail the connection with a retryable error + await read_called_queue.get() + await read_result_queue.put(InternalServerError("retryable")) + await sleep_queues[_MIN_BACKOFF_SECS].called.get() + await sleep_queues[_MIN_BACKOFF_SECS].results.put(None) + # Reinitialization + await write_called_queue.get() + await write_result_queue.put(None) + await read_called_queue.get() + await read_result_queue.put(StreamingCommitCursorResponse(initial={})) + # Re-sending messages on the new stream + await write_called_queue.get() + await write_result_queue.put(None) + asyncio_sleep.assert_has_calls( + [call(FLUSH_SECONDS), call(FLUSH_SECONDS), call(FLUSH_SECONDS), call(_MIN_BACKOFF_SECS)]) + default_connection.write.assert_has_calls([ + # Aggregates response calls on second pass + call(initial_request), call(as_request(cursor2))]) + + # Sending the response for the one commit finishes both + await read_called_queue.get() + await read_result_queue.put(as_response(count=1)) + await commit_fut1 + await commit_fut2