Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement committer #13

Merged
merged 4 commits into from Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions google/cloud/pubsublite/internal/wire/committer.py
@@ -0,0 +1,13 @@
from abc import abstractmethod
from typing import AsyncContextManager

from google.cloud.pubsublite_v1 import Cursor


class Committer(AsyncContextManager):
"""
A Committer is able to commit subscribers' completed offsets.
"""
@abstractmethod
async def commit(self, cursor: Cursor) -> None:
pass
138 changes: 138 additions & 0 deletions google/cloud/pubsublite/internal/wire/committer_impl.py
@@ -0,0 +1,138 @@
import asyncio
from typing import Optional, List, Iterable

from absl import logging

from google.cloud.pubsublite.internal.wire.committer import Committer
from google.cloud.pubsublite.internal.wire.retrying_connection import RetryingConnection, ConnectionFactory
from google.api_core.exceptions import FailedPrecondition, GoogleAPICallError
from google.cloud.pubsublite.internal.wire.connection_reinitializer import ConnectionReinitializer
from google.cloud.pubsublite.internal.wire.connection import Connection
from google.cloud.pubsublite.internal.wire.serial_batcher import SerialBatcher, BatchTester
from google.cloud.pubsublite_v1 import Cursor
from google.cloud.pubsublite_v1.types import StreamingCommitCursorRequest, StreamingCommitCursorResponse, InitialCommitCursorRequest
from google.cloud.pubsublite.internal.wire.work_item import WorkItem


class CommitterImpl(Committer, ConnectionReinitializer[StreamingCommitCursorRequest, StreamingCommitCursorResponse], BatchTester[Cursor]):
_initial: InitialCommitCursorRequest
_flush_seconds: float
_connection: RetryingConnection[StreamingCommitCursorRequest, StreamingCommitCursorResponse]

_batcher: SerialBatcher[Cursor, None]

_outstanding_commits: List[List[WorkItem[Cursor, None]]]

_receiver: Optional[asyncio.Future]
_flusher: Optional[asyncio.Future]

def __init__(self, initial: InitialCommitCursorRequest, flush_seconds: float,
factory: ConnectionFactory[StreamingCommitCursorRequest, StreamingCommitCursorResponse]):
self._initial = initial
self._flush_seconds = flush_seconds
self._connection = RetryingConnection(factory, self)
self._batcher = SerialBatcher(self)
self._outstanding_commits = []
self._receiver = None
self._flusher = None

async def __aenter__(self):
await self._connection.__aenter__()

def _start_loopers(self):
assert self._receiver is None
assert self._flusher is None
self._receiver = asyncio.ensure_future(self._receive_loop())
self._flusher = asyncio.ensure_future(self._flush_loop())

async def _stop_loopers(self):
if self._receiver:
self._receiver.cancel()
await self._receiver
self._receiver = None
if self._flusher:
self._flusher.cancel()
await self._flusher
self._flusher = None

def _handle_response(self, response: StreamingCommitCursorResponse):
if "commit" not in response:
self._connection.fail(FailedPrecondition("Received an invalid subsequent response on the commit stream."))
if response.commit.acknowledged_commits > len(self._outstanding_commits):
self._connection.fail(
FailedPrecondition("Received a commit response on the stream with no outstanding commits."))
for _ in range(response.commit.acknowledged_commits):
batch = self._outstanding_commits.pop(0)
pradn marked this conversation as resolved.
Show resolved Hide resolved
for item in batch:
item.response_future.set_result(None)

async def _receive_loop(self):
try:
while True:
response = await self._connection.read()
self._handle_response(response)
except asyncio.CancelledError:
return

async def _flush_loop(self):
try:
while True:
await asyncio.sleep(self._flush_seconds)
await self._flush()
except asyncio.CancelledError:
return

async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._connection.error():
self._fail_if_retrying_failed()
else:
await self._flush()
await self._connection.__aexit__(exc_type, exc_val, exc_tb)

def _fail_if_retrying_failed(self):
if self._connection.error():
for batch in self._outstanding_commits:
for item in batch:
item.response_future.set_exception(self._connection.error())

async def _flush(self):
batch = self._batcher.flush()
if not batch:
return
self._outstanding_commits.append(batch)
req = StreamingCommitCursorRequest()
req.commit.cursor = batch[-1].request
try:
await self._connection.write(req)
except GoogleAPICallError as e:
logging.debug(f"Failed commit on stream: {e}")
self._fail_if_retrying_failed()

async def commit(self, cursor: Cursor) -> None:
future = self._batcher.add(cursor)
if self._batcher.should_flush():
# always returns false currently, here in case this changes in the future.
await self._flush()
await future

async def reinitialize(self, connection: Connection[StreamingCommitCursorRequest, StreamingCommitCursorResponse]):
await self._stop_loopers()
await connection.write(StreamingCommitCursorRequest(initial=self._initial))
response = await connection.read()
if "initial" not in response:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally we're not doing substrings to check for errors? This happens in a few places.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't substrings.... its checking for a subfield in the response. This is how you're supposed to check for oneof field presence in the new proto lib.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah gotcha!

