From 535854df64234fe491cae1e3db0c96d685ef0800 Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Thu, 30 Jan 2020 11:37:15 -0500 Subject: [PATCH] fix: removing delivery attempt attribute when dead lettering is not enabled (#72) * fix: creating fix to not populate delivery attempt attribute when dead lettering is not enabled * Adding unit test for case in which a received message has no delivery attempt --- .../cloud/pubsub/v1/MessageDispatcher.java | 14 ++++++--- .../google/cloud/pubsub/v1/Subscriber.java | 7 +++-- .../pubsub/v1/MessageDispatcherTest.java | 30 +++++++++++++++++-- .../cloud/pubsub/v1/SubscriberTest.java | 5 ++-- 4 files changed, 46 insertions(+), 10 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index e22c6130f..56fa8a3a9 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -346,10 +346,16 @@ private void processBatch(List batch) { } private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) { - return PubsubMessage.newBuilder(receivedMessage.getMessage()) - .putAttributes( - "googclient_deliveryattempt", Integer.toString(receivedMessage.getDeliveryAttempt())) - .build(); + PubsubMessage originalMessage = receivedMessage.getMessage(); + int deliveryAttempt = receivedMessage.getDeliveryAttempt(); + // Delivery Attempt will be set to 0 if DeadLetterPolicy is not set on the subscription. In + // this case, do not populate the PubsubMessage with the delivery attempt attribute. + if (deliveryAttempt > 0) { + return PubsubMessage.newBuilder(originalMessage) + .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt)) + .build(); + } + return originalMessage; } private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) { diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 422a0577f..2deafb2e7 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -207,8 +207,11 @@ public static Builder newBuilder(String subscription, MessageReceiver receiver) } /** Returns the delivery attempt count for a received {@link PubsubMessage} */ - public static int getDeliveryAttempt(PubsubMessage message) { - return Integer.parseInt(message.getAttributesOrDefault("googclient_deliveryattempt", "0")); + public static Integer getDeliveryAttempt(PubsubMessage message) { + if (!message.containsAttributes("googclient_deliveryattempt")) { + return null; + } + return Integer.parseInt(message.getAttributesOrThrow("googclient_deliveryattempt")); } /** Subscription which the subscriber is subscribed to. */ diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index b84e82c8c..1745b370b 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -17,6 +17,8 @@ package com.google.cloud.pubsub.v1; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController; @@ -59,6 +61,7 @@ public void run() { private List sentModAcks; private FakeClock clock; private FlowController flowController; + private boolean messageContainsDeliveryAttempt; @AutoValue abstract static class ModAckItem { @@ -82,8 +85,13 @@ public void setUp() { @Override public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) { assertThat(message.getData()).isEqualTo(MESSAGE_DATA); - assertThat(message.getAttributesOrThrow("googclient_deliveryattempt")) - .isEqualTo(Integer.toString(DELIVERY_INFO_COUNT)); + if (messageContainsDeliveryAttempt) { + assertTrue(message.containsAttributes("googclient_deliveryattempt")); + assertThat(message.getAttributesOrThrow("googclient_deliveryattempt")) + .isEqualTo(Integer.toString(DELIVERY_INFO_COUNT)); + } else { + assertFalse(message.containsAttributes("googclient_deliveryattempt")); + } consumers.add(consumer); } }; @@ -126,6 +134,8 @@ public void sendAckOperations( systemExecutor, clock); dispatcher.setMessageDeadlineSeconds(Subscriber.MIN_ACK_DEADLINE_SECONDS); + + messageContainsDeliveryAttempt = true; } @Test @@ -136,6 +146,22 @@ public void testReceipt() { .contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS)); } + @Test + public void testReceiptNoDeliveryAttempt() { + messageContainsDeliveryAttempt = false; + ReceivedMessage messageNoDeliveryAttempt = + ReceivedMessage.newBuilder() + .setAckId("ackid") + .setMessage(PubsubMessage.newBuilder().setData(MESSAGE_DATA).build()) + .build(); + dispatcher.processReceivedMessages(Collections.singletonList(messageNoDeliveryAttempt)); + dispatcher.processOutstandingAckOperations(); + assertThat(sentModAcks) + .contains( + ModAckItem.of( + messageNoDeliveryAttempt.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS)); + } + @Test public void testAck() throws Exception { dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index 8f8489f21..95ad58d80 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -86,15 +86,16 @@ public void tearDown() throws Exception { @Test public void testDeliveryAttemptHelper() { - int deliveryAttempt = 3; + Integer deliveryAttempt = 3; PubsubMessage message = PubsubMessage.newBuilder() .putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt)) .build(); assertEquals(Subscriber.getDeliveryAttempt(message), deliveryAttempt); + // In the case where delivery attempt attribute is not populated, expect null PubsubMessage emptyMessage = PubsubMessage.newBuilder().build(); - assertEquals(Subscriber.getDeliveryAttempt(emptyMessage), 0); + assertEquals(Subscriber.getDeliveryAttempt(emptyMessage), null); } @Test