Skip to content

Commit

Permalink
feat: Implement committer (#13)
Browse files Browse the repository at this point in the history
* feat: Implement committer

Also small fix to retrying connection so it doesn't leak reads/writes from previous connections.

* fix: Patch retrying connection and add comments.

* Update committer.py

* Update committer_impl.py
  • Loading branch information
dpcollins-google committed Sep 14, 2020
1 parent baeb0f6 commit aa9aca8
Show file tree
Hide file tree
Showing 7 changed files with 462 additions and 38 deletions.
13 changes: 13 additions & 0 deletions google/cloud/pubsublite/internal/wire/committer.py
@@ -0,0 +1,13 @@
from abc import abstractmethod
from typing import AsyncContextManager

from google.cloud.pubsublite_v1 import Cursor


class Committer(AsyncContextManager):
"""
A Committer is able to commit subscribers' completed offsets.
"""
@abstractmethod
async def commit(self, cursor: Cursor) -> None:
pass
138 changes: 138 additions & 0 deletions google/cloud/pubsublite/internal/wire/committer_impl.py
@@ -0,0 +1,138 @@
import asyncio
from typing import Optional, List, Iterable

from absl import logging

from google.cloud.pubsublite.internal.wire.committer import Committer
from google.cloud.pubsublite.internal.wire.retrying_connection import RetryingConnection, ConnectionFactory
from google.api_core.exceptions import FailedPrecondition, GoogleAPICallError
from google.cloud.pubsublite.internal.wire.connection_reinitializer import ConnectionReinitializer
from google.cloud.pubsublite.internal.wire.connection import Connection
from google.cloud.pubsublite.internal.wire.serial_batcher import SerialBatcher, BatchTester
from google.cloud.pubsublite_v1 import Cursor
from google.cloud.pubsublite_v1.types import StreamingCommitCursorRequest, StreamingCommitCursorResponse, InitialCommitCursorRequest
from google.cloud.pubsublite.internal.wire.work_item import WorkItem


class CommitterImpl(Committer, ConnectionReinitializer[StreamingCommitCursorRequest, StreamingCommitCursorResponse], BatchTester[Cursor]):
_initial: InitialCommitCursorRequest
_flush_seconds: float
_connection: RetryingConnection[StreamingCommitCursorRequest, StreamingCommitCursorResponse]

_batcher: SerialBatcher[Cursor, None]

_outstanding_commits: List[List[WorkItem[Cursor, None]]]

_receiver: Optional[asyncio.Future]
_flusher: Optional[asyncio.Future]

def __init__(self, initial: InitialCommitCursorRequest, flush_seconds: float,
factory: ConnectionFactory[StreamingCommitCursorRequest, StreamingCommitCursorResponse]):
self._initial = initial
self._flush_seconds = flush_seconds
self._connection = RetryingConnection(factory, self)
self._batcher = SerialBatcher(self)
self._outstanding_commits = []
self._receiver = None
self._flusher = None

async def __aenter__(self):
await self._connection.__aenter__()

def _start_loopers(self):
assert self._receiver is None
assert self._flusher is None
self._receiver = asyncio.ensure_future(self._receive_loop())
self._flusher = asyncio.ensure_future(self._flush_loop())

async def _stop_loopers(self):
if self._receiver:
self._receiver.cancel()
await self._receiver
self._receiver = None
if self._flusher:
self._flusher.cancel()
await self._flusher
self._flusher = None

def _handle_response(self, response: StreamingCommitCursorResponse):
if "commit" not in response:
self._connection.fail(FailedPrecondition("Received an invalid subsequent response on the commit stream."))
if response.commit.acknowledged_commits > len(self._outstanding_commits):
self._connection.fail(
FailedPrecondition("Received a commit response on the stream with no outstanding commits."))
for _ in range(response.commit.acknowledged_commits):
batch = self._outstanding_commits.pop(0)
for item in batch:
item.response_future.set_result(None)

async def _receive_loop(self):
try:
while True:
response = await self._connection.read()
self._handle_response(response)
except asyncio.CancelledError:
return

async def _flush_loop(self):
try:
while True:
await asyncio.sleep(self._flush_seconds)
await self._flush()
except asyncio.CancelledError:
return

async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._connection.error():
self._fail_if_retrying_failed()
else:
await self._flush()
await self._connection.__aexit__(exc_type, exc_val, exc_tb)

def _fail_if_retrying_failed(self):
if self._connection.error():
for batch in self._outstanding_commits:
for item in batch:
item.response_future.set_exception(self._connection.error())

async def _flush(self):
batch = self._batcher.flush()
if not batch:
return
self._outstanding_commits.append(batch)
req = StreamingCommitCursorRequest()
req.commit.cursor = batch[-1].request
try:
await self._connection.write(req)
except GoogleAPICallError as e:
logging.debug(f"Failed commit on stream: {e}")
self._fail_if_retrying_failed()

async def commit(self, cursor: Cursor) -> None:
future = self._batcher.add(cursor)
if self._batcher.should_flush():
# always returns false currently, here in case this changes in the future.
await self._flush()
await future

async def reinitialize(self, connection: Connection[StreamingCommitCursorRequest, StreamingCommitCursorResponse]):
await self._stop_loopers()
await connection.write(StreamingCommitCursorRequest(initial=self._initial))
response = await connection.read()
if "initial" not in response:
self._connection.fail(FailedPrecondition("Received an invalid initial response on the publish stream."))
if self._outstanding_commits:
# Roll up outstanding commits
rollup: List[WorkItem[Cursor, None]] = []
for batch in self._outstanding_commits:
for item in batch:
rollup.append(item)
self._outstanding_commits = [rollup]
req = StreamingCommitCursorRequest()
req.commit.cursor = rollup[-1].request
await connection.write(req)
self._start_loopers()

def test(self, requests: Iterable[Cursor]) -> bool:
# There is no bound on the number of outstanding cursors.
return False
Expand Up @@ -7,7 +7,8 @@ class ConnectionReinitializer(Generic[Request, Response], metaclass=ABCMeta):
"""A class capable of reinitializing a connection after a new one has been created."""
@abstractmethod
def reinitialize(self, connection: Connection[Request, Response]):
"""Reinitialize a connection.
"""Reinitialize a connection. Must ensure no calls to the associated RetryingConnection
occur until this completes.
Args:
connection: The connection to reinitialize
Expand Down
13 changes: 3 additions & 10 deletions google/cloud/pubsublite/internal/wire/publisher.py
@@ -1,20 +1,13 @@
from abc import ABC, abstractmethod
from abc import abstractmethod
from typing import AsyncContextManager
from google.cloud.pubsublite_v1.types import PubSubMessage
from google.cloud.pubsublite.publish_metadata import PublishMetadata


class Publisher(ABC):
class Publisher(AsyncContextManager):
"""
A Pub/Sub Lite asynchronous wire protocol publisher.
"""
@abstractmethod
async def __aenter__(self):
raise NotImplementedError()

@abstractmethod
async def __aexit__(self, exc_type, exc_val, exc_tb):
raise NotImplementedError()

@abstractmethod
async def publish(self, message: PubSubMessage) -> PublishMetadata:
"""
Expand Down
7 changes: 7 additions & 0 deletions google/cloud/pubsublite/internal/wire/retrying_connection.py
Expand Up @@ -48,15 +48,22 @@ async def _run_loop(self):
"""
Processes actions on this connection and handles retries until cancelled.
"""
last_failure: GoogleAPICallError
try:
bad_retries = 0
while True:
try:
async with self._connection_factory.new() as connection:
# Needs to happen prior to reinitialization to clear outstanding waiters.
while not self._write_queue.empty():
self._write_queue.get_nowait().response_future.set_exception(last_failure)
self._read_queue = asyncio.Queue(maxsize=1)
self._write_queue = asyncio.Queue(maxsize=1)
await self._reinitializer.reinitialize(connection)
bad_retries = 0
await self._loop_connection(connection)
except GoogleAPICallError as e:
last_failure = e
if not is_retryable(e):
self.fail(e)
return
Expand Down
60 changes: 33 additions & 27 deletions setup.py
Expand Up @@ -25,40 +25,46 @@

readme_filename = os.path.join(package_root, "README.rst")
with io.open(readme_filename, encoding="utf-8") as readme_file:
readme = readme_file.read()
readme = readme_file.read()

dependencies = [
"google-api-core >= 1.22.0",
"absl-py >= 0.9.0",
"proto-plus >= 0.4.0",
"grpcio",
"setuptools"
]

test_dependencies = [
"asynctest",
"pytest",
"pytest-asyncio"
]

setuptools.setup(
name="google-cloud-pubsublite",
version=version,
long_description=readme,
author="Google LLC",
author_email="googleapis-packages@google.com",
license="Apache 2.0",
url="https://github.com/googleapis/python-pubsublite",
packages=setuptools.PEP420PackageFinder.find(),
namespace_packages=("google", "google.cloud"),
platforms="Posix; MacOS X; Windows",
include_package_data=True,
install_requires=dependencies,
setup_requires=('pytest-runner',),
tests_require=['asynctest', 'pytest', 'pytest-asyncio'],
python_requires=">=3.6",
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Topic :: Internet",
"Topic :: Software Development :: Libraries :: Python Modules",
],
zip_safe=False,
name="google-cloud-pubsublite",
version=version,
long_description=readme,
author="Google LLC",
author_email="googleapis-packages@google.com",
license="Apache 2.0",
url="https://github.com/googleapis/python-pubsublite",
packages=setuptools.PEP420PackageFinder.find(),
namespace_packages=("google", "google.cloud"),
platforms="Posix; MacOS X; Windows",
include_package_data=True,
install_requires=dependencies,
extras_require={"tests": test_dependencies},
python_requires=">=3.6",
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Topic :: Internet",
"Topic :: Software Development :: Libraries :: Python Modules",
],
zip_safe=False,
)

0 comments on commit aa9aca8

Please sign in to comment.