From e174e2043f64563f4d2868537aeb90d948233166 Mon Sep 17 00:00:00 2001 From: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com> Date: Mon, 15 Mar 2021 21:53:40 -0400 Subject: [PATCH] fix: update MessageDispatcher to not extend deadlines of messages which arrive early to 60s (#570) * Update MessageDispatcher to not extend deadlines of messages which arrive early to 60s * fix: lint * fix: record minimum value in distribution by default --- .../java/com/google/cloud/pubsub/v1/MessageDispatcher.java | 7 ++++--- .../main/java/com/google/cloud/pubsub/v1/Subscriber.java | 4 ++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index f0eab3ba6..4177c6e01 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -82,9 +82,10 @@ class MessageDispatcher { private final LinkedBlockingQueue pendingNacks = new LinkedBlockingQueue<>(); private final LinkedBlockingQueue pendingReceipts = new LinkedBlockingQueue<>(); - // The deadline should be set before use. Here, set it to something unreasonable, - // so we fail loudly if we mess up. - private final AtomicInteger messageDeadlineSeconds = new AtomicInteger(60); + // Start the deadline at the minimum ack deadline so messages which arrive before this is + // updated will not have a long ack deadline. + private final AtomicInteger messageDeadlineSeconds = + new AtomicInteger(Subscriber.MIN_ACK_DEADLINE_SECONDS); private final AtomicBoolean extendDeadline = new AtomicBoolean(true); private final Lock jobLock; private ScheduledFuture backgroundJob; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 4c3791b37..9de0420ea 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -184,9 +184,9 @@ public Void apply(UnaryCallSettings.Builder settingsBuilder) { streamingSubscriberConnections = new ArrayList(numPullers); // We regularly look up the distribution for a good subscription deadline. - // So we seed the distribution with something reasonable to start with. + // So we seed the distribution with the minimum value to start with. // Distribution is percentile-based, so this value will eventually lose importance. - ackLatencyDistribution.record(60); + ackLatencyDistribution.record(MIN_ACK_DEADLINE_SECONDS); } /**