From 9c750c8e4899ae994a788c5323fc456ac5d7aa38 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Wed, 24 Jun 2020 13:41:56 -0400 Subject: [PATCH] feat: Send streaming pull flow control settings to server (#267) * feat: Add flow control support to publisher * make suggested fixes * chore: Remove note that ordering keys requires enablements. * feat: Add support for server-side flow control * Revert "chore: Remove note that ordering keys requires enablements." This reverts commit 9c113c3e32c28cf0d1de8aad3409b5c509fb1ada. * fix: Fix import order --- .../cloud/pubsub/v1/StreamingSubscriberConnection.java | 7 +++++++ .../main/java/com/google/cloud/pubsub/v1/Subscriber.java | 1 + 2 files changed, 8 insertions(+) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 047e1ba75..885554f74 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -24,6 +24,7 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.Distribution; import com.google.api.gax.grpc.GrpcCallContext; @@ -71,6 +72,8 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final ScheduledExecutorService systemExecutor; private final MessageDispatcher messageDispatcher; + private final FlowControlSettings flowControlSettings; + private final AtomicLong channelReconnectBackoffMillis = new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis()); private final Waiter ackOperationsWaiter = new Waiter(); @@ -93,6 +96,7 @@ public StreamingSubscriberConnection( Distribution ackLatencyDistribution, SubscriberStub stub, int channelAffinity, + FlowControlSettings flowControlSettings, FlowController flowController, ScheduledExecutorService executor, ScheduledExecutorService systemExecutor, @@ -112,6 +116,7 @@ public StreamingSubscriberConnection( executor, systemExecutor, clock); + this.flowControlSettings = flowControlSettings; } @Override @@ -209,6 +214,8 @@ private void initialize() { .setSubscription(subscription) .setStreamAckDeadlineSeconds(60) .setClientId(clientId) + .setMaxOutstandingMessages(flowControlSettings.getMaxOutstandingElementCount()) + .setMaxOutstandingBytes(flowControlSettings.getMaxOutstandingRequestBytes()) .build()); /** diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 0054408ee..bd30bb112 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -332,6 +332,7 @@ private void startStreamingConnections() { ackLatencyDistribution, subStub, i, + flowControlSettings, flowController, executor, alarmsExecutor,