From 1427b8ced4d7f75e4cb1b57dd898b18f0d232d19 Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Mon, 6 Jul 2020 16:34:06 -0400 Subject: [PATCH] feat: implement max duration per ack extension (#211) * feat: implement max duration per ack extension * fix: update coverage * fix: requested changes --- .../cloud/pubsub/v1/MessageDispatcher.java | 7 +++++++ .../v1/StreamingSubscriberConnection.java | 2 ++ .../google/cloud/pubsub/v1/Subscriber.java | 20 +++++++++++++++++++ .../pubsub/v1/MessageDispatcherTest.java | 13 ++++++++++++ .../cloud/pubsub/v1/SubscriberTest.java | 4 +++- 5 files changed, 45 insertions(+), 1 deletion(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index c3f1a46cd..f0eab3ba6 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -68,6 +68,7 @@ class MessageDispatcher { private final Duration ackExpirationPadding; private final Duration maxAckExtensionPeriod; + private final int maxSecondsPerAckExtension; private final MessageReceiver receiver; private final AckProcessor ackProcessor; @@ -190,6 +191,7 @@ void sendAckOperations( AckProcessor ackProcessor, Duration ackExpirationPadding, Duration maxAckExtensionPeriod, + Duration maxDurationPerAckExtension, Distribution ackLatencyDistribution, FlowController flowController, Executor executor, @@ -199,6 +201,7 @@ void sendAckOperations( this.systemExecutor = systemExecutor; this.ackExpirationPadding = ackExpirationPadding; this.maxAckExtensionPeriod = maxAckExtensionPeriod; + this.maxSecondsPerAckExtension = Math.toIntExact(maxDurationPerAckExtension.getSeconds()); this.receiver = receiver; this.ackProcessor = ackProcessor; this.flowController = flowController; @@ -407,6 +410,10 @@ public void run() { int computeDeadlineSeconds() { int sec = ackLatencyDistribution.getPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES); + if ((maxSecondsPerAckExtension > 0) && (sec > maxSecondsPerAckExtension)) { + sec = maxSecondsPerAckExtension; + } + // Use Ints.constrainToRange when we get guava 21. if (sec < Subscriber.MIN_ACK_DEADLINE_SECONDS) { sec = Subscriber.MIN_ACK_DEADLINE_SECONDS; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 885554f74..19c2c79e3 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -93,6 +93,7 @@ public StreamingSubscriberConnection( MessageReceiver receiver, Duration ackExpirationPadding, Duration maxAckExtensionPeriod, + Duration maxDurationPerAckExtension, Distribution ackLatencyDistribution, SubscriberStub stub, int channelAffinity, @@ -111,6 +112,7 @@ public StreamingSubscriberConnection( this, ackExpirationPadding, maxAckExtensionPeriod, + maxDurationPerAckExtension, ackLatencyDistribution, flowController, executor, diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index bd30bb112..35c50fdb6 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -104,6 +104,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac private final String subscriptionName; private final FlowControlSettings flowControlSettings; private final Duration maxAckExtensionPeriod; + private final Duration maxDurationPerAckExtension; // The ExecutorProvider used to generate executors for processing messages. private final ExecutorProvider executorProvider; // An instantiation of the SystemExecutorProvider used for processing acks @@ -128,6 +129,7 @@ private Subscriber(Builder builder) { subscriptionName = builder.subscriptionName; maxAckExtensionPeriod = builder.maxAckExtensionPeriod; + maxDurationPerAckExtension = builder.maxDurationPerAckExtension; clock = builder.clock.isPresent() ? builder.clock.get() : CurrentMillisClock.getDefaultClock(); flowController = @@ -329,6 +331,7 @@ private void startStreamingConnections() { receiver, ACK_EXPIRATION_PADDING, maxAckExtensionPeriod, + maxDurationPerAckExtension, ackLatencyDistribution, subStub, i, @@ -415,6 +418,7 @@ public static final class Builder { private MessageReceiver receiver; private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD; + private Duration maxDurationPerAckExtension = Duration.ofMillis(0); private FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder() @@ -515,6 +519,22 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) { return this; } + /** + * Set the upper bound for a single mod ack extention period. + * + *

The ack deadline will continue to be extended by up to this duration until + * MaxAckExtensionPeriod is reached. Setting MaxDurationPerAckExtension bounds the maximum + * amount of time before a mesage re-delivery in the event the Subscriber fails to extend the + * deadline. + * + *

MaxDurationPerAckExtension configuration can be disabled by specifying a zero duration. + */ + public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension) { + Preconditions.checkArgument(maxDurationPerAckExtension.toMillis() >= 0); + this.maxDurationPerAckExtension = maxDurationPerAckExtension; + return this; + } + /** * Gives the ability to set a custom executor. {@link ExecutorProvider#getExecutor()} will be * called {@link Builder#parallelPullCount} times. diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 1745b370b..88a015f6e 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -54,6 +54,7 @@ public void run() { // No-op; don't do anything. } }; + private static final int MAX_SECONDS_PER_ACK_EXTENSION = 60; private MessageDispatcher dispatcher; private LinkedBlockingQueue consumers; @@ -128,6 +129,7 @@ public void sendAckOperations( processor, Duration.ofSeconds(5), Duration.ofMinutes(60), + Duration.ofSeconds(MAX_SECONDS_PER_ACK_EXTENSION), new Distribution(Subscriber.MAX_ACK_DEADLINE_SECONDS + 1), flowController, MoreExecutors.directExecutor(), @@ -235,4 +237,15 @@ public void testDeadlineAdjustment() throws Exception { assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(42); } + + @Test + public void testMaxDurationPerAckExtension() throws Exception { + assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(10); + + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); + clock.advance(MAX_SECONDS_PER_ACK_EXTENSION + 5, TimeUnit.SECONDS); + consumers.take().ack(); + + assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(MAX_SECONDS_PER_ACK_EXTENSION); + } } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index 95ad58d80..4659297c0 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -45,6 +45,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import org.threeten.bp.Duration; /** Tests for {@link Subscriber}. */ public class SubscriberTest { @@ -236,6 +237,7 @@ private Builder getTestSubscriberBuilder(MessageReceiver receiver) { FixedTransportChannelProvider.create(GrpcTransportChannel.create(testChannel))) .setCredentialsProvider(NoCredentialsProvider.create()) .setClock(fakeExecutor.getClock()) - .setParallelPullCount(1); + .setParallelPullCount(1) + .setMaxDurationPerAckExtension(Duration.ofSeconds(5)); } }