From d812ab21240d64d475d12528a746dac0c851ab88 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Wed, 5 Aug 2020 10:41:21 -0400 Subject: [PATCH 1/3] feat: Implement python retrying connection, which generically retries stream errors. --- .gitignore | 3 + google/__init__.py | 3 + google/cloud/__init__.py | 3 + google/cloud/pubsublite/internal/__init__.py | 0 .../pubsublite/internal/wire/__init__.py | 0 .../pubsublite/internal/wire/connection.py | 37 ++++++ .../internal/wire/connection_reinitializer.py | 19 +++ .../internal/wire/gapic_connection.py | 52 ++++++++ .../internal/wire/permanent_failable.py | 30 +++++ .../internal/wire/retrying_connection.py | 87 +++++++++++++ .../pubsublite/internal/wire/work_item.py | 13 ++ google/cloud/pubsublite/status_codes.py | 10 ++ google/cloud/pubsublite/testing/__init__.py | 0 google/cloud/pubsublite/testing/test_utils.py | 12 ++ setup.py | 12 +- tests/unit/pubsublite/__init__.py | 0 tests/unit/pubsublite/internal/__init__.py | 0 .../unit/pubsublite/internal/wire/__init__.py | 0 .../internal/wire/gapic_connection_test.py | 40 ++++++ .../internal/wire/retrying_connection_test.py | 119 ++++++++++++++++++ 20 files changed, 436 insertions(+), 4 deletions(-) create mode 100644 google/__init__.py create mode 100644 google/cloud/__init__.py create mode 100644 google/cloud/pubsublite/internal/__init__.py create mode 100644 google/cloud/pubsublite/internal/wire/__init__.py create mode 100644 google/cloud/pubsublite/internal/wire/connection.py create mode 100644 google/cloud/pubsublite/internal/wire/connection_reinitializer.py create mode 100644 google/cloud/pubsublite/internal/wire/gapic_connection.py create mode 100644 google/cloud/pubsublite/internal/wire/permanent_failable.py create mode 100644 google/cloud/pubsublite/internal/wire/retrying_connection.py create mode 100644 google/cloud/pubsublite/internal/wire/work_item.py create mode 100644 google/cloud/pubsublite/status_codes.py create mode 100644 google/cloud/pubsublite/testing/__init__.py create mode 100644 google/cloud/pubsublite/testing/test_utils.py create mode 100644 tests/unit/pubsublite/__init__.py create mode 100644 tests/unit/pubsublite/internal/__init__.py create mode 100644 tests/unit/pubsublite/internal/wire/__init__.py create mode 100644 tests/unit/pubsublite/internal/wire/gapic_connection_test.py create mode 100644 tests/unit/pubsublite/internal/wire/retrying_connection_test.py diff --git a/.gitignore b/.gitignore index b87e1ed5..ea1d8c4a 100644 --- a/.gitignore +++ b/.gitignore @@ -52,6 +52,9 @@ env/ coverage.xml sponge_log.xml +# Pycharm virtual environment +venv/ + # System test environment variables. system_tests/local_test_setup diff --git a/google/__init__.py b/google/__init__.py new file mode 100644 index 00000000..2e435f54 --- /dev/null +++ b/google/__init__.py @@ -0,0 +1,3 @@ +import pkg_resources + +pkg_resources.declare_namespace(__name__) diff --git a/google/cloud/__init__.py b/google/cloud/__init__.py new file mode 100644 index 00000000..2e435f54 --- /dev/null +++ b/google/cloud/__init__.py @@ -0,0 +1,3 @@ +import pkg_resources + +pkg_resources.declare_namespace(__name__) diff --git a/google/cloud/pubsublite/internal/__init__.py b/google/cloud/pubsublite/internal/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google/cloud/pubsublite/internal/wire/__init__.py b/google/cloud/pubsublite/internal/wire/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google/cloud/pubsublite/internal/wire/connection.py b/google/cloud/pubsublite/internal/wire/connection.py new file mode 100644 index 00000000..990e75cc --- /dev/null +++ b/google/cloud/pubsublite/internal/wire/connection.py @@ -0,0 +1,37 @@ +from typing import Generic, TypeVar, Coroutine, Any, AsyncContextManager +from abc import ABCMeta, abstractmethod +from google.api_core.exceptions import GoogleAPICallError + +Request = TypeVar('Request') +Response = TypeVar('Response') + + +class Connection(Generic[Request, Response], AsyncContextManager): + """ + A connection to an underlying stream. Only one call to 'read' may be outstanding at a time. + """ + + @abstractmethod + async def write(self, request: Request) -> None: + """ + Write a message to the stream. + + Raises: + GoogleAPICallError: When the connection fails. + """ + raise NotImplementedError() + + @abstractmethod + async def read(self) -> Response: + """ + Read a message off of the stream. + + Raises: + GoogleAPICallError: When the connection fails. + """ + raise NotImplementedError() + + +class ConnectionFactory(Generic[Request, Response]): + def new(self) -> Connection[Request, Response]: + raise NotImplementedError() diff --git a/google/cloud/pubsublite/internal/wire/connection_reinitializer.py b/google/cloud/pubsublite/internal/wire/connection_reinitializer.py new file mode 100644 index 00000000..5a4f3a5f --- /dev/null +++ b/google/cloud/pubsublite/internal/wire/connection_reinitializer.py @@ -0,0 +1,19 @@ +from typing import Generic +from abc import ABCMeta, abstractmethod +from google.cloud.pubsublite.internal.wire.connection import Connection, Request, Response + + +class ConnectionReinitializer(Generic[Request, Response], metaclass=ABCMeta): + @abstractmethod + def reinitialize(self, connection: Connection[Request, Response]): + """ + + Args: + connection: The connection to reinitialize + + Raises: + GoogleAPICallError: If it fails to reinitialize. + """ + raise NotImplementedError() + + diff --git a/google/cloud/pubsublite/internal/wire/gapic_connection.py b/google/cloud/pubsublite/internal/wire/gapic_connection.py new file mode 100644 index 00000000..810f0e8a --- /dev/null +++ b/google/cloud/pubsublite/internal/wire/gapic_connection.py @@ -0,0 +1,52 @@ +from typing import AsyncIterator, TypeVar, Optional, Callable, AsyncIterable +import asyncio + +from google.cloud.pubsublite.internal.wire.connection import Connection, Request, Response, ConnectionFactory +from google.cloud.pubsublite.internal.wire.work_item import WorkItem +from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable + +T = TypeVar('T') + + +class GapicConnection(Connection[Request, Response], AsyncIterator[Request], PermanentFailable): + _write_queue: 'asyncio.Queue[WorkItem[Request]]' + _response_it: Optional[AsyncIterator[Response]] + + def __init__(self): + super().__init__() + self._write_queue = asyncio.Queue(maxsize=1) + + def set_response_it(self, response_it: AsyncIterator[Response]): + self._response_it = response_it + + async def write(self, request: Request) -> None: + item = WorkItem(request) + await self.await_or_fail(self._write_queue.put(item)) + await self.await_or_fail(item.response_future) + + async def read(self) -> Response: + return await self.await_or_fail(self._response_it.__anext__()) + + def __aenter__(self): + return self + + def __aexit__(self, exc_type, exc_value, traceback) -> None: + pass + + async def __anext__(self) -> Request: + item: WorkItem[Request] = await self.await_or_fail(self._write_queue.get()) + item.response_future.set_result(None) + return item.request + + def __aiter__(self) -> AsyncIterator[Response]: + return self + + +class GapicConnectionFactory(ConnectionFactory[Request, Response]): + _producer = Callable[[AsyncIterator[Request]], AsyncIterable[Response]] + + def New(self) -> Connection[Request, Response]: + conn = GapicConnection[Request, Response]() + response_iterable = self._producer(conn) + conn.set_response_it(response_iterable.__aiter__()) + return conn diff --git a/google/cloud/pubsublite/internal/wire/permanent_failable.py b/google/cloud/pubsublite/internal/wire/permanent_failable.py new file mode 100644 index 00000000..78fc75e8 --- /dev/null +++ b/google/cloud/pubsublite/internal/wire/permanent_failable.py @@ -0,0 +1,30 @@ +import asyncio +from typing import Awaitable, TypeVar + +from google.api_core.exceptions import GoogleAPICallError + +T = TypeVar('T') + + +class PermanentFailable: + _failure_task: asyncio.Future + + def __init__(self): + self._failure_task = asyncio.Future() + + async def await_or_fail(self, awaitable: Awaitable[T]) -> T: + if self._failure_task.done(): + raise self._failure_task.exception() + task = asyncio.ensure_future(awaitable) + done, _ = await asyncio.wait([task, self._failure_task], return_when=asyncio.FIRST_COMPLETED) + if task in done: + try: + return await task + except GoogleAPICallError as e: + self.fail(e) + task.cancel() + raise self._failure_task.exception() + + def fail(self, err: GoogleAPICallError): + if not self._failure_task.done(): + self._failure_task.set_exception(err) diff --git a/google/cloud/pubsublite/internal/wire/retrying_connection.py b/google/cloud/pubsublite/internal/wire/retrying_connection.py new file mode 100644 index 00000000..de429830 --- /dev/null +++ b/google/cloud/pubsublite/internal/wire/retrying_connection.py @@ -0,0 +1,87 @@ +import asyncio + +from typing import Awaitable +from google.api_core.exceptions import GoogleAPICallError, Cancelled +from google.cloud.pubsublite.status_codes import is_retryable +from google.cloud.pubsublite.internal.wire.connection_reinitializer import ConnectionReinitializer +from google.cloud.pubsublite.internal.wire.connection import Connection, Request, Response, ConnectionFactory +from google.cloud.pubsublite.internal.wire.work_item import WorkItem +from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable + +_MIN_BACKOFF_SECS = .01 +_MAX_BACKOFF_SECS = 10 + + +class RetryingConnection(Connection[Request, Response], PermanentFailable): + _connection_factory: ConnectionFactory[Request, Response] + _reinitializer: ConnectionReinitializer[Request, Response] + + _loop_task: asyncio.Future + + _write_queue: 'asyncio.Queue[WorkItem[Request]]' + _read_queue: 'asyncio.Queue[Response]' + + def __init__(self, connection_factory: ConnectionFactory[Request, Response], reinitializer: ConnectionReinitializer[Request, Response]): + super().__init__() + self._connection_factory = connection_factory + self._reinitializer = reinitializer + self._write_queue = asyncio.Queue(maxsize=1) + self._read_queue = asyncio.Queue(maxsize=1) + + async def __aenter__(self): + self._loop_task = asyncio.ensure_future(self._run_loop()) + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + self.fail(Cancelled("Connection shutting down.")) + + async def write(self, request: Request) -> None: + item = WorkItem(request) + await self.await_or_fail(self._write_queue.put(item)) + return await self.await_or_fail(item.response_future) + + async def read(self) -> Response: + return await self.await_or_fail(self._read_queue.get()) + + async def _run_loop(self): + """ + Processes actions on this connection and handles retries until cancelled. + """ + try: + bad_retries = 0 + while True: + try: + async with self._connection_factory.new() as connection: + await self._reinitializer.reinitialize(connection) + bad_retries = 0 + await self._loop_connection(connection) + except (Exception, GoogleAPICallError) as e: + if not is_retryable(e): + self.fail(e) + return + await asyncio.sleep(min(_MAX_BACKOFF_SECS, _MIN_BACKOFF_SECS * (2**bad_retries))) + bad_retries += 1 + + except asyncio.CancelledError: + return + + async def _loop_connection(self, connection: Connection[Request, Response]): + read_task: Awaitable[Response] = asyncio.ensure_future(connection.read()) + write_task: Awaitable[WorkItem[Request]] = asyncio.ensure_future(self._write_queue.get()) + while True: + done, _ = await asyncio.wait([write_task, read_task], return_when=asyncio.FIRST_COMPLETED) + if write_task in done: + await self._handle_write(connection, await write_task) + write_task = asyncio.ensure_future(self._write_queue.get()) + if read_task in done: + await self._read_queue.put(await read_task) + read_task = asyncio.ensure_future(connection.read()) + + @staticmethod + async def _handle_write(connection: Connection[Request, Response], to_write: WorkItem[Request]): + try: + await connection.write(to_write.request) + to_write.response_future.set_result(None) + except GoogleAPICallError as e: + to_write.response_future.set_exception(e) + raise e diff --git a/google/cloud/pubsublite/internal/wire/work_item.py b/google/cloud/pubsublite/internal/wire/work_item.py new file mode 100644 index 00000000..40214f7f --- /dev/null +++ b/google/cloud/pubsublite/internal/wire/work_item.py @@ -0,0 +1,13 @@ +import asyncio +from typing import Generic, TypeVar + +T = TypeVar('T') + + +class WorkItem(Generic[T]): + request: T + response_future: "asyncio.Future[None]" + + def __init__(self, request: T): + self.request = request + self.response_future = asyncio.Future() diff --git a/google/cloud/pubsublite/status_codes.py b/google/cloud/pubsublite/status_codes.py new file mode 100644 index 00000000..8ca1acd4 --- /dev/null +++ b/google/cloud/pubsublite/status_codes.py @@ -0,0 +1,10 @@ +from grpc import StatusCode +from google.api_core.exceptions import GoogleAPICallError + +retryable_codes = { + StatusCode.DEADLINE_EXCEEDED, StatusCode.ABORTED, StatusCode.INTERNAL, StatusCode.UNAVAILABLE, StatusCode.UNKNOWN +} + + +def is_retryable(error: GoogleAPICallError) -> bool: + return error.grpc_status_code in retryable_codes diff --git a/google/cloud/pubsublite/testing/__init__.py b/google/cloud/pubsublite/testing/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/google/cloud/pubsublite/testing/test_utils.py b/google/cloud/pubsublite/testing/test_utils.py new file mode 100644 index 00000000..a35a006c --- /dev/null +++ b/google/cloud/pubsublite/testing/test_utils.py @@ -0,0 +1,12 @@ +from abc import abstractmethod +from typing import List, Union, Any, Generic +from unittest.mock import Mock + +from google.cloud.pubsublite.internal.wire.connection import Connection, Request, Response + + +async def async_iterable(elts: List[Union[Any, Exception]]): + for elt in elts: + if isinstance(elt, Exception): + raise elt + yield elt diff --git a/setup.py b/setup.py index 0881244f..4a25ebeb 100644 --- a/setup.py +++ b/setup.py @@ -27,6 +27,11 @@ with io.open(readme_filename, encoding="utf-8") as readme_file: readme = readme_file.read() +dependencies = [ + "google-api-core >= 1.22.0", + "absl-py >= 0.9.0", + "proto-plus >= 0.4.0", +] setuptools.setup( name="google-cloud-pubsublite", @@ -40,10 +45,9 @@ namespace_packages=("google", "google.cloud"), platforms="Posix; MacOS X; Windows", include_package_data=True, - install_requires=( - "google-api-core[grpc] >= 1.22.0, < 2.0.0dev", - "proto-plus >= 0.4.0", - ), + install_requires=dependencies, + setup_requires=('pytest-runner',), + tests_require=['pytest', 'pytest-asyncio'], python_requires=">=3.6", classifiers=[ "Development Status :: 4 - Beta", diff --git a/tests/unit/pubsublite/__init__.py b/tests/unit/pubsublite/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/pubsublite/internal/__init__.py b/tests/unit/pubsublite/internal/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/pubsublite/internal/wire/__init__.py b/tests/unit/pubsublite/internal/wire/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/pubsublite/internal/wire/gapic_connection_test.py b/tests/unit/pubsublite/internal/wire/gapic_connection_test.py new file mode 100644 index 00000000..2bc65acc --- /dev/null +++ b/tests/unit/pubsublite/internal/wire/gapic_connection_test.py @@ -0,0 +1,40 @@ +import asyncio + +import pytest +from google.api_core.exceptions import InternalServerError +from google.cloud.pubsublite.internal.wire.gapic_connection import GapicConnection +from google.cloud.pubsublite.testing.test_utils import async_iterable + +# All test coroutines will be treated as marked. +pytestmark = pytest.mark.asyncio + + +async def test_read_error_fails(): + conn = GapicConnection[int, int]() + conn.set_response_it(async_iterable([InternalServerError("abc")])) + with pytest.raises(InternalServerError): + await conn.read() + with pytest.raises(InternalServerError): + await conn.read() + with pytest.raises(InternalServerError): + await conn.write(3) + + +async def test_read_success(): + conn = GapicConnection[int, int]() + conn.set_response_it(async_iterable([3, 4, 5])) + assert [await conn.read() for _ in range(3)] == [3, 4, 5] + + +async def test_writes(): + conn = GapicConnection[int, int]() + conn.set_response_it(async_iterable([])) + task1 = asyncio.ensure_future(conn.write(1)) + task2 = asyncio.ensure_future(conn.write(2)) + assert not task1.done() + assert not task2.done() + assert await conn.__anext__() == 1 + await task1 + assert not task2.done() + assert await conn.__anext__() == 2 + await task2 diff --git a/tests/unit/pubsublite/internal/wire/retrying_connection_test.py b/tests/unit/pubsublite/internal/wire/retrying_connection_test.py new file mode 100644 index 00000000..ed01cbb0 --- /dev/null +++ b/tests/unit/pubsublite/internal/wire/retrying_connection_test.py @@ -0,0 +1,119 @@ +import asyncio +from typing import Union + +from asynctest.mock import MagicMock, CoroutineMock +import pytest +from google.api_core.exceptions import InternalServerError, InvalidArgument +from google.cloud.pubsublite.internal.wire.connection import Connection, ConnectionFactory +from google.cloud.pubsublite.internal.wire.connection_reinitializer import ConnectionReinitializer +from google.cloud.pubsublite.internal.wire.retrying_connection import RetryingConnection, _MIN_BACKOFF_SECS + +# All test coroutines will be treated as marked. +pytestmark = pytest.mark.asyncio + + +@pytest.fixture() +def reinitializer(): + return MagicMock(spec=ConnectionReinitializer[int, int]) + + +@pytest.fixture() +def default_connection(): + conn = MagicMock(spec=Connection[int, int]) + conn.__aenter__.return_value = conn + return conn + + +@pytest.fixture() +def connection_factory(default_connection): + factory = MagicMock(spec=ConnectionFactory[int, int]) + factory.new.return_value = default_connection + return factory + + +@pytest.fixture() +def retrying_connection(connection_factory, reinitializer): + return RetryingConnection[int, int](connection_factory, reinitializer) + + +@pytest.fixture +def asyncio_sleep(monkeypatch): + """Requests.get() mocked to return {'mock_key':'mock_response'}.""" + mock = CoroutineMock() + monkeypatch.setattr(asyncio, "sleep", mock) + return mock + + +async def test_permanent_error_on_reinitializer(retrying_connection: Connection[int, int], reinitializer, + default_connection): + fut = asyncio.Future() + reinitialize_called = asyncio.Future() + + async def reinit_action(conn): + assert conn == default_connection + reinitialize_called.set_result(None) + return await fut + + reinitializer.reinitialize.side_effect = reinit_action + async with retrying_connection as _: + await reinitialize_called + reinitializer.reinitialize.assert_called_once() + fut.set_exception(InvalidArgument("abc")) + with pytest.raises(InvalidArgument): + await retrying_connection.read() + + +async def test_successful_reinitialize(retrying_connection: Connection[int, int], reinitializer, + default_connection): + fut = asyncio.Future() + reinitialize_called = asyncio.Future() + + async def reinit_action(conn): + assert conn == default_connection + reinitialize_called.set_result(None) + return await fut + + reinitializer.reinitialize.side_effect = reinit_action + async with retrying_connection as _: + await reinitialize_called + reinitializer.reinitialize.assert_called_once() + fut.set_result(None) + default_connection.read.return_value = 1 + assert await retrying_connection.read() == 1 + assert default_connection.read.call_count == 2 # re-call to read once first completes + write_fut = asyncio.Future() + + async def write_action(val: int): + assert val == 2 + write_fut.set_result(None) + + default_connection.write.side_effect = write_action + await retrying_connection.write(2) + await write_fut + default_connection.write.assert_called_once() + + +async def test_reinitialize_after_retryable(retrying_connection: Connection[int, int], reinitializer, + default_connection, asyncio_sleep): + reinit_called = asyncio.Queue() + reinit_results: "asyncio.Queue[Union[None, Exception]]" = asyncio.Queue() + + async def reinit_action(conn): + assert conn == default_connection + await reinit_called.put(None) + result = await reinit_results.get() + if isinstance(result, Exception): + raise result + + reinitializer.reinitialize.side_effect = reinit_action + async with retrying_connection as _: + await reinit_called.get() + reinitializer.reinitialize.assert_called_once() + await reinit_results.put(InternalServerError("abc")) + await reinit_called.get() + asyncio_sleep.assert_called_once_with(_MIN_BACKOFF_SECS) + assert reinitializer.reinitialize.call_count == 2 + await reinit_results.put(None) + default_connection.read.return_value = 1 + assert await retrying_connection.read() == 1 + assert default_connection.read.call_count == 2 # re-call to read once first completes From 0b1f5e5462c7e5e9be33be7b53e100dffd770c11 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Wed, 5 Aug 2020 10:48:32 -0400 Subject: [PATCH 2/3] fix: Add asynctest to tests_require. --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 4a25ebeb..c2787534 100644 --- a/setup.py +++ b/setup.py @@ -47,7 +47,7 @@ include_package_data=True, install_requires=dependencies, setup_requires=('pytest-runner',), - tests_require=['pytest', 'pytest-asyncio'], + tests_require=['asynctest', 'pytest', 'pytest-asyncio'], python_requires=">=3.6", classifiers=[ "Development Status :: 4 - Beta", From 5aad689ba498302c02b853f80b48efba66487758 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Wed, 5 Aug 2020 12:57:53 -0400 Subject: [PATCH 3/3] fix: Add class comments. --- google/cloud/pubsublite/internal/wire/connection.py | 5 +++-- .../pubsublite/internal/wire/connection_reinitializer.py | 3 ++- google/cloud/pubsublite/internal/wire/gapic_connection.py | 2 ++ google/cloud/pubsublite/internal/wire/permanent_failable.py | 1 + .../cloud/pubsublite/internal/wire/retrying_connection.py | 1 + google/cloud/pubsublite/internal/wire/work_item.py | 1 + google/cloud/pubsublite/testing/test_utils.py | 6 +----- 7 files changed, 11 insertions(+), 8 deletions(-) diff --git a/google/cloud/pubsublite/internal/wire/connection.py b/google/cloud/pubsublite/internal/wire/connection.py index 990e75cc..aecc9fac 100644 --- a/google/cloud/pubsublite/internal/wire/connection.py +++ b/google/cloud/pubsublite/internal/wire/connection.py @@ -17,7 +17,7 @@ async def write(self, request: Request) -> None: Write a message to the stream. Raises: - GoogleAPICallError: When the connection fails. + GoogleAPICallError: When the connection terminates in failure. """ raise NotImplementedError() @@ -27,11 +27,12 @@ async def read(self) -> Response: Read a message off of the stream. Raises: - GoogleAPICallError: When the connection fails. + GoogleAPICallError: When the connection terminates in failure. """ raise NotImplementedError() class ConnectionFactory(Generic[Request, Response]): + """A factory for producing Connections.""" def new(self) -> Connection[Request, Response]: raise NotImplementedError() diff --git a/google/cloud/pubsublite/internal/wire/connection_reinitializer.py b/google/cloud/pubsublite/internal/wire/connection_reinitializer.py index 5a4f3a5f..9b415cdc 100644 --- a/google/cloud/pubsublite/internal/wire/connection_reinitializer.py +++ b/google/cloud/pubsublite/internal/wire/connection_reinitializer.py @@ -4,9 +4,10 @@ class ConnectionReinitializer(Generic[Request, Response], metaclass=ABCMeta): + """A class capable of reinitializing a connection after a new one has been created.""" @abstractmethod def reinitialize(self, connection: Connection[Request, Response]): - """ + """Reinitialize a connection. Args: connection: The connection to reinitialize diff --git a/google/cloud/pubsublite/internal/wire/gapic_connection.py b/google/cloud/pubsublite/internal/wire/gapic_connection.py index 810f0e8a..c6938622 100644 --- a/google/cloud/pubsublite/internal/wire/gapic_connection.py +++ b/google/cloud/pubsublite/internal/wire/gapic_connection.py @@ -9,6 +9,7 @@ class GapicConnection(Connection[Request, Response], AsyncIterator[Request], PermanentFailable): + """A Connection wrapping a gapic AsyncIterator[Request/Response] pair.""" _write_queue: 'asyncio.Queue[WorkItem[Request]]' _response_it: Optional[AsyncIterator[Response]] @@ -43,6 +44,7 @@ def __aiter__(self) -> AsyncIterator[Response]: class GapicConnectionFactory(ConnectionFactory[Request, Response]): + """A ConnectionFactory that produces GapicConnections.""" _producer = Callable[[AsyncIterator[Request]], AsyncIterable[Response]] def New(self) -> Connection[Request, Response]: diff --git a/google/cloud/pubsublite/internal/wire/permanent_failable.py b/google/cloud/pubsublite/internal/wire/permanent_failable.py index 78fc75e8..1151de78 100644 --- a/google/cloud/pubsublite/internal/wire/permanent_failable.py +++ b/google/cloud/pubsublite/internal/wire/permanent_failable.py @@ -7,6 +7,7 @@ class PermanentFailable: + """A class that can experience permanent failures, with helpers for forwarding these to client actions.""" _failure_task: asyncio.Future def __init__(self): diff --git a/google/cloud/pubsublite/internal/wire/retrying_connection.py b/google/cloud/pubsublite/internal/wire/retrying_connection.py index de429830..894022d4 100644 --- a/google/cloud/pubsublite/internal/wire/retrying_connection.py +++ b/google/cloud/pubsublite/internal/wire/retrying_connection.py @@ -13,6 +13,7 @@ class RetryingConnection(Connection[Request, Response], PermanentFailable): + """A connection which performs retries on an underlying stream when experiencing retryable errors.""" _connection_factory: ConnectionFactory[Request, Response] _reinitializer: ConnectionReinitializer[Request, Response] diff --git a/google/cloud/pubsublite/internal/wire/work_item.py b/google/cloud/pubsublite/internal/wire/work_item.py index 40214f7f..3685fb84 100644 --- a/google/cloud/pubsublite/internal/wire/work_item.py +++ b/google/cloud/pubsublite/internal/wire/work_item.py @@ -5,6 +5,7 @@ class WorkItem(Generic[T]): + """An item of work and a future to complete when it is finished.""" request: T response_future: "asyncio.Future[None]" diff --git a/google/cloud/pubsublite/testing/test_utils.py b/google/cloud/pubsublite/testing/test_utils.py index a35a006c..b9531acd 100644 --- a/google/cloud/pubsublite/testing/test_utils.py +++ b/google/cloud/pubsublite/testing/test_utils.py @@ -1,8 +1,4 @@ -from abc import abstractmethod -from typing import List, Union, Any, Generic -from unittest.mock import Mock - -from google.cloud.pubsublite.internal.wire.connection import Connection, Request, Response +from typing import List, Union, Any async def async_iterable(elts: List[Union[Any, Exception]]):