Skip to content

Commit

Permalink
feat: Implement SinglePartitionSubscriber. (#22)
Browse files Browse the repository at this point in the history
* feat: Implement SinglePartitionSubscriber.

This handles mapping a single partition to a Cloud Pub/Sub Like asynchronous subscriber.

* feat: Add DefaultNackHandler.

* docs: add comments explaining asyncio weirdness.
  • Loading branch information
dpcollins-google committed Sep 24, 2020
1 parent 58fda6f commit bb76d90
Show file tree
Hide file tree
Showing 10 changed files with 401 additions and 1 deletion.
11 changes: 11 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/flow_control_settings.py
@@ -0,0 +1,11 @@
from typing import NamedTuple


class FlowControlSettings(NamedTuple):
messages_outstanding: int
bytes_outstanding: int


_MAX_INT64 = 0x7FFFFFFFFFFFFFFF

DISABLED_FLOW_CONTROL = FlowControlSettings(_MAX_INT64, _MAX_INT64)
Expand Up @@ -15,6 +15,7 @@ class AckSetTrackerImpl(AckSetTracker):
_acks: "queue.PriorityQueue[int]"

def __init__(self, committer: Committer):
super().__init__()
self._committer = committer
self._receipts = deque()
self._acks = queue.PriorityQueue()
Expand Down
Expand Up @@ -11,6 +11,7 @@ class AsyncPublisherImpl(AsyncPublisher):
_publisher: Publisher

def __init__(self, publisher: Publisher):
super().__init__()
self._publisher = publisher

async def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> str:
Expand Down
Expand Up @@ -10,6 +10,7 @@ class PublisherImpl(Publisher):
_underlying: AsyncPublisher

def __init__(self, underlying: AsyncPublisher):
super().__init__()
self._managed_loop = ManagedEventLoop()
self._underlying = underlying

Expand Down
@@ -0,0 +1,123 @@
import asyncio
from typing import Union, Dict, NamedTuple
import queue

from google.api_core.exceptions import FailedPrecondition, GoogleAPICallError
from google.cloud.pubsub_v1.subscriber.message import Message
from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite.cloudpubsub.flow_control_settings import FlowControlSettings
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker
from google.cloud.pubsublite.cloudpubsub.message_transformer import MessageTransformer
from google.cloud.pubsublite.cloudpubsub.nack_handler import NackHandler
from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber
from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable
from google.cloud.pubsublite.internal.wire.subscriber import Subscriber
from google.cloud.pubsublite_v1 import FlowControlRequest, SequencedMessage
from google.cloud.pubsub_v1.subscriber._protocol import requests


class _SizedMessage(NamedTuple):
message: PubsubMessage
size_bytes: int


class SinglePartitionSubscriber(PermanentFailable, AsyncSubscriber):
_underlying: Subscriber
_flow_control_settings: FlowControlSettings
_ack_set_tracker: AckSetTracker
_nack_handler: NackHandler
_transformer: MessageTransformer

_queue: queue.Queue
_messages_by_offset: Dict[int, _SizedMessage]
_looper_future: asyncio.Future

def __init__(self, underlying: Subscriber, flow_control_settings: FlowControlSettings, ack_set_tracker: AckSetTracker,
nack_handler: NackHandler, transformer: MessageTransformer):
super().__init__()
self._underlying = underlying
self._flow_control_settings = flow_control_settings
self._ack_set_tracker = ack_set_tracker
self._nack_handler = nack_handler
self._transformer = transformer

self._queue = queue.Queue()
self._messages_by_offset = {}

async def read(self) -> Message:
message: SequencedMessage = await self.await_unless_failed(self._underlying.read())
try:
cps_message = self._transformer.transform(message)
offset = message.cursor.offset
self._ack_set_tracker.track(offset)
self._messages_by_offset[offset] = _SizedMessage(cps_message, message.size_bytes)
wrapped_message = Message(cps_message._pb, ack_id=str(offset), delivery_attempt=0, request_queue=self._queue)
return wrapped_message
except GoogleAPICallError as e:
self.fail(e)
raise e

async def _handle_ack(self, message: requests.AckRequest):
offset = int(message.ack_id)
await self._underlying.allow_flow(
FlowControlRequest(allowed_messages=1, allowed_bytes=self._messages_by_offset[offset].size_bytes))
del self._messages_by_offset[offset]
try:
await self._ack_set_tracker.ack(offset)
except GoogleAPICallError as e:
self.fail(e)

def _handle_nack(self, message: requests.NackRequest):
offset = int(message.ack_id)
sized_message = self._messages_by_offset[offset]
try:
# Put the ack request back into the queue since the callback may be called from another thread.
self._nack_handler.on_nack(sized_message.message,
lambda: self._queue.put(requests.AckRequest(
ack_id=message.ack_id,
byte_size=0, # Ignored
time_to_ack=0, # Ignored
ordering_key="" # Ignored
)))
except GoogleAPICallError as e:
self.fail(e)

async def _handle_queue_message(self, message: Union[
requests.AckRequest, requests.DropRequest, requests.ModAckRequest, requests.NackRequest]):
if isinstance(message, requests.DropRequest) or isinstance(message, requests.ModAckRequest):
self.fail(FailedPrecondition("Called internal method of google.cloud.pubsub_v1.subscriber.message.Message "
f"Pub/Sub Lite does not support: {message}"))
elif isinstance(message, requests.AckRequest):
await self._handle_ack(message)
else:
self._handle_nack(message)

async def _looper(self):
while True:
try:
# This is not an asyncio.Queue, and therefore we cannot do `await self._queue.get()`.
# A blocking wait would block the event loop, this needs to be a queue.Queue for
# compatibility with the Cloud Pub/Sub Message's requirements.
queue_message = self._queue.get_nowait()
await self._handle_queue_message(queue_message)
except queue.Empty:
await asyncio.sleep(.1)

async def __aenter__(self):
await self._ack_set_tracker.__aenter__()
await self._underlying.__aenter__()
self._looper_future = asyncio.ensure_future(self._looper())
await self._underlying.allow_flow(FlowControlRequest(
allowed_messages=self._flow_control_settings.messages_outstanding,
allowed_bytes=self._flow_control_settings.bytes_outstanding))
return self

async def __aexit__(self, exc_type, exc_value, traceback):
self._looper_future.cancel()
try:
await self._looper_future
except asyncio.CancelledError:
pass
await self._underlying.__aexit__(exc_type, exc_value, traceback)
await self._ack_set_tracker.__aexit__(exc_type, exc_value, traceback)
30 changes: 30 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/message_transformer.py
@@ -0,0 +1,30 @@
from abc import ABC, abstractmethod
from collections import Callable

from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite.cloudpubsub.message_transforms import to_cps_subscribe_message
from google.cloud.pubsublite_v1 import SequencedMessage


class MessageTransformer(ABC):
"""
A MessageTransformer turns Pub/Sub Lite message protos into Pub/Sub message protos.
"""

@abstractmethod
def transform(self, source: SequencedMessage) -> PubsubMessage:
"""Transform a SequencedMessage to a PubsubMessage.
Args:
source: The message to transform.
Raises:
GoogleAPICallError: To fail the client if raised inline.
"""
pass


class DefaultMessageTransformer(MessageTransformer):
def transform(self, source: SequencedMessage) -> PubsubMessage:
return to_cps_subscribe_message(source)
31 changes: 31 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/nack_handler.py
@@ -0,0 +1,31 @@
from abc import ABC, abstractmethod
from typing import Callable

from google.api_core.exceptions import FailedPrecondition
from google.pubsub_v1 import PubsubMessage


class NackHandler(ABC):
"""
A NackHandler handles calls to the nack() method which is not expressible in Pub/Sub Lite.
"""

@abstractmethod
def on_nack(self, message: PubsubMessage, ack: Callable[[], None]):
"""Handle a negative acknowledgement. ack must eventually be called.
Args:
message: The nacked message.
ack: A callable to acknowledge the underlying message. This must eventually be called.
Raises:
GoogleAPICallError: To fail the client if raised inline.
"""
pass


class DefaultNackHandler(NackHandler):
def on_nack(self, message: PubsubMessage, ack: Callable[[], None]):
raise FailedPrecondition(
"You may not nack messages by default when using a PubSub Lite client. See NackHandler for how to customize"
" this.")
25 changes: 25 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/subscriber.py
@@ -0,0 +1,25 @@
from abc import abstractmethod
from typing import AsyncContextManager

from google.cloud.pubsub_v1.subscriber.message import Message


class AsyncSubscriber(AsyncContextManager):
"""
A Cloud Pub/Sub asynchronous subscriber.
"""
@abstractmethod
async def read(self) -> Message:
"""
Read the next message off of the stream.
Returns:
The next message. ack() or nack() must eventually be called exactly once.
Pub/Sub Lite does not support nack() by default- if you do call nack(), it will immediately fail the client
unless you have a NackHandler installed.
Raises:
GoogleAPICallError: On a permanent error.
"""
raise NotImplementedError()
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -31,7 +31,7 @@
"google-api-core >= 1.22.0",
"absl-py >= 0.9.0",
"proto-plus >= 0.4.0",
"google-cloud-pubsub >= 1.7.0",
"google-cloud-pubsub >= 2.1.0",
"grpcio",
"setuptools"
]
Expand Down

0 comments on commit bb76d90

Please sign in to comment.