From c63dc255dff55982fca2e4cf0e955c47614dc291 Mon Sep 17 00:00:00 2001 From: Jaume Marhuenda Date: Mon, 23 Nov 2020 16:03:51 -0500 Subject: [PATCH] feat: make the stream ack deadline congruent with maxDurationPerAckExtension (#447) * feat: allow to override the stream ack deadline * set streamAckDeadline to be congruent with maxDurationPerAckExtension * remove extra impot * remove unused DEFAULT_STREAM_ACK_DEADLINE * consider the case in which maxDurationPerAckExtension is not set * add test * test fix Co-authored-by: Jaume Marhuenda-Beltran --- .../v1/StreamingSubscriberConnection.java | 17 ++++- .../google/cloud/pubsub/v1/Subscriber.java | 3 +- .../pubsub/v1/FakeSubscriberServiceImpl.java | 14 ++++ .../cloud/pubsub/v1/SubscriberTest.java | 65 +++++++++++++++++++ 4 files changed, 97 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 1587afb91..98e055715 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -16,6 +16,7 @@ package com.google.cloud.pubsub.v1; +import static com.google.cloud.pubsub.v1.Subscriber.DEFAULT_MAX_DURATION_PER_ACK_EXTENSION; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import com.google.api.core.AbstractApiService; @@ -23,6 +24,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import com.google.api.core.InternalApi; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController; @@ -62,10 +64,14 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private static final Logger logger = Logger.getLogger(StreamingSubscriberConnection.class.getName()); + @InternalApi static final Duration DEFAULT_STREAM_ACK_DEADLINE = Duration.ofSeconds(60); + @InternalApi static final Duration MAX_STREAM_ACK_DEADLINE = Duration.ofSeconds(600); + @InternalApi static final Duration MIN_STREAM_ACK_DEADLINE = Duration.ofSeconds(10); private static final Duration INITIAL_CHANNEL_RECONNECT_BACKOFF = Duration.ofMillis(100); private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10); private static final int MAX_PER_REQUEST_CHANGES = 1000; + private final Duration streamAckDeadline; private final SubscriberStub stub; private final int channelAffinity; private final String subscription; @@ -106,6 +112,15 @@ public StreamingSubscriberConnection( ApiClock clock) { this.subscription = subscription; this.systemExecutor = systemExecutor; + if (maxDurationPerAckExtension.compareTo(DEFAULT_MAX_DURATION_PER_ACK_EXTENSION) == 0) { + this.streamAckDeadline = DEFAULT_STREAM_ACK_DEADLINE; + } else if (maxDurationPerAckExtension.compareTo(MIN_STREAM_ACK_DEADLINE) < 0) { + this.streamAckDeadline = MIN_STREAM_ACK_DEADLINE; + } else if (maxDurationPerAckExtension.compareTo(MAX_STREAM_ACK_DEADLINE) > 0) { + this.streamAckDeadline = MAX_STREAM_ACK_DEADLINE; + } else { + this.streamAckDeadline = maxDurationPerAckExtension; + } this.stub = stub; this.channelAffinity = channelAffinity; this.messageDispatcher = @@ -217,7 +232,7 @@ private void initialize() { initClientStream.send( StreamingPullRequest.newBuilder() .setSubscription(subscription) - .setStreamAckDeadlineSeconds(60) + .setStreamAckDeadlineSeconds((int) streamAckDeadline.getSeconds()) .setClientId(clientId) .setMaxOutstandingMessages( this.useLegacyFlowControl diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 6d5946276..948de0b7a 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -91,6 +91,7 @@ * details. */ public class Subscriber extends AbstractApiService implements SubscriberInterface { + @InternalApi static final Duration DEFAULT_MAX_DURATION_PER_ACK_EXTENSION = Duration.ofMillis(0); private static final int THREADS_PER_CHANNEL = 5; private static final int MAX_INBOUND_MESSAGE_SIZE = 20 * 1024 * 1024; // 20MB API maximum message size. @@ -421,7 +422,7 @@ public static final class Builder { private MessageReceiver receiver; private Duration maxAckExtensionPeriod = DEFAULT_MAX_ACK_EXTENSION_PERIOD; - private Duration maxDurationPerAckExtension = Duration.ofMillis(0); + private Duration maxDurationPerAckExtension = DEFAULT_MAX_DURATION_PER_ACK_EXTENSION; private boolean useLegacyFlowControl = false; private FlowControlSettings flowControlSettings = diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeSubscriberServiceImpl.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeSubscriberServiceImpl.java index c6336fa1c..260071b9b 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeSubscriberServiceImpl.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeSubscriberServiceImpl.java @@ -51,6 +51,7 @@ class FakeSubscriberServiceImpl extends SubscriberImplBase { private final AtomicInteger messageAckDeadline = new AtomicInteger(Subscriber.MIN_ACK_DEADLINE_SECONDS); private final AtomicInteger getSubscriptionCalled = new AtomicInteger(); + private StreamingPullRequest lastSeenRequest; private final List openedStreams = new ArrayList<>(); private final List closedStreams = new ArrayList<>(); private final List acks = new ArrayList<>(); @@ -139,6 +140,7 @@ public void onNext(StreamingPullRequest request) { subscriptionInitialized.notifyAll(); } } + setLastSeenRequest(request); addOpenedStream(stream); stream.notifyAll(); } @@ -292,6 +294,18 @@ private static void waitAtLeast(Collection collection, int target) } } + public StreamingPullRequest getLastSeenRequest() { + synchronized (lastSeenRequest) { + return lastSeenRequest; + } + } + + public void setLastSeenRequest(StreamingPullRequest lastSeenRequest) { + synchronized (lastSeenRequest) { + this.lastSeenRequest = lastSeenRequest; + } + } + private void addOpenedStream(Stream stream) { synchronized (openedStreams) { openedStreams.add(stream); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index 98d3927be..5ebd8cb81 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -16,6 +16,10 @@ package com.google.cloud.pubsub.v1; +import static com.google.cloud.pubsub.v1.StreamingSubscriberConnection.DEFAULT_STREAM_ACK_DEADLINE; +import static com.google.cloud.pubsub.v1.StreamingSubscriberConnection.MAX_STREAM_ACK_DEADLINE; +import static com.google.cloud.pubsub.v1.StreamingSubscriberConnection.MIN_STREAM_ACK_DEADLINE; +import static com.google.cloud.pubsub.v1.Subscriber.DEFAULT_MAX_DURATION_PER_ACK_EXTENSION; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -224,6 +228,67 @@ public ScheduledExecutorService getExecutor() { } } + @Test + public void testStreamAckDeadlineIsSetCorrectly() throws Exception { + int expectedChannelCount = 1; + // Deadline is smaller than the allowed streamAckDeadline. + int maxDurationPerAckExtension = 5; + + Subscriber subscriber = + startSubscriber( + getTestSubscriberBuilder(testReceiver) + .setMaxDurationPerAckExtension(Duration.ofSeconds(maxDurationPerAckExtension))); + assertEquals( + expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount)); + assertEquals( + MIN_STREAM_ACK_DEADLINE.getSeconds(), + fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds()); + + subscriber.stopAsync().awaitTerminated(); + + // Deadline is higher than the allowed streamAckDeadline. + maxDurationPerAckExtension = 700; + subscriber = + startSubscriber( + getTestSubscriberBuilder(testReceiver) + .setMaxDurationPerAckExtension(Duration.ofSeconds(maxDurationPerAckExtension))); + assertEquals( + expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount)); + assertEquals( + MAX_STREAM_ACK_DEADLINE.getSeconds(), + fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds()); + + subscriber.stopAsync().awaitTerminated(); + + // Deadline is within the allowed limits for streamAckDeadline. + maxDurationPerAckExtension = 100; + subscriber = + startSubscriber( + getTestSubscriberBuilder(testReceiver) + .setMaxDurationPerAckExtension(Duration.ofSeconds(maxDurationPerAckExtension))); + assertEquals( + expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount)); + assertEquals( + maxDurationPerAckExtension, + fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds()); + + subscriber.stopAsync().awaitTerminated(); + + // maxDurationPerAckExtension is unset. + maxDurationPerAckExtension = (int) DEFAULT_MAX_DURATION_PER_ACK_EXTENSION.getSeconds(); + subscriber = + startSubscriber( + getTestSubscriberBuilder(testReceiver) + .setMaxDurationPerAckExtension(Duration.ofSeconds(maxDurationPerAckExtension))); + assertEquals( + expectedChannelCount, fakeSubscriberServiceImpl.waitForOpenedStreams(expectedChannelCount)); + assertEquals( + DEFAULT_STREAM_ACK_DEADLINE.getSeconds(), + fakeSubscriberServiceImpl.getLastSeenRequest().getStreamAckDeadlineSeconds()); + + subscriber.stopAsync().awaitTerminated(); + } + private Subscriber startSubscriber(Builder testSubscriberBuilder) { Subscriber subscriber = testSubscriberBuilder.build(); subscriber.startAsync().awaitRunning();