diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py index 07da80d93..2235c0060 100644 --- a/samples/snippets/subscriber.py +++ b/samples/snippets/subscriber.py @@ -90,7 +90,8 @@ def create_subscription(project_id, topic_id, subscription_id): def create_subscription_with_dead_letter_topic( - project_id, topic_id, subscription_id, dead_letter_topic_id + project_id, topic_id, subscription_id, dead_letter_topic_id, + max_delivery_attempts=5 ): """Create a subscription with dead letter policy.""" # [START pubsub_dead_letter_create_subscription] @@ -108,6 +109,9 @@ def create_subscription_with_dead_letter_topic( # TODO(developer): This is an existing dead letter topic that the subscription # with dead letter policy will forward dead letter messages to. # dead_letter_topic_id = "your-dead-letter-topic-id" + # TODO(developer): This is the maximum number of delivery attempts allowed + # for a message before it gets delivered to a dead letter topic. + # max_delivery_attempts = 5 publisher = pubsub_v1.PublisherClient() subscriber = pubsub_v1.SubscriberClient() @@ -117,7 +121,8 @@ def create_subscription_with_dead_letter_topic( dead_letter_topic_path = publisher.topic_path(project_id, dead_letter_topic_id) dead_letter_policy = DeadLetterPolicy( - dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=10 + dead_letter_topic=dead_letter_topic_path, + max_delivery_attempts=max_delivery_attempts ) with subscriber: @@ -259,7 +264,8 @@ def update_push_subscription(project_id, topic_id, subscription_id, endpoint): def update_subscription_with_dead_letter_policy( - project_id, topic_id, subscription_id, dead_letter_topic_id + project_id, topic_id, subscription_id, dead_letter_topic_id, + max_delivery_attempts=5 ): """Update a subscription's dead letter policy.""" # [START pubsub_dead_letter_update_subscription] @@ -276,6 +282,9 @@ def update_subscription_with_dead_letter_policy( # TODO(developer): This is an existing dead letter topic that the subscription # with dead letter policy will forward dead letter messages to. # dead_letter_topic_id = "your-dead-letter-topic-id" + # TODO(developer): This is the maximum number of delivery attempts allowed + # for a message before it gets delivered to a dead letter topic. + # max_delivery_attempts = 5 publisher = pubsub_v1.PublisherClient() subscriber = pubsub_v1.SubscriberClient() @@ -290,11 +299,12 @@ def update_subscription_with_dead_letter_policy( print(f"Before the update: {subscription_before_update}.") # Indicates which fields in the provided subscription to update. - update_mask = FieldMask(paths=["dead_letter_policy.max_delivery_attempts"]) + update_mask = FieldMask(paths=["dead_letter_policy"]) # Construct a dead letter policy you expect to have after the update. dead_letter_policy = DeadLetterPolicy( - dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=20 + dead_letter_topic=dead_letter_topic_path, + max_delivery_attempts=max_delivery_attempts ) # Construct the subscription with the dead letter policy you expect to have @@ -339,12 +349,7 @@ def remove_dead_letter_policy(project_id, topic_id, subscription_id): print(f"Before removing the policy: {subscription_before_update}.") # Indicates which fields in the provided subscription to update. - update_mask = FieldMask( - paths=[ - "dead_letter_policy.dead_letter_topic", - "dead_letter_policy.max_delivery_attempts", - ] - ) + update_mask = FieldMask(paths=["dead_letter_policy"]) # Construct the subscription (without any dead letter policy) that you # expect to have after the update. @@ -676,6 +681,12 @@ def callback(message): create_with_dead_letter_policy_parser.add_argument("topic_id") create_with_dead_letter_policy_parser.add_argument("subscription_id") create_with_dead_letter_policy_parser.add_argument("dead_letter_topic_id") + create_with_dead_letter_policy_parser.add_argument( + "max_delivery_attempts", + type=int, + nargs="?", + default=5 + ) create_push_parser = subparsers.add_parser( "create-push", help=create_push_subscription.__doc__ @@ -707,6 +718,12 @@ def callback(message): update_dead_letter_policy_parser.add_argument("topic_id") update_dead_letter_policy_parser.add_argument("subscription_id") update_dead_letter_policy_parser.add_argument("dead_letter_topic_id") + update_dead_letter_policy_parser.add_argument( + "max_delivery_attempts", + type=int, + nargs="?", + default=5 + ) remove_dead_letter_policy_parser = subparsers.add_parser( "remove-dead-letter-policy", help=remove_dead_letter_policy.__doc__ @@ -777,6 +794,7 @@ def callback(message): args.topic_id, args.subscription_id, args.dead_letter_topic_id, + args.max_delivery_attempts, ) elif args.command == "create-push": create_push_subscription( @@ -798,6 +816,7 @@ def callback(message): args.topic_id, args.subscription_id, args.dead_letter_topic_id, + args.max_delivery_attempts, ) elif args.command == "remove-dead-letter-policy": remove_dead_letter_policy(args.project_id, args.topic_id, args.subscription_id) diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index d722ebdec..e69212f8c 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -32,6 +32,8 @@ SUBSCRIPTION_DLQ = "subscription-test-subscription-dlq-" + UUID ENDPOINT = "https://{}.appspot.com/push".format(PROJECT_ID) NEW_ENDPOINT = "https://{}.appspot.com/push2".format(PROJECT_ID) +DEFAULT_MAX_DELIVERY_ATTEMPTS = 5 +UPDATED_MAX_DELIVERY_ATTEMPTS = 20 @pytest.fixture(scope="module") @@ -214,18 +216,19 @@ def test_create_subscription_with_dead_letter_policy( out, _ = capsys.readouterr() assert f"Subscription created: {subscription_dlq}" in out assert f"It will forward dead letter messages to: {dead_letter_topic}" in out - assert "After 10 delivery attempts." in out + assert f"After {DEFAULT_MAX_DELIVERY_ATTEMPTS} delivery attempts." in out def test_update_dead_letter_policy(subscription_dlq, dead_letter_topic, capsys): _ = subscriber.update_subscription_with_dead_letter_policy( - PROJECT_ID, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC + PROJECT_ID, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC, + UPDATED_MAX_DELIVERY_ATTEMPTS ) out, _ = capsys.readouterr() assert dead_letter_topic in out assert subscription_dlq in out - assert "max_delivery_attempts: 20" in out + assert f"max_delivery_attempts: {UPDATED_MAX_DELIVERY_ATTEMPTS}" in out def test_create_subscription_with_ordering(