From a0937c13107b92271913de579b60f24b2aaac177 Mon Sep 17 00:00:00 2001 From: Prad Nelluru Date: Thu, 30 Jan 2020 17:03:47 -0500 Subject: [PATCH] feat(pubsub): add delivery attempt property to message object received by user code (#10205) - Return None when a DeadLetterPolicy hasn't been set on the subscription. --- .../_protocol/streaming_pull_manager.py | 5 ++- google/cloud/pubsub_v1/subscriber/message.py | 30 +++++++++++++++++- .../unit/pubsub_v1/subscriber/test_message.py | 20 +++++++++--- .../subscriber/test_streaming_pull_manager.py | 31 +++++++++++++++++++ 4 files changed, 80 insertions(+), 6 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 6cf5b6e8f..26764b1a9 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -569,7 +569,10 @@ def _on_response(self, response): for received_message in response.received_messages: message = google.cloud.pubsub_v1.subscriber.message.Message( - received_message.message, received_message.ack_id, self._scheduler.queue + received_message.message, + received_message.ack_id, + received_message.delivery_attempt, + self._scheduler.queue, ) # Making a decision based on the load, and modifying the data that # affects the load -> needs a lock, as that state can be modified diff --git a/google/cloud/pubsub_v1/subscriber/message.py b/google/cloud/pubsub_v1/subscriber/message.py index 41bc42755..6dc7bc443 100644 --- a/google/cloud/pubsub_v1/subscriber/message.py +++ b/google/cloud/pubsub_v1/subscriber/message.py @@ -70,7 +70,7 @@ class Message(object): published. """ - def __init__(self, message, ack_id, request_queue): + def __init__(self, message, ack_id, delivery_attempt, request_queue): """Construct the Message. .. note:: @@ -82,12 +82,16 @@ def __init__(self, message, ack_id, request_queue): message (~.pubsub_v1.types.PubsubMessage): The message received from Pub/Sub. ack_id (str): The ack_id received from Pub/Sub. + delivery_attempt (int): The delivery attempt counter received + from Pub/Sub if a DeadLetterPolicy is set on the subscription, + and zero otherwise. request_queue (queue.Queue): A queue provided by the policy that can accept requests; the policy is responsible for handling those requests. """ self._message = message self._ack_id = ack_id + self._delivery_attempt = delivery_attempt if delivery_attempt > 0 else None self._request_queue = request_queue self.message_id = message.message_id @@ -162,6 +166,30 @@ def ack_id(self): """str: the ID used to ack the message.""" return self._ack_id + @property + def delivery_attempt(self): + """The delivery attempt counter is 1 + (the sum of number of NACKs + and number of ack_deadline exceeds) for this message. It is set to None + if a DeadLetterPolicy is not set on the subscription. + + A NACK is any call to ModifyAckDeadline with a 0 deadline. An ack_deadline + exceeds event is whenever a message is not acknowledged within + ack_deadline. Note that ack_deadline is initially + Subscription.ackDeadlineSeconds, but may get extended automatically by + the client library. + + The first delivery of a given message will have this value as 1. The value + is calculated at best effort and is approximate. + + EXPERIMENTAL: This feature is part of a closed alpha release. This + API might be changed in backward-incompatible ways and is not recommended + for production use. It is not subject to any SLA or deprecation policy. + + Returns: + Optional[int]: The delivery attempt counter or None. + """ + return self._delivery_attempt + def ack(self): """Acknowledge the given message. diff --git a/tests/unit/pubsub_v1/subscriber/test_message.py b/tests/unit/pubsub_v1/subscriber/test_message.py index 4bb3329a2..fd23deef0 100644 --- a/tests/unit/pubsub_v1/subscriber/test_message.py +++ b/tests/unit/pubsub_v1/subscriber/test_message.py @@ -33,11 +33,11 @@ PUBLISHED_SECONDS = datetime_helpers.to_milliseconds(PUBLISHED) // 1000 -def create_message(data, ack_id="ACKID", **attrs): +def create_message(data, ack_id="ACKID", delivery_attempt=0, **attrs): with mock.patch.object(time, "time") as time_: time_.return_value = RECEIVED_SECONDS msg = message.Message( - types.PubsubMessage( + message=types.PubsubMessage( attributes=attrs, data=data, message_id="message_id", @@ -45,8 +45,9 @@ def create_message(data, ack_id="ACKID", **attrs): seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000 ), ), - ack_id, - queue.Queue(), + ack_id=ack_id, + delivery_attempt=delivery_attempt, + request_queue=queue.Queue(), ) return msg @@ -72,6 +73,17 @@ def test_ack_id(): assert msg.ack_id == ack_id +def test_delivery_attempt(): + delivery_attempt = 10 + msg = create_message(b"foo", delivery_attempt=delivery_attempt) + assert msg.delivery_attempt == delivery_attempt + + +def test_delivery_attempt_is_none(): + msg = create_message(b"foo", delivery_attempt=0) + assert msg.delivery_attempt is None + + def test_publish_time(): msg = create_message(b"foo") assert msg.publish_time == PUBLISHED diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 8bb53f150..6f8a04ac9 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -627,6 +627,37 @@ def test__get_initial_request_wo_leaser(): assert initial_request.modify_deadline_seconds == [] +def test__on_response_delivery_attempt(): + manager, _, dispatcher, leaser, _, scheduler = make_running_manager() + manager._callback = mock.sentinel.callback + + # Set up the messages. + response = types.StreamingPullResponse( + received_messages=[ + types.ReceivedMessage( + ack_id="fack", message=types.PubsubMessage(data=b"foo", message_id="1") + ), + types.ReceivedMessage( + ack_id="back", + message=types.PubsubMessage(data=b"bar", message_id="2"), + delivery_attempt=6, + ), + ] + ) + + # adjust message bookkeeping in leaser + fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=42) + + manager._on_response(response) + + schedule_calls = scheduler.schedule.mock_calls + assert len(schedule_calls) == 2 + msg1 = schedule_calls[0][1][1] + assert msg1.delivery_attempt is None + msg2 = schedule_calls[1][1][1] + assert msg2.delivery_attempt == 6 + + def test__on_response_no_leaser_overload(): manager, _, dispatcher, leaser, _, scheduler = make_running_manager() manager._callback = mock.sentinel.callback