diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 047e1ba75..885554f74 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -24,6 +24,7 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.Distribution; import com.google.api.gax.grpc.GrpcCallContext; @@ -71,6 +72,8 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final ScheduledExecutorService systemExecutor; private final MessageDispatcher messageDispatcher; + private final FlowControlSettings flowControlSettings; + private final AtomicLong channelReconnectBackoffMillis = new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis()); private final Waiter ackOperationsWaiter = new Waiter(); @@ -93,6 +96,7 @@ public StreamingSubscriberConnection( Distribution ackLatencyDistribution, SubscriberStub stub, int channelAffinity, + FlowControlSettings flowControlSettings, FlowController flowController, ScheduledExecutorService executor, ScheduledExecutorService systemExecutor, @@ -112,6 +116,7 @@ public StreamingSubscriberConnection( executor, systemExecutor, clock); + this.flowControlSettings = flowControlSettings; } @Override @@ -209,6 +214,8 @@ private void initialize() { .setSubscription(subscription) .setStreamAckDeadlineSeconds(60) .setClientId(clientId) + .setMaxOutstandingMessages(flowControlSettings.getMaxOutstandingElementCount()) + .setMaxOutstandingBytes(flowControlSettings.getMaxOutstandingRequestBytes()) .build()); /** diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 0054408ee..bd30bb112 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -332,6 +332,7 @@ private void startStreamingConnections() { ackLatencyDistribution, subStub, i, + flowControlSettings, flowController, executor, alarmsExecutor,