diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index c3f1a46cd..f0eab3ba6 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -68,6 +68,7 @@ class MessageDispatcher { private final Duration ackExpirationPadding; private final Duration maxAckExtensionPeriod; + private final int maxSecondsPerAckExtension; private final MessageReceiver receiver; private final AckProcessor ackProcessor; @@ -190,6 +191,7 @@ void sendAckOperations( AckProcessor ackProcessor, Duration ackExpirationPadding, Duration maxAckExtensionPeriod, + Duration maxDurationPerAckExtension, Distribution ackLatencyDistribution, FlowController flowController, Executor executor, @@ -199,6 +201,7 @@ void sendAckOperations( this.systemExecutor = systemExecutor; this.ackExpirationPadding = ackExpirationPadding; this.maxAckExtensionPeriod = maxAckExtensionPeriod; + this.maxSecondsPerAckExtension = Math.toIntExact(maxDurationPerAckExtension.getSeconds()); this.receiver = receiver; this.ackProcessor = ackProcessor; this.flowController = flowController; @@ -407,6 +410,10 @@ public void run() { int computeDeadlineSeconds() { int sec = ackLatencyDistribution.getPercentile(PERCENTILE_FOR_ACK_DEADLINE_UPDATES); + if ((maxSecondsPerAckExtension > 0) && (sec > maxSecondsPerAckExtension)) { + sec = maxSecondsPerAckExtension; + } + // Use Ints.constrainToRange when we get guava 21. if (sec < Subscriber.MIN_ACK_DEADLINE_SECONDS) { sec = Subscriber.MIN_ACK_DEADLINE_SECONDS; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 885554f74..19c2c79e3 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -93,6 +93,7 @@ public StreamingSubscriberConnection( MessageReceiver receiver, Duration ackExpirationPadding, Duration maxAckExtensionPeriod, + Duration maxDurationPerAckExtension, Distribution ackLatencyDistribution, SubscriberStub stub, int channelAffinity, @@ -111,6 +112,7 @@ public StreamingSubscriberConnection( this, ackExpirationPadding, maxAckExtensionPeriod, + maxDurationPerAckExtension, ackLatencyDistribution, flowController, executor, diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index bd30bb112..35c50fdb6 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -104,6 +104,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac private final String subscriptionName; private final FlowControlSettings flowControlSettings; private final Duration maxAckExtensionPeriod; + private final Duration maxDurationPerAckExtension; // The ExecutorProvider used to generate executors for processing messages. private final ExecutorProvider executorProvider; // An instantiation of the SystemExecutorProvider used for processing acks @@ -128,6 +129,7 @@ private Subscriber(Builder builder) { subscriptionName = builder.subscriptionName; maxAckExtensionPeriod = builder.maxAckExtensionPeriod; + maxDurationPerAckExtension = builder.maxDurationPerAckExtension; clock = builder.clock.isPresent() ? builder.clock.get() : CurrentMillisClock.getDefaultClock(); flowController = @@ -329,6 +331,7 @@ private void startStreamingConnections() { receiver, ACK_EXPIRATION_PADDING, maxAckExtensionPeriod, + maxDurationPerAckExtension, ackLatencyDistribution, subStub, i, @@ -415,6 +418,7 @@ public static final class Builder { private MessageReceiver receiver; private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD; + private Duration maxDurationPerAckExtension = Duration.ofMillis(0); private FlowControlSettings flowControlSettings = FlowControlSettings.newBuilder() @@ -515,6 +519,22 @@ public Builder setMaxAckExtensionPeriod(Duration maxAckExtensionPeriod) { return this; } + /** + * Set the upper bound for a single mod ack extention period. + * + *
The ack deadline will continue to be extended by up to this duration until + * MaxAckExtensionPeriod is reached. Setting MaxDurationPerAckExtension bounds the maximum + * amount of time before a mesage re-delivery in the event the Subscriber fails to extend the + * deadline. + * + *
MaxDurationPerAckExtension configuration can be disabled by specifying a zero duration.
+ */
+ public Builder setMaxDurationPerAckExtension(Duration maxDurationPerAckExtension) {
+ Preconditions.checkArgument(maxDurationPerAckExtension.toMillis() >= 0);
+ this.maxDurationPerAckExtension = maxDurationPerAckExtension;
+ return this;
+ }
+
/**
* Gives the ability to set a custom executor. {@link ExecutorProvider#getExecutor()} will be
* called {@link Builder#parallelPullCount} times.
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java
index 1745b370b..88a015f6e 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java
@@ -54,6 +54,7 @@ public void run() {
// No-op; don't do anything.
}
};
+ private static final int MAX_SECONDS_PER_ACK_EXTENSION = 60;
private MessageDispatcher dispatcher;
private LinkedBlockingQueue