Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement SinglePartitionSubscriber. #22

Merged
merged 3 commits into from Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/flow_control_settings.py
@@ -0,0 +1,11 @@
from typing import NamedTuple

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meta comment; do you mind setting your editor's line length limit to 100 lines? The line breaks in your PRs look quite unreadable; this is fine, but I would appreciate it for the next ones.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After talking with tianzi, there's an auto-formatter that exists. I'll run that after this chain of prs is submitted since it would be quite difficult to fix the commit chain history :/ I hope thats acceptable.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, totally fine. Thanks.



class FlowControlSettings(NamedTuple):
messages_outstanding: int
bytes_outstanding: int


_MAX_INT64 = 0x7FFFFFFFFFFFFFFF

DISABLED_FLOW_CONTROL = FlowControlSettings(_MAX_INT64, _MAX_INT64)
Expand Up @@ -15,6 +15,7 @@ class AckSetTrackerImpl(AckSetTracker):
_acks: "queue.PriorityQueue[int]"

def __init__(self, committer: Committer):
super().__init__()
self._committer = committer
self._receipts = deque()
self._acks = queue.PriorityQueue()
Expand Down
Expand Up @@ -11,6 +11,7 @@ class AsyncPublisherImpl(AsyncPublisher):
_publisher: Publisher

def __init__(self, publisher: Publisher):
super().__init__()
self._publisher = publisher

async def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> str:
Expand Down
Expand Up @@ -10,6 +10,7 @@ class PublisherImpl(Publisher):
_underlying: AsyncPublisher

def __init__(self, underlying: AsyncPublisher):
super().__init__()
self._managed_loop = ManagedEventLoop()
self._underlying = underlying

Expand Down
@@ -0,0 +1,123 @@
import asyncio
from typing import Union, Dict, NamedTuple
import queue

from google.api_core.exceptions import FailedPrecondition, GoogleAPICallError
from google.cloud.pubsub_v1.subscriber.message import Message
from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite.cloudpubsub.flow_control_settings import FlowControlSettings
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker
from google.cloud.pubsublite.cloudpubsub.message_transformer import MessageTransformer
from google.cloud.pubsublite.cloudpubsub.nack_handler import NackHandler
from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber
from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable
from google.cloud.pubsublite.internal.wire.subscriber import Subscriber
from google.cloud.pubsublite_v1 import FlowControlRequest, SequencedMessage
from google.cloud.pubsub_v1.subscriber._protocol import requests


class _SizedMessage(NamedTuple):
message: PubsubMessage
size_bytes: int


class SinglePartitionSubscriber(PermanentFailable, AsyncSubscriber):
_underlying: Subscriber
_flow_control_settings: FlowControlSettings
_ack_set_tracker: AckSetTracker
_nack_handler: NackHandler
_transformer: MessageTransformer

_queue: queue.Queue
_messages_by_offset: Dict[int, _SizedMessage]
_looper_future: asyncio.Future

def __init__(self, underlying: Subscriber, flow_control_settings: FlowControlSettings, ack_set_tracker: AckSetTracker,
nack_handler: NackHandler, transformer: MessageTransformer):
super().__init__()
self._underlying = underlying
self._flow_control_settings = flow_control_settings
self._ack_set_tracker = ack_set_tracker
self._nack_handler = nack_handler
self._transformer = transformer

self._queue = queue.Queue()
self._messages_by_offset = {}

async def read(self) -> Message:
message: SequencedMessage = await self.await_unless_failed(self._underlying.read())
try:
cps_message = self._transformer.transform(message)
offset = message.cursor.offset
self._ack_set_tracker.track(offset)
self._messages_by_offset[offset] = _SizedMessage(cps_message, message.size_bytes)
wrapped_message = Message(cps_message._pb, ack_id=str(offset), delivery_attempt=0, request_queue=self._queue)
return wrapped_message
except GoogleAPICallError as e:
self.fail(e)
raise e

async def _handle_ack(self, message: requests.AckRequest):
offset = int(message.ack_id)
await self._underlying.allow_flow(
FlowControlRequest(allowed_messages=1, allowed_bytes=self._messages_by_offset[offset].size_bytes))
del self._messages_by_offset[offset]
try:
await self._ack_set_tracker.ack(offset)
except GoogleAPICallError as e:
self.fail(e)

def _handle_nack(self, message: requests.NackRequest):
offset = int(message.ack_id)
sized_message = self._messages_by_offset[offset]
try:
# Put the ack request back into the queue since the callback may be called from another thread.
self._nack_handler.on_nack(sized_message.message,
lambda: self._queue.put(requests.AckRequest(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to use AckRequests? It seems simpler to just call _handle_ack() here. What am I missing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_handle_ack is a coroutine. the nack handler function is a Callable[[PubsubMessage], None] to enable it being called from other threads that are not part of the event loop. It would be harder to call _handle_ack from here than it would be to just put an AckRequest on the queue.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. Maybe add a comment? This was a bit unintuitive to me, possibly because I've never used asyncio before.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

ack_id=message.ack_id,
byte_size=0, # Ignored
time_to_ack=0, # Ignored
ordering_key="" # Ignored
)))
except GoogleAPICallError as e:
self.fail(e)

async def _handle_queue_message(self, message: Union[
requests.AckRequest, requests.DropRequest, requests.ModAckRequest, requests.NackRequest]):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the queue really have all these kinds of messages? Do we ever want them?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can. This is the protocol I backed out from here https://github.com/googleapis/python-pubsub/blob/master/google/cloud/pubsub_v1/subscriber/message.py

Technically drop and modack requests can be sent from the message, but as you can see on the line below we immediately fail the client if those requests are sent.

if isinstance(message, requests.DropRequest) or isinstance(message, requests.ModAckRequest):
self.fail(FailedPrecondition("Called internal method of google.cloud.pubsub_v1.subscriber.message.Message "
f"Pub/Sub Lite does not support: {message}"))
elif isinstance(message, requests.AckRequest):
await self._handle_ack(message)
else:
self._handle_nack(message)

async def _looper(self):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use asyncio's default event_loop for asynchronicity?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does use asyncio's default event loop? This is just a coroutine running in it that polls for a queue.Queue to have a message. There is asyncio.Queue (used elsewhere) which you can await on, but Message.ack/nack (from CPS' client library) needs to be able to be called from other threads, so it uses the threadsafe queue instead of the asyncio-enabled one.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for explaining this, both here and offline. Do you mind adding a comment explaining why we cannot use the asyncio queue, and why we cannot use the blocking get() here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

while True:
try:
# This is not an asyncio.Queue, and therefore we cannot do `await self._queue.get()`.
# A blocking wait would block the event loop, this needs to be a queue.Queue for
# compatibility with the Cloud Pub/Sub Message's requirements.
queue_message = self._queue.get_nowait()
await self._handle_queue_message(queue_message)
except queue.Empty:
await asyncio.sleep(.1)

async def __aenter__(self):
await self._ack_set_tracker.__aenter__()
await self._underlying.__aenter__()
self._looper_future = asyncio.ensure_future(self._looper())
await self._underlying.allow_flow(FlowControlRequest(
allowed_messages=self._flow_control_settings.messages_outstanding,
allowed_bytes=self._flow_control_settings.bytes_outstanding))
return self

async def __aexit__(self, exc_type, exc_value, traceback):
self._looper_future.cancel()
try:
await self._looper_future
except asyncio.CancelledError:
pass
await self._underlying.__aexit__(exc_type, exc_value, traceback)
await self._ack_set_tracker.__aexit__(exc_type, exc_value, traceback)
30 changes: 30 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/message_transformer.py
@@ -0,0 +1,30 @@
from abc import ABC, abstractmethod
from collections import Callable

from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite.cloudpubsub.message_transforms import to_cps_subscribe_message
from google.cloud.pubsublite_v1 import SequencedMessage


class MessageTransformer(ABC):
"""
A MessageTransformer turns Pub/Sub Lite message protos into Pub/Sub message protos.
"""

@abstractmethod
def transform(self, source: SequencedMessage) -> PubsubMessage:
"""Transform a SequencedMessage to a PubsubMessage.

Args:
source: The message to transform.

Raises:
GoogleAPICallError: To fail the client if raised inline.
"""
pass


class DefaultMessageTransformer(MessageTransformer):
def transform(self, source: SequencedMessage) -> PubsubMessage:
return to_cps_subscribe_message(source)
31 changes: 31 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/nack_handler.py
@@ -0,0 +1,31 @@
from abc import ABC, abstractmethod
from typing import Callable

from google.api_core.exceptions import FailedPrecondition
from google.pubsub_v1 import PubsubMessage


class NackHandler(ABC):
"""
A NackHandler handles calls to the nack() method which is not expressible in Pub/Sub Lite.
"""

@abstractmethod
def on_nack(self, message: PubsubMessage, ack: Callable[[], None]):
"""Handle a negative acknowledgement. ack must eventually be called.

Args:
message: The nacked message.
ack: A callable to acknowledge the underlying message. This must eventually be called.

Raises:
GoogleAPICallError: To fail the client if raised inline.
"""
pass


class DefaultNackHandler(NackHandler):
def on_nack(self, message: PubsubMessage, ack: Callable[[], None]):
raise FailedPrecondition(
"You may not nack messages by default when using a PubSub Lite client. See NackHandler for how to customize"
" this.")
25 changes: 25 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/subscriber.py
@@ -0,0 +1,25 @@
from abc import abstractmethod
from typing import AsyncContextManager

from google.cloud.pubsub_v1.subscriber.message import Message


class AsyncSubscriber(AsyncContextManager):
"""
A Cloud Pub/Sub asynchronous subscriber.
"""
@abstractmethod
async def read(self) -> Message:
"""
Read the next message off of the stream.

Returns:
The next message. ack() or nack() must eventually be called exactly once.

Pub/Sub Lite does not support nack() by default- if you do call nack(), it will immediately fail the client
unless you have a NackHandler installed.

Raises:
GoogleAPICallError: On a permanent error.
"""
raise NotImplementedError()
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -31,7 +31,7 @@
"google-api-core >= 1.22.0",
"absl-py >= 0.9.0",
"proto-plus >= 0.4.0",
"google-cloud-pubsub >= 1.7.0",
"google-cloud-pubsub >= 2.1.0",
"grpcio",
"setuptools"
]
Expand Down