From b991726225985daf72b787c039dacbf924865ff8 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Fri, 10 Apr 2020 09:54:32 -0700 Subject: [PATCH 1/4] docs: update libraries-bom --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ca3d245d7..77d0d1786 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ If you are using Maven with a BOM, add this to your pom.xml file: com.google.cloud libraries-bom - 2.9.0 + 4.4.1 pom import From f306d0f8aa471bab06c810dc3dae8b22370c9f28 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Mon, 18 May 2020 14:38:05 -0700 Subject: [PATCH 2/4] basic setup --- samples/install-without-bom/pom.xml | 15 ++++ samples/pom.xml | 45 +++++++++++- samples/snapshot/pom.xml | 15 ++++ samples/snippets/pom.xml | 15 ++++ ...bscriptionWithDeadLetterPolicyExample.java | 42 +++++++++++ .../test/java/pubsub/DeadLetterQueueIT.java | 70 +++++++++++++++++++ 6 files changed, 201 insertions(+), 1 deletion(-) create mode 100644 samples/snippets/src/main/java/pubsub/CreateSubscriptionWithDeadLetterPolicyExample.java create mode 100644 samples/snippets/src/test/java/pubsub/DeadLetterQueueIT.java diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml index e262b999f..2fdd67dd8 100644 --- a/samples/install-without-bom/pom.xml +++ b/samples/install-without-bom/pom.xml @@ -1,4 +1,19 @@ + 4.0.0 com.google.cloud diff --git a/samples/pom.xml b/samples/pom.xml index 8219bd3f1..4b0f4526b 100644 --- a/samples/pom.xml +++ b/samples/pom.xml @@ -1,9 +1,52 @@ + 4.0.0 com.google.cloud google-cloud-pubsub-samples - 0.0.1-SNAPSHOT + 0.0.1-SNAPSHOT + + + junit + junit + + + junit + junit + + + junit + junit + + + junit + junit + + + junit + junit + + + junit + junit + 4.13 + test + + pom Google Cloud Pub/Sub Samples Parent https://github.com/googleapis/java-pubsub diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml index f88f9e1a1..87b9c2a8a 100644 --- a/samples/snapshot/pom.xml +++ b/samples/snapshot/pom.xml @@ -1,4 +1,19 @@ + 4.0.0 com.google.cloud diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml index 7cb6f69e2..5afd00edc 100644 --- a/samples/snippets/pom.xml +++ b/samples/snippets/pom.xml @@ -1,4 +1,19 @@ + 4.0.0 com.google.cloud diff --git a/samples/snippets/src/main/java/pubsub/CreateSubscriptionWithDeadLetterPolicyExample.java b/samples/snippets/src/main/java/pubsub/CreateSubscriptionWithDeadLetterPolicyExample.java new file mode 100644 index 000000000..3391b6526 --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/CreateSubscriptionWithDeadLetterPolicyExample.java @@ -0,0 +1,42 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_dead_letter_create_subscription] + +public class CreateSubscriptionWithDeadLetterPolicyExample { + + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String PROJECT_ID = "Your Project ID"; + String TOPIC_NAME = "Your Topic Name"; + String SUBSCRIPTION_NAME = "Your Subscription Name"; + String DEAD_LETTER_TOPIC = "Your Dead Letter Topic"; + + CreateSubscriptionWithDeadLetterPolicyExample + .createSubscriptionWithDeadLetterPolicyExample(PROJECT_ID, + SUBSCRIPTION_NAME, TOPIC_NAME, DEAD_LETTER_TOPIC); + } + + public static void createSubscriptionWithDeadLetterPolicyExample( + String PROJECT_ID, String SUBSCRIPTION_NAME, String TOPIC_NAME, + String DEAD_LETTER_TOPIC_NAME) { + + } +} + +// [END pubsub_dead_letter_create_subscription] diff --git a/samples/snippets/src/test/java/pubsub/DeadLetterQueueIT.java b/samples/snippets/src/test/java/pubsub/DeadLetterQueueIT.java new file mode 100644 index 000000000..dbc710e60 --- /dev/null +++ b/samples/snippets/src/test/java/pubsub/DeadLetterQueueIT.java @@ -0,0 +1,70 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +import static junit.framework.TestCase.assertNotNull; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.UUID; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +public class DeadLetterQueueIT { + + private ByteArrayOutputStream bout; + private PrintStream out; + + private static final String GOOGLE_CLOUD_PROJECT = + System.getenv("GOOGLE_CLOUD_PROJECT"); + private static final String SUFFIX = UUID.randomUUID().toString(); + private static final String TOPIC_NAME = "topic-" + SUFFIX; + private static final String SUBSCRIPTION_NAME = "subscription-" + SUFFIX; + + private static void requireEnvVar(String varName) { + assertNotNull( + "Environment variable " + varName + " is required to perform these tests.", + System.getenv(varName)); + } + + @Rule public Timeout globalTimeout = Timeout.seconds(300); // 5 minute timeout + + @BeforeClass + public static void checkRequirements() { + requireEnvVar("GOOGLE_CLOUD_PROJECT_NUMBER"); + } + + @Before + public void setUp() throws Exception { + bout = new ByteArrayOutputStream(); + out = new PrintStream(bout); + System.setOut(out); + } + + @After + public void tearDown() throws Exception { + System.setOut(null); + } + + @Test + public void testQuickstart() throws Exception { + } +} From 460a1908d095f4585ef70edec73572261bc33c62 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Mon, 18 May 2020 17:53:32 -0700 Subject: [PATCH 3/4] add four samples --- samples/pom.xml | 30 +------ ...bscriptionWithDeadLetterPolicyExample.java | 60 ++++++++++--- ...veMessagesWithDeliveryAttemptsExample.java | 74 ++++++++++++++++ .../pubsub/RemoveDeadLetterPolicyExample.java | 80 +++++++++++++++++ .../pubsub/UpdateDeadLetterPolicyExample.java | 88 +++++++++++++++++++ .../test/java/pubsub/DeadLetterQueueIT.java | 6 +- 6 files changed, 295 insertions(+), 43 deletions(-) create mode 100644 samples/snippets/src/main/java/pubsub/ReceiveMessagesWithDeliveryAttemptsExample.java create mode 100644 samples/snippets/src/main/java/pubsub/RemoveDeadLetterPolicyExample.java create mode 100644 samples/snippets/src/main/java/pubsub/UpdateDeadLetterPolicyExample.java diff --git a/samples/pom.xml b/samples/pom.xml index 4b0f4526b..5b4f2113a 100644 --- a/samples/pom.xml +++ b/samples/pom.xml @@ -18,35 +18,7 @@ 4.0.0 com.google.cloud google-cloud-pubsub-samples - 0.0.1-SNAPSHOT - - - junit - junit - - - junit - junit - - - junit - junit - - - junit - junit - - - junit - junit - - - junit - junit - 4.13 - test - - + 0.0.1-SNAPSHOT pom Google Cloud Pub/Sub Samples Parent https://github.com/googleapis/java-pubsub diff --git a/samples/snippets/src/main/java/pubsub/CreateSubscriptionWithDeadLetterPolicyExample.java b/samples/snippets/src/main/java/pubsub/CreateSubscriptionWithDeadLetterPolicyExample.java index 3391b6526..b252309ce 100644 --- a/samples/snippets/src/main/java/pubsub/CreateSubscriptionWithDeadLetterPolicyExample.java +++ b/samples/snippets/src/main/java/pubsub/CreateSubscriptionWithDeadLetterPolicyExample.java @@ -18,24 +18,64 @@ // [START pubsub_dead_letter_create_subscription] +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.pubsub.v1.DeadLetterPolicy; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.Subscription; +import java.io.IOException; + public class CreateSubscriptionWithDeadLetterPolicyExample { public static void main(String... args) throws Exception { // TODO(developer): Replace these variables before running the sample. - String PROJECT_ID = "Your Project ID"; - String TOPIC_NAME = "Your Topic Name"; - String SUBSCRIPTION_NAME = "Your Subscription Name"; - String DEAD_LETTER_TOPIC = "Your Dead Letter Topic"; - - CreateSubscriptionWithDeadLetterPolicyExample - .createSubscriptionWithDeadLetterPolicyExample(PROJECT_ID, - SUBSCRIPTION_NAME, TOPIC_NAME, DEAD_LETTER_TOPIC); + String projectId = "Your Project ID"; + String topicId = "Your Topic ID"; + String subscriptionId = "Your Subscription ID"; + String deadLetterTopicId = "Your Dead Letter Topic ID"; + + projectId = "tz-playground-bigdata"; + topicId = "jan"; + subscriptionId = "uno"; + deadLetterTopicId = "jan-dlq"; + + CreateSubscriptionWithDeadLetterPolicyExample.createSubscriptionWithDeadLetterPolicyExample( + projectId, subscriptionId, topicId, deadLetterTopicId); } public static void createSubscriptionWithDeadLetterPolicyExample( - String PROJECT_ID, String SUBSCRIPTION_NAME, String TOPIC_NAME, - String DEAD_LETTER_TOPIC_NAME) { + String projectId, String subscriptionId, String topicId, String deadLetterTopicId) + throws IOException { + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + + ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId); + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + ProjectTopicName deadLetterTopicName = ProjectTopicName.of(projectId, deadLetterTopicId); + + DeadLetterPolicy deadLetterPolicy = + DeadLetterPolicy.newBuilder() + .setDeadLetterTopic(deadLetterTopicName.toString()) + .setMaxDeliveryAttempts(10) + .build(); + + Subscription request = + Subscription.newBuilder() + .setName(subscriptionName.toString()) + .setTopic(topicName.toString()) + .setPushConfig(PushConfig.getDefaultInstance()) + .setDeadLetterPolicy(deadLetterPolicy) + .setAckDeadlineSeconds(10) + .build(); + + Subscription subscription = subscriptionAdminClient.createSubscription(request); + System.out.println("Created subscription: " + subscription.getName()); + System.out.println("Of dead letter policy: " + subscription.getDeadLetterPolicy()); + System.out.println( + "It will forward dead letter messages to: " + deadLetterTopicName.toString()); + } } } diff --git a/samples/snippets/src/main/java/pubsub/ReceiveMessagesWithDeliveryAttemptsExample.java b/samples/snippets/src/main/java/pubsub/ReceiveMessagesWithDeliveryAttemptsExample.java new file mode 100644 index 000000000..db00e004d --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/ReceiveMessagesWithDeliveryAttemptsExample.java @@ -0,0 +1,74 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_dead_letter_delivery_attempt] + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; + +public class ReceiveMessagesWithDeliveryAttemptsExample { + + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "Your Project ID"; + String subscriptionId = "Your Subscription ID"; + + projectId = "tz-playground-bigdata"; + subscriptionId = "uno"; + ReceiveMessagesWithDeliveryAttemptsExample.receiveMessagesWithDeliveryAttemptsExample( + projectId, subscriptionId); + } + + public static void receiveMessagesWithDeliveryAttemptsExample( + String projectId, String subscriptionId) { + + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + // Instantiate an asynchronous message receiver + MessageReceiver receiver = + new MessageReceiver() { + @Override + public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { + // handle incoming message, then ack/nack the received message + System.out.println("Id : " + message.getMessageId()); + System.out.println("Data : " + message.getData().toStringUtf8()); + System.out.println("Deliver Attempt : " + Subscriber.getDeliveryAttempt(message)); + consumer.ack(); + } + }; + + Subscriber subscriber = null; + try { + subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); + subscriber.startAsync().awaitRunning(); + System.out.printf("Listening for messages on %s:\n", subscriptionName.toString()); + // Allow the subscriber to run indefinitely unless an unrecoverable error occurs + subscriber.awaitTerminated(); + } finally { + // Stop receiving messages + if (subscriber != null) { + subscriber.stopAsync(); + } + } + } +} +// [END pubsub_dead_letter_delivery_attempt] diff --git a/samples/snippets/src/main/java/pubsub/RemoveDeadLetterPolicyExample.java b/samples/snippets/src/main/java/pubsub/RemoveDeadLetterPolicyExample.java new file mode 100644 index 000000000..795400b56 --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/RemoveDeadLetterPolicyExample.java @@ -0,0 +1,80 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_dead_letter_remove] + +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.protobuf.FieldMask; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.TopicName; +import com.google.pubsub.v1.UpdateSubscriptionRequest; +import java.io.IOException; + +public class RemoveDeadLetterPolicyExample { + + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "Your Project ID"; + String subscriptionId = "Your Subscription ID"; + String topicId = "Your Topic ID"; + + projectId = "tz-playground-bigdata"; + subscriptionId = "une"; + topicId = "jan"; + + RemoveDeadLetterPolicyExample.removeDeadLetterPolicyExample(projectId, subscriptionId, topicId); + } + + public static void removeDeadLetterPolicyExample( + String projectId, String subscriptionId, String topicId) throws IOException { + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + System.out.println( + "Before: " + subscriptionAdminClient.getSubscription(subscriptionName).getAllFields()); + + TopicName topicName = TopicName.of(projectId, topicId); + + Subscription subscription = + Subscription.newBuilder() + .setName(subscriptionName.toString()) + .setTopic(topicName.toString()) + .build(); + + FieldMask updateMask = + FieldMask.newBuilder() + .addPaths("dead_letter_policy.dead_letter_topic") + .addPaths("dead_letter_policy.max_delivery_attempts") + .build(); + + UpdateSubscriptionRequest request = + UpdateSubscriptionRequest.newBuilder() + .setSubscription(subscription) + .setUpdateMask(updateMask) + .build(); + + Subscription response = subscriptionAdminClient.updateSubscription(request); + + System.out.println("After: " + response.getAllFields()); + } + } +} +// [END pubsub_dead_letter_remove] diff --git a/samples/snippets/src/main/java/pubsub/UpdateDeadLetterPolicyExample.java b/samples/snippets/src/main/java/pubsub/UpdateDeadLetterPolicyExample.java new file mode 100644 index 000000000..070e0a14b --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/UpdateDeadLetterPolicyExample.java @@ -0,0 +1,88 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_dead_letter_update_subscription] + +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.protobuf.FieldMask; +import com.google.pubsub.v1.DeadLetterPolicy; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.TopicName; +import com.google.pubsub.v1.UpdateSubscriptionRequest; +import java.io.IOException; + +public class UpdateDeadLetterPolicyExample { + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "Your Project ID"; + String subscriptionId = "Your Subscription ID"; + String topicId = "Your Topic ID"; + String deadLetterTopicId = "Your Dead Letter Topic ID"; + + projectId = "tz-playground-bigdata"; + subscriptionId = "une"; + topicId = "jan"; + deadLetterTopicId = "jan-dlq"; + + UpdateDeadLetterPolicyExample.updateDeadLetterPolicyExample( + projectId, subscriptionId, topicId, deadLetterTopicId); + } + + public static void updateDeadLetterPolicyExample( + String projectId, String subscriptionId, String topicId, String deadLetterTopicId) + throws IOException { + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + System.out.println( + "Before: " + subscriptionAdminClient.getSubscription(subscriptionName).getAllFields()); + + TopicName topicName = TopicName.of(projectId, topicId); + TopicName deadLetterTopicName = TopicName.of(projectId, deadLetterTopicId); + + DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.newBuilder() + .setDeadLetterTopic(deadLetterTopicName.toString()) + .setMaxDeliveryAttempts(20) + .build(); + + Subscription subscription = + Subscription.newBuilder() + .setName(subscriptionName.toString()) + .setTopic(topicName.toString()) + .setDeadLetterPolicy(deadLetterPolicy) + .build(); + + FieldMask updateMask = + FieldMask.newBuilder().addPaths("dead_letter_policy.max_delivery_attempts").build(); + + UpdateSubscriptionRequest request = + UpdateSubscriptionRequest.newBuilder() + .setSubscription(subscription) + .setUpdateMask(updateMask) + .build(); + + Subscription response = subscriptionAdminClient.updateSubscription(request); + + System.out.println("After: " + response.getAllFields()); + } + } +} +// [END pubsub_dead_letter_update_subscription] diff --git a/samples/snippets/src/test/java/pubsub/DeadLetterQueueIT.java b/samples/snippets/src/test/java/pubsub/DeadLetterQueueIT.java index dbc710e60..0ab6265e3 100644 --- a/samples/snippets/src/test/java/pubsub/DeadLetterQueueIT.java +++ b/samples/snippets/src/test/java/pubsub/DeadLetterQueueIT.java @@ -33,8 +33,7 @@ public class DeadLetterQueueIT { private ByteArrayOutputStream bout; private PrintStream out; - private static final String GOOGLE_CLOUD_PROJECT = - System.getenv("GOOGLE_CLOUD_PROJECT"); + private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); private static final String SUFFIX = UUID.randomUUID().toString(); private static final String TOPIC_NAME = "topic-" + SUFFIX; private static final String SUBSCRIPTION_NAME = "subscription-" + SUFFIX; @@ -65,6 +64,5 @@ public void tearDown() throws Exception { } @Test - public void testQuickstart() throws Exception { - } + public void testQuickstart() throws Exception {} } From 4f01e86ff8fda370983bd76ddf55e3f2245e7c55 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Tue, 19 May 2020 12:56:54 -0700 Subject: [PATCH 4/4] samples & tests completed --- ...bscriptionWithDeadLetterPolicyExample.java | 31 ++++--- ...veMessagesWithDeliveryAttemptsExample.java | 28 +++--- .../pubsub/RemoveDeadLetterPolicyExample.java | 22 ++--- .../pubsub/UpdateDeadLetterPolicyExample.java | 26 ++++-- .../test/java/pubsub/DeadLetterQueueIT.java | 88 +++++++++++++++++-- 5 files changed, 142 insertions(+), 53 deletions(-) diff --git a/samples/snippets/src/main/java/pubsub/CreateSubscriptionWithDeadLetterPolicyExample.java b/samples/snippets/src/main/java/pubsub/CreateSubscriptionWithDeadLetterPolicyExample.java index b252309ce..c9a07fb4a 100644 --- a/samples/snippets/src/main/java/pubsub/CreateSubscriptionWithDeadLetterPolicyExample.java +++ b/samples/snippets/src/main/java/pubsub/CreateSubscriptionWithDeadLetterPolicyExample.java @@ -22,31 +22,29 @@ import com.google.pubsub.v1.DeadLetterPolicy; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.ProjectTopicName; -import com.google.pubsub.v1.PushConfig; import com.google.pubsub.v1.Subscription; -import java.io.IOException; public class CreateSubscriptionWithDeadLetterPolicyExample { public static void main(String... args) throws Exception { // TODO(developer): Replace these variables before running the sample. String projectId = "Your Project ID"; - String topicId = "Your Topic ID"; + // This is the subscription you want to create with a dead letter policy. String subscriptionId = "Your Subscription ID"; + // This is an existing topic that you want to attach the subscription with dead letter policy + // to. + String topicId = "Your Topic ID"; + // This is an existing topic that the subscription with dead letter policy forwards dead letter + // messages to. String deadLetterTopicId = "Your Dead Letter Topic ID"; - projectId = "tz-playground-bigdata"; - topicId = "jan"; - subscriptionId = "uno"; - deadLetterTopicId = "jan-dlq"; - CreateSubscriptionWithDeadLetterPolicyExample.createSubscriptionWithDeadLetterPolicyExample( projectId, subscriptionId, topicId, deadLetterTopicId); } public static void createSubscriptionWithDeadLetterPolicyExample( String projectId, String subscriptionId, String topicId, String deadLetterTopicId) - throws IOException { + throws Exception { try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId); @@ -57,6 +55,8 @@ public static void createSubscriptionWithDeadLetterPolicyExample( DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.newBuilder() .setDeadLetterTopic(deadLetterTopicName.toString()) + // The maximum number of times that the service attempts to deliver a + // message before forwarding it to the dead letter topic. Must be [5-100]. .setMaxDeliveryAttempts(10) .build(); @@ -64,19 +64,22 @@ public static void createSubscriptionWithDeadLetterPolicyExample( Subscription.newBuilder() .setName(subscriptionName.toString()) .setTopic(topicName.toString()) - .setPushConfig(PushConfig.getDefaultInstance()) .setDeadLetterPolicy(deadLetterPolicy) - .setAckDeadlineSeconds(10) .build(); Subscription subscription = subscriptionAdminClient.createSubscription(request); System.out.println("Created subscription: " + subscription.getName()); - System.out.println("Of dead letter policy: " + subscription.getDeadLetterPolicy()); System.out.println( - "It will forward dead letter messages to: " + deadLetterTopicName.toString()); + "It will forward dead letter messages to: " + + subscription.getDeadLetterPolicy().getDeadLetterTopic()); + System.out.println( + "After " + + subscription.getDeadLetterPolicy().getMaxDeliveryAttempts() + + " delivery attempts."); + // Remember to attach a subscription to the dead letter topic because + // messages published to a topic with no subscriptions are lost. } } } - // [END pubsub_dead_letter_create_subscription] diff --git a/samples/snippets/src/main/java/pubsub/ReceiveMessagesWithDeliveryAttemptsExample.java b/samples/snippets/src/main/java/pubsub/ReceiveMessagesWithDeliveryAttemptsExample.java index db00e004d..8612e45dc 100644 --- a/samples/snippets/src/main/java/pubsub/ReceiveMessagesWithDeliveryAttemptsExample.java +++ b/samples/snippets/src/main/java/pubsub/ReceiveMessagesWithDeliveryAttemptsExample.java @@ -23,16 +23,17 @@ import com.google.cloud.pubsub.v1.Subscriber; import com.google.pubsub.v1.ProjectSubscriptionName; import com.google.pubsub.v1.PubsubMessage; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class ReceiveMessagesWithDeliveryAttemptsExample { public static void main(String... args) throws Exception { // TODO(developer): Replace these variables before running the sample. String projectId = "Your Project ID"; + // This is an existing subscription with a dead letter policy. String subscriptionId = "Your Subscription ID"; - projectId = "tz-playground-bigdata"; - subscriptionId = "uno"; ReceiveMessagesWithDeliveryAttemptsExample.receiveMessagesWithDeliveryAttemptsExample( projectId, subscriptionId); } @@ -43,15 +44,15 @@ public static void receiveMessagesWithDeliveryAttemptsExample( ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); - // Instantiate an asynchronous message receiver + // Instantiate an asynchronous message receiver. MessageReceiver receiver = new MessageReceiver() { @Override public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { - // handle incoming message, then ack/nack the received message - System.out.println("Id : " + message.getMessageId()); - System.out.println("Data : " + message.getData().toStringUtf8()); - System.out.println("Deliver Attempt : " + Subscriber.getDeliveryAttempt(message)); + // Handle incoming message, then ack the received message. + System.out.println("Id: " + message.getMessageId()); + System.out.println("Data: " + message.getData().toStringUtf8()); + System.out.println("Delivery Attempt: " + Subscriber.getDeliveryAttempt(message)); consumer.ack(); } }; @@ -59,15 +60,14 @@ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { Subscriber subscriber = null; try { subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); + // Start the subscriber. subscriber.startAsync().awaitRunning(); System.out.printf("Listening for messages on %s:\n", subscriptionName.toString()); - // Allow the subscriber to run indefinitely unless an unrecoverable error occurs - subscriber.awaitTerminated(); - } finally { - // Stop receiving messages - if (subscriber != null) { - subscriber.stopAsync(); - } + // Allow the subscriber to run for 30s unless an unrecoverable error occurs. + subscriber.awaitTerminated(30, TimeUnit.SECONDS); + } catch (TimeoutException timeoutException) { + // Shut down the subscriber after 30s. Stop receiving messages. + subscriber.stopAsync(); } } } diff --git a/samples/snippets/src/main/java/pubsub/RemoveDeadLetterPolicyExample.java b/samples/snippets/src/main/java/pubsub/RemoveDeadLetterPolicyExample.java index 795400b56..51d23594a 100644 --- a/samples/snippets/src/main/java/pubsub/RemoveDeadLetterPolicyExample.java +++ b/samples/snippets/src/main/java/pubsub/RemoveDeadLetterPolicyExample.java @@ -24,55 +24,57 @@ import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.TopicName; import com.google.pubsub.v1.UpdateSubscriptionRequest; -import java.io.IOException; public class RemoveDeadLetterPolicyExample { public static void main(String... args) throws Exception { // TODO(developer): Replace these variables before running the sample. String projectId = "Your Project ID"; + // This is an existing subscription with dead letter policy. String subscriptionId = "Your Subscription ID"; + // This is an existing topic that the subscription with dead letter policy is attached to. String topicId = "Your Topic ID"; - projectId = "tz-playground-bigdata"; - subscriptionId = "une"; - topicId = "jan"; - RemoveDeadLetterPolicyExample.removeDeadLetterPolicyExample(projectId, subscriptionId, topicId); } public static void removeDeadLetterPolicyExample( - String projectId, String subscriptionId, String topicId) throws IOException { + String projectId, String subscriptionId, String topicId) throws Exception { try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId); + TopicName topicName = TopicName.of(projectId, topicId); System.out.println( "Before: " + subscriptionAdminClient.getSubscription(subscriptionName).getAllFields()); - TopicName topicName = TopicName.of(projectId, topicId); - - Subscription subscription = + // Construct the subscription you expect to have after the request. Here, + // values in the required fields (name, topic) help identify the subscription. + // No dead letter policy is supplied. + Subscription expectedSubscription = Subscription.newBuilder() .setName(subscriptionName.toString()) .setTopic(topicName.toString()) .build(); + // Construct a field mask to indicate which field to update in the subscription. FieldMask updateMask = FieldMask.newBuilder() .addPaths("dead_letter_policy.dead_letter_topic") + // A default of 5 is applied upon successful update. .addPaths("dead_letter_policy.max_delivery_attempts") .build(); UpdateSubscriptionRequest request = UpdateSubscriptionRequest.newBuilder() - .setSubscription(subscription) + .setSubscription(expectedSubscription) .setUpdateMask(updateMask) .build(); Subscription response = subscriptionAdminClient.updateSubscription(request); + // You should see an empty dead letter topic field inside the dead letter policy. System.out.println("After: " + response.getAllFields()); } } diff --git a/samples/snippets/src/main/java/pubsub/UpdateDeadLetterPolicyExample.java b/samples/snippets/src/main/java/pubsub/UpdateDeadLetterPolicyExample.java index 070e0a14b..ff4270ea6 100644 --- a/samples/snippets/src/main/java/pubsub/UpdateDeadLetterPolicyExample.java +++ b/samples/snippets/src/main/java/pubsub/UpdateDeadLetterPolicyExample.java @@ -31,15 +31,14 @@ public class UpdateDeadLetterPolicyExample { public static void main(String... args) throws Exception { // TODO(developer): Replace these variables before running the sample. String projectId = "Your Project ID"; + // This is an existing subscription with a dead letter policy. String subscriptionId = "Your Subscription ID"; + // This is an existing topic that the subscription with dead letter policy is attached to. String topicId = "Your Topic ID"; + // This is an existing dead letter topic that the subscription with dead letter policy forwards + // dead letter messages to. String deadLetterTopicId = "Your Dead Letter Topic ID"; - projectId = "tz-playground-bigdata"; - subscriptionId = "une"; - topicId = "jan"; - deadLetterTopicId = "jan-dlq"; - UpdateDeadLetterPolicyExample.updateDeadLetterPolicyExample( projectId, subscriptionId, topicId, deadLetterTopicId); } @@ -58,11 +57,16 @@ public static void updateDeadLetterPolicyExample( TopicName topicName = TopicName.of(projectId, topicId); TopicName deadLetterTopicName = TopicName.of(projectId, deadLetterTopicId); - DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.newBuilder() - .setDeadLetterTopic(deadLetterTopicName.toString()) - .setMaxDeliveryAttempts(20) - .build(); + // Construct the dead letter policy you expect to have after the update. + DeadLetterPolicy deadLetterPolicy = + DeadLetterPolicy.newBuilder() + .setDeadLetterTopic(deadLetterTopicName.toString()) + .setMaxDeliveryAttempts(20) + .build(); + // Construct the subscription with the dead letter policy you expect to have + // after the update. Here, values in the required fields (name, topic) help + // identify the subscription. Subscription subscription = Subscription.newBuilder() .setName(subscriptionName.toString()) @@ -70,6 +74,7 @@ public static void updateDeadLetterPolicyExample( .setDeadLetterPolicy(deadLetterPolicy) .build(); + // Construct a field mask to indicate which field to update in the subscription. FieldMask updateMask = FieldMask.newBuilder().addPaths("dead_letter_policy.max_delivery_attempts").build(); @@ -82,6 +87,9 @@ public static void updateDeadLetterPolicyExample( Subscription response = subscriptionAdminClient.updateSubscription(request); System.out.println("After: " + response.getAllFields()); + System.out.println( + "Max delivery attempts is now " + + response.getDeadLetterPolicy().getMaxDeliveryAttempts()); } } } diff --git a/samples/snippets/src/test/java/pubsub/DeadLetterQueueIT.java b/samples/snippets/src/test/java/pubsub/DeadLetterQueueIT.java index 0ab6265e3..73071a6eb 100644 --- a/samples/snippets/src/test/java/pubsub/DeadLetterQueueIT.java +++ b/samples/snippets/src/test/java/pubsub/DeadLetterQueueIT.java @@ -16,8 +16,17 @@ package pubsub; +import static com.google.common.truth.Truth.assertThat; import static junit.framework.TestCase.assertNotNull; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.Topic; import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.util.UUID; @@ -33,10 +42,16 @@ public class DeadLetterQueueIT { private ByteArrayOutputStream bout; private PrintStream out; - private static final String GOOGLE_CLOUD_PROJECT = System.getenv("GOOGLE_CLOUD_PROJECT"); - private static final String SUFFIX = UUID.randomUUID().toString(); - private static final String TOPIC_NAME = "topic-" + SUFFIX; - private static final String SUBSCRIPTION_NAME = "subscription-" + SUFFIX; + private static final String projectId = System.getenv("GOOGLE_CLOUD_PROJECT"); + private static final String _suffix = UUID.randomUUID().toString(); + private static final String topicId = "topic-" + _suffix; + private static final String subscriptionId = "subscription-" + _suffix; + private static final String deadLetterTopicId = "topic-dlq-" + _suffix; + private static final ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId); + private static final ProjectTopicName deadLetterTopicName = + ProjectTopicName.of(projectId, deadLetterTopicId); + private static final ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); private static void requireEnvVar(String varName) { assertNotNull( @@ -44,11 +59,20 @@ private static void requireEnvVar(String varName) { System.getenv(varName)); } + // Helper function to publish a message. + private static void publishSomeMessages() throws Exception { + ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId); + Publisher publisher = Publisher.newBuilder(topicName).build(); + ByteString data = ByteString.copyFromUtf8("Hello"); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); + publisher.publish(pubsubMessage).get(); + } + @Rule public Timeout globalTimeout = Timeout.seconds(300); // 5 minute timeout @BeforeClass public static void checkRequirements() { - requireEnvVar("GOOGLE_CLOUD_PROJECT_NUMBER"); + requireEnvVar("GOOGLE_CLOUD_PROJECT"); } @Before @@ -56,13 +80,65 @@ public void setUp() throws Exception { bout = new ByteArrayOutputStream(); out = new PrintStream(bout); System.setOut(out); + + // Create a topic to attach a subscription with dead letter policy, and a + // dead letter topic for that subscription to forward dead letter messages to. + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + Topic topic = Topic.newBuilder().setName(topicName.toString()).build(); + Topic deadLetterTopic = Topic.newBuilder().setName(deadLetterTopicName.toString()).build(); + topicAdminClient.createTopic(topic); + topicAdminClient.createTopic(deadLetterTopic); + } } @After public void tearDown() throws Exception { + // Delete the subscription with dead letter policy. + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + subscriptionAdminClient.deleteSubscription(subscriptionName); + } + + // Delete the topic that the subscription with dead letter policy is attached + // to, and the dead letter topic that the subscription forwards dead letter + // messages to. + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + topicAdminClient.deleteTopic(topicName.toString()); + topicAdminClient.deleteTopic(deadLetterTopicName.toString()); + } + System.setOut(null); } @Test - public void testQuickstart() throws Exception {} + public void testQuickstart() throws Exception { + // Create a subscription with dead letter policy + CreateSubscriptionWithDeadLetterPolicyExample.createSubscriptionWithDeadLetterPolicyExample( + projectId, subscriptionId, topicId, deadLetterTopicId); + assertThat(bout.toString()).contains("Created subscription: " + subscriptionName.toString()); + assertThat(bout.toString()) + .contains("It will forward dead letter messages to: " + deadLetterTopicName.toString()); + assertThat(bout.toString()).contains("After 10 delivery attempts."); + + publishSomeMessages(); + + bout.reset(); + // Receive messages with delivery attempts. + ReceiveMessagesWithDeliveryAttemptsExample.receiveMessagesWithDeliveryAttemptsExample( + projectId, subscriptionId); + assertThat(bout.toString()).contains("Listening for messages on"); + assertThat(bout.toString()).contains("Data: Hello"); + assertThat(bout.toString()).contains("Delivery Attempt: 1"); + + bout.reset(); + // Update dead letter policy. + UpdateDeadLetterPolicyExample.updateDeadLetterPolicyExample( + projectId, subscriptionId, topicId, deadLetterTopicId); + assertThat(bout.toString()).contains("Max delivery attempts is now 20"); + + bout.reset(); + // Remove dead letter policy. + RemoveDeadLetterPolicyExample.removeDeadLetterPolicyExample(projectId, subscriptionId, topicId); + assertThat(bout.toString()) + .contains("google.pubsub.v1.Subscription.dead_letter_policy=max_delivery_attempts: 5"); + } }