diff --git a/google/cloud/pubsublite/cloudpubsub/internal/assigning_subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/assigning_subscriber.py index c2706113..0ecde27d 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/assigning_subscriber.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/assigning_subscriber.py @@ -17,6 +17,7 @@ from google.cloud.pubsub_v1.subscriber.message import Message +from google.cloud.pubsublite.cloudpubsub.reassignment_handler import ReassignmentHandler from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( AsyncSingleSubscriber, ) @@ -39,6 +40,7 @@ class _RunningSubscriber(NamedTuple): class AssigningSingleSubscriber(AsyncSingleSubscriber, PermanentFailable): _assigner_factory: Callable[[], Assigner] _subscriber_factory: PartitionSubscriberFactory + _reassignment_handler: ReassignmentHandler _subscribers: Dict[Partition, _RunningSubscriber] @@ -51,6 +53,7 @@ def __init__( self, assigner_factory: Callable[[], Assigner], subscriber_factory: PartitionSubscriberFactory, + reassignment_handler: ReassignmentHandler, ): """ Accepts a factory for an Assigner instead of an Assigner because GRPC asyncio uses the current thread's event @@ -58,8 +61,9 @@ def __init__( """ super().__init__() self._assigner_factory = assigner_factory - self._assigner = None self._subscriber_factory = subscriber_factory + self._reassignment_handler = reassignment_handler + self._assigner = None self._subscribers = {} self._batches = None @@ -85,14 +89,20 @@ async def _stop_subscriber(self, running: _RunningSubscriber): async def _assign_action(self): assignment: Set[Partition] = await self._assigner.get_assignment() - added_partitions = assignment - self._subscribers.keys() - removed_partitions = self._subscribers.keys() - assignment + old_assignment: Set[Partition] = set(self._subscribers.keys()) + added_partitions = assignment - old_assignment + removed_partitions = old_assignment - assignment for partition in added_partitions: await self._start_subscriber(partition) for partition in removed_partitions: subscriber = self._subscribers[partition] del self._subscribers[partition] await self._stop_subscriber(subscriber) + maybe_awaitable = self._reassignment_handler.handle_reassignment( + old_assignment, assignment + ) + if maybe_awaitable is not None: + await maybe_awaitable async def __aenter__(self): self._batches = Queue() diff --git a/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py index e1ffff55..58e12353 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py @@ -18,6 +18,10 @@ from google.api_core.client_options import ClientOptions from google.auth.credentials import Credentials +from google.cloud.pubsublite.cloudpubsub.reassignment_handler import ( + ReassignmentHandler, + DefaultReassignmentHandler, +) from google.cloud.pubsublite.cloudpubsub.message_transforms import ( to_cps_subscribe_message, add_id_to_cps_subscribe_transformer, @@ -179,6 +183,7 @@ def make_async_subscriber( transport: str, per_partition_flow_control_settings: FlowControlSettings, nack_handler: Optional[NackHandler] = None, + reassignment_handler: Optional[ReassignmentHandler] = None, message_transformer: Optional[MessageTransformer] = None, fixed_partitions: Optional[Set[Partition]] = None, credentials: Optional[Credentials] = None, @@ -218,6 +223,8 @@ def make_async_subscriber( if nack_handler is None: nack_handler = DefaultNackHandler() + if reassignment_handler is None: + reassignment_handler = DefaultReassignmentHandler() if message_transformer is None: message_transformer = MessageTransformer.of_callable(to_cps_subscribe_message) partition_subscriber_factory = _make_partition_subscriber_factory( @@ -230,4 +237,6 @@ def make_async_subscriber( nack_handler, message_transformer, ) - return AssigningSingleSubscriber(assigner_factory, partition_subscriber_factory) + return AssigningSingleSubscriber( + assigner_factory, partition_subscriber_factory, reassignment_handler + ) diff --git a/google/cloud/pubsublite/cloudpubsub/nack_handler.py b/google/cloud/pubsublite/cloudpubsub/nack_handler.py index a79dca10..b55501d4 100644 --- a/google/cloud/pubsublite/cloudpubsub/nack_handler.py +++ b/google/cloud/pubsublite/cloudpubsub/nack_handler.py @@ -28,6 +28,8 @@ class NackHandler(ABC): def on_nack(self, message: PubsubMessage, ack: Callable[[], None]): """Handle a negative acknowledgement. ack must eventually be called. + This method will be called on an event loop and should not block. + Args: message: The nacked message. ack: A callable to acknowledge the underlying message. This must eventually be called. diff --git a/google/cloud/pubsublite/cloudpubsub/publisher_client.py b/google/cloud/pubsublite/cloudpubsub/publisher_client.py index 9ddd4da5..248465bd 100644 --- a/google/cloud/pubsublite/cloudpubsub/publisher_client.py +++ b/google/cloud/pubsublite/cloudpubsub/publisher_client.py @@ -62,6 +62,7 @@ class PublisherClient(PublisherClientInterface, ConstructableFromServiceAccount) def __init__( self, + *, per_partition_batching_settings: Optional[BatchSettings] = None, credentials: Optional[Credentials] = None, transport: str = "grpc_asyncio", @@ -93,7 +94,7 @@ def publish( topic: Union[TopicPath, str], data: bytes, ordering_key: str = "", - **attrs: Mapping[str, str] + **attrs: Mapping[str, str], ) -> "Future[str]": self._require_stared.require_started() return self._impl.publish( @@ -132,6 +133,7 @@ class AsyncPublisherClient( def __init__( self, + *, per_partition_batching_settings: Optional[BatchSettings] = None, credentials: Optional[Credentials] = None, transport: str = "grpc_asyncio", @@ -163,7 +165,7 @@ async def publish( topic: Union[TopicPath, str], data: bytes, ordering_key: str = "", - **attrs: Mapping[str, str] + **attrs: Mapping[str, str], ) -> str: self._require_stared.require_started() return await self._impl.publish( diff --git a/google/cloud/pubsublite/cloudpubsub/reassignment_handler.py b/google/cloud/pubsublite/cloudpubsub/reassignment_handler.py new file mode 100644 index 00000000..5cc6a668 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/reassignment_handler.py @@ -0,0 +1,70 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import Set, Optional, Awaitable + +from google.cloud.pubsublite.types import Partition + + +class ReassignmentHandler(ABC): + """ + A ReassignmentHandler is called any time a new partition assignment is received from the server. + It will be called with both the previous and new assignments as decided by the backend. + + The client library will not acknowledge the assignment until handleReassignment returns. The + assigning backend will not assign any of the partitions in `before` to another server unless the + assignment is acknowledged, or a client takes too long to acknowledged (currently 30 seconds from + the time the assignment is sent from server's point of view). + + Because of the above, as long as reassignment handling is processed quickly, it can be used to + abort outstanding operations on partitions which are being assigned away from this client, or to + pre-warm state which will be used by the MessageCallback. + """ + + @abstractmethod + def handle_reassignment( + self, before: Set[Partition], after: Set[Partition] + ) -> Optional[Awaitable]: + """ + Called with the previous and new assignment delivered to this client on an assignment change. + The assignment will not be acknowledged until this method returns, so it should complete + quickly, or the backend will assume it is non-responsive and assign all partitions away without + waiting for acknowledgement. + + handle_reassignment will only be called after no new message deliveries will be started for the partition. + There may still be messages in flight on executors or in async callbacks. + + Acks or nacks on messages from partitions being assigned away will have no effect. + + This method will be called on an event loop and should not block. + + Args: + before: The previous assignment. + after: The new assignment. + + Returns: + Either None or an Awaitable to be waited on before acknowledging reassignment. + + Raises: + GoogleAPICallError: To fail the client if raised. + """ + pass + + +class DefaultReassignmentHandler(ReassignmentHandler): + def handle_reassignment( + self, before: Set[Partition], after: Set[Partition] + ) -> Optional[Awaitable]: + return None diff --git a/google/cloud/pubsublite/cloudpubsub/subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/subscriber_client.py index eda098fb..ab2b98cd 100644 --- a/google/cloud/pubsublite/cloudpubsub/subscriber_client.py +++ b/google/cloud/pubsublite/cloudpubsub/subscriber_client.py @@ -20,6 +20,7 @@ from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture from google.cloud.pubsub_v1.subscriber.message import Message +from google.cloud.pubsublite.cloudpubsub.reassignment_handler import ReassignmentHandler from google.cloud.pubsublite.cloudpubsub.internal.make_subscriber import ( make_async_subscriber, ) @@ -61,8 +62,10 @@ class SubscriberClient(SubscriberClientInterface, ConstructableFromServiceAccoun def __init__( self, + *, executor: Optional[ThreadPoolExecutor] = None, nack_handler: Optional[NackHandler] = None, + reassignment_handler: Optional[ReassignmentHandler] = None, message_transformer: Optional[MessageTransformer] = None, credentials: Optional[Credentials] = None, transport: str = "grpc_asyncio", @@ -88,6 +91,7 @@ def __init__( transport=transport, per_partition_flow_control_settings=settings, nack_handler=nack_handler, + reassignment_handler=reassignment_handler, message_transformer=message_transformer, fixed_partitions=partitions, credentials=credentials, @@ -140,7 +144,9 @@ class AsyncSubscriberClient( def __init__( self, + *, nack_handler: Optional[NackHandler] = None, + reassignment_handler: Optional[ReassignmentHandler] = None, message_transformer: Optional[MessageTransformer] = None, credentials: Optional[Credentials] = None, transport: str = "grpc_asyncio", @@ -162,6 +168,7 @@ def __init__( transport=transport, per_partition_flow_control_settings=settings, nack_handler=nack_handler, + reassignment_handler=reassignment_handler, message_transformer=message_transformer, fixed_partitions=partitions, credentials=credentials, diff --git a/google/cloud/pubsublite/internal/wire/permanent_failable.py b/google/cloud/pubsublite/internal/wire/permanent_failable.py index d96c4c9d..ae78c861 100644 --- a/google/cloud/pubsublite/internal/wire/permanent_failable.py +++ b/google/cloud/pubsublite/internal/wire/permanent_failable.py @@ -85,8 +85,10 @@ async def run_poller(self, poll_action: Callable[[], Awaitable[None]]): try: while True: await self.await_unless_failed(poll_action()) - except GoogleAPICallError as e: - self.fail(e) + except asyncio.CancelledError: + pass + except Exception as e: + self.fail(adapt_error(e)) def fail(self, err: GoogleAPICallError): if not self._failure_task.done(): diff --git a/tests/unit/pubsublite/cloudpubsub/internal/assigning_subscriber_test.py b/tests/unit/pubsublite/cloudpubsub/internal/assigning_subscriber_test.py index 50a39ae0..2232d4c0 100644 --- a/tests/unit/pubsublite/cloudpubsub/internal/assigning_subscriber_test.py +++ b/tests/unit/pubsublite/cloudpubsub/internal/assigning_subscriber_test.py @@ -14,6 +14,7 @@ from typing import Set +from asyncio import Future from asynctest.mock import MagicMock, call import threading import pytest @@ -21,6 +22,7 @@ from google.cloud.pubsub_v1.subscriber.message import Message from google.pubsub_v1 import PubsubMessage +from google.cloud.pubsublite.cloudpubsub.reassignment_handler import ReassignmentHandler from google.cloud.pubsublite.cloudpubsub.internal.assigning_subscriber import ( AssigningSingleSubscriber, PartitionSubscriberFactory, @@ -46,17 +48,26 @@ def assigner(): return mock_async_context_manager(MagicMock(spec=Assigner)) +@pytest.fixture() +def reassignment_handler(): + handler = MagicMock(spec=ReassignmentHandler) + handler.handle_reassignment.return_value = None + return handler + + @pytest.fixture() def subscriber_factory(): return MagicMock(spec=PartitionSubscriberFactory) @pytest.fixture() -def subscriber(assigner, subscriber_factory): +def subscriber(assigner, subscriber_factory, reassignment_handler): box = Box() def set_box(): - box.val = AssigningSingleSubscriber(lambda: assigner, subscriber_factory) + box.val = AssigningSingleSubscriber( + lambda: assigner, subscriber_factory, reassignment_handler + ) # Initialize AssigningSubscriber on another thread with a different event loop. thread = threading.Thread(target=set_box) @@ -103,7 +114,9 @@ async def test_assigner_failure(subscriber, assigner, subscriber_factory): await subscriber.read() -async def test_assignment_change(subscriber, assigner, subscriber_factory): +async def test_assignment_change( + subscriber, assigner, subscriber_factory, reassignment_handler +): assign_queues = wire_queues(assigner.get_assignment) async with subscriber: await assign_queues.called.get() @@ -124,13 +137,35 @@ async def test_assignment_change(subscriber, assigner, subscriber_factory): ) sub1.__aenter__.assert_called_once() sub2.__aenter__.assert_called_once() + reassignment_handler.handle_reassignment.assert_called_once_with( + set(), {Partition(1), Partition(2)} + ) + reassignment_triggered = Future() + reassignment_future = Future() + + def on_reassignment(before, after): + reassignment_triggered.set_result(None) + return reassignment_future + + reassignment_handler.handle_reassignment.side_effect = on_reassignment await assign_queues.results.put({Partition(1), Partition(3)}) - await assign_queues.called.get() + await reassignment_triggered subscriber_factory.assert_has_calls( [call(Partition(1)), call(Partition(2)), call(Partition(3))], any_order=True ) sub3.__aenter__.assert_called_once() sub2.__aexit__.assert_called_once() + reassignment_handler.handle_reassignment.assert_has_calls( + [ + call(set(), {Partition(1), Partition(2)}), + call({Partition(1), Partition(2)}, {Partition(1), Partition(3)}), + ] + ) + # Assigner is stalled waiting for reassignment future completion + assert assign_queues.called.empty() + reassignment_future.set_result(None) + await assign_queues.called.get() + sub1.__aexit__.assert_called_once() sub2.__aexit__.assert_called_once() sub3.__aexit__.assert_called_once() diff --git a/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py b/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py index 3d355642..c007913a 100644 --- a/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py +++ b/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py @@ -226,6 +226,27 @@ async def sleep_forever(): await subscriber.read() +async def test_nack_after_shutdown_noop( + subscriber: SinglePartitionSingleSubscriber, + underlying, + transformer, + ack_set_tracker, + nack_handler, +): + async with subscriber: + message = SequencedMessage(cursor=Cursor(offset=1), size_bytes=5)._pb + underlying.read.return_value = [message] + read: List[Message] = await subscriber.read() + assert len(read) == 1 + ack_set_tracker.track.assert_has_calls([call(1)]) + nack_handler.on_nack.side_effect = FailedPrecondition("Bad nack") + read[0].nack() + # Yield to the runtime, allowing the subscriber to process the queue if it + # is still running (which it should not be) + await asyncio.sleep(1) + nack_handler.on_nack.assert_has_calls([]) + + async def test_nack_calls_ack( subscriber: SinglePartitionSingleSubscriber, underlying,