Skip to content

Commit

Permalink
fix: Ensure that messages that are in pending batches for an ordering…
Browse files Browse the repository at this point in the history
… key are canceled when a previous publish for the ordering keys fails (#366)

* feat: Add flow control support to publisher

* make suggested fixes

* chore: Remove note that ordering keys requires enablements.

* feat: Add support for server-side flow control

* Revert "chore: Remove note that ordering keys requires enablements."

This reverts commit 9c113c3.

* fix: Fix import order

* fix: Make error message more clear about where ordering must be enabled when publishing.

* fix: Ensure that messages that are in pending batches for an ordering key are canceled when a previous publish for the ordering keys fails.
  • Loading branch information
kamalaboulhosn committed Sep 29, 2020
1 parent 68ad21d commit 7cdf8bc
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 0 deletions.
Expand Up @@ -256,6 +256,11 @@ public ApiFuture<String> publish(PubsubMessage message) {
List<OutstandingBatch> batchesToSend;
messagesBatchLock.lock();
try {
if (sequentialExecutor.keyHasError(orderingKey)) {
outstandingPublish.publishResult.setException(
SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION);
return outstandingPublish.publishResult;
}
MessagesBatch messagesBatch = messagesBatches.get(orderingKey);
if (messagesBatch == null) {
messagesBatch = new MessagesBatch(batchingSettings, orderingKey);
Expand Down Expand Up @@ -462,6 +467,21 @@ public void onSuccess(PublishResponse result) {
@Override
public void onFailure(Throwable t) {
try {
if (outstandingBatch.orderingKey != null && !outstandingBatch.orderingKey.isEmpty()) {
messagesBatchLock.lock();
try {
MessagesBatch messagesBatch = messagesBatches.get(outstandingBatch.orderingKey);
if (messagesBatch != null) {
for (OutstandingPublish outstanding : messagesBatch.messages) {
outstanding.publishResult.setException(
SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION);
}
messagesBatches.remove(outstandingBatch.orderingKey);
}
} finally {
messagesBatchLock.unlock();
}
}
outstandingBatch.onFailure(t);
} finally {
messagesWaiter.incrementPendingCount(-outstandingBatch.size());
Expand Down
Expand Up @@ -243,6 +243,10 @@ public void cancel(Throwable e) {
return future;
}

boolean keyHasError(String key) {
return keysWithErrors.contains(key);
}

void resumePublish(String key) {
keysWithErrors.remove(key);
}
Expand Down
Expand Up @@ -566,6 +566,57 @@ public void testResumePublish() throws Exception {
shutdownTestPublisher(publisher);
}

@Test
public void testPublishThrowExceptionForUnsubmittedOrderingKeyMessage() throws Exception {
Publisher publisher =
getTestPublisherBuilder()
.setExecutorProvider(SINGLE_THREAD_EXECUTOR)
.setBatchingSettings(
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(2L)
.setDelayThreshold(Duration.ofSeconds(500))
.build())
.setEnableMessageOrdering(true)
.build();

// Send two messages that will fulfill the first batch, which will return a failure.
testPublisherServiceImpl.addPublishError(new StatusException(Status.INVALID_ARGUMENT));
ApiFuture<String> publishFuture1 = sendTestMessageWithOrderingKey(publisher, "A", "a");
ApiFuture<String> publishFuture2 = sendTestMessageWithOrderingKey(publisher, "B", "a");

// A third message will fail because the first attempt to publish failed.
ApiFuture<String> publishFuture3 = sendTestMessageWithOrderingKey(publisher, "C", "a");

try {
publishFuture1.get();
fail("Should have failed.");
} catch (ExecutionException e) {
}

try {
publishFuture2.get();
fail("Should have failed.");
} catch (ExecutionException e) {
}

try {
publishFuture3.get();
fail("Should have failed.");
} catch (ExecutionException e) {
assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause());
}

// A subsequent attempt fails immediately.
ApiFuture<String> publishFuture4 = sendTestMessageWithOrderingKey(publisher, "D", "a");
try {
publishFuture4.get();
fail("Should have failed.");
} catch (ExecutionException e) {
assertEquals(SequentialExecutorService.CallbackExecutor.CANCELLATION_EXCEPTION, e.getCause());
}
}

private ApiFuture<String> sendTestMessageWithOrderingKey(
Publisher publisher, String data, String orderingKey) {
return publisher.publish(
Expand Down

0 comments on commit 7cdf8bc

Please sign in to comment.