Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding support for dead letter queues #60

Merged
merged 8 commits into from Jan 14, 2020
Expand Up @@ -341,10 +341,17 @@ private void processBatch(List<OutstandingMessage> batch) {
// This should be a blocking flow controller and never throw an exception.
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
processOutstandingMessage(message.receivedMessage.getMessage(), message.ackHandler);
processOutstandingMessage(addDeliveryInfoCount(message.receivedMessage), message.ackHandler);
}
}

private PubsubMessage addDeliveryInfoCount(ReceivedMessage receivedMessage) {
return PubsubMessage.newBuilder(receivedMessage.getMessage())
.putAttributes(
"googclient_deliveryattempt", Integer.toString(receivedMessage.getDeliveryAttempt()))
hannahrogers-google marked this conversation as resolved.
Show resolved Hide resolved
.build();
}

private void processOutstandingMessage(final PubsubMessage message, final AckHandler ackHandler) {
final SettableApiFuture<AckReply> response = SettableApiFuture.create();
final AckReplyConsumer consumer =
Expand Down
Expand Up @@ -44,6 +44,7 @@
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -205,6 +206,11 @@ public static Builder newBuilder(String subscription, MessageReceiver receiver)
return new Builder(subscription, receiver);
}

/** Returns the delivery attempt count for a received {@link PubsubMessage} */
public static int getDeliveryAttempt(PubsubMessage message) {
return Integer.parseInt(message.getAttributesOrDefault("googclient_deliveryattempt", "0"));
}

/** Subscription which the subscriber is subscribed to. */
public String getSubscriptionNameString() {
return subscriptionName;
Expand Down
Expand Up @@ -37,10 +37,13 @@
import org.threeten.bp.Duration;

public class MessageDispatcherTest {
private static final ByteString MESSAGE_DATA = ByteString.copyFromUtf8("message-data");
private static final int DELIVERY_INFO_COUNT = 3;
private static final ReceivedMessage TEST_MESSAGE =
ReceivedMessage.newBuilder()
.setAckId("ackid")
.setMessage(PubsubMessage.newBuilder().setData(ByteString.EMPTY).build())
.setMessage(PubsubMessage.newBuilder().setData(MESSAGE_DATA).build())
.setDeliveryAttempt(DELIVERY_INFO_COUNT)
.build();
private static final Runnable NOOP_RUNNABLE =
new Runnable() {
Expand Down Expand Up @@ -78,6 +81,9 @@ public void setUp() {
new MessageReceiver() {
@Override
public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
assertThat(message.getData()).isEqualTo(MESSAGE_DATA);
assertThat(message.getAttributesOrThrow("googclient_deliveryattempt"))
.isEqualTo(Integer.toString(DELIVERY_INFO_COUNT));
consumers.add(consumer);
}
};
Expand Down
Expand Up @@ -84,6 +84,19 @@ public void tearDown() throws Exception {
testChannel.shutdown();
}

@Test
public void testDeliveryAttemptHelper() {
int deliveryAttempt = 3;
PubsubMessage message =
PubsubMessage.newBuilder()
.putAttributes("googclient_deliveryattempt", Integer.toString(deliveryAttempt))
.build();
assertEquals(Subscriber.getDeliveryAttempt(message), deliveryAttempt);

PubsubMessage emptyMessage = PubsubMessage.newBuilder().build();
assertEquals(Subscriber.getDeliveryAttempt(emptyMessage), 0);
}

@Test
public void testOpenedChannels() throws Exception {
int expectedChannelCount = 1;
Expand Down