self._connection.fail(FailedPrecondition("Received an invalid initial response on the publish stream."))
if self._outstanding_commits:
# Roll up outstanding commits
rollup: List[WorkItem[Cursor, None]] = []
for batch in self._outstanding_commits:
for item in batch:
rollup.append(item)
self._outstanding_commits = [rollup]
req = StreamingCommitCursorRequest()
req.commit.cursor = rollup[-1].request
await connection.write(req)
self._start_loopers()

def test(self, requests: Iterable[Cursor]) -> bool:
# There is no bound on the number of outstanding cursors.
return False
Expand Up @@ -7,7 +7,8 @@ class ConnectionReinitializer(Generic[Request, Response], metaclass=ABCMeta):
"""A class capable of reinitializing a connection after a new one has been created."""
@abstractmethod
def reinitialize(self, connection: Connection[Request, Response]):
"""Reinitialize a connection.
"""Reinitialize a connection. Must ensure no calls to the associated RetryingConnection
pradn marked this conversation as resolved.
Show resolved Hide resolved
occur until this completes.

Args:
connection: The connection to reinitialize
Expand Down
13 changes: 3 additions & 10 deletions google/cloud/pubsublite/internal/wire/publisher.py
@@ -1,20 +1,13 @@
from abc import ABC, abstractmethod
from abc import abstractmethod
from typing import AsyncContextManager
from google.cloud.pubsublite_v1.types import PubSubMessage
from google.cloud.pubsublite.publish_metadata import PublishMetadata


class Publisher(ABC):
class Publisher(AsyncContextManager):
"""
A Pub/Sub Lite asynchronous wire protocol publisher.
"""
@abstractmethod
async def __aenter__(self):
raise NotImplementedError()

@abstractmethod
async def __aexit__(self, exc_type, exc_val, exc_tb):
raise NotImplementedError()

@abstractmethod
async def publish(self, message: PubSubMessage) -> PublishMetadata:
"""
Expand Down
7 changes: 7 additions & 0 deletions google/cloud/pubsublite/internal/wire/retrying_connection.py
Expand Up @@ -48,15 +48,22 @@ async def _run_loop(self):
"""
Processes actions on this connection and handles retries until cancelled.
"""
last_failure: GoogleAPICallError
try:
bad_retries = 0
while True:
try:
async with self._connection_factory.new() as connection:
# Needs to happen prior to reinitialization to clear outstanding waiters.
while not self._write_queue.empty():
self._write_queue.get_nowait().response_future.set_exception(last_failure)
self._read_queue = asyncio.Queue(maxsize=1)
self._write_queue = asyncio.Queue(maxsize=1)
await self._reinitializer.reinitialize(connection)
bad_retries = 0
await self._loop_connection(connection)
except GoogleAPICallError as e:
last_failure = e
if not is_retryable(e):
self.fail(e)
return
Expand Down
60 changes: 33 additions & 27 deletions setup.py
Expand Up @@ -25,40 +25,46 @@

readme_filename = os.path.join(package_root, "README.rst")
with io.open(readme_filename, encoding="utf-8") as readme_file:
readme = readme_file.read()
readme = readme_file.read()

dependencies = [
"google-api-core >= 1.22.0",
"absl-py >= 0.9.0",
"proto-plus >= 0.4.0",
"grpcio",
"setuptools"
]

test_dependencies = [
"asynctest",
"pytest",
"pytest-asyncio"
]

setuptools.setup(
name="google-cloud-pubsublite",
version=version,
long_description=readme,
author="Google LLC",
author_email="googleapis-packages@google.com",
license="Apache 2.0",
url="https://github.com/googleapis/python-pubsublite",
packages=setuptools.PEP420PackageFinder.find(),
namespace_packages=("google", "google.cloud"),
platforms="Posix; MacOS X; Windows",
include_package_data=True,
install_requires=dependencies,
setup_requires=('pytest-runner',),
tests_require=['asynctest', 'pytest', 'pytest-asyncio'],
python_requires=">=3.6",
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Topic :: Internet",
"Topic :: Software Development :: Libraries :: Python Modules",
],
zip_safe=False,
name="google-cloud-pubsublite",
pradn marked this conversation as resolved.
Show resolved Hide resolved
version=version,
long_description=readme,
author="Google LLC",
author_email="googleapis-packages@google.com",
license="Apache 2.0",
url="https://github.com/googleapis/python-pubsublite",
packages=setuptools.PEP420PackageFinder.find(),
namespace_packages=("google", "google.cloud"),
platforms="Posix; MacOS X; Windows",
include_package_data=True,
install_requires=dependencies,
extras_require={"tests": test_dependencies},
python_requires=">=3.6",
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Topic :: Internet",
"Topic :: Software Development :: Libraries :: Python Modules",
],
zip_safe=False,
)