From f3c93fa8bf0eb8ebda6eea6c6c6a60a36dc69af2 Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Tue, 14 Jan 2020 09:56:08 -0500 Subject: [PATCH] feat: adding support for dead letter queues (#60) * feat: Adding support for DLQs Adding delivery attempt count to PubsubMessages as a message attribute, and creating helper function to allow users to get the count without knowing implementation details. * Fix formatting * fix: making changes requested in pull request --- .../google/cloud/pubsub/v1/MessageDispatcher.java | 9 ++++++++- .../java/com/google/cloud/pubsub/v1/Subscriber.java | 6 ++++++ .../cloud/pubsub/v1/MessageDispatcherTest.java | 8 +++++++- .../com/google/cloud/pubsub/v1/SubscriberTest.java | 13 +++++++++++++ 4 files changed, 34 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 53b979d5e..34b482cd8 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -341,10 +341,17 @@ private void processBatch(List batch) { // This should be a blocking flow controller and never throw an exception. throw new IllegalStateException("Flow control unexpected exception", unexpectedException); } - processOutstandingMessage(message.receivedMessage.getMessage(), message.ackHandler); + processOutstandingMessage(addDeliveryInfoCount(message.receivedMessage), message.ackHandler); } } + private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) { + return PubsubMessage.newBuilder(receivedMessage.getMessage()) + .putAttributes( + "googclient_deliveryattempt", Integer.toString(receivedMessage.getDeliveryAttempt())) + .build(); + } + private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) { final SettableApiFuture response = SettableApiFuture.create(); final AckReplyConsumer consumer = diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 059ac0ce4..422a0577f 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -44,6 +44,7 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -205,6 +206,11 @@ public static Builder newBuilder(String subscription, MessageReceiver receiver) return new Builder(subscription, receiver); } + /** Returns the delivery attempt count for a received {@link PubsubMessage} */ + public static int getDeliveryAttempt(PubsubMessage message) { + return Integer.parseInt(message.getAttributesOrDefault("googclient_deliveryattempt", "0")); + } + /** Subscription which the subscriber is subscribed to. */ public String getSubscriptionNameString() { return subscriptionName; diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 163475407..b84e82c8c 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -37,10 +37,13 @@ import org.threeten.bp.Duration; public class MessageDispatcherTest { + private static final ByteString MESSAGE_DATA = ByteString.copyFromUtf8("message-data"); + private static final int DELIVERY_INFO_COUNT = 3; private static final ReceivedMessage TEST_MESSAGE = ReceivedMessage.newBuilder() .setAckId("ackid") - .setMessage(PubsubMessage.newBuilder().setData(ByteString.EMPTY).build()) + .setMessage(PubsubMessage.newBuilder().setData(MESSAGE_DATA).build()) + .setDeliveryAttempt(DELIVERY_INFO_COUNT) .build(); private static final Runnable NOOP_RUNNABLE = new Runnable() { @@ -78,6 +81,9 @@ public void setUp() { new MessageReceiver() { @Override public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) { + assertThat(message.getData()).isEqualTo(MESSAGE_DATA); + assertThat(message.getAttributesOrThrow("googclient_deliveryattempt")) + .isEqualTo(Integer.toString(DELIVERY_INFO_COUNT)); consumers.add(consumer); } }; diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index 5512145e1..8f8489f21 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -84,6 +84,19 @@ public void tearDown() throws Exception { testChannel.shutdown(); } + @Test + public void testDeliveryAttemptHelper() { + int deliveryAttempt = 3; + PubsubMessage message = + PubsubMessage.newBuilder() + .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt)) + .build(); + assertEquals(Subscriber.getDeliveryAttempt(message), deliveryAttempt); + + PubsubMessage emptyMessage = PubsubMessage.newBuilder().build(); + assertEquals(Subscriber.getDeliveryAttempt(emptyMessage), 0); + } + @Test public void testOpenedChannels() throws Exception { int expectedChannelCount = 1;