Skip to content

Commit

Permalink
fix: do not block forever if message size > flow control limits (#847)
Browse files Browse the repository at this point in the history
* Modifying Publish example in README to match other examples given, and
fix issue #6784

* fix: Modifying Publish example in README to match other examples, and
fix Issue #11

* feat: Adding support for DLQs

Adding delivery attempt count to PubsubMessages as a message attribute,
and creating helper function to allow users to get the count without
knowing implementation details.

* Fix formatting

* fix: making changes requested in pull request

* fix: creating fix to not populate delivery attempt attribute when dead
lettering is not enabled

* Adding unit test for case in which a received message has no delivery attempt

* Making MessageWaiter class more generic to also be used for outstanding
ack operations

* Waiting for acks to complete before shutting down a streaming subscriber
connection

* Fixing formatting error

* fix: making sure all publishes complete before shutting down the
publisher

* adding default max outstanding request bytes

* fix: make push endpoint valid https

* fix: use default zero value if a flow control setting is not provided

* fix lint issues

* fix: better cleanup during publisher test

* fix: format issues

* fix: test timeouts should be a minute

* fix: make subscriberIt less flaky

* fix: add deprecated tag for deprecated methods

* fix: retrying sync pulls in subscriberIT test

* Revert "fix: retrying sync pulls in subscriberIT test"

This reverts commit fef9956.

* fix: do not send ModAck if auto ack extensions are disabled

* Adding test for zero ack extension

* revert maxAckExtension changes

* fix: flow control blocks forever

* fix: throw flow control exception when message byte size is too big
  • Loading branch information
hannahrogers-google committed Sep 22, 2021
1 parent 374b8c3 commit f4ca4b2
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 24 deletions.
Expand Up @@ -27,6 +27,7 @@
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.BackgroundResourceAggregation;
import com.google.api.gax.core.CredentialsProvider;
Expand Down Expand Up @@ -779,6 +780,11 @@ public Builder setBatchingSettings(BatchingSettings batchingSettings) {
Preconditions.checkArgument(batchingSettings.getRequestByteThreshold() > 0);
Preconditions.checkNotNull(batchingSettings.getDelayThreshold());
Preconditions.checkArgument(batchingSettings.getDelayThreshold().toMillis() > 0);
FlowControlSettings flowControlSettings = batchingSettings.getFlowControlSettings();
if (flowControlSettings.getLimitExceededBehavior() != LimitExceededBehavior.Ignore) {
Preconditions.checkArgument(flowControlSettings.getMaxOutstandingElementCount() > 0);
Preconditions.checkArgument(flowControlSettings.getMaxOutstandingRequestBytes() > 0);
}
this.batchingSettings = batchingSettings;
return this;
}
Expand Down Expand Up @@ -859,6 +865,12 @@ private static class MessageFlowController {
}

void acquire(long messageSize) throws FlowController.FlowControlException {
if (messageSize > byteLimit) {
logger.log(
Level.WARNING,
"Attempted to publish message with byte size > request byte flow control limit.");
throw new FlowController.MaxOutstandingRequestBytesReachedException(byteLimit);
}
lock.lock();
try {
if (outstandingMessages >= messageLimit
Expand Down
Expand Up @@ -1019,7 +1019,57 @@ public void testShutDown() throws Exception {
}

@Test
public void testPublishFlowControl_throwException() throws Exception {
public void invalidFlowControlBytes_throwException() throws Exception {
try {
Publisher publisher =
getTestPublisherBuilder()
.setBatchingSettings(
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(1L)
.setDelayThreshold(Duration.ofSeconds(5))
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(
FlowController.LimitExceededBehavior.ThrowException)
.setMaxOutstandingElementCount(1L)
.setMaxOutstandingRequestBytes(0L)
.build())
.build())
.build();
fail("Expected an IllegalArgumentException");
} catch (Exception e) {
assertThat(e).isInstanceOf(IllegalArgumentException.class);
}
}

@Test
public void invalidFlowControlElementCount_throwException() throws Exception {
try {
Publisher publisher =
getTestPublisherBuilder()
.setBatchingSettings(
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(1L)
.setDelayThreshold(Duration.ofSeconds(5))
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(
FlowController.LimitExceededBehavior.ThrowException)
.setMaxOutstandingElementCount(0L)
.setMaxOutstandingRequestBytes(1000L)
.build())
.build())
.build();
fail("Expected an IllegalArgumentException");
} catch (Exception e) {
assertThat(e).isInstanceOf(IllegalArgumentException.class);
}
}

@Test
public void testMessageExceedsFlowControlLimits_throwException() throws Exception {
Publisher publisher =
getTestPublisherBuilder()
.setExecutorProvider(SINGLE_THREAD_EXECUTOR)
Expand All @@ -1030,39 +1080,57 @@ public void testPublishFlowControl_throwException() throws Exception {
.setDelayThreshold(Duration.ofSeconds(5))
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(
FlowController.LimitExceededBehavior.ThrowException)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.setMaxOutstandingElementCount(1L)
.setMaxOutstandingRequestBytes(10L)
.setMaxOutstandingRequestBytes(1L)
.build())
.build())
.build();

// Sending a message that is too large results in an exception.
ApiFuture<String> publishFuture1 = sendTestMessage(publisher, "AAAAAAAAAAA");
try {
publishFuture1.get();
fail("Should have thrown an FlowController.MaxOutstandingRequestBytesReachedException");
sendTestMessage(publisher, "AAAAAAAAAAAAAAAAAAAAA").get();
fail("Should have thrown a FlowController.MaxOutstandingRequestBytesReachedException");
} catch (ExecutionException e) {
assertThat(e.getCause())
.isInstanceOf(FlowController.MaxOutstandingRequestBytesReachedException.class);
}
}

// Sending a second message succeeds.
ApiFuture<String> publishFuture2 = sendTestMessage(publisher, "AAAA");
@Test
public void testPublishFlowControl_throwException() throws Exception {
Publisher publisher =
getTestPublisherBuilder()
.setExecutorProvider(SINGLE_THREAD_EXECUTOR)
.setBatchingSettings(
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(1L)
.setDelayThreshold(Duration.ofSeconds(5))
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(
FlowController.LimitExceededBehavior.ThrowException)
.setMaxOutstandingElementCount(1L)
.setMaxOutstandingRequestBytes(10L)
.build())
.build())
.build();

// Sending a message less than the byte limit succeeds.
ApiFuture<String> publishFuture1 = sendTestMessage(publisher, "AAAA");

// Sending a third message fails because of the outstanding message.
ApiFuture<String> publishFuture3 = sendTestMessage(publisher, "AA");
// Sending another message fails because of the outstanding message.
ApiFuture<String> publishFuture2 = sendTestMessage(publisher, "AA");
try {
publishFuture3.get();
publishFuture2.get();
fail("Should have thrown an FlowController.MaxOutstandingElementCountReachedException");
} catch (ExecutionException e) {
assertThat(e.getCause())
.isInstanceOf(FlowController.MaxOutstandingElementCountReachedException.class);
}

testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1"));
assertEquals("1", publishFuture2.get());
assertEquals("1", publishFuture1.get());

// Sending another message succeeds.
ApiFuture<String> publishFuture4 = sendTestMessage(publisher, "AAAA");
Expand Down Expand Up @@ -1091,22 +1159,27 @@ public void testPublishFlowControl_throwExceptionWithOrderingKey() throws Except
.setEnableMessageOrdering(true)
.build();

// Sending a message that is too large results in an exception.
ApiFuture<String> publishFuture1 =
sendTestMessageWithOrderingKey(publisher, "AAAAAAAAAAA", "a");
// Sending a message less than the byte limit succeeds.
ApiFuture<String> publishFuture1 = sendTestMessageWithOrderingKey(publisher, "AAAA", "a");

// Sending another message fails because of the outstanding message.
ApiFuture<String> publishFuture2 = sendTestMessageWithOrderingKey(publisher, "AA", "a");
try {
publishFuture1.get();
fail("Should have thrown an FlowController.MaxOutstandingRequestBytesReachedException");
publishFuture2.get();
fail("Should have thrown an FlowController.MaxOutstandingElementCountReachedException");
} catch (ExecutionException e) {
assertThat(e.getCause())
.isInstanceOf(FlowController.MaxOutstandingRequestBytesReachedException.class);
.isInstanceOf(FlowController.MaxOutstandingElementCountReachedException.class);
}

// Sending a second message for the same ordering key fails because the first one failed.
ApiFuture<String> publishFuture2 = sendTestMessageWithOrderingKey(publisher, "AAAA", "a");
testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1"));
assertEquals("1", publishFuture1.get());

// Sending another message for the same ordering key fails because the one before failed.
ApiFuture<String> publishFuture3 = sendTestMessageWithOrderingKey(publisher, "AAAA", "a");
try {
publishFuture2.get();
Assert.fail("This should fail.");
publishFuture3.get();
fail("This should fail.");
} catch (ExecutionException e) {
assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause());
}
Expand Down

0 comments on commit f4ca4b2

Please sign in to comment.