Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add dead lettering max delivery attempts argument #236

Merged
merged 7 commits into from Nov 16, 2020
41 changes: 30 additions & 11 deletions samples/snippets/subscriber.py
Expand Up @@ -90,7 +90,8 @@ def create_subscription(project_id, topic_id, subscription_id):


def create_subscription_with_dead_letter_topic(
project_id, topic_id, subscription_id, dead_letter_topic_id
project_id, topic_id, subscription_id, dead_letter_topic_id,
max_delivery_attempts=5
):
"""Create a subscription with dead letter policy."""
# [START pubsub_dead_letter_create_subscription]
Expand All @@ -108,6 +109,9 @@ def create_subscription_with_dead_letter_topic(
# TODO(developer): This is an existing dead letter topic that the subscription
# with dead letter policy will forward dead letter messages to.
# dead_letter_topic_id = "your-dead-letter-topic-id"
# TODO(developer): This is the maximum number of delivery attempts allowed
# for a message before it gets delivered to a dead letter topic.
# max_delivery_attempts = 5

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
Expand All @@ -117,7 +121,8 @@ def create_subscription_with_dead_letter_topic(
dead_letter_topic_path = publisher.topic_path(project_id, dead_letter_topic_id)

dead_letter_policy = DeadLetterPolicy(
dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=10
dead_letter_topic=dead_letter_topic_path,
max_delivery_attempts=max_delivery_attempts
)

with subscriber:
Expand Down Expand Up @@ -259,7 +264,8 @@ def update_push_subscription(project_id, topic_id, subscription_id, endpoint):


def update_subscription_with_dead_letter_policy(
project_id, topic_id, subscription_id, dead_letter_topic_id
project_id, topic_id, subscription_id, dead_letter_topic_id,
max_delivery_attempts=5
):
"""Update a subscription's dead letter policy."""
# [START pubsub_dead_letter_update_subscription]
Expand All @@ -276,6 +282,9 @@ def update_subscription_with_dead_letter_policy(
# TODO(developer): This is an existing dead letter topic that the subscription
# with dead letter policy will forward dead letter messages to.
# dead_letter_topic_id = "your-dead-letter-topic-id"
# TODO(developer): This is the maximum number of delivery attempts allowed
# for a message before it gets delivered to a dead letter topic.
# max_delivery_attempts = 5

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
Expand All @@ -290,11 +299,12 @@ def update_subscription_with_dead_letter_policy(
print(f"Before the update: {subscription_before_update}.")

# Indicates which fields in the provided subscription to update.
update_mask = FieldMask(paths=["dead_letter_policy.max_delivery_attempts"])
update_mask = FieldMask(paths=["dead_letter_policy"])

# Construct a dead letter policy you expect to have after the update.
dead_letter_policy = DeadLetterPolicy(
dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=20
dead_letter_topic=dead_letter_topic_path,
max_delivery_attempts=max_delivery_attempts
)

# Construct the subscription with the dead letter policy you expect to have
Expand Down Expand Up @@ -339,12 +349,7 @@ def remove_dead_letter_policy(project_id, topic_id, subscription_id):
print(f"Before removing the policy: {subscription_before_update}.")

# Indicates which fields in the provided subscription to update.
update_mask = FieldMask(
paths=[
"dead_letter_policy.dead_letter_topic",
"dead_letter_policy.max_delivery_attempts",
]
)
update_mask = FieldMask(paths=["dead_letter_policy"])

# Construct the subscription (without any dead letter policy) that you
# expect to have after the update.
Expand Down Expand Up @@ -676,6 +681,12 @@ def callback(message):
create_with_dead_letter_policy_parser.add_argument("topic_id")
create_with_dead_letter_policy_parser.add_argument("subscription_id")
create_with_dead_letter_policy_parser.add_argument("dead_letter_topic_id")
create_with_dead_letter_policy_parser.add_argument(
"max_delivery_attempts",
type=int,
nargs="?",
default=5
)

create_push_parser = subparsers.add_parser(
"create-push", help=create_push_subscription.__doc__
Expand Down Expand Up @@ -707,6 +718,12 @@ def callback(message):
update_dead_letter_policy_parser.add_argument("topic_id")
update_dead_letter_policy_parser.add_argument("subscription_id")
update_dead_letter_policy_parser.add_argument("dead_letter_topic_id")
update_dead_letter_policy_parser.add_argument(
"max_delivery_attempts",
type=int,
nargs="?",
default=5
)

remove_dead_letter_policy_parser = subparsers.add_parser(
"remove-dead-letter-policy", help=remove_dead_letter_policy.__doc__
Expand Down Expand Up @@ -777,6 +794,7 @@ def callback(message):
args.topic_id,
args.subscription_id,
args.dead_letter_topic_id,
args.max_delivery_attempts,
)
elif args.command == "create-push":
create_push_subscription(
Expand All @@ -798,6 +816,7 @@ def callback(message):
args.topic_id,
args.subscription_id,
args.dead_letter_topic_id,
args.max_delivery_attempts,
)
elif args.command == "remove-dead-letter-policy":
remove_dead_letter_policy(args.project_id, args.topic_id, args.subscription_id)
Expand Down
9 changes: 6 additions & 3 deletions samples/snippets/subscriber_test.py
Expand Up @@ -32,6 +32,8 @@
SUBSCRIPTION_DLQ = "subscription-test-subscription-dlq-" + UUID
ENDPOINT = "https://{}.appspot.com/push".format(PROJECT_ID)
NEW_ENDPOINT = "https://{}.appspot.com/push2".format(PROJECT_ID)
DEFAULT_MAX_DELIVERY_ATTEMPTS = 5
UPDATED_MAX_DELIVERY_ATTEMPTS = 20


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -214,18 +216,19 @@ def test_create_subscription_with_dead_letter_policy(
out, _ = capsys.readouterr()
assert f"Subscription created: {subscription_dlq}" in out
assert f"It will forward dead letter messages to: {dead_letter_topic}" in out
assert "After 10 delivery attempts." in out
assert f"After {DEFAULT_MAX_DELIVERY_ATTEMPTS} delivery attempts." in out


def test_update_dead_letter_policy(subscription_dlq, dead_letter_topic, capsys):
_ = subscriber.update_subscription_with_dead_letter_policy(
PROJECT_ID, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC
PROJECT_ID, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC,
UPDATED_MAX_DELIVERY_ATTEMPTS
)

out, _ = capsys.readouterr()
assert dead_letter_topic in out
assert subscription_dlq in out
assert "max_delivery_attempts: 20" in out
assert f"max_delivery_attempts: {UPDATED_MAX_DELIVERY_ATTEMPTS}" in out


def test_create_subscription_with_ordering(
Expand Down