New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Implement SinglePartitionSubscriber. #22
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
from typing import NamedTuple | ||
|
||
|
||
class FlowControlSettings(NamedTuple): | ||
messages_outstanding: int | ||
bytes_outstanding: int | ||
|
||
|
||
_MAX_INT64 = 0x7FFFFFFFFFFFFFFF | ||
|
||
DISABLED_FLOW_CONTROL = FlowControlSettings(_MAX_INT64, _MAX_INT64) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
import asyncio | ||
from typing import Union, Dict, NamedTuple | ||
import queue | ||
|
||
from google.api_core.exceptions import FailedPrecondition, GoogleAPICallError | ||
from google.cloud.pubsub_v1.subscriber.message import Message | ||
from google.pubsub_v1 import PubsubMessage | ||
|
||
from google.cloud.pubsublite.cloudpubsub.flow_control_settings import FlowControlSettings | ||
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker | ||
from google.cloud.pubsublite.cloudpubsub.message_transformer import MessageTransformer | ||
from google.cloud.pubsublite.cloudpubsub.nack_handler import NackHandler | ||
from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber | ||
from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable | ||
from google.cloud.pubsublite.internal.wire.subscriber import Subscriber | ||
from google.cloud.pubsublite_v1 import FlowControlRequest, SequencedMessage | ||
from google.cloud.pubsub_v1.subscriber._protocol import requests | ||
|
||
|
||
class _SizedMessage(NamedTuple): | ||
message: PubsubMessage | ||
size_bytes: int | ||
|
||
|
||
class SinglePartitionSubscriber(PermanentFailable, AsyncSubscriber): | ||
_underlying: Subscriber | ||
_flow_control_settings: FlowControlSettings | ||
_ack_set_tracker: AckSetTracker | ||
_nack_handler: NackHandler | ||
_transformer: MessageTransformer | ||
|
||
_queue: queue.Queue | ||
_messages_by_offset: Dict[int, _SizedMessage] | ||
_looper_future: asyncio.Future | ||
|
||
def __init__(self, underlying: Subscriber, flow_control_settings: FlowControlSettings, ack_set_tracker: AckSetTracker, | ||
nack_handler: NackHandler, transformer: MessageTransformer): | ||
super().__init__() | ||
self._underlying = underlying | ||
self._flow_control_settings = flow_control_settings | ||
self._ack_set_tracker = ack_set_tracker | ||
self._nack_handler = nack_handler | ||
self._transformer = transformer | ||
|
||
self._queue = queue.Queue() | ||
self._messages_by_offset = {} | ||
|
||
async def read(self) -> Message: | ||
message: SequencedMessage = await self.await_unless_failed(self._underlying.read()) | ||
try: | ||
cps_message = self._transformer.transform(message) | ||
offset = message.cursor.offset | ||
self._ack_set_tracker.track(offset) | ||
self._messages_by_offset[offset] = _SizedMessage(cps_message, message.size_bytes) | ||
wrapped_message = Message(cps_message._pb, ack_id=str(offset), delivery_attempt=0, request_queue=self._queue) | ||
return wrapped_message | ||
except GoogleAPICallError as e: | ||
self.fail(e) | ||
raise e | ||
|
||
async def _handle_ack(self, message: requests.AckRequest): | ||
offset = int(message.ack_id) | ||
await self._underlying.allow_flow( | ||
FlowControlRequest(allowed_messages=1, allowed_bytes=self._messages_by_offset[offset].size_bytes)) | ||
del self._messages_by_offset[offset] | ||
try: | ||
await self._ack_set_tracker.ack(offset) | ||
except GoogleAPICallError as e: | ||
self.fail(e) | ||
|
||
def _handle_nack(self, message: requests.NackRequest): | ||
offset = int(message.ack_id) | ||
sized_message = self._messages_by_offset[offset] | ||
try: | ||
# Put the ack request back into the queue since the callback may be called from another thread. | ||
self._nack_handler.on_nack(sized_message.message, | ||
lambda: self._queue.put(requests.AckRequest( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have to use AckRequests? It seems simpler to just call _handle_ack() here. What am I missing? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. _handle_ack is a coroutine. the nack handler function is a Callable[[PubsubMessage], None] to enable it being called from other threads that are not part of the event loop. It would be harder to call _handle_ack from here than it would be to just put an AckRequest on the queue. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SGTM. Maybe add a comment? This was a bit unintuitive to me, possibly because I've never used asyncio before. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
ack_id=message.ack_id, | ||
byte_size=0, # Ignored | ||
time_to_ack=0, # Ignored | ||
ordering_key="" # Ignored | ||
))) | ||
except GoogleAPICallError as e: | ||
self.fail(e) | ||
|
||
async def _handle_queue_message(self, message: Union[ | ||
requests.AckRequest, requests.DropRequest, requests.ModAckRequest, requests.NackRequest]): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can the queue really have all these kinds of messages? Do we ever want them? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It can. This is the protocol I backed out from here https://github.com/googleapis/python-pubsub/blob/master/google/cloud/pubsub_v1/subscriber/message.py Technically drop and modack requests can be sent from the message, but as you can see on the line below we immediately fail the client if those requests are sent. |
||
if isinstance(message, requests.DropRequest) or isinstance(message, requests.ModAckRequest): | ||
self.fail(FailedPrecondition("Called internal method of google.cloud.pubsub_v1.subscriber.message.Message " | ||
f"Pub/Sub Lite does not support: {message}")) | ||
elif isinstance(message, requests.AckRequest): | ||
await self._handle_ack(message) | ||
else: | ||
self._handle_nack(message) | ||
|
||
async def _looper(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not use asyncio's default event_loop for asynchronicity? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does use asyncio's default event loop? This is just a coroutine running in it that polls for a queue.Queue to have a message. There is asyncio.Queue (used elsewhere) which you can await on, but Message.ack/nack (from CPS' client library) needs to be able to be called from other threads, so it uses the threadsafe queue instead of the asyncio-enabled one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for explaining this, both here and offline. Do you mind adding a comment explaining why we cannot use the asyncio queue, and why we cannot use the blocking get() here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
while True: | ||
try: | ||
# This is not an asyncio.Queue, and therefore we cannot do `await self._queue.get()`. | ||
# A blocking wait would block the event loop, this needs to be a queue.Queue for | ||
# compatibility with the Cloud Pub/Sub Message's requirements. | ||
queue_message = self._queue.get_nowait() | ||
await self._handle_queue_message(queue_message) | ||
except queue.Empty: | ||
await asyncio.sleep(.1) | ||
|
||
async def __aenter__(self): | ||
await self._ack_set_tracker.__aenter__() | ||
await self._underlying.__aenter__() | ||
self._looper_future = asyncio.ensure_future(self._looper()) | ||
await self._underlying.allow_flow(FlowControlRequest( | ||
allowed_messages=self._flow_control_settings.messages_outstanding, | ||
allowed_bytes=self._flow_control_settings.bytes_outstanding)) | ||
return self | ||
|
||
async def __aexit__(self, exc_type, exc_value, traceback): | ||
self._looper_future.cancel() | ||
try: | ||
await self._looper_future | ||
except asyncio.CancelledError: | ||
pass | ||
await self._underlying.__aexit__(exc_type, exc_value, traceback) | ||
await self._ack_set_tracker.__aexit__(exc_type, exc_value, traceback) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
from abc import ABC, abstractmethod | ||
from collections import Callable | ||
|
||
from google.pubsub_v1 import PubsubMessage | ||
|
||
from google.cloud.pubsublite.cloudpubsub.message_transforms import to_cps_subscribe_message | ||
from google.cloud.pubsublite_v1 import SequencedMessage | ||
|
||
|
||
class MessageTransformer(ABC): | ||
""" | ||
A MessageTransformer turns Pub/Sub Lite message protos into Pub/Sub message protos. | ||
""" | ||
|
||
@abstractmethod | ||
def transform(self, source: SequencedMessage) -> PubsubMessage: | ||
"""Transform a SequencedMessage to a PubsubMessage. | ||
|
||
Args: | ||
source: The message to transform. | ||
|
||
Raises: | ||
GoogleAPICallError: To fail the client if raised inline. | ||
""" | ||
pass | ||
|
||
|
||
class DefaultMessageTransformer(MessageTransformer): | ||
def transform(self, source: SequencedMessage) -> PubsubMessage: | ||
return to_cps_subscribe_message(source) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
from abc import ABC, abstractmethod | ||
from typing import Callable | ||
|
||
from google.api_core.exceptions import FailedPrecondition | ||
from google.pubsub_v1 import PubsubMessage | ||
|
||
|
||
class NackHandler(ABC): | ||
""" | ||
A NackHandler handles calls to the nack() method which is not expressible in Pub/Sub Lite. | ||
""" | ||
|
||
@abstractmethod | ||
def on_nack(self, message: PubsubMessage, ack: Callable[[], None]): | ||
"""Handle a negative acknowledgement. ack must eventually be called. | ||
|
||
Args: | ||
message: The nacked message. | ||
ack: A callable to acknowledge the underlying message. This must eventually be called. | ||
|
||
Raises: | ||
GoogleAPICallError: To fail the client if raised inline. | ||
""" | ||
pass | ||
|
||
|
||
class DefaultNackHandler(NackHandler): | ||
def on_nack(self, message: PubsubMessage, ack: Callable[[], None]): | ||
raise FailedPrecondition( | ||
"You may not nack messages by default when using a PubSub Lite client. See NackHandler for how to customize" | ||
" this.") |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
from abc import abstractmethod | ||
from typing import AsyncContextManager | ||
|
||
from google.cloud.pubsub_v1.subscriber.message import Message | ||
|
||
|
||
class AsyncSubscriber(AsyncContextManager): | ||
""" | ||
A Cloud Pub/Sub asynchronous subscriber. | ||
""" | ||
@abstractmethod | ||
async def read(self) -> Message: | ||
""" | ||
Read the next message off of the stream. | ||
|
||
Returns: | ||
The next message. ack() or nack() must eventually be called exactly once. | ||
|
||
Pub/Sub Lite does not support nack() by default- if you do call nack(), it will immediately fail the client | ||
unless you have a NackHandler installed. | ||
|
||
Raises: | ||
GoogleAPICallError: On a permanent error. | ||
""" | ||
raise NotImplementedError() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Meta comment; do you mind setting your editor's line length limit to 100 lines? The line breaks in your PRs look quite unreadable; this is fine, but I would appreciate it for the next ones.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After talking with tianzi, there's an auto-formatter that exists. I'll run that after this chain of prs is submitted since it would be quite difficult to fix the commit chain history :/ I hope thats acceptable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, totally fine. Thanks.