Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: update MessageDispatcher to not extend deadlines of messages whi…
…ch arrive early to 60s (#570)

* Update MessageDispatcher to not extend deadlines of messages which arrive early to 60s

* fix: lint

* fix: record minimum value in distribution by default
  • Loading branch information
dpcollins-google committed Mar 16, 2021
1 parent 4b98556 commit e174e20
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
Expand Up @@ -82,9 +82,10 @@ class MessageDispatcher {
private final LinkedBlockingQueue<String> pendingNacks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<String> pendingReceipts = new LinkedBlockingQueue<>();

// The deadline should be set before use. Here, set it to something unreasonable,
// so we fail loudly if we mess up.
private final AtomicInteger messageDeadlineSeconds = new AtomicInteger(60);
// Start the deadline at the minimum ack deadline so messages which arrive before this is
// updated will not have a long ack deadline.
private final AtomicInteger messageDeadlineSeconds =
new AtomicInteger(Subscriber.MIN_ACK_DEADLINE_SECONDS);
private final AtomicBoolean extendDeadline = new AtomicBoolean(true);
private final Lock jobLock;
private ScheduledFuture<?> backgroundJob;
Expand Down
Expand Up @@ -184,9 +184,9 @@ public Void apply(UnaryCallSettings.Builder<?, ?> settingsBuilder) {
streamingSubscriberConnections = new ArrayList<StreamingSubscriberConnection>(numPullers);

// We regularly look up the distribution for a good subscription deadline.
// So we seed the distribution with something reasonable to start with.
// So we seed the distribution with the minimum value to start with.
// Distribution is percentile-based, so this value will eventually lose importance.
ackLatencyDistribution.record(60);
ackLatencyDistribution.record(MIN_ACK_DEADLINE_SECONDS);
}

/**
Expand Down

0 comments on commit e174e20

Please sign in to comment.