Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add dead lettering max delivery attempts argument #236

Merged
merged 7 commits into from Nov 16, 2020
41 changes: 30 additions & 11 deletions samples/snippets/subscriber.py
Expand Up @@ -90,7 +90,8 @@ def create_subscription(project_id, topic_id, subscription_id):


def create_subscription_with_dead_letter_topic(
project_id, topic_id, subscription_id, dead_letter_topic_id
project_id, topic_id, subscription_id, dead_letter_topic_id,
max_delivery_attempts=5
):
"""Create a subscription with dead letter policy."""
# [START pubsub_dead_letter_create_subscription]
Expand All @@ -108,6 +109,9 @@ def create_subscription_with_dead_letter_topic(
# TODO(developer): This is an existing dead letter topic that the subscription
# with dead letter policy will forward dead letter messages to.
# dead_letter_topic_id = "your-dead-letter-topic-id"
# TODO(developer): This is an the maximum number of delivery attempts allowed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"This is the maximum number of delivery attempts allowed for a message before it gets delivered to a dead letter topic."

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think you still forgot the cross out "an" in "This is an the"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing that out!

# before a message gets dead lettered.
# max_delivery_attempts = "your-max-delivery-attempts"
danavaziri-ga marked this conversation as resolved.
Show resolved Hide resolved

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
Expand All @@ -117,7 +121,8 @@ def create_subscription_with_dead_letter_topic(
dead_letter_topic_path = publisher.topic_path(project_id, dead_letter_topic_id)

dead_letter_policy = DeadLetterPolicy(
dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=10
dead_letter_topic=dead_letter_topic_path,
max_delivery_attempts=max_delivery_attempts
)

with subscriber:
Expand Down Expand Up @@ -259,7 +264,8 @@ def update_push_subscription(project_id, topic_id, subscription_id, endpoint):


def update_subscription_with_dead_letter_policy(
project_id, topic_id, subscription_id, dead_letter_topic_id
project_id, topic_id, subscription_id, dead_letter_topic_id,
max_delivery_attempts=5
):
"""Update a subscription's dead letter policy."""
# [START pubsub_dead_letter_update_subscription]
Expand All @@ -276,6 +282,9 @@ def update_subscription_with_dead_letter_policy(
# TODO(developer): This is an existing dead letter topic that the subscription
# with dead letter policy will forward dead letter messages to.
# dead_letter_topic_id = "your-dead-letter-topic-id"
# TODO(developer): This is an the maximum number of delivery attempts allowed
# before a message gets dead lettered.
# max_delivery_attempts = "your-max-delivery-attempts"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.


publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
Expand All @@ -290,11 +299,12 @@ def update_subscription_with_dead_letter_policy(
print(f"Before the update: {subscription_before_update}.")

# Indicates which fields in the provided subscription to update.
update_mask = FieldMask(paths=["dead_letter_policy.max_delivery_attempts"])
update_mask = FieldMask(paths=["dead_letter_policy"])

# Construct a dead letter policy you expect to have after the update.
dead_letter_policy = DeadLetterPolicy(
dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=20
dead_letter_topic=dead_letter_topic_path,
max_delivery_attempts=max_delivery_attempts
)

# Construct the subscription with the dead letter policy you expect to have
Expand Down Expand Up @@ -339,12 +349,7 @@ def remove_dead_letter_policy(project_id, topic_id, subscription_id):
print(f"Before removing the policy: {subscription_before_update}.")

# Indicates which fields in the provided subscription to update.
update_mask = FieldMask(
paths=[
"dead_letter_policy.dead_letter_topic",
"dead_letter_policy.max_delivery_attempts",
]
)
update_mask = FieldMask(paths=["dead_letter_policy"])

# Construct the subscription (without any dead letter policy) that you
# expect to have after the update.
Expand Down Expand Up @@ -676,6 +681,12 @@ def callback(message):
create_with_dead_letter_policy_parser.add_argument("topic_id")
create_with_dead_letter_policy_parser.add_argument("subscription_id")
create_with_dead_letter_policy_parser.add_argument("dead_letter_topic_id")
create_with_dead_letter_policy_parser.add_argument(
"max_delivery_attempts",
type=int,
nargs="?",
default=5
)

create_push_parser = subparsers.add_parser(
"create-push", help=create_push_subscription.__doc__
Expand Down Expand Up @@ -707,6 +718,12 @@ def callback(message):
update_dead_letter_policy_parser.add_argument("topic_id")
update_dead_letter_policy_parser.add_argument("subscription_id")
update_dead_letter_policy_parser.add_argument("dead_letter_topic_id")
update_dead_letter_policy_parser.add_argument(
"max_delivery_attempts",
type=int,
nargs="?",
default=5
)

remove_dead_letter_policy_parser = subparsers.add_parser(
"remove-dead-letter-policy", help=remove_dead_letter_policy.__doc__
Expand Down Expand Up @@ -777,6 +794,7 @@ def callback(message):
args.topic_id,
args.subscription_id,
args.dead_letter_topic_id,
args.max_delivery_attempts,
)
elif args.command == "create-push":
create_push_subscription(
Expand All @@ -798,6 +816,7 @@ def callback(message):
args.topic_id,
args.subscription_id,
args.dead_letter_topic_id,
args.max_delivery_attempts,
)
elif args.command == "remove-dead-letter-policy":
remove_dead_letter_policy(args.project_id, args.topic_id, args.subscription_id)
Expand Down
9 changes: 6 additions & 3 deletions samples/snippets/subscriber_test.py
Expand Up @@ -32,6 +32,8 @@
SUBSCRIPTION_DLQ = "subscription-test-subscription-dlq-" + UUID
ENDPOINT = "https://{}.appspot.com/push".format(PROJECT_ID)
NEW_ENDPOINT = "https://{}.appspot.com/push2".format(PROJECT_ID)
DEFAULT_MAX_DELIVERY_ATTEMPTS = 5
UPDATED_MAX_DELIVERY_ATTEMPTS = 20


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -214,18 +216,19 @@ def test_create_subscription_with_dead_letter_policy(
out, _ = capsys.readouterr()
assert f"Subscription created: {subscription_dlq}" in out
assert f"It will forward dead letter messages to: {dead_letter_topic}" in out
assert "After 10 delivery attempts." in out
assert f"After {DEFAULT_MAX_DELIVERY_ATTEMPTS} delivery attempts." in out


def test_update_dead_letter_policy(subscription_dlq, dead_letter_topic, capsys):
_ = subscriber.update_subscription_with_dead_letter_policy(
PROJECT_ID, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC
PROJECT_ID, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC,
UPDATED_MAX_DELIVERY_ATTEMPTS
)

out, _ = capsys.readouterr()
assert dead_letter_topic in out
assert subscription_dlq in out
assert "max_delivery_attempts: 20" in out
assert f"max_delivery_attempts: {UPDATED_MAX_DELIVERY_ATTEMPTS}" in out


def test_create_subscription_with_ordering(
Expand Down