diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml index e262b999f..2fdd67dd8 100644 --- a/samples/install-without-bom/pom.xml +++ b/samples/install-without-bom/pom.xml @@ -1,4 +1,19 @@ + 4.0.0 com.google.cloud diff --git a/samples/pom.xml b/samples/pom.xml index 8219bd3f1..5b4f2113a 100644 --- a/samples/pom.xml +++ b/samples/pom.xml @@ -1,4 +1,19 @@ + 4.0.0 com.google.cloud diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml index f88f9e1a1..87b9c2a8a 100644 --- a/samples/snapshot/pom.xml +++ b/samples/snapshot/pom.xml @@ -1,4 +1,19 @@ + 4.0.0 com.google.cloud diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml index 7aaa3421f..d8a58f407 100644 --- a/samples/snippets/pom.xml +++ b/samples/snippets/pom.xml @@ -1,4 +1,19 @@ + 4.0.0 com.google.cloud diff --git a/samples/snippets/src/main/java/pubsub/CreateSubscriptionWithDeadLetterPolicyExample.java b/samples/snippets/src/main/java/pubsub/CreateSubscriptionWithDeadLetterPolicyExample.java new file mode 100644 index 000000000..c9a07fb4a --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/CreateSubscriptionWithDeadLetterPolicyExample.java @@ -0,0 +1,85 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_dead_letter_create_subscription] + +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.pubsub.v1.DeadLetterPolicy; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.Subscription; + +public class CreateSubscriptionWithDeadLetterPolicyExample { + + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "Your Project ID"; + // This is the subscription you want to create with a dead letter policy. + String subscriptionId = "Your Subscription ID"; + // This is an existing topic that you want to attach the subscription with dead letter policy + // to. + String topicId = "Your Topic ID"; + // This is an existing topic that the subscription with dead letter policy forwards dead letter + // messages to. + String deadLetterTopicId = "Your Dead Letter Topic ID"; + + CreateSubscriptionWithDeadLetterPolicyExample.createSubscriptionWithDeadLetterPolicyExample( + projectId, subscriptionId, topicId, deadLetterTopicId); + } + + public static void createSubscriptionWithDeadLetterPolicyExample( + String projectId, String subscriptionId, String topicId, String deadLetterTopicId) + throws Exception { + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + + ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId); + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + ProjectTopicName deadLetterTopicName = ProjectTopicName.of(projectId, deadLetterTopicId); + + DeadLetterPolicy deadLetterPolicy = + DeadLetterPolicy.newBuilder() + .setDeadLetterTopic(deadLetterTopicName.toString()) + // The maximum number of times that the service attempts to deliver a + // message before forwarding it to the dead letter topic. Must be [5-100]. + .setMaxDeliveryAttempts(10) + .build(); + + Subscription request = + Subscription.newBuilder() + .setName(subscriptionName.toString()) + .setTopic(topicName.toString()) + .setDeadLetterPolicy(deadLetterPolicy) + .build(); + + Subscription subscription = subscriptionAdminClient.createSubscription(request); + + System.out.println("Created subscription: " + subscription.getName()); + System.out.println( + "It will forward dead letter messages to: " + + subscription.getDeadLetterPolicy().getDeadLetterTopic()); + System.out.println( + "After " + + subscription.getDeadLetterPolicy().getMaxDeliveryAttempts() + + " delivery attempts."); + // Remember to attach a subscription to the dead letter topic because + // messages published to a topic with no subscriptions are lost. + } + } +} +// [END pubsub_dead_letter_create_subscription] diff --git a/samples/snippets/src/main/java/pubsub/ReceiveMessagesWithDeliveryAttemptsExample.java b/samples/snippets/src/main/java/pubsub/ReceiveMessagesWithDeliveryAttemptsExample.java new file mode 100644 index 000000000..8612e45dc --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/ReceiveMessagesWithDeliveryAttemptsExample.java @@ -0,0 +1,74 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_dead_letter_delivery_attempt] + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class ReceiveMessagesWithDeliveryAttemptsExample { + + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "Your Project ID"; + // This is an existing subscription with a dead letter policy. + String subscriptionId = "Your Subscription ID"; + + ReceiveMessagesWithDeliveryAttemptsExample.receiveMessagesWithDeliveryAttemptsExample( + projectId, subscriptionId); + } + + public static void receiveMessagesWithDeliveryAttemptsExample( + String projectId, String subscriptionId) { + + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + // Instantiate an asynchronous message receiver. + MessageReceiver receiver = + new MessageReceiver() { + @Override + public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { + // Handle incoming message, then ack the received message. + System.out.println("Id: " + message.getMessageId()); + System.out.println("Data: " + message.getData().toStringUtf8()); + System.out.println("Delivery Attempt: " + Subscriber.getDeliveryAttempt(message)); + consumer.ack(); + } + }; + + Subscriber subscriber = null; + try { + subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); + // Start the subscriber. + subscriber.startAsync().awaitRunning(); + System.out.printf("Listening for messages on %s:\n", subscriptionName.toString()); + // Allow the subscriber to run for 30s unless an unrecoverable error occurs. + subscriber.awaitTerminated(30, TimeUnit.SECONDS); + } catch (TimeoutException timeoutException) { + // Shut down the subscriber after 30s. Stop receiving messages. + subscriber.stopAsync(); + } + } +} +// [END pubsub_dead_letter_delivery_attempt] diff --git a/samples/snippets/src/main/java/pubsub/RemoveDeadLetterPolicyExample.java b/samples/snippets/src/main/java/pubsub/RemoveDeadLetterPolicyExample.java new file mode 100644 index 000000000..51d23594a --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/RemoveDeadLetterPolicyExample.java @@ -0,0 +1,82 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_dead_letter_remove] + +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.protobuf.FieldMask; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.TopicName; +import com.google.pubsub.v1.UpdateSubscriptionRequest; + +public class RemoveDeadLetterPolicyExample { + + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "Your Project ID"; + // This is an existing subscription with dead letter policy. + String subscriptionId = "Your Subscription ID"; + // This is an existing topic that the subscription with dead letter policy is attached to. + String topicId = "Your Topic ID"; + + RemoveDeadLetterPolicyExample.removeDeadLetterPolicyExample(projectId, subscriptionId, topicId); + } + + public static void removeDeadLetterPolicyExample( + String projectId, String subscriptionId, String topicId) throws Exception { + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + TopicName topicName = TopicName.of(projectId, topicId); + + System.out.println( + "Before: " + subscriptionAdminClient.getSubscription(subscriptionName).getAllFields()); + + // Construct the subscription you expect to have after the request. Here, + // values in the required fields (name, topic) help identify the subscription. + // No dead letter policy is supplied. + Subscription expectedSubscription = + Subscription.newBuilder() + .setName(subscriptionName.toString()) + .setTopic(topicName.toString()) + .build(); + + // Construct a field mask to indicate which field to update in the subscription. + FieldMask updateMask = + FieldMask.newBuilder() + .addPaths("dead_letter_policy.dead_letter_topic") + // A default of 5 is applied upon successful update. + .addPaths("dead_letter_policy.max_delivery_attempts") + .build(); + + UpdateSubscriptionRequest request = + UpdateSubscriptionRequest.newBuilder() + .setSubscription(expectedSubscription) + .setUpdateMask(updateMask) + .build(); + + Subscription response = subscriptionAdminClient.updateSubscription(request); + + // You should see an empty dead letter topic field inside the dead letter policy. + System.out.println("After: " + response.getAllFields()); + } + } +} +// [END pubsub_dead_letter_remove] diff --git a/samples/snippets/src/main/java/pubsub/UpdateDeadLetterPolicyExample.java b/samples/snippets/src/main/java/pubsub/UpdateDeadLetterPolicyExample.java new file mode 100644 index 000000000..ff4270ea6 --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/UpdateDeadLetterPolicyExample.java @@ -0,0 +1,96 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +// [START pubsub_dead_letter_update_subscription] + +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.protobuf.FieldMask; +import com.google.pubsub.v1.DeadLetterPolicy; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.TopicName; +import com.google.pubsub.v1.UpdateSubscriptionRequest; +import java.io.IOException; + +public class UpdateDeadLetterPolicyExample { + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "Your Project ID"; + // This is an existing subscription with a dead letter policy. + String subscriptionId = "Your Subscription ID"; + // This is an existing topic that the subscription with dead letter policy is attached to. + String topicId = "Your Topic ID"; + // This is an existing dead letter topic that the subscription with dead letter policy forwards + // dead letter messages to. + String deadLetterTopicId = "Your Dead Letter Topic ID"; + + UpdateDeadLetterPolicyExample.updateDeadLetterPolicyExample( + projectId, subscriptionId, topicId, deadLetterTopicId); + } + + public static void updateDeadLetterPolicyExample( + String projectId, String subscriptionId, String topicId, String deadLetterTopicId) + throws IOException { + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + System.out.println( + "Before: " + subscriptionAdminClient.getSubscription(subscriptionName).getAllFields()); + + TopicName topicName = TopicName.of(projectId, topicId); + TopicName deadLetterTopicName = TopicName.of(projectId, deadLetterTopicId); + + // Construct the dead letter policy you expect to have after the update. + DeadLetterPolicy deadLetterPolicy = + DeadLetterPolicy.newBuilder() + .setDeadLetterTopic(deadLetterTopicName.toString()) + .setMaxDeliveryAttempts(20) + .build(); + + // Construct the subscription with the dead letter policy you expect to have + // after the update. Here, values in the required fields (name, topic) help + // identify the subscription. + Subscription subscription = + Subscription.newBuilder() + .setName(subscriptionName.toString()) + .setTopic(topicName.toString()) + .setDeadLetterPolicy(deadLetterPolicy) + .build(); + + // Construct a field mask to indicate which field to update in the subscription. + FieldMask updateMask = + FieldMask.newBuilder().addPaths("dead_letter_policy.max_delivery_attempts").build(); + + UpdateSubscriptionRequest request = + UpdateSubscriptionRequest.newBuilder() + .setSubscription(subscription) + .setUpdateMask(updateMask) + .build(); + + Subscription response = subscriptionAdminClient.updateSubscription(request); + + System.out.println("After: " + response.getAllFields()); + System.out.println( + "Max delivery attempts is now " + + response.getDeadLetterPolicy().getMaxDeliveryAttempts()); + } + } +} +// [END pubsub_dead_letter_update_subscription] diff --git a/samples/snippets/src/test/java/pubsub/DeadLetterQueueIT.java b/samples/snippets/src/test/java/pubsub/DeadLetterQueueIT.java new file mode 100644 index 000000000..73071a6eb --- /dev/null +++ b/samples/snippets/src/test/java/pubsub/DeadLetterQueueIT.java @@ -0,0 +1,144 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +import static com.google.common.truth.Truth.assertThat; +import static junit.framework.TestCase.assertNotNull; + +import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.Topic; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.util.UUID; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; + +public class DeadLetterQueueIT { + + private ByteArrayOutputStream bout; + private PrintStream out; + + private static final String projectId = System.getenv("GOOGLE_CLOUD_PROJECT"); + private static final String _suffix = UUID.randomUUID().toString(); + private static final String topicId = "topic-" + _suffix; + private static final String subscriptionId = "subscription-" + _suffix; + private static final String deadLetterTopicId = "topic-dlq-" + _suffix; + private static final ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId); + private static final ProjectTopicName deadLetterTopicName = + ProjectTopicName.of(projectId, deadLetterTopicId); + private static final ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + private static void requireEnvVar(String varName) { + assertNotNull( + "Environment variable " + varName + " is required to perform these tests.", + System.getenv(varName)); + } + + // Helper function to publish a message. + private static void publishSomeMessages() throws Exception { + ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId); + Publisher publisher = Publisher.newBuilder(topicName).build(); + ByteString data = ByteString.copyFromUtf8("Hello"); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); + publisher.publish(pubsubMessage).get(); + } + + @Rule public Timeout globalTimeout = Timeout.seconds(300); // 5 minute timeout + + @BeforeClass + public static void checkRequirements() { + requireEnvVar("GOOGLE_CLOUD_PROJECT"); + } + + @Before + public void setUp() throws Exception { + bout = new ByteArrayOutputStream(); + out = new PrintStream(bout); + System.setOut(out); + + // Create a topic to attach a subscription with dead letter policy, and a + // dead letter topic for that subscription to forward dead letter messages to. + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + Topic topic = Topic.newBuilder().setName(topicName.toString()).build(); + Topic deadLetterTopic = Topic.newBuilder().setName(deadLetterTopicName.toString()).build(); + topicAdminClient.createTopic(topic); + topicAdminClient.createTopic(deadLetterTopic); + } + } + + @After + public void tearDown() throws Exception { + // Delete the subscription with dead letter policy. + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + subscriptionAdminClient.deleteSubscription(subscriptionName); + } + + // Delete the topic that the subscription with dead letter policy is attached + // to, and the dead letter topic that the subscription forwards dead letter + // messages to. + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + topicAdminClient.deleteTopic(topicName.toString()); + topicAdminClient.deleteTopic(deadLetterTopicName.toString()); + } + + System.setOut(null); + } + + @Test + public void testQuickstart() throws Exception { + // Create a subscription with dead letter policy + CreateSubscriptionWithDeadLetterPolicyExample.createSubscriptionWithDeadLetterPolicyExample( + projectId, subscriptionId, topicId, deadLetterTopicId); + assertThat(bout.toString()).contains("Created subscription: " + subscriptionName.toString()); + assertThat(bout.toString()) + .contains("It will forward dead letter messages to: " + deadLetterTopicName.toString()); + assertThat(bout.toString()).contains("After 10 delivery attempts."); + + publishSomeMessages(); + + bout.reset(); + // Receive messages with delivery attempts. + ReceiveMessagesWithDeliveryAttemptsExample.receiveMessagesWithDeliveryAttemptsExample( + projectId, subscriptionId); + assertThat(bout.toString()).contains("Listening for messages on"); + assertThat(bout.toString()).contains("Data: Hello"); + assertThat(bout.toString()).contains("Delivery Attempt: 1"); + + bout.reset(); + // Update dead letter policy. + UpdateDeadLetterPolicyExample.updateDeadLetterPolicyExample( + projectId, subscriptionId, topicId, deadLetterTopicId); + assertThat(bout.toString()).contains("Max delivery attempts is now 20"); + + bout.reset(); + // Remove dead letter policy. + RemoveDeadLetterPolicyExample.removeDeadLetterPolicyExample(projectId, subscriptionId, topicId); + assertThat(bout.toString()) + .contains("google.pubsub.v1.Subscription.dead_letter_policy=max_delivery_attempts: 5"); + } +}