diff --git a/google/cloud/pubsublite/cloudpubsub/flow_control_settings.py b/google/cloud/pubsublite/cloudpubsub/flow_control_settings.py new file mode 100644 index 00000000..7837bb80 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/flow_control_settings.py @@ -0,0 +1,11 @@ +from typing import NamedTuple + + +class FlowControlSettings(NamedTuple): + messages_outstanding: int + bytes_outstanding: int + + +_MAX_INT64 = 0x7FFFFFFFFFFFFFFF + +DISABLED_FLOW_CONTROL = FlowControlSettings(_MAX_INT64, _MAX_INT64) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker_impl.py index 45f0cd56..99232d45 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker_impl.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker_impl.py @@ -15,6 +15,7 @@ class AckSetTrackerImpl(AckSetTracker): _acks: "queue.PriorityQueue[int]" def __init__(self, committer: Committer): + super().__init__() self._committer = committer self._receipts = deque() self._acks = queue.PriorityQueue() diff --git a/google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py index 7828ee8d..a05d90f2 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py @@ -11,6 +11,7 @@ class AsyncPublisherImpl(AsyncPublisher): _publisher: Publisher def __init__(self, publisher: Publisher): + super().__init__() self._publisher = publisher async def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> str: diff --git a/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py index 25419580..9e760008 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py @@ -10,6 +10,7 @@ class PublisherImpl(Publisher): _underlying: AsyncPublisher def __init__(self, underlying: AsyncPublisher): + super().__init__() self._managed_loop = ManagedEventLoop() self._underlying = underlying diff --git a/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py new file mode 100644 index 00000000..a6ac4fd7 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py @@ -0,0 +1,123 @@ +import asyncio +from typing import Union, Dict, NamedTuple +import queue + +from google.api_core.exceptions import FailedPrecondition, GoogleAPICallError +from google.cloud.pubsub_v1.subscriber.message import Message +from google.pubsub_v1 import PubsubMessage + +from google.cloud.pubsublite.cloudpubsub.flow_control_settings import FlowControlSettings +from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker +from google.cloud.pubsublite.cloudpubsub.message_transformer import MessageTransformer +from google.cloud.pubsublite.cloudpubsub.nack_handler import NackHandler +from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber +from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable +from google.cloud.pubsublite.internal.wire.subscriber import Subscriber +from google.cloud.pubsublite_v1 import FlowControlRequest, SequencedMessage +from google.cloud.pubsub_v1.subscriber._protocol import requests + + +class _SizedMessage(NamedTuple): + message: PubsubMessage + size_bytes: int + + +class SinglePartitionSubscriber(PermanentFailable, AsyncSubscriber): + _underlying: Subscriber + _flow_control_settings: FlowControlSettings + _ack_set_tracker: AckSetTracker + _nack_handler: NackHandler + _transformer: MessageTransformer + + _queue: queue.Queue + _messages_by_offset: Dict[int, _SizedMessage] + _looper_future: asyncio.Future + + def __init__(self, underlying: Subscriber, flow_control_settings: FlowControlSettings, ack_set_tracker: AckSetTracker, + nack_handler: NackHandler, transformer: MessageTransformer): + super().__init__() + self._underlying = underlying + self._flow_control_settings = flow_control_settings + self._ack_set_tracker = ack_set_tracker + self._nack_handler = nack_handler + self._transformer = transformer + + self._queue = queue.Queue() + self._messages_by_offset = {} + + async def read(self) -> Message: + message: SequencedMessage = await self.await_unless_failed(self._underlying.read()) + try: + cps_message = self._transformer.transform(message) + offset = message.cursor.offset + self._ack_set_tracker.track(offset) + self._messages_by_offset[offset] = _SizedMessage(cps_message, message.size_bytes) + wrapped_message = Message(cps_message._pb, ack_id=str(offset), delivery_attempt=0, request_queue=self._queue) + return wrapped_message + except GoogleAPICallError as e: + self.fail(e) + raise e + + async def _handle_ack(self, message: requests.AckRequest): + offset = int(message.ack_id) + await self._underlying.allow_flow( + FlowControlRequest(allowed_messages=1, allowed_bytes=self._messages_by_offset[offset].size_bytes)) + del self._messages_by_offset[offset] + try: + await self._ack_set_tracker.ack(offset) + except GoogleAPICallError as e: + self.fail(e) + + def _handle_nack(self, message: requests.NackRequest): + offset = int(message.ack_id) + sized_message = self._messages_by_offset[offset] + try: + # Put the ack request back into the queue since the callback may be called from another thread. + self._nack_handler.on_nack(sized_message.message, + lambda: self._queue.put(requests.AckRequest( + ack_id=message.ack_id, + byte_size=0, # Ignored + time_to_ack=0, # Ignored + ordering_key="" # Ignored + ))) + except GoogleAPICallError as e: + self.fail(e) + + async def _handle_queue_message(self, message: Union[ + requests.AckRequest, requests.DropRequest, requests.ModAckRequest, requests.NackRequest]): + if isinstance(message, requests.DropRequest) or isinstance(message, requests.ModAckRequest): + self.fail(FailedPrecondition("Called internal method of google.cloud.pubsub_v1.subscriber.message.Message " + f"Pub/Sub Lite does not support: {message}")) + elif isinstance(message, requests.AckRequest): + await self._handle_ack(message) + else: + self._handle_nack(message) + + async def _looper(self): + while True: + try: + # This is not an asyncio.Queue, and therefore we cannot do `await self._queue.get()`. + # A blocking wait would block the event loop, this needs to be a queue.Queue for + # compatibility with the Cloud Pub/Sub Message's requirements. + queue_message = self._queue.get_nowait() + await self._handle_queue_message(queue_message) + except queue.Empty: + await asyncio.sleep(.1) + + async def __aenter__(self): + await self._ack_set_tracker.__aenter__() + await self._underlying.__aenter__() + self._looper_future = asyncio.ensure_future(self._looper()) + await self._underlying.allow_flow(FlowControlRequest( + allowed_messages=self._flow_control_settings.messages_outstanding, + allowed_bytes=self._flow_control_settings.bytes_outstanding)) + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + self._looper_future.cancel() + try: + await self._looper_future + except asyncio.CancelledError: + pass + await self._underlying.__aexit__(exc_type, exc_value, traceback) + await self._ack_set_tracker.__aexit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/message_transformer.py b/google/cloud/pubsublite/cloudpubsub/message_transformer.py new file mode 100644 index 00000000..a147c8d0 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/message_transformer.py @@ -0,0 +1,30 @@ +from abc import ABC, abstractmethod +from collections import Callable + +from google.pubsub_v1 import PubsubMessage + +from google.cloud.pubsublite.cloudpubsub.message_transforms import to_cps_subscribe_message +from google.cloud.pubsublite_v1 import SequencedMessage + + +class MessageTransformer(ABC): + """ + A MessageTransformer turns Pub/Sub Lite message protos into Pub/Sub message protos. + """ + + @abstractmethod + def transform(self, source: SequencedMessage) -> PubsubMessage: + """Transform a SequencedMessage to a PubsubMessage. + + Args: + source: The message to transform. + + Raises: + GoogleAPICallError: To fail the client if raised inline. + """ + pass + + +class DefaultMessageTransformer(MessageTransformer): + def transform(self, source: SequencedMessage) -> PubsubMessage: + return to_cps_subscribe_message(source) diff --git a/google/cloud/pubsublite/cloudpubsub/nack_handler.py b/google/cloud/pubsublite/cloudpubsub/nack_handler.py new file mode 100644 index 00000000..6455efa5 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/nack_handler.py @@ -0,0 +1,31 @@ +from abc import ABC, abstractmethod +from typing import Callable + +from google.api_core.exceptions import FailedPrecondition +from google.pubsub_v1 import PubsubMessage + + +class NackHandler(ABC): + """ + A NackHandler handles calls to the nack() method which is not expressible in Pub/Sub Lite. + """ + + @abstractmethod + def on_nack(self, message: PubsubMessage, ack: Callable[[], None]): + """Handle a negative acknowledgement. ack must eventually be called. + + Args: + message: The nacked message. + ack: A callable to acknowledge the underlying message. This must eventually be called. + + Raises: + GoogleAPICallError: To fail the client if raised inline. + """ + pass + + +class DefaultNackHandler(NackHandler): + def on_nack(self, message: PubsubMessage, ack: Callable[[], None]): + raise FailedPrecondition( + "You may not nack messages by default when using a PubSub Lite client. See NackHandler for how to customize" + " this.") diff --git a/google/cloud/pubsublite/cloudpubsub/subscriber.py b/google/cloud/pubsublite/cloudpubsub/subscriber.py new file mode 100644 index 00000000..2c8922fd --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/subscriber.py @@ -0,0 +1,25 @@ +from abc import abstractmethod +from typing import AsyncContextManager + +from google.cloud.pubsub_v1.subscriber.message import Message + + +class AsyncSubscriber(AsyncContextManager): + """ + A Cloud Pub/Sub asynchronous subscriber. + """ + @abstractmethod + async def read(self) -> Message: + """ + Read the next message off of the stream. + + Returns: + The next message. ack() or nack() must eventually be called exactly once. + + Pub/Sub Lite does not support nack() by default- if you do call nack(), it will immediately fail the client + unless you have a NackHandler installed. + + Raises: + GoogleAPICallError: On a permanent error. + """ + raise NotImplementedError() diff --git a/setup.py b/setup.py index 2810f54d..fde78f97 100644 --- a/setup.py +++ b/setup.py @@ -31,7 +31,7 @@ "google-api-core >= 1.22.0", "absl-py >= 0.9.0", "proto-plus >= 0.4.0", - "google-cloud-pubsub >= 1.7.0", + "google-cloud-pubsub >= 2.1.0", "grpcio", "setuptools" ] diff --git a/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py b/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py new file mode 100644 index 00000000..be39f4dc --- /dev/null +++ b/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py @@ -0,0 +1,177 @@ +import asyncio +import datetime +from typing import Callable + +from asynctest.mock import MagicMock, call +import pytest +from google.api_core.exceptions import FailedPrecondition +from google.cloud.pubsub_v1.subscriber.message import Message +from google.protobuf.timestamp_pb2 import Timestamp +from google.pubsub_v1 import PubsubMessage + +from google.cloud.pubsublite.cloudpubsub.flow_control_settings import FlowControlSettings +from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker +from google.cloud.pubsublite.cloudpubsub.internal.single_partition_subscriber import SinglePartitionSubscriber +from google.cloud.pubsublite.cloudpubsub.message_transformer import MessageTransformer +from google.cloud.pubsublite.cloudpubsub.nack_handler import NackHandler +from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber +from google.cloud.pubsublite.internal.wire.subscriber import Subscriber +from google.cloud.pubsublite.testing.test_utils import make_queue_waiter +from google.cloud.pubsublite_v1 import Cursor, FlowControlRequest, SequencedMessage + +# All test coroutines will be treated as marked. +pytestmark = pytest.mark.asyncio + + +def mock_async_context_manager(cm): + cm.__aenter__.return_value = cm + return cm + + +@pytest.fixture() +def underlying(): + return mock_async_context_manager(MagicMock(spec=Subscriber)) + + +@pytest.fixture() +def flow_control_settings(): + return FlowControlSettings(1000, 1000) + + +@pytest.fixture() +def initial_flow_request(flow_control_settings): + return FlowControlRequest( + allowed_messages=flow_control_settings.messages_outstanding, + allowed_bytes=flow_control_settings.bytes_outstanding) + + +@pytest.fixture() +def ack_set_tracker(): + return mock_async_context_manager(MagicMock(spec=AckSetTracker)) + + +@pytest.fixture() +def nack_handler(): + return MagicMock(spec=NackHandler) + + +@pytest.fixture() +def transformer(): + result = MagicMock(spec=MessageTransformer) + result.transform.side_effect = lambda source: PubsubMessage(message_id=str(source.cursor.offset)) + return result + + +@pytest.fixture() +def subscriber(underlying, flow_control_settings, ack_set_tracker, nack_handler, transformer): + return SinglePartitionSubscriber(underlying, flow_control_settings, ack_set_tracker, nack_handler, transformer) + + +async def test_init(subscriber, underlying, ack_set_tracker, initial_flow_request): + async with subscriber: + underlying.__aenter__.assert_called_once() + ack_set_tracker.__aenter__.assert_called_once() + underlying.allow_flow.assert_called_once_with(initial_flow_request) + underlying.__aexit__.assert_called_once() + ack_set_tracker.__aexit__.assert_called_once() + + +async def test_failed_transform(subscriber, underlying, transformer): + async with subscriber: + transformer.transform.side_effect = FailedPrecondition("Bad message") + underlying.read.return_value = SequencedMessage() + with pytest.raises(FailedPrecondition): + await subscriber.read() + + +async def test_ack(subscriber: AsyncSubscriber, underlying, transformer, ack_set_tracker): + ack_called_queue = asyncio.Queue() + ack_result_queue = asyncio.Queue() + ack_set_tracker.ack.side_effect = make_queue_waiter(ack_called_queue, ack_result_queue) + async with subscriber: + message_1 = SequencedMessage(cursor=Cursor(offset=1), size_bytes=5) + message_2 = SequencedMessage(cursor=Cursor(offset=2), size_bytes=10) + underlying.read.return_value = message_1 + read_1: Message = await subscriber.read() + ack_set_tracker.track.assert_has_calls([call(1)]) + assert read_1.message_id == "1" + underlying.read.return_value = message_2 + read_2: Message = await subscriber.read() + ack_set_tracker.track.assert_has_calls([call(1), call(2)]) + assert read_2.message_id == "2" + read_2.ack() + await ack_called_queue.get() + await ack_result_queue.put(None) + ack_set_tracker.ack.assert_has_calls([call(2)]) + read_1.ack() + await ack_called_queue.get() + await ack_result_queue.put(None) + ack_set_tracker.ack.assert_has_calls([call(2), call(1)]) + + +async def test_track_failure(subscriber: SinglePartitionSubscriber, underlying, transformer, ack_set_tracker): + async with subscriber: + ack_set_tracker.track.side_effect = FailedPrecondition("Bad track") + message = SequencedMessage(cursor=Cursor(offset=1), size_bytes=5) + underlying.read.return_value = message + with pytest.raises(FailedPrecondition): + await subscriber.read() + ack_set_tracker.track.assert_has_calls([call(1)]) + + +async def test_ack_failure(subscriber: SinglePartitionSubscriber, underlying, transformer, ack_set_tracker): + ack_called_queue = asyncio.Queue() + ack_result_queue = asyncio.Queue() + ack_set_tracker.ack.side_effect = make_queue_waiter(ack_called_queue, ack_result_queue) + async with subscriber: + message = SequencedMessage(cursor=Cursor(offset=1), size_bytes=5) + underlying.read.return_value = message + read: Message = await subscriber.read() + ack_set_tracker.track.assert_has_calls([call(1)]) + read.ack() + await ack_called_queue.get() + ack_set_tracker.ack.assert_has_calls([call(1)]) + await ack_result_queue.put(FailedPrecondition("Bad ack")) + + async def sleep_forever(): + await asyncio.sleep(float("inf")) + underlying.read.side_effect = sleep_forever + with pytest.raises(FailedPrecondition): + await subscriber.read() + + +async def test_nack_failure(subscriber: SinglePartitionSubscriber, underlying, transformer, ack_set_tracker, nack_handler): + async with subscriber: + message = SequencedMessage(cursor=Cursor(offset=1), size_bytes=5) + underlying.read.return_value = message + read: Message = await subscriber.read() + ack_set_tracker.track.assert_has_calls([call(1)]) + nack_handler.on_nack.side_effect = FailedPrecondition("Bad nack") + read.nack() + + async def sleep_forever(): + await asyncio.sleep(float("inf")) + underlying.read.side_effect = sleep_forever + with pytest.raises(FailedPrecondition): + await subscriber.read() + + +async def test_nack_calls_ack(subscriber: SinglePartitionSubscriber, underlying, transformer, ack_set_tracker, nack_handler): + ack_called_queue = asyncio.Queue() + ack_result_queue = asyncio.Queue() + ack_set_tracker.ack.side_effect = make_queue_waiter(ack_called_queue, ack_result_queue) + async with subscriber: + message = SequencedMessage(cursor=Cursor(offset=1), size_bytes=5) + underlying.read.return_value = message + read: Message = await subscriber.read() + ack_set_tracker.track.assert_has_calls([call(1)]) + + def on_nack(nacked: PubsubMessage, ack: Callable[[], None]): + assert nacked.message_id == "1" + ack() + nack_handler.on_nack.side_effect = on_nack + read.nack() + await ack_called_queue.get() + await ack_result_queue.put(None) + ack_set_tracker.ack.assert_has_calls([call(1)]) +