Skip to content

Commit

Permalink
feat: expose default client configs (#541)
Browse files Browse the repository at this point in the history
* Modifying Publish example in README to match other examples given, and
fix issue #6784

* fix: Modifying Publish example in README to match other examples, and
fix Issue #11

* feat: Adding support for DLQs

Adding delivery attempt count to PubsubMessages as a message attribute,
and creating helper function to allow users to get the count without
knowing implementation details.

* Fix formatting

* fix: making changes requested in pull request

* fix: creating fix to not populate delivery attempt attribute when dead
lettering is not enabled

* Adding unit test for case in which a received message has no delivery attempt

* Making MessageWaiter class more generic to also be used for outstanding
ack operations

* Waiting for acks to complete before shutting down a streaming subscriber
connection

* Fixing formatting error

* fix: making sure all publishes complete before shutting down the
publisher

* adding default max outstanding request bytes

* fix: make push endpoint valid https

* fix: use default zero value if a flow control setting is not provided

* fix lint issues

* fix: better cleanup during publisher test

* fix: format issues

* fix: test timeouts should be a minute

* fix: make subscriberIt less flaky

* fix: add deprecated tag for deprecated methods

* fix: retrying sync pulls in subscriberIT test

* Revert "fix: retrying sync pulls in subscriberIT test"

This reverts commit fef9956.

* fix: do not send ModAck if auto ack extensions are disabled

* Adding test for zero ack extension

* feat: expose default client configs

* fix: revert unexpected changes
  • Loading branch information
hannahrogers-google committed Mar 11, 2021
1 parent 74e6a92 commit 01e6daf
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 5 deletions.
Expand Up @@ -820,6 +820,11 @@ public Builder setEndpoint(String endpoint) {
return this;
}

/** Returns the default BatchingSettings used by the client if settings are not provided. */
public static BatchingSettings getDefaultBatchingSettings() {
return DEFAULT_BATCHING_SETTINGS;
}

public Publisher build() throws IOException {
return new Publisher(this);
}
Expand Down
Expand Up @@ -414,6 +414,11 @@ private void stopConnections(List<? extends ApiService> connections) {
/** Builder of {@link Subscriber Subscribers}. */
public static final class Builder {
private static final Duration DEFAULT_MAX_ACK_EXTENSION_PERIOD = Duration.ofMinutes(60);
static final FlowControlSettings DEFAULT_FLOW_CONTROL_SETTINGS =
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1000L)
.setMaxOutstandingRequestBytes(100L * 1024L * 1024L) // 100MB
.build();

private static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
InstantiatingExecutorProvider.newBuilder()
Expand All @@ -428,11 +433,7 @@ public static final class Builder {
private Duration maxDurationPerAckExtension = DEFAULT_MAX_DURATION_PER_ACK_EXTENSION;

private boolean useLegacyFlowControl = false;
private FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1000L)
.setMaxOutstandingRequestBytes(100L * 1024L * 1024L) // 100MB
.build();
private FlowControlSettings flowControlSettings = DEFAULT_FLOW_CONTROL_SETTINGS;

private ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER;
private ExecutorProvider systemExecutorProvider = null;
Expand Down Expand Up @@ -597,6 +598,11 @@ Builder setClock(ApiClock clock) {
return this;
}

/** Returns the default FlowControlSettings used by the client if settings are not provided. */
public static FlowControlSettings getDefaultFlowControlSettings() {
return DEFAULT_FLOW_CONTROL_SETTINGS;
}

public Subscriber build() {
if (systemExecutorProvider == null) {
ThreadFactory threadFactory =
Expand Down
Expand Up @@ -955,6 +955,35 @@ public void testBuilderInvalidArguments() {
}
}

@Test
public void testPartialBatchingSettings() throws Exception {
Publisher publisher =
getTestPublisherBuilder()
.setBatchingSettings(
Publisher.Builder.getDefaultBatchingSettings()
.toBuilder()
.setRequestByteThreshold(5000L)
.build())
.build();
assertEquals((long) publisher.getBatchingSettings().getRequestByteThreshold(), 5000);
assertEquals(
publisher.getBatchingSettings().getElementCountThreshold(),
Publisher.Builder.DEFAULT_BATCHING_SETTINGS.getElementCountThreshold());

publisher =
getTestPublisherBuilder()
.setBatchingSettings(
Publisher.Builder.getDefaultBatchingSettings()
.toBuilder()
.setElementCountThreshold(500L)
.build())
.build();
assertEquals((long) publisher.getBatchingSettings().getElementCountThreshold(), 500);
assertEquals(
publisher.getBatchingSettings().getRequestByteThreshold(),
Publisher.Builder.DEFAULT_BATCHING_SETTINGS.getRequestByteThreshold());
}

@Test
public void testAwaitTermination() throws Exception {
Publisher publisher =
Expand Down
Expand Up @@ -289,6 +289,36 @@ public void testStreamAckDeadlineIsSetCorrectly() throws Exception {
subscriber.stopAsync().awaitTerminated();
}

@Test
public void testPartialFlowControlSettings() throws Exception {
Subscriber subscriber =
getTestSubscriberBuilder(testReceiver)
.setFlowControlSettings(
Subscriber.Builder.DEFAULT_FLOW_CONTROL_SETTINGS
.toBuilder()
.setMaxOutstandingElementCount(500L)
.build())
.build();
assertEquals((long) subscriber.getFlowControlSettings().getMaxOutstandingElementCount(), 500);
assertEquals(
subscriber.getFlowControlSettings().getMaxOutstandingRequestBytes(),
Subscriber.Builder.DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingRequestBytes());

subscriber =
getTestSubscriberBuilder(testReceiver)
.setFlowControlSettings(
Subscriber.Builder.DEFAULT_FLOW_CONTROL_SETTINGS
.toBuilder()
.setMaxOutstandingRequestBytes(5_000_000_000L)
.build())
.build();
assertEquals(
(long) subscriber.getFlowControlSettings().getMaxOutstandingRequestBytes(), 5_000_000_000L);
assertEquals(
subscriber.getFlowControlSettings().getMaxOutstandingElementCount(),
Subscriber.Builder.DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingElementCount());
}

private Subscriber startSubscriber(Builder testSubscriberBuilder) {
Subscriber subscriber = testSubscriberBuilder.build();
subscriber.startAsync().awaitRunning();
Expand Down

0 comments on commit 01e6daf

Please sign in to comment.