Skip to content

Commit

Permalink
feat: Add ReassignmentHandler which is notified on client reassignment (
Browse files Browse the repository at this point in the history
#242)

* feat: Add ReassignmentHandler which is notified on client reassignment

This enables clients to respond to a reassignment by cancelling outstanding actions and nacking all messages.

* fix: docs

* fix: construction
  • Loading branch information
dpcollins-google committed Sep 16, 2021
1 parent 10783e9 commit 80fd5f6
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 12 deletions.
Expand Up @@ -17,6 +17,7 @@

from google.cloud.pubsub_v1.subscriber.message import Message

from google.cloud.pubsublite.cloudpubsub.reassignment_handler import ReassignmentHandler
from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import (
AsyncSingleSubscriber,
)
Expand All @@ -39,6 +40,7 @@ class _RunningSubscriber(NamedTuple):
class AssigningSingleSubscriber(AsyncSingleSubscriber, PermanentFailable):
_assigner_factory: Callable[[], Assigner]
_subscriber_factory: PartitionSubscriberFactory
_reassignment_handler: ReassignmentHandler

_subscribers: Dict[Partition, _RunningSubscriber]

Expand All @@ -51,15 +53,17 @@ def __init__(
self,
assigner_factory: Callable[[], Assigner],
subscriber_factory: PartitionSubscriberFactory,
reassignment_handler: ReassignmentHandler,
):
"""
Accepts a factory for an Assigner instead of an Assigner because GRPC asyncio uses the current thread's event
loop.
"""
super().__init__()
self._assigner_factory = assigner_factory
self._assigner = None
self._subscriber_factory = subscriber_factory
self._reassignment_handler = reassignment_handler
self._assigner = None
self._subscribers = {}
self._batches = None

Expand All @@ -85,14 +89,20 @@ async def _stop_subscriber(self, running: _RunningSubscriber):

async def _assign_action(self):
assignment: Set[Partition] = await self._assigner.get_assignment()
added_partitions = assignment - self._subscribers.keys()
removed_partitions = self._subscribers.keys() - assignment
old_assignment: Set[Partition] = set(self._subscribers.keys())
added_partitions = assignment - old_assignment
removed_partitions = old_assignment - assignment
for partition in added_partitions:
await self._start_subscriber(partition)
for partition in removed_partitions:
subscriber = self._subscribers[partition]
del self._subscribers[partition]
await self._stop_subscriber(subscriber)
maybe_awaitable = self._reassignment_handler.handle_reassignment(
old_assignment, assignment
)
if maybe_awaitable is not None:
await maybe_awaitable

async def __aenter__(self):
self._batches = Queue()
Expand Down
11 changes: 10 additions & 1 deletion google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py
Expand Up @@ -18,6 +18,10 @@
from google.api_core.client_options import ClientOptions
from google.auth.credentials import Credentials

from google.cloud.pubsublite.cloudpubsub.reassignment_handler import (
ReassignmentHandler,
DefaultReassignmentHandler,
)
from google.cloud.pubsublite.cloudpubsub.message_transforms import (
to_cps_subscribe_message,
add_id_to_cps_subscribe_transformer,
Expand Down Expand Up @@ -179,6 +183,7 @@ def make_async_subscriber(
transport: str,
per_partition_flow_control_settings: FlowControlSettings,
nack_handler: Optional[NackHandler] = None,
reassignment_handler: Optional[ReassignmentHandler] = None,
message_transformer: Optional[MessageTransformer] = None,
fixed_partitions: Optional[Set[Partition]] = None,
credentials: Optional[Credentials] = None,
Expand Down Expand Up @@ -218,6 +223,8 @@ def make_async_subscriber(

if nack_handler is None:
nack_handler = DefaultNackHandler()
if reassignment_handler is None:
reassignment_handler = DefaultReassignmentHandler()
if message_transformer is None:
message_transformer = MessageTransformer.of_callable(to_cps_subscribe_message)
partition_subscriber_factory = _make_partition_subscriber_factory(
Expand All @@ -230,4 +237,6 @@ def make_async_subscriber(
nack_handler,
message_transformer,
)
return AssigningSingleSubscriber(assigner_factory, partition_subscriber_factory)
return AssigningSingleSubscriber(
assigner_factory, partition_subscriber_factory, reassignment_handler
)
2 changes: 2 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/nack_handler.py
Expand Up @@ -28,6 +28,8 @@ class NackHandler(ABC):
def on_nack(self, message: PubsubMessage, ack: Callable[[], None]):
"""Handle a negative acknowledgement. ack must eventually be called.
This method will be called on an event loop and should not block.
Args:
message: The nacked message.
ack: A callable to acknowledge the underlying message. This must eventually be called.
Expand Down
6 changes: 4 additions & 2 deletions google/cloud/pubsublite/cloudpubsub/publisher_client.py
Expand Up @@ -62,6 +62,7 @@ class PublisherClient(PublisherClientInterface, ConstructableFromServiceAccount)

def __init__(
self,
*,
per_partition_batching_settings: Optional[BatchSettings] = None,
credentials: Optional[Credentials] = None,
transport: str = "grpc_asyncio",
Expand Down Expand Up @@ -93,7 +94,7 @@ def publish(
topic: Union[TopicPath, str],
data: bytes,
ordering_key: str = "",
**attrs: Mapping[str, str]
**attrs: Mapping[str, str],
) -> "Future[str]":
self._require_stared.require_started()
return self._impl.publish(
Expand Down Expand Up @@ -132,6 +133,7 @@ class AsyncPublisherClient(

def __init__(
self,
*,
per_partition_batching_settings: Optional[BatchSettings] = None,
credentials: Optional[Credentials] = None,
transport: str = "grpc_asyncio",
Expand Down Expand Up @@ -163,7 +165,7 @@ async def publish(
topic: Union[TopicPath, str],
data: bytes,
ordering_key: str = "",
**attrs: Mapping[str, str]
**attrs: Mapping[str, str],
) -> str:
self._require_stared.require_started()
return await self._impl.publish(
Expand Down
70 changes: 70 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/reassignment_handler.py
@@ -0,0 +1,70 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import ABC, abstractmethod
from typing import Set, Optional, Awaitable

from google.cloud.pubsublite.types import Partition


class ReassignmentHandler(ABC):
"""
A ReassignmentHandler is called any time a new partition assignment is received from the server.
It will be called with both the previous and new assignments as decided by the backend.
The client library will not acknowledge the assignment until handleReassignment returns. The
assigning backend will not assign any of the partitions in `before` to another server unless the
assignment is acknowledged, or a client takes too long to acknowledged (currently 30 seconds from
the time the assignment is sent from server's point of view).
Because of the above, as long as reassignment handling is processed quickly, it can be used to
abort outstanding operations on partitions which are being assigned away from this client, or to
pre-warm state which will be used by the MessageCallback.
"""

@abstractmethod
def handle_reassignment(
self, before: Set[Partition], after: Set[Partition]
) -> Optional[Awaitable]:
"""
Called with the previous and new assignment delivered to this client on an assignment change.
The assignment will not be acknowledged until this method returns, so it should complete
quickly, or the backend will assume it is non-responsive and assign all partitions away without
waiting for acknowledgement.
handle_reassignment will only be called after no new message deliveries will be started for the partition.
There may still be messages in flight on executors or in async callbacks.
Acks or nacks on messages from partitions being assigned away will have no effect.
This method will be called on an event loop and should not block.
Args:
before: The previous assignment.
after: The new assignment.
Returns:
Either None or an Awaitable to be waited on before acknowledging reassignment.
Raises:
GoogleAPICallError: To fail the client if raised.
"""
pass


class DefaultReassignmentHandler(ReassignmentHandler):
def handle_reassignment(
self, before: Set[Partition], after: Set[Partition]
) -> Optional[Awaitable]:
return None
7 changes: 7 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/subscriber_client.py
Expand Up @@ -20,6 +20,7 @@
from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture
from google.cloud.pubsub_v1.subscriber.message import Message

from google.cloud.pubsublite.cloudpubsub.reassignment_handler import ReassignmentHandler
from google.cloud.pubsublite.cloudpubsub.internal.make_subscriber import (
make_async_subscriber,
)
Expand Down Expand Up @@ -61,8 +62,10 @@ class SubscriberClient(SubscriberClientInterface, ConstructableFromServiceAccoun

def __init__(
self,
*,
executor: Optional[ThreadPoolExecutor] = None,
nack_handler: Optional[NackHandler] = None,
reassignment_handler: Optional[ReassignmentHandler] = None,
message_transformer: Optional[MessageTransformer] = None,
credentials: Optional[Credentials] = None,
transport: str = "grpc_asyncio",
Expand All @@ -88,6 +91,7 @@ def __init__(
transport=transport,
per_partition_flow_control_settings=settings,
nack_handler=nack_handler,
reassignment_handler=reassignment_handler,
message_transformer=message_transformer,
fixed_partitions=partitions,
credentials=credentials,
Expand Down Expand Up @@ -140,7 +144,9 @@ class AsyncSubscriberClient(

def __init__(
self,
*,
nack_handler: Optional[NackHandler] = None,
reassignment_handler: Optional[ReassignmentHandler] = None,
message_transformer: Optional[MessageTransformer] = None,
credentials: Optional[Credentials] = None,
transport: str = "grpc_asyncio",
Expand All @@ -162,6 +168,7 @@ def __init__(
transport=transport,
per_partition_flow_control_settings=settings,
nack_handler=nack_handler,
reassignment_handler=reassignment_handler,
message_transformer=message_transformer,
fixed_partitions=partitions,
credentials=credentials,
Expand Down
6 changes: 4 additions & 2 deletions google/cloud/pubsublite/internal/wire/permanent_failable.py
Expand Up @@ -85,8 +85,10 @@ async def run_poller(self, poll_action: Callable[[], Awaitable[None]]):
try:
while True:
await self.await_unless_failed(poll_action())
except GoogleAPICallError as e:
self.fail(e)
except asyncio.CancelledError:
pass
except Exception as e:
self.fail(adapt_error(e))

def fail(self, err: GoogleAPICallError):
if not self._failure_task.done():
Expand Down
Expand Up @@ -14,13 +14,15 @@

from typing import Set

from asyncio import Future
from asynctest.mock import MagicMock, call
import threading
import pytest
from google.api_core.exceptions import FailedPrecondition
from google.cloud.pubsub_v1.subscriber.message import Message
from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite.cloudpubsub.reassignment_handler import ReassignmentHandler
from google.cloud.pubsublite.cloudpubsub.internal.assigning_subscriber import (
AssigningSingleSubscriber,
PartitionSubscriberFactory,
Expand All @@ -46,17 +48,26 @@ def assigner():
return mock_async_context_manager(MagicMock(spec=Assigner))


@pytest.fixture()
def reassignment_handler():
handler = MagicMock(spec=ReassignmentHandler)
handler.handle_reassignment.return_value = None
return handler


@pytest.fixture()
def subscriber_factory():
return MagicMock(spec=PartitionSubscriberFactory)


@pytest.fixture()
def subscriber(assigner, subscriber_factory):
def subscriber(assigner, subscriber_factory, reassignment_handler):
box = Box()

def set_box():
box.val = AssigningSingleSubscriber(lambda: assigner, subscriber_factory)
box.val = AssigningSingleSubscriber(
lambda: assigner, subscriber_factory, reassignment_handler
)

# Initialize AssigningSubscriber on another thread with a different event loop.
thread = threading.Thread(target=set_box)
Expand Down Expand Up @@ -103,7 +114,9 @@ async def test_assigner_failure(subscriber, assigner, subscriber_factory):
await subscriber.read()


async def test_assignment_change(subscriber, assigner, subscriber_factory):
async def test_assignment_change(
subscriber, assigner, subscriber_factory, reassignment_handler
):
assign_queues = wire_queues(assigner.get_assignment)
async with subscriber:
await assign_queues.called.get()
Expand All @@ -124,13 +137,35 @@ async def test_assignment_change(subscriber, assigner, subscriber_factory):
)
sub1.__aenter__.assert_called_once()
sub2.__aenter__.assert_called_once()
reassignment_handler.handle_reassignment.assert_called_once_with(
set(), {Partition(1), Partition(2)}
)
reassignment_triggered = Future()
reassignment_future = Future()

def on_reassignment(before, after):
reassignment_triggered.set_result(None)
return reassignment_future

reassignment_handler.handle_reassignment.side_effect = on_reassignment
await assign_queues.results.put({Partition(1), Partition(3)})
await assign_queues.called.get()
await reassignment_triggered
subscriber_factory.assert_has_calls(
[call(Partition(1)), call(Partition(2)), call(Partition(3))], any_order=True
)
sub3.__aenter__.assert_called_once()
sub2.__aexit__.assert_called_once()
reassignment_handler.handle_reassignment.assert_has_calls(
[
call(set(), {Partition(1), Partition(2)}),
call({Partition(1), Partition(2)}, {Partition(1), Partition(3)}),
]
)
# Assigner is stalled waiting for reassignment future completion
assert assign_queues.called.empty()
reassignment_future.set_result(None)
await assign_queues.called.get()

sub1.__aexit__.assert_called_once()
sub2.__aexit__.assert_called_once()
sub3.__aexit__.assert_called_once()
Expand Down
Expand Up @@ -226,6 +226,27 @@ async def sleep_forever():
await subscriber.read()


async def test_nack_after_shutdown_noop(
subscriber: SinglePartitionSingleSubscriber,
underlying,
transformer,
ack_set_tracker,
nack_handler,
):
async with subscriber:
message = SequencedMessage(cursor=Cursor(offset=1), size_bytes=5)._pb
underlying.read.return_value = [message]
read: List[Message] = await subscriber.read()
assert len(read) == 1
ack_set_tracker.track.assert_has_calls([call(1)])
nack_handler.on_nack.side_effect = FailedPrecondition("Bad nack")
read[0].nack()
# Yield to the runtime, allowing the subscriber to process the queue if it
# is still running (which it should not be)
await asyncio.sleep(1)
nack_handler.on_nack.assert_has_calls([])


async def test_nack_calls_ack(
subscriber: SinglePartitionSingleSubscriber,
underlying,
Expand Down

0 comments on commit 80fd5f6

Please sign in to comment.