From d56155b640d13fa78d28031862aabe4814ce201d Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Fri, 24 Jul 2020 14:21:12 -0700 Subject: [PATCH] fix: if a flow control setting is not provided use zero (#292) * fix: use default zero value if a flow control setting is not provided * fix lint issues --- .../cloud/pubsub/v1/StreamingSubscriberConnection.java | 10 ++++++++-- .../com/google/cloud/pubsub/v1/SubscriberTest.java | 5 ++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 19c2c79e3..f4e330ef1 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -216,8 +216,10 @@ private void initialize() { .setSubscription(subscription) .setStreamAckDeadlineSeconds(60) .setClientId(clientId) - .setMaxOutstandingMessages(flowControlSettings.getMaxOutstandingElementCount()) - .setMaxOutstandingBytes(flowControlSettings.getMaxOutstandingRequestBytes()) + .setMaxOutstandingMessages( + valueOrZero(flowControlSettings.getMaxOutstandingElementCount())) + .setMaxOutstandingBytes( + valueOrZero(flowControlSettings.getMaxOutstandingRequestBytes())) .build()); /** @@ -281,6 +283,10 @@ public void run() { MoreExecutors.directExecutor()); } + private Long valueOrZero(Long value) { + return value != null ? value : 0; + } + private boolean isAlive() { State state = state(); // Read the state only once. return state == State.RUNNING || state == State.STARTING; diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index 4659297c0..98d3927be 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.FixedExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; @@ -238,6 +239,8 @@ private Builder getTestSubscriberBuilder(MessageReceiver receiver) { .setCredentialsProvider(NoCredentialsProvider.create()) .setClock(fakeExecutor.getClock()) .setParallelPullCount(1) - .setMaxDurationPerAckExtension(Duration.ofSeconds(5)); + .setMaxDurationPerAckExtension(Duration.ofSeconds(5)) + .setFlowControlSettings( + FlowControlSettings.newBuilder().setMaxOutstandingElementCount(1000L).build()); } }