diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 1ea18df62..093590bb9 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -371,6 +371,25 @@ private void publishAllWithoutInflight() { } } + /** + * Publish any outstanding batches if non-empty and there are no other batches in flight for + * orderingKey. This method sends buffered messages, but does not wait for the send operations to + * complete. To wait for messages to send, call {@code get} on the futures returned from {@code + * publish}. + */ + private void publishAllWithoutInflightForKey(final String orderingKey) { + messagesBatchLock.lock(); + try { + MessagesBatch batch = messagesBatches.get(orderingKey); + if (batch != null && !sequentialExecutor.hasTasksInflight(orderingKey)) { + publishOutstandingBatch(batch.popOutstandingBatch()); + messagesBatches.remove(orderingKey); + } + } finally { + messagesBatchLock.unlock(); + } + } + private ApiFuture publishCall(OutstandingBatch outstandingBatch) { return publisherStub .publishCallable() @@ -397,6 +416,11 @@ public void onSuccess(PublishResponse result) { result.getMessageIdsCount(), outstandingBatch.size()))); } else { outstandingBatch.onSuccess(result.getMessageIdsList()); + if (!activeAlarm.get() + && outstandingBatch.orderingKey != null + && !outstandingBatch.orderingKey.isEmpty()) { + publishAllWithoutInflightForKey(outstandingBatch.orderingKey); + } } } finally { messagesWaiter.incrementPendingMessages(-outstandingBatch.size()); diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java index a172c1ecb..712f51eb5 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SequentialExecutorService.java @@ -216,8 +216,8 @@ public void run() { // Step 5.1: on success @Override public void onSuccess(T msg) { - future.set(msg); callNextTaskAsync(key); + future.set(msg); } // Step 5.2: on failure diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java index 620a09ac9..23817f558 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakePublisherServiceImpl.java @@ -24,7 +24,10 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.threeten.bp.Duration; /** * A fake implementation of {@link PublisherImplBase}, that can be used to test clients of a Cloud @@ -36,6 +39,8 @@ class FakePublisherServiceImpl extends PublisherImplBase { private final LinkedBlockingQueue publishResponses = new LinkedBlockingQueue<>(); private final AtomicInteger nextMessageId = new AtomicInteger(1); private boolean autoPublishResponse; + private ScheduledExecutorService executor = null; + private Duration responseDelay = Duration.ZERO; /** Class used to save the state of a possible response. */ private static class Response { @@ -74,7 +79,8 @@ public String toString() { } @Override - public void publish(PublishRequest request, StreamObserver responseObserver) { + public void publish( + PublishRequest request, final StreamObserver responseObserver) { requests.add(request); Response response; try { @@ -90,6 +96,23 @@ public void publish(PublishRequest request, StreamObserver resp } catch (InterruptedException e) { throw new IllegalArgumentException(e); } + if (responseDelay == Duration.ZERO) { + sendResponse(response, responseObserver); + } else { + final Response responseToSend = response; + executor.schedule( + new Runnable() { + @Override + public void run() { + sendResponse(responseToSend, responseObserver); + } + }, + responseDelay.toMillis(), + TimeUnit.MILLISECONDS); + } + } + + private void sendResponse(Response response, StreamObserver responseObserver) { if (response.isError()) { responseObserver.onError(response.getError()); } else { @@ -107,6 +130,18 @@ public FakePublisherServiceImpl setAutoPublishResponse(boolean autoPublishRespon return this; } + /** Set an executor to use to delay publish responses. */ + public FakePublisherServiceImpl setExecutor(ScheduledExecutorService executor) { + this.executor = executor; + return this; + } + + /** Set an amount of time by which to delay publish responses. */ + public FakePublisherServiceImpl setPublishResponseDelay(Duration responseDelay) { + this.responseDelay = responseDelay; + return this; + } + public FakePublisherServiceImpl addPublishResponse(PublishResponse publishResponse) { publishResponses.add(new Response(publishResponse)); return this; diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index f3c85220f..08fdee7b6 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -308,6 +308,8 @@ public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception { .setEnableMessageOrdering(true) .build(); testPublisherServiceImpl.setAutoPublishResponse(true); + testPublisherServiceImpl.setExecutor(fakeExecutor); + testPublisherServiceImpl.setPublishResponseDelay(Duration.ofSeconds(300)); // Publish two messages with ordering key, "OrderA", and other two messages with "OrderB". ApiFuture publishFuture1 = sendTestMessageWithOrderingKey(publisher, "m1", "OrderA"); @@ -325,10 +327,23 @@ public void testBatchedMessagesWithOrderingKeyByDuration() throws Exception { // The timeout expires. fakeExecutor.advanceTime(Duration.ofSeconds(100)); + // Publish one more message on "OrderA" while publishes are outstanding. + testPublisherServiceImpl.setPublishResponseDelay(Duration.ZERO); + ApiFuture publishFuture5 = sendTestMessageWithOrderingKey(publisher, "m5", "OrderA"); + + // The second timeout expires. + fakeExecutor.advanceTime(Duration.ofSeconds(100)); + + // Publishing completes on the first four messages. + fakeExecutor.advanceTime(Duration.ofSeconds(200)); + // Verify that they were delivered in order per ordering key. assertTrue(Integer.parseInt(publishFuture1.get()) < Integer.parseInt(publishFuture3.get())); assertTrue(Integer.parseInt(publishFuture2.get()) < Integer.parseInt(publishFuture4.get())); + // Verify that they were delivered in order per ordering key. + assertTrue(Integer.parseInt(publishFuture3.get()) < Integer.parseInt(publishFuture5.get())); + // Verify that every message within the same batch has the same ordering key. List requests = testPublisherServiceImpl.getCapturedRequests(); for (PublishRequest request : requests) {