diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index af7472e96..1ed5e2254 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -27,6 +27,7 @@ import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController; +import com.google.api.gax.batching.FlowController.LimitExceededBehavior; import com.google.api.gax.core.BackgroundResource; import com.google.api.gax.core.BackgroundResourceAggregation; import com.google.api.gax.core.CredentialsProvider; @@ -779,6 +780,11 @@ public Builder setBatchingSettings(BatchingSettings batchingSettings) { Preconditions.checkArgument(batchingSettings.getRequestByteThreshold() > 0); Preconditions.checkNotNull(batchingSettings.getDelayThreshold()); Preconditions.checkArgument(batchingSettings.getDelayThreshold().toMillis() > 0); + FlowControlSettings flowControlSettings = batchingSettings.getFlowControlSettings(); + if (flowControlSettings.getLimitExceededBehavior() != LimitExceededBehavior.Ignore) { + Preconditions.checkArgument(flowControlSettings.getMaxOutstandingElementCount() > 0); + Preconditions.checkArgument(flowControlSettings.getMaxOutstandingRequestBytes() > 0); + } this.batchingSettings = batchingSettings; return this; } @@ -859,6 +865,12 @@ private static class MessageFlowController { } void acquire(long messageSize) throws FlowController.FlowControlException { + if (messageSize > byteLimit) { + logger.log( + Level.WARNING, + "Attempted to publish message with byte size > request byte flow control limit."); + throw new FlowController.MaxOutstandingRequestBytesReachedException(byteLimit); + } lock.lock(); try { if (outstandingMessages >= messageLimit diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 3da657061..c41931de6 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -1019,7 +1019,57 @@ public void testShutDown() throws Exception { } @Test - public void testPublishFlowControl_throwException() throws Exception { + public void invalidFlowControlBytes_throwException() throws Exception { + try { + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setDelayThreshold(Duration.ofSeconds(5)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior( + FlowController.LimitExceededBehavior.ThrowException) + .setMaxOutstandingElementCount(1L) + .setMaxOutstandingRequestBytes(0L) + .build()) + .build()) + .build(); + fail("Expected an IllegalArgumentException"); + } catch (Exception e) { + assertThat(e).isInstanceOf(IllegalArgumentException.class); + } + } + + @Test + public void invalidFlowControlElementCount_throwException() throws Exception { + try { + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setDelayThreshold(Duration.ofSeconds(5)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior( + FlowController.LimitExceededBehavior.ThrowException) + .setMaxOutstandingElementCount(0L) + .setMaxOutstandingRequestBytes(1000L) + .build()) + .build()) + .build(); + fail("Expected an IllegalArgumentException"); + } catch (Exception e) { + assertThat(e).isInstanceOf(IllegalArgumentException.class); + } + } + + @Test + public void testMessageExceedsFlowControlLimits_throwException() throws Exception { Publisher publisher = getTestPublisherBuilder() .setExecutorProvider(SINGLE_THREAD_EXECUTOR) @@ -1030,31 +1080,49 @@ public void testPublishFlowControl_throwException() throws Exception { .setDelayThreshold(Duration.ofSeconds(5)) .setFlowControlSettings( FlowControlSettings.newBuilder() - .setLimitExceededBehavior( - FlowController.LimitExceededBehavior.ThrowException) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) .setMaxOutstandingElementCount(1L) - .setMaxOutstandingRequestBytes(10L) + .setMaxOutstandingRequestBytes(1L) .build()) .build()) .build(); - // Sending a message that is too large results in an exception. - ApiFuture publishFuture1 = sendTestMessage(publisher, "AAAAAAAAAAA"); try { - publishFuture1.get(); - fail("Should have thrown an FlowController.MaxOutstandingRequestBytesReachedException"); + sendTestMessage(publisher, "AAAAAAAAAAAAAAAAAAAAA").get(); + fail("Should have thrown a FlowController.MaxOutstandingRequestBytesReachedException"); } catch (ExecutionException e) { assertThat(e.getCause()) .isInstanceOf(FlowController.MaxOutstandingRequestBytesReachedException.class); } + } - // Sending a second message succeeds. - ApiFuture publishFuture2 = sendTestMessage(publisher, "AAAA"); + @Test + public void testPublishFlowControl_throwException() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setDelayThreshold(Duration.ofSeconds(5)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior( + FlowController.LimitExceededBehavior.ThrowException) + .setMaxOutstandingElementCount(1L) + .setMaxOutstandingRequestBytes(10L) + .build()) + .build()) + .build(); + + // Sending a message less than the byte limit succeeds. + ApiFuture publishFuture1 = sendTestMessage(publisher, "AAAA"); - // Sending a third message fails because of the outstanding message. - ApiFuture publishFuture3 = sendTestMessage(publisher, "AA"); + // Sending another message fails because of the outstanding message. + ApiFuture publishFuture2 = sendTestMessage(publisher, "AA"); try { - publishFuture3.get(); + publishFuture2.get(); fail("Should have thrown an FlowController.MaxOutstandingElementCountReachedException"); } catch (ExecutionException e) { assertThat(e.getCause()) @@ -1062,7 +1130,7 @@ public void testPublishFlowControl_throwException() throws Exception { } testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1")); - assertEquals("1", publishFuture2.get()); + assertEquals("1", publishFuture1.get()); // Sending another message succeeds. ApiFuture publishFuture4 = sendTestMessage(publisher, "AAAA"); @@ -1091,22 +1159,27 @@ public void testPublishFlowControl_throwExceptionWithOrderingKey() throws Except .setEnableMessageOrdering(true) .build(); - // Sending a message that is too large results in an exception. - ApiFuture publishFuture1 = - sendTestMessageWithOrderingKey(publisher, "AAAAAAAAAAA", "a"); + // Sending a message less than the byte limit succeeds. + ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "AAAA", "a"); + + // Sending another message fails because of the outstanding message. + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "AA", "a"); try { - publishFuture1.get(); - fail("Should have thrown an FlowController.MaxOutstandingRequestBytesReachedException"); + publishFuture2.get(); + fail("Should have thrown an FlowController.MaxOutstandingElementCountReachedException"); } catch (ExecutionException e) { assertThat(e.getCause()) - .isInstanceOf(FlowController.MaxOutstandingRequestBytesReachedException.class); + .isInstanceOf(FlowController.MaxOutstandingElementCountReachedException.class); } - // Sending a second message for the same ordering key fails because the first one failed. - ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "AAAA", "a"); + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1")); + assertEquals("1", publishFuture1.get()); + + // Sending another message for the same ordering key fails because the one before failed. + ApiFuture publishFuture3 = sendTestMessageWithOrderingKey(publisher, "AAAA", "a"); try { - publishFuture2.get(); - Assert.fail("This should fail."); + publishFuture3.get(); + fail("This should fail."); } catch (ExecutionException e) { assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); }