From 11c9a690abbc648a57b801458f6193d02d5262d2 Mon Sep 17 00:00:00 2001 From: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com> Date: Mon, 10 Aug 2020 10:45:01 -0400 Subject: [PATCH] feat: Implement python retrying connection, which generically retries stream errors (#4) * feat: Implement python retrying connection, which generically retries stream errors. * fix: Add asynctest to tests_require. * fix: Add class comments. --- .gitignore | 3 + google/__init__.py | 3 + google/cloud/__init__.py | 3 + google/cloud/pubsublite/internal/__init__.py | 0 .../pubsublite/internal/wire/__init__.py | 0 .../pubsublite/internal/wire/connection.py | 38 ++++++ .../internal/wire/connection_reinitializer.py | 20 +++ .../internal/wire/gapic_connection.py | 54 ++++++++ .../internal/wire/permanent_failable.py | 31 +++++ .../internal/wire/retrying_connection.py | 88 +++++++++++++ .../pubsublite/internal/wire/work_item.py | 14 +++ google/cloud/pubsublite/status_codes.py | 10 ++ google/cloud/pubsublite/testing/__init__.py | 0 google/cloud/pubsublite/testing/test_utils.py | 8 ++ setup.py | 12 +- tests/unit/pubsublite/__init__.py | 0 tests/unit/pubsublite/internal/__init__.py | 0 .../unit/pubsublite/internal/wire/__init__.py | 0 .../internal/wire/gapic_connection_test.py | 40 ++++++ .../internal/wire/retrying_connection_test.py | 119 ++++++++++++++++++ 20 files changed, 439 insertions(+), 4 deletions(-) create mode 100644 google/__init__.py create mode 100644 google/cloud/__init__.py create mode 100644 google/cloud/pubsublite/internal/__init__.py create mode 100644 google/cloud/pubsublite/internal/wire/__init__.py create mode 100644 google/cloud/pubsublite/internal/wire/connection.py create mode 100644 google/cloud/pubsublite/internal/wire/connection_reinitializer.py create mode 100644 google/cloud/pubsublite/internal/wire/gapic_connection.py create mode 100644 google/cloud/pubsublite/internal/wire/permanent_failable.py create mode 100644 google/cloud/pubsublite/internal/wire/retrying_connection.py create mode 100644 google/cloud/pubsublite/internal/wire/work_item.py create mode 100644 google/cloud/pubsublite/status_codes.py create mode 100644 google/cloud/pubsublite/testing/__init__.py create mode 100644 google/cloud/pubsublite/testing/test_utils.py create mode 100644 tests/unit/pubsublite/__init__.py create mode 100644 tests/unit/pubsublite/internal/__init__.py create mode 100644 tests/unit/pubsublite/internal/wire/__init__.py create mode 100644 tests/unit/pubsublite/internal/wire/gapic_connection_test.py create mode 100644 tests/unit/pubsublite/internal/wire/retrying_connection_test.py diff --git a/.gitignore b/.gitignore index b87e1ed5..ea1d8c4a 100644 --- a/.gitignore +++ b/.gitignore @@ -52,6 +52,9 @@ env/ coverage.xml sponge_log.xml +# Pycharm virtual environment +venv/ + # System test environment variables. system_tests/local_test_setup diff --git a/google/__init__.py b/google/__init__.py new file mode 100644 index 00000000..2e435f54 --- /dev/null +++ b/google/__init__.py @@ -0,0 +1,3 @@ +import pkg_resources + +pkg_resources.declare_namespace(__name__) diff --git a/google/cloud/__init__.py b/google/cloud/__init__.py new file mode 100644 index 00000000..2e435f54 --- /dev/null +++ b/google/cloud/__init__.py @@ -0,0 +1,3 @@ +import pkg_resources + +pkg_resources.declare_namespace(__name__) diff --git a/google/cloud/pubsublite/internal/__init__.py b/google/cloud/pubsublite/internal/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google/cloud/pubsublite/internal/wire/__init__.py b/google/cloud/pubsublite/internal/wire/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google/cloud/pubsublite/internal/wire/connection.py b/google/cloud/pubsublite/internal/wire/connection.py new file mode 100644 index 00000000..aecc9fac --- /dev/null +++ b/google/cloud/pubsublite/internal/wire/connection.py @@ -0,0 +1,38 @@ +from typing import Generic, TypeVar, Coroutine, Any, AsyncContextManager +from abc import ABCMeta, abstractmethod +from google.api_core.exceptions import GoogleAPICallError + +Request = TypeVar('Request') +Response = TypeVar('Response') + + +class Connection(Generic[Request, Response], AsyncContextManager): + """ + A connection to an underlying stream. Only one call to 'read' may be outstanding at a time. + """ + + @abstractmethod + async def write(self, request: Request) -> None: + """ + Write a message to the stream. + + Raises: + GoogleAPICallError: When the connection terminates in failure. + """ + raise NotImplementedError() + + @abstractmethod + async def read(self) -> Response: + """ + Read a message off of the stream. + + Raises: + GoogleAPICallError: When the connection terminates in failure. + """ + raise NotImplementedError() + + +class ConnectionFactory(Generic[Request, Response]): + """A factory for producing Connections.""" + def new(self) -> Connection[Request, Response]: + raise NotImplementedError() diff --git a/google/cloud/pubsublite/internal/wire/connection_reinitializer.py b/google/cloud/pubsublite/internal/wire/connection_reinitializer.py new file mode 100644 index 00000000..9b415cdc --- /dev/null +++ b/google/cloud/pubsublite/internal/wire/connection_reinitializer.py @@ -0,0 +1,20 @@ +from typing import Generic +from abc import ABCMeta, abstractmethod +from google.cloud.pubsublite.internal.wire.connection import Connection, Request, Response + + +class ConnectionReinitializer(Generic[Request, Response], metaclass=ABCMeta): + """A class capable of reinitializing a connection after a new one has been created.""" + @abstractmethod + def reinitialize(self, connection: Connection[Request, Response]): + """Reinitialize a connection. + + Args: + connection: The connection to reinitialize + + Raises: + GoogleAPICallError: If it fails to reinitialize. + """ + raise NotImplementedError() + + diff --git a/google/cloud/pubsublite/internal/wire/gapic_connection.py b/google/cloud/pubsublite/internal/wire/gapic_connection.py new file mode 100644 index 00000000..c6938622 --- /dev/null +++ b/google/cloud/pubsublite/internal/wire/gapic_connection.py @@ -0,0 +1,54 @@ +from typing import AsyncIterator, TypeVar, Optional, Callable, AsyncIterable +import asyncio + +from google.cloud.pubsublite.internal.wire.connection import Connection, Request, Response, ConnectionFactory +from google.cloud.pubsublite.internal.wire.work_item import WorkItem +from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable + +T = TypeVar('T') + + +class GapicConnection(Connection[Request, Response], AsyncIterator[Request], PermanentFailable): + """A Connection wrapping a gapic AsyncIterator[Request/Response] pair.""" + _write_queue: 'asyncio.Queue[WorkItem[Request]]' + _response_it: Optional[AsyncIterator[Response]] + + def __init__(self): + super().__init__() + self._write_queue = asyncio.Queue(maxsize=1) + + def set_response_it(self, response_it: AsyncIterator[Response]): + self._response_it = response_it + + async def write(self, request: Request) -> None: + item = WorkItem(request) + await self.await_or_fail(self._write_queue.put(item)) + await self.await_or_fail(item.response_future) + + async def read(self) -> Response: + return await self.await_or_fail(self._response_it.__anext__()) + + def __aenter__(self): + return self + + def __aexit__(self, exc_type, exc_value, traceback) -> None: + pass + + async def __anext__(self) -> Request: + item: WorkItem[Request] = await self.await_or_fail(self._write_queue.get()) + item.response_future.set_result(None) + return item.request + + def __aiter__(self) -> AsyncIterator[Response]: + return self + + +class GapicConnectionFactory(ConnectionFactory[Request, Response]): + """A ConnectionFactory that produces GapicConnections.""" + _producer = Callable[[AsyncIterator[Request]], AsyncIterable[Response]] + + def New(self) -> Connection[Request, Response]: + conn = GapicConnection[Request, Response]() + response_iterable = self._producer(conn) + conn.set_response_it(response_iterable.__aiter__()) + return conn diff --git a/google/cloud/pubsublite/internal/wire/permanent_failable.py b/google/cloud/pubsublite/internal/wire/permanent_failable.py new file mode 100644 index 00000000..1151de78 --- /dev/null +++ b/google/cloud/pubsublite/internal/wire/permanent_failable.py @@ -0,0 +1,31 @@ +import asyncio +from typing import Awaitable, TypeVar + +from google.api_core.exceptions import GoogleAPICallError + +T = TypeVar('T') + + +class PermanentFailable: + """A class that can experience permanent failures, with helpers for forwarding these to client actions.""" + _failure_task: asyncio.Future + + def __init__(self): + self._failure_task = asyncio.Future() + + async def await_or_fail(self, awaitable: Awaitable[T]) -> T: + if self._failure_task.done(): + raise self._failure_task.exception() + task = asyncio.ensure_future(awaitable) + done, _ = await asyncio.wait([task, self._failure_task], return_when=asyncio.FIRST_COMPLETED) + if task in done: + try: + return await task + except GoogleAPICallError as e: + self.fail(e) + task.cancel() + raise self._failure_task.exception() + + def fail(self, err: GoogleAPICallError): + if not self._failure_task.done(): + self._failure_task.set_exception(err) diff --git a/google/cloud/pubsublite/internal/wire/retrying_connection.py b/google/cloud/pubsublite/internal/wire/retrying_connection.py new file mode 100644 index 00000000..894022d4 --- /dev/null +++ b/google/cloud/pubsublite/internal/wire/retrying_connection.py @@ -0,0 +1,88 @@ +import asyncio + +from typing import Awaitable +from google.api_core.exceptions import GoogleAPICallError, Cancelled +from google.cloud.pubsublite.status_codes import is_retryable +from google.cloud.pubsublite.internal.wire.connection_reinitializer import ConnectionReinitializer +from google.cloud.pubsublite.internal.wire.connection import Connection, Request, Response, ConnectionFactory +from google.cloud.pubsublite.internal.wire.work_item import WorkItem +from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable + +_MIN_BACKOFF_SECS = .01 +_MAX_BACKOFF_SECS = 10 + + +class RetryingConnection(Connection[Request, Response], PermanentFailable): + """A connection which performs retries on an underlying stream when experiencing retryable errors.""" + _connection_factory: ConnectionFactory[Request, Response] + _reinitializer: ConnectionReinitializer[Request, Response] + + _loop_task: asyncio.Future + + _write_queue: 'asyncio.Queue[WorkItem[Request]]' + _read_queue: 'asyncio.Queue[Response]' + + def __init__(self, connection_factory: ConnectionFactory[Request, Response], reinitializer: ConnectionReinitializer[Request, Response]): + super().__init__() + self._connection_factory = connection_factory + self._reinitializer = reinitializer + self._write_queue = asyncio.Queue(maxsize=1) + self._read_queue = asyncio.Queue(maxsize=1) + + async def __aenter__(self): + self._loop_task = asyncio.ensure_future(self._run_loop()) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + self.fail(Cancelled("Connection shutting down.")) + + async def write(self, request: Request) -> None: + item = WorkItem(request) + await self.await_or_fail(self._write_queue.put(item)) + return await self.await_or_fail(item.response_future) + + async def read(self) -> Response: + return await self.await_or_fail(self._read_queue.get()) + + async def _run_loop(self): + """ + Processes actions on this connection and handles retries until cancelled. + """ + try: + bad_retries = 0 + while True: + try: + async with self._connection_factory.new() as connection: + await self._reinitializer.reinitialize(connection) + bad_retries = 0 + await self._loop_connection(connection) + except (Exception, GoogleAPICallError) as e: + if not is_retryable(e): + self.fail(e) + return + await asyncio.sleep(min(_MAX_BACKOFF_SECS, _MIN_BACKOFF_SECS * (2**bad_retries))) + bad_retries += 1 + + except asyncio.CancelledError: + return + + async def _loop_connection(self, connection: Connection[Request, Response]): + read_task: Awaitable[Response] = asyncio.ensure_future(connection.read()) + write_task: Awaitable[WorkItem[Request]] = asyncio.ensure_future(self._write_queue.get()) + while True: + done, _ = await asyncio.wait([write_task, read_task], return_when=asyncio.FIRST_COMPLETED) + if write_task in done: + await self._handle_write(connection, await write_task) + write_task = asyncio.ensure_future(self._write_queue.get()) + if read_task in done: + await self._read_queue.put(await read_task) + read_task = asyncio.ensure_future(connection.read()) + + @staticmethod + async def _handle_write(connection: Connection[Request, Response], to_write: WorkItem[Request]): + try: + await connection.write(to_write.request) + to_write.response_future.set_result(None) + except GoogleAPICallError as e: + to_write.response_future.set_exception(e) + raise e diff --git a/google/cloud/pubsublite/internal/wire/work_item.py b/google/cloud/pubsublite/internal/wire/work_item.py new file mode 100644 index 00000000..3685fb84 --- /dev/null +++ b/google/cloud/pubsublite/internal/wire/work_item.py @@ -0,0 +1,14 @@ +import asyncio +from typing import Generic, TypeVar + +T = TypeVar('T') + + +class WorkItem(Generic[T]): + """An item of work and a future to complete when it is finished.""" + request: T + response_future: "asyncio.Future[None]" + + def __init__(self, request: T): + self.request = request + self.response_future = asyncio.Future() diff --git a/google/cloud/pubsublite/status_codes.py b/google/cloud/pubsublite/status_codes.py new file mode 100644 index 00000000..8ca1acd4 --- /dev/null +++ b/google/cloud/pubsublite/status_codes.py @@ -0,0 +1,10 @@ +from grpc import StatusCode +from google.api_core.exceptions import GoogleAPICallError + +retryable_codes = { + StatusCode.DEADLINE_EXCEEDED, StatusCode.ABORTED, StatusCode.INTERNAL, StatusCode.UNAVAILABLE, StatusCode.UNKNOWN +} + + +def is_retryable(error: GoogleAPICallError) -> bool: + return error.grpc_status_code in retryable_codes diff --git a/google/cloud/pubsublite/testing/__init__.py b/google/cloud/pubsublite/testing/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google/cloud/pubsublite/testing/test_utils.py b/google/cloud/pubsublite/testing/test_utils.py new file mode 100644 index 00000000..b9531acd --- /dev/null +++ b/google/cloud/pubsublite/testing/test_utils.py @@ -0,0 +1,8 @@ +from typing import List, Union, Any + + +async def async_iterable(elts: List[Union[Any, Exception]]): + for elt in elts: + if isinstance(elt, Exception): + raise elt + yield elt diff --git a/setup.py b/setup.py index 0881244f..c2787534 100644 --- a/setup.py +++ b/setup.py @@ -27,6 +27,11 @@ with io.open(readme_filename, encoding="utf-8") as readme_file: readme = readme_file.read() +dependencies = [ + "google-api-core >= 1.22.0", + "absl-py >= 0.9.0", + "proto-plus >= 0.4.0", +] setuptools.setup( name="google-cloud-pubsublite", @@ -40,10 +45,9 @@ namespace_packages=("google", "google.cloud"), platforms="Posix; MacOS X; Windows", include_package_data=True, - install_requires=( - "google-api-core[grpc] >= 1.22.0, < 2.0.0dev", - "proto-plus >= 0.4.0", - ), + install_requires=dependencies, + setup_requires=('pytest-runner',), + tests_require=['asynctest', 'pytest', 'pytest-asyncio'], python_requires=">=3.6", classifiers=[ "Development Status :: 4 - Beta", diff --git a/tests/unit/pubsublite/__init__.py b/tests/unit/pubsublite/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/pubsublite/internal/__init__.py b/tests/unit/pubsublite/internal/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/pubsublite/internal/wire/__init__.py b/tests/unit/pubsublite/internal/wire/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/pubsublite/internal/wire/gapic_connection_test.py b/tests/unit/pubsublite/internal/wire/gapic_connection_test.py new file mode 100644 index 00000000..2bc65acc --- /dev/null +++ b/tests/unit/pubsublite/internal/wire/gapic_connection_test.py @@ -0,0 +1,40 @@ +import asyncio + +import pytest +from google.api_core.exceptions import InternalServerError +from google.cloud.pubsublite.internal.wire.gapic_connection import GapicConnection +from google.cloud.pubsublite.testing.test_utils import async_iterable + +# All test coroutines will be treated as marked. +pytestmark = pytest.mark.asyncio + + +async def test_read_error_fails(): + conn = GapicConnection[int, int]() + conn.set_response_it(async_iterable([InternalServerError("abc")])) + with pytest.raises(InternalServerError): + await conn.read() + with pytest.raises(InternalServerError): + await conn.read() + with pytest.raises(InternalServerError): + await conn.write(3) + + +async def test_read_success(): + conn = GapicConnection[int, int]() + conn.set_response_it(async_iterable([3, 4, 5])) + assert [await conn.read() for _ in range(3)] == [3, 4, 5] + + +async def test_writes(): + conn = GapicConnection[int, int]() + conn.set_response_it(async_iterable([])) + task1 = asyncio.ensure_future(conn.write(1)) + task2 = asyncio.ensure_future(conn.write(2)) + assert not task1.done() + assert not task2.done() + assert await conn.__anext__() == 1 + await task1 + assert not task2.done() + assert await conn.__anext__() == 2 + await task2 diff --git a/tests/unit/pubsublite/internal/wire/retrying_connection_test.py b/tests/unit/pubsublite/internal/wire/retrying_connection_test.py new file mode 100644 index 00000000..ed01cbb0 --- /dev/null +++ b/tests/unit/pubsublite/internal/wire/retrying_connection_test.py @@ -0,0 +1,119 @@ +import asyncio +from typing import Union + +from asynctest.mock import MagicMock, CoroutineMock +import pytest +from google.api_core.exceptions import InternalServerError, InvalidArgument +from google.cloud.pubsublite.internal.wire.connection import Connection, ConnectionFactory +from google.cloud.pubsublite.internal.wire.connection_reinitializer import ConnectionReinitializer +from google.cloud.pubsublite.internal.wire.retrying_connection import RetryingConnection, _MIN_BACKOFF_SECS + +# All test coroutines will be treated as marked. +pytestmark = pytest.mark.asyncio + + +@pytest.fixture() +def reinitializer(): + return MagicMock(spec=ConnectionReinitializer[int, int]) + + +@pytest.fixture() +def default_connection(): + conn = MagicMock(spec=Connection[int, int]) + conn.__aenter__.return_value = conn + return conn + + +@pytest.fixture() +def connection_factory(default_connection): + factory = MagicMock(spec=ConnectionFactory[int, int]) + factory.new.return_value = default_connection + return factory + + +@pytest.fixture() +def retrying_connection(connection_factory, reinitializer): + return RetryingConnection[int, int](connection_factory, reinitializer) + + +@pytest.fixture +def asyncio_sleep(monkeypatch): + """Requests.get() mocked to return {'mock_key':'mock_response'}.""" + mock = CoroutineMock() + monkeypatch.setattr(asyncio, "sleep", mock) + return mock + + +async def test_permanent_error_on_reinitializer(retrying_connection: Connection[int, int], reinitializer, + default_connection): + fut = asyncio.Future() + reinitialize_called = asyncio.Future() + + async def reinit_action(conn): + assert conn == default_connection + reinitialize_called.set_result(None) + return await fut + + reinitializer.reinitialize.side_effect = reinit_action + async with retrying_connection as _: + await reinitialize_called + reinitializer.reinitialize.assert_called_once() + fut.set_exception(InvalidArgument("abc")) + with pytest.raises(InvalidArgument): + await retrying_connection.read() + + +async def test_successful_reinitialize(retrying_connection: Connection[int, int], reinitializer, + default_connection): + fut = asyncio.Future() + reinitialize_called = asyncio.Future() + + async def reinit_action(conn): + assert conn == default_connection + reinitialize_called.set_result(None) + return await fut + + reinitializer.reinitialize.side_effect = reinit_action + async with retrying_connection as _: + await reinitialize_called + reinitializer.reinitialize.assert_called_once() + fut.set_result(None) + default_connection.read.return_value = 1 + assert await retrying_connection.read() == 1 + assert default_connection.read.call_count == 2 # re-call to read once first completes + write_fut = asyncio.Future() + + async def write_action(val: int): + assert val == 2 + write_fut.set_result(None) + + default_connection.write.side_effect = write_action + await retrying_connection.write(2) + await write_fut + default_connection.write.assert_called_once() + + +async def test_reinitialize_after_retryable(retrying_connection: Connection[int, int], reinitializer, + default_connection, asyncio_sleep): + reinit_called = asyncio.Queue() + reinit_results: "asyncio.Queue[Union[None, Exception]]" = asyncio.Queue() + + async def reinit_action(conn): + assert conn == default_connection + await reinit_called.put(None) + result = await reinit_results.get() + if isinstance(result, Exception): + raise result + + reinitializer.reinitialize.side_effect = reinit_action + async with retrying_connection as _: + await reinit_called.get() + reinitializer.reinitialize.assert_called_once() + await reinit_results.put(InternalServerError("abc")) + await reinit_called.get() + asyncio_sleep.assert_called_once_with(_MIN_BACKOFF_SECS) + assert reinitializer.reinitialize.call_count == 2 + await reinit_results.put(None) + default_connection.read.return_value = 1 + assert await retrying_connection.read() == 1 + assert default_connection.read.call_count == 2 # re-call to read once first completes