diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index e28427eea..659d84bb6 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -25,6 +25,8 @@ import com.google.api.core.BetaApi; import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.BackgroundResource; import com.google.api.gax.core.BackgroundResourceAggregation; import com.google.api.gax.core.CredentialsProvider; @@ -55,6 +57,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -108,6 +111,8 @@ public class Publisher { private ScheduledFuture currentAlarmFuture; private final ApiFunction messageTransform; + private MessageFlowController flowController = null; + /** The maximum number of messages in one request. Defined by the API. */ public static long getApiMaxRequestElementCount() { return 1000L; @@ -122,6 +127,16 @@ private Publisher(Builder builder) throws IOException { topicName = builder.topicName; this.batchingSettings = builder.batchingSettings; + FlowControlSettings flowControl = this.batchingSettings.getFlowControlSettings(); + if (flowControl != null + && flowControl.getLimitExceededBehavior() != FlowController.LimitExceededBehavior.Ignore) { + this.flowController = + new MessageFlowController( + flowControl.getMaxOutstandingElementCount(), + flowControl.getMaxOutstandingRequestBytes(), + flowControl.getLimitExceededBehavior()); + } + this.enableMessageOrdering = builder.enableMessageOrdering; this.messageTransform = builder.messageTransform; @@ -221,6 +236,19 @@ public ApiFuture publish(PubsubMessage message) { final OutstandingPublish outstandingPublish = new OutstandingPublish(messageTransform.apply(message)); + + if (flowController != null) { + try { + flowController.acquire(outstandingPublish.messageSize); + } catch (FlowController.FlowControlException e) { + if (!orderingKey.isEmpty()) { + sequentialExecutor.stopPublish(orderingKey); + } + outstandingPublish.publishResult.setException(e); + return outstandingPublish.publishResult; + } + } + List batchesToSend; messagesBatchLock.lock(); try { @@ -454,7 +482,7 @@ public ApiFuture call() { ApiFutures.addCallback(future, futureCallback, directExecutor()); } - private static final class OutstandingBatch { + private final class OutstandingBatch { final List outstandingPublishes; final long creationTime; int attempt; @@ -484,6 +512,9 @@ private List getMessages() { private void onFailure(Throwable t) { for (OutstandingPublish outstandingPublish : outstandingPublishes) { + if (flowController != null) { + flowController.release(outstandingPublish.messageSize); + } outstandingPublish.publishResult.setException(t); } } @@ -491,7 +522,11 @@ private void onFailure(Throwable t) { private void onSuccess(Iterable results) { Iterator messagesResultsIt = outstandingPublishes.iterator(); for (String messageId : results) { - messagesResultsIt.next().publishResult.set(messageId); + OutstandingPublish nextPublish = messagesResultsIt.next(); + if (flowController != null) { + flowController.release(nextPublish.messageSize); + } + nextPublish.publishResult.set(messageId); } } } @@ -602,6 +637,10 @@ public static final class Builder { .setDelayThreshold(DEFAULT_DELAY_THRESHOLD) .setRequestByteThreshold(DEFAULT_REQUEST_BYTES_THRESHOLD) .setElementCountThreshold(DEFAULT_ELEMENT_COUNT_THRESHOLD) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore) + .build()) .build(); static final RetrySettings DEFAULT_RETRY_SETTINGS = RetrySettings.newBuilder() @@ -759,7 +798,135 @@ public Publisher build() throws IOException { } } - private static class MessagesBatch { + private static class MessageFlowController { + private final Lock lock; + private final Long messageLimit; + private final Long byteLimit; + private final FlowController.LimitExceededBehavior limitBehavior; + + private Long outstandingMessages; + private Long outstandingBytes; + private LinkedList awaitingMessageAcquires; + private LinkedList awaitingBytesAcquires; + + MessageFlowController( + Long messageLimit, Long byteLimit, FlowController.LimitExceededBehavior limitBehavior) { + this.messageLimit = messageLimit; + this.byteLimit = byteLimit; + this.limitBehavior = limitBehavior; + this.lock = new ReentrantLock(); + + this.outstandingMessages = 0L; + this.outstandingBytes = 0L; + + this.awaitingMessageAcquires = new LinkedList(); + this.awaitingBytesAcquires = new LinkedList(); + } + + void acquire(long messageSize) throws FlowController.FlowControlException { + lock.lock(); + try { + if (outstandingMessages >= messageLimit + && limitBehavior == FlowController.LimitExceededBehavior.ThrowException) { + throw new FlowController.MaxOutstandingElementCountReachedException(messageLimit); + } + if (outstandingBytes + messageSize >= byteLimit + && limitBehavior == FlowController.LimitExceededBehavior.ThrowException) { + throw new FlowController.MaxOutstandingRequestBytesReachedException(byteLimit); + } + + // We can acquire or we should wait until we can acquire. + // Start by acquiring a slot for a message. + CountDownLatch messageWaiter = null; + while (outstandingMessages >= messageLimit) { + if (messageWaiter == null) { + // This message gets added to the back of the line. + messageWaiter = new CountDownLatch(1); + awaitingMessageAcquires.addLast(messageWaiter); + } else { + // This message already in line stays at the head of the line. + messageWaiter = new CountDownLatch(1); + awaitingMessageAcquires.set(0, messageWaiter); + } + lock.unlock(); + try { + messageWaiter.await(); + } catch (InterruptedException e) { + logger.log(Level.WARNING, "Interrupted while waiting to acquire flow control tokens"); + } + lock.lock(); + } + ++outstandingMessages; + if (messageWaiter != null) { + awaitingMessageAcquires.removeFirst(); + } + + // There may be some surplus messages left; let the next message waiting for a token have + // one. + if (!awaitingMessageAcquires.isEmpty() && outstandingMessages < messageLimit) { + awaitingMessageAcquires.getFirst().countDown(); + } + + // Now acquire space for bytes. + CountDownLatch bytesWaiter = null; + Long bytesRemaining = messageSize; + while (outstandingBytes + bytesRemaining >= byteLimit) { + // Take what is available. + Long available = byteLimit - outstandingBytes; + bytesRemaining -= available; + outstandingBytes = byteLimit; + if (bytesWaiter == null) { + // This message gets added to the back of the line. + bytesWaiter = new CountDownLatch(1); + awaitingBytesAcquires.addLast(bytesWaiter); + } else { + // This message already in line stays at the head of the line. + bytesWaiter = new CountDownLatch(1); + awaitingBytesAcquires.set(0, bytesWaiter); + } + lock.unlock(); + try { + bytesWaiter.await(); + } catch (InterruptedException e) { + logger.log(Level.WARNING, "Interrupted while waiting to acquire flow control tokens"); + } + lock.lock(); + } + + outstandingBytes += bytesRemaining; + if (bytesWaiter != null) { + awaitingBytesAcquires.removeFirst(); + } + // There may be some surplus bytes left; let the next message waiting for bytes have some. + if (!awaitingBytesAcquires.isEmpty() && outstandingBytes < byteLimit) { + awaitingBytesAcquires.getFirst().countDown(); + } + } finally { + lock.unlock(); + } + } + + private void notifyNextAcquires() { + if (!awaitingMessageAcquires.isEmpty()) { + CountDownLatch awaitingAcquire = awaitingMessageAcquires.getFirst(); + awaitingAcquire.countDown(); + } + if (!awaitingBytesAcquires.isEmpty()) { + CountDownLatch awaitingAcquire = awaitingBytesAcquires.getFirst(); + awaitingAcquire.countDown(); + } + } + + void release(long messageSize) { + lock.lock(); + --outstandingMessages; + outstandingBytes -= messageSize; + notifyNextAcquires(); + lock.unlock(); + } + } + + private class MessagesBatch { private List messages; private int batchedBytes; private String orderingKey; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index 712f51eb5..292921850 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -247,6 +247,10 @@ void resumePublish(String key) { keysWithErrors.remove(key); } + void stopPublish(String key) { + keysWithErrors.add(key); + } + /** Cancels every task in the queue associated with {@code key}. */ private void cancelQueuedTasks(final String key, Throwable e) { keysWithErrors.add(key); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index b1109c8fc..5ee5fdcfc 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -25,6 +25,8 @@ import com.google.api.core.ApiFuture; import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.FixedExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; @@ -43,7 +45,10 @@ import io.grpc.StatusException; import io.grpc.inprocess.InProcessServerBuilder; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.easymock.EasyMock; import org.junit.After; @@ -923,6 +928,191 @@ public void testShutDown() throws Exception { assertTrue(publisher.awaitTermination(1, TimeUnit.MINUTES)); } + @Test + public void testPublishFlowControl_throwException() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setDelayThreshold(Duration.ofSeconds(5)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior( + FlowController.LimitExceededBehavior.ThrowException) + .setMaxOutstandingElementCount(1L) + .setMaxOutstandingRequestBytes(10L) + .build()) + .build()) + .build(); + + // Sending a message that is too large results in an exception. + ApiFuture publishFuture1 = sendTestMessage(publisher, "AAAAAAAAAAA"); + try { + publishFuture1.get(); + fail("Should have thrown an FlowController.MaxOutstandingRequestBytesReachedException"); + } catch (ExecutionException e) { + assertThat(e.getCause()) + .isInstanceOf(FlowController.MaxOutstandingRequestBytesReachedException.class); + } + + // Sending a second message succeeds. + ApiFuture publishFuture2 = sendTestMessage(publisher, "AAAA"); + + // Sending a third message fails because of the outstanding message. + ApiFuture publishFuture3 = sendTestMessage(publisher, "AA"); + try { + publishFuture3.get(); + fail("Should have thrown an FlowController.MaxOutstandingElementCountReachedException"); + } catch (ExecutionException e) { + assertThat(e.getCause()) + .isInstanceOf(FlowController.MaxOutstandingElementCountReachedException.class); + } + + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("1")); + assertEquals("1", publishFuture2.get()); + + // Sending another message succeeds. + ApiFuture publishFuture4 = sendTestMessage(publisher, "AAAA"); + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("2")); + assertEquals("2", publishFuture4.get()); + } + + @Test + public void testPublishFlowControl_throwExceptionWithOrderingKey() throws Exception { + Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setDelayThreshold(Duration.ofSeconds(5)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior( + FlowController.LimitExceededBehavior.ThrowException) + .setMaxOutstandingElementCount(1L) + .setMaxOutstandingRequestBytes(10L) + .build()) + .build()) + .setEnableMessageOrdering(true) + .build(); + + // Sending a message that is too large results in an exception. + ApiFuture publishFuture1 = + sendTestMessageWithOrderingKey(publisher, "AAAAAAAAAAA", "a"); + try { + publishFuture1.get(); + fail("Should have thrown an FlowController.MaxOutstandingRequestBytesReachedException"); + } catch (ExecutionException e) { + assertThat(e.getCause()) + .isInstanceOf(FlowController.MaxOutstandingRequestBytesReachedException.class); + } + + // Sending a second message for the same ordering key fails because the first one failed. + ApiFuture publishFuture2 = sendTestMessageWithOrderingKey(publisher, "AAAA", "a"); + try { + publishFuture2.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause()); + } + } + + @Test + public void testPublishFlowControl_block() throws Exception { + final Publisher publisher = + getTestPublisherBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setDelayThreshold(Duration.ofSeconds(5)) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) + .setMaxOutstandingElementCount(2L) + .setMaxOutstandingRequestBytes(10L) + .build()) + .build()) + .build(); + Executor responseExecutor = Executors.newScheduledThreadPool(10); + final CountDownLatch sendResponse1 = new CountDownLatch(1); + final CountDownLatch response1Sent = new CountDownLatch(1); + final CountDownLatch sendResponse2 = new CountDownLatch(1); + responseExecutor.execute( + new Runnable() { + @Override + public void run() { + try { + sendResponse1.await(); + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("1")); + response1Sent.countDown(); + sendResponse2.await(); + testPublisherServiceImpl.addPublishResponse( + PublishResponse.newBuilder().addMessageIds("2")); + } catch (Exception e) { + } + } + }); + + // Sending two messages succeeds. + ApiFuture publishFuture1 = sendTestMessage(publisher, "AA"); + ApiFuture publishFuture2 = sendTestMessage(publisher, "AA"); + + // Sending a third message blocks because messages are outstanding. + final CountDownLatch publish3Completed = new CountDownLatch(1); + final CountDownLatch response3Sent = new CountDownLatch(1); + responseExecutor.execute( + new Runnable() { + @Override + public void run() { + ApiFuture publishFuture3 = sendTestMessage(publisher, "AAAAAA"); + publish3Completed.countDown(); + } + }); + + responseExecutor.execute( + new Runnable() { + @Override + public void run() { + try { + sendResponse1.countDown(); + response1Sent.await(); + sendResponse2.countDown(); + } catch (Exception e) { + } + } + }); + + // Sending a fourth message blocks because although only one message has been sent, + // the third message claimed the tokens for outstanding bytes. + final CountDownLatch publish4Completed = new CountDownLatch(1); + responseExecutor.execute( + new Runnable() { + @Override + public void run() { + try { + publish3Completed.await(); + ApiFuture publishFuture4 = sendTestMessage(publisher, "A"); + publish4Completed.countDown(); + } catch (Exception e) { + } + } + }); + + publish3Completed.await(); + testPublisherServiceImpl.addPublishResponse(PublishResponse.newBuilder().addMessageIds("3")); + response3Sent.countDown(); + + publish4Completed.await(); + } + private Builder getTestPublisherBuilder() { return Publisher.newBuilder(TEST_TOPIC) .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor))