Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): add delivery attempt property to message object received by user code #10205

Merged
merged 3 commits into from Jan 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -569,7 +569,10 @@ def _on_response(self, response):

for received_message in response.received_messages:
message = google.cloud.pubsub_v1.subscriber.message.Message(
received_message.message, received_message.ack_id, self._scheduler.queue
received_message.message,
received_message.ack_id,
received_message.delivery_attempt,
self._scheduler.queue,
)
# Making a decision based on the load, and modifying the data that
# affects the load -> needs a lock, as that state can be modified
Expand Down
30 changes: 29 additions & 1 deletion pubsub/google/cloud/pubsub_v1/subscriber/message.py
Expand Up @@ -70,7 +70,7 @@ class Message(object):
published.
"""

def __init__(self, message, ack_id, request_queue):
def __init__(self, message, ack_id, delivery_attempt, request_queue):
"""Construct the Message.

.. note::
Expand All @@ -82,12 +82,16 @@ def __init__(self, message, ack_id, request_queue):
message (~.pubsub_v1.types.PubsubMessage): The message received
from Pub/Sub.
ack_id (str): The ack_id received from Pub/Sub.
delivery_attempt (int): The delivery attempt counter received
from Pub/Sub if a DeadLetterPolicy is set on the subscription,
and zero otherwise.
request_queue (queue.Queue): A queue provided by the policy that
can accept requests; the policy is responsible for handling
those requests.
"""
self._message = message
self._ack_id = ack_id
self._delivery_attempt = delivery_attempt if delivery_attempt > 0 else None
self._request_queue = request_queue
self.message_id = message.message_id

Expand Down Expand Up @@ -162,6 +166,30 @@ def ack_id(self):
"""str: the ID used to ack the message."""
return self._ack_id

@property
def delivery_attempt(self):
"""The delivery attempt counter is 1 + (the sum of number of NACKs
and number of ack_deadline exceeds) for this message. It is set to None
if a DeadLetterPolicy is not set on the subscription.

A NACK is any call to ModifyAckDeadline with a 0 deadline. An ack_deadline
exceeds event is whenever a message is not acknowledged within
ack_deadline. Note that ack_deadline is initially
Subscription.ackDeadlineSeconds, but may get extended automatically by
the client library.

The first delivery of a given message will have this value as 1. The value
is calculated at best effort and is approximate.

EXPERIMENTAL: This feature is part of a closed alpha release. This
API might be changed in backward-incompatible ways and is not recommended
for production use. It is not subject to any SLA or deprecation policy.

Returns:
Optional[int]: The delivery attempt counter or None.
"""
return self._delivery_attempt

def ack(self):
"""Acknowledge the given message.

Expand Down
20 changes: 16 additions & 4 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_message.py
Expand Up @@ -33,20 +33,21 @@
PUBLISHED_SECONDS = datetime_helpers.to_milliseconds(PUBLISHED) // 1000


def create_message(data, ack_id="ACKID", **attrs):
def create_message(data, ack_id="ACKID", delivery_attempt=0, **attrs):
with mock.patch.object(time, "time") as time_:
time_.return_value = RECEIVED_SECONDS
msg = message.Message(
types.PubsubMessage(
message=types.PubsubMessage(
attributes=attrs,
data=data,
message_id="message_id",
publish_time=timestamp_pb2.Timestamp(
seconds=PUBLISHED_SECONDS, nanos=PUBLISHED_MICROS * 1000
),
),
ack_id,
queue.Queue(),
ack_id=ack_id,
delivery_attempt=delivery_attempt,
request_queue=queue.Queue(),
)
return msg

Expand All @@ -72,6 +73,17 @@ def test_ack_id():
assert msg.ack_id == ack_id


def test_delivery_attempt():
delivery_attempt = 10
msg = create_message(b"foo", delivery_attempt=delivery_attempt)
assert msg.delivery_attempt == delivery_attempt


def test_delivery_attempt_is_none():
msg = create_message(b"foo", delivery_attempt=0)
assert msg.delivery_attempt is None


def test_publish_time():
msg = create_message(b"foo")
assert msg.publish_time == PUBLISHED
Expand Down
Expand Up @@ -627,6 +627,37 @@ def test__get_initial_request_wo_leaser():
assert initial_request.modify_deadline_seconds == []


def test__on_response_delivery_attempt():
manager, _, dispatcher, leaser, _, scheduler = make_running_manager()
manager._callback = mock.sentinel.callback

# Set up the messages.
response = types.StreamingPullResponse(
received_messages=[
types.ReceivedMessage(
ack_id="fack", message=types.PubsubMessage(data=b"foo", message_id="1")
),
types.ReceivedMessage(
ack_id="back",
message=types.PubsubMessage(data=b"bar", message_id="2"),
delivery_attempt=6,
),
]
)

# adjust message bookkeeping in leaser
fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=42)

manager._on_response(response)

schedule_calls = scheduler.schedule.mock_calls
assert len(schedule_calls) == 2
msg1 = schedule_calls[0][1][1]
assert msg1.delivery_attempt is None
msg2 = schedule_calls[1][1][1]
assert msg2.delivery_attempt == 6


def test__on_response_no_leaser_overload():
manager, _, dispatcher, leaser, _, scheduler = make_running_manager()
manager._callback = mock.sentinel.callback
Expand Down