From 01e6dafb569a37c661463b79f5afbfba4f6d188d Mon Sep 17 00:00:00 2001 From: hannahrogers-google <52459909+hannahrogers-google@users.noreply.github.com> Date: Thu, 11 Mar 2021 10:22:20 -0800 Subject: [PATCH] feat: expose default client configs (#541) * Modifying Publish example in README to match other examples given, and fix issue #6784 * fix: Modifying Publish example in README to match other examples, and fix Issue #11 * feat: Adding support for DLQs Adding delivery attempt count to PubsubMessages as a message attribute, and creating helper function to allow users to get the count without knowing implementation details. * Fix formatting * fix: making changes requested in pull request * fix: creating fix to not populate delivery attempt attribute when dead lettering is not enabled * Adding unit test for case in which a received message has no delivery attempt * Making MessageWaiter class more generic to also be used for outstanding ack operations * Waiting for acks to complete before shutting down a streaming subscriber connection * Fixing formatting error * fix: making sure all publishes complete before shutting down the publisher * adding default max outstanding request bytes * fix: make push endpoint valid https * fix: use default zero value if a flow control setting is not provided * fix lint issues * fix: better cleanup during publisher test * fix: format issues * fix: test timeouts should be a minute * fix: make subscriberIt less flaky * fix: add deprecated tag for deprecated methods * fix: retrying sync pulls in subscriberIT test * Revert "fix: retrying sync pulls in subscriberIT test" This reverts commit fef99561335f6e60e70ca7d0d00ac4d74a9f431e. * fix: do not send ModAck if auto ack extensions are disabled * Adding test for zero ack extension * feat: expose default client configs * fix: revert unexpected changes --- .../com/google/cloud/pubsub/v1/Publisher.java | 5 ++++ .../google/cloud/pubsub/v1/Subscriber.java | 16 ++++++---- .../cloud/pubsub/v1/PublisherImplTest.java | 29 ++++++++++++++++++ .../cloud/pubsub/v1/SubscriberTest.java | 30 +++++++++++++++++++ 4 files changed, 75 insertions(+), 5 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index aa140d7b9..d29a619fd 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -820,6 +820,11 @@ public Builder setEndpoint(String endpoint) { return this; } + /** Returns the default BatchingSettings used by the client if settings are not provided. */ + public static BatchingSettings getDefaultBatchingSettings() { + return DEFAULT_BATCHING_SETTINGS; + } + public Publisher build() throws IOException { return new Publisher(this); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index cca9986a8..4c3791b37 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -414,6 +414,11 @@ private void stopConnections(List connections) { /** Builder of {@link Subscriber Subscribers}. */ public static final class Builder { private static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60); + static final FlowControlSettings DEFAULT_FLOW_CONTROL_SETTINGS = + FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(1000L) + .setMaxOutstandingRequestBytes(100L * 1024L * 1024L) // 100MB + .build(); private static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = InstantiatingExecutorProvider.newBuilder() @@ -428,11 +433,7 @@ public static final class Builder { private Duration maxDurationPerAckExtension = DEFAULT_MAX_DURATION_PER_ACK_EXTENSION; private boolean useLegacyFlowControl = false; - private FlowControlSettings flowControlSettings = - FlowControlSettings.newBuilder() - .setMaxOutstandingElementCount(1000L) - .setMaxOutstandingRequestBytes(100L * 1024L * 1024L) // 100MB - .build(); + private FlowControlSettings flowControlSettings = DEFAULT_FLOW_CONTROL_SETTINGS; private ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER; private ExecutorProvider systemExecutorProvider = null; @@ -597,6 +598,11 @@ Builder setClock(ApiClock clock) { return this; } + /** Returns the default FlowControlSettings used by the client if settings are not provided. */ + public static FlowControlSettings getDefaultFlowControlSettings() { + return DEFAULT_FLOW_CONTROL_SETTINGS; + } + public Subscriber build() { if (systemExecutorProvider == null) { ThreadFactory threadFactory = diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index e5a785aed..3da657061 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -955,6 +955,35 @@ public void testBuilderInvalidArguments() { } } + @Test + public void testPartialBatchingSettings() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.getDefaultBatchingSettings() + .toBuilder() + .setRequestByteThreshold(5000L) + .build()) + .build(); + assertEquals((long) publisher.getBatchingSettings().getRequestByteThreshold(), 5000); + assertEquals( + publisher.getBatchingSettings().getElementCountThreshold(), + Publisher.Builder.DEFAULT_BATCHING_SETTINGS.getElementCountThreshold()); + + publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.getDefaultBatchingSettings() + .toBuilder() + .setElementCountThreshold(500L) + .build()) + .build(); + assertEquals((long) publisher.getBatchingSettings().getElementCountThreshold(), 500); + assertEquals( + publisher.getBatchingSettings().getRequestByteThreshold(), + Publisher.Builder.DEFAULT_BATCHING_SETTINGS.getRequestByteThreshold()); + } + @Test public void testAwaitTermination() throws Exception { Publisher publisher = diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index 5ebd8cb81..4491b6ef9 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -289,6 +289,36 @@ public void testStreamAckDeadlineIsSetCorrectly() throws Exception { subscriber.stopAsync().awaitTerminated(); } + @Test + public void testPartialFlowControlSettings() throws Exception { + Subscriber subscriber = + getTestSubscriberBuilder(testReceiver) + .setFlowControlSettings( + Subscriber.Builder.DEFAULT_FLOW_CONTROL_SETTINGS + .toBuilder() + .setMaxOutstandingElementCount(500L) + .build()) + .build(); + assertEquals((long) subscriber.getFlowControlSettings().getMaxOutstandingElementCount(), 500); + assertEquals( + subscriber.getFlowControlSettings().getMaxOutstandingRequestBytes(), + Subscriber.Builder.DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingRequestBytes()); + + subscriber = + getTestSubscriberBuilder(testReceiver) + .setFlowControlSettings( + Subscriber.Builder.DEFAULT_FLOW_CONTROL_SETTINGS + .toBuilder() + .setMaxOutstandingRequestBytes(5_000_000_000L) + .build()) + .build(); + assertEquals( + (long) subscriber.getFlowControlSettings().getMaxOutstandingRequestBytes(), 5_000_000_000L); + assertEquals( + subscriber.getFlowControlSettings().getMaxOutstandingElementCount(), + Subscriber.Builder.DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingElementCount()); + } + private Subscriber startSubscriber(Builder testSubscriberBuilder) { Subscriber subscriber = testSubscriberBuilder.build(); subscriber.startAsync().awaitRunning();