From aab528803405c2b5f9fc89641f47abff948a876d Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Fri, 16 Jul 2021 15:19:22 -0400 Subject: [PATCH] feat: introduce closeAsync to Batcher (#1423) * feat: introduce closeAsync to Batcher This should allow callers to signal that they are done using a batcher without blocking their thread. * improve comments * add tests * format --- .../com/google/api/gax/batching/Batcher.java | 10 +- .../google/api/gax/batching/BatcherImpl.java | 77 +++++++++-- .../api/gax/batching/BatcherImplTest.java | 126 +++++++++++++++++- 3 files changed, 200 insertions(+), 13 deletions(-) diff --git a/gax/src/main/java/com/google/api/gax/batching/Batcher.java b/gax/src/main/java/com/google/api/gax/batching/Batcher.java index f8d136650..d01f79f4a 100644 --- a/gax/src/main/java/com/google/api/gax/batching/Batcher.java +++ b/gax/src/main/java/com/google/api/gax/batching/Batcher.java @@ -31,6 +31,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.BetaApi; +import com.google.api.core.InternalExtensionOnly; /** * Represents a batching context where individual elements will be accumulated and flushed in a @@ -45,6 +46,7 @@ * @param The type of the result for each individual element. */ @BetaApi("The surface for batching is not stable yet and may change in the future.") +@InternalExtensionOnly public interface Batcher extends AutoCloseable { /** @@ -74,9 +76,15 @@ public interface Batcher extends AutoCloseable { void sendOutstanding(); /** - * Closes this Batcher by preventing new elements from being added and flushing the existing + * Closes this Batcher by preventing new elements from being added, and then flushing the existing * elements. */ @Override void close() throws InterruptedException; + + /** + * Closes this Batcher by preventing new elements from being added, and then sending outstanding + * elements. The returned future will be resolved when the last element completes + */ + ApiFuture closeAsync(); } diff --git a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java index 8d3bbac26..4eb47e756 100644 --- a/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java +++ b/gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -89,7 +90,7 @@ public class BatcherImpl private final Object flushLock = new Object(); private final Object elementLock = new Object(); private final Future scheduledFuture; - private volatile boolean isClosed = false; + private SettableApiFuture closeFuture; private final BatcherStats batcherStats = new BatcherStats(); private final FlowController flowController; @@ -172,7 +173,10 @@ public BatcherImpl( /** {@inheritDoc} */ @Override public ApiFuture add(ElementT element) { - Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher"); + // Note: there is no need to synchronize over closeFuture. The write & read of the variable + // will only be done from a single calling thread. + Preconditions.checkState(closeFuture == null, "Cannot add elements on a closed batcher"); + // This is not the optimal way of throttling. It does not send out partial batches, which // means that the Batcher might not use up all the resources allowed by FlowController. // The more efficient implementation should look like: @@ -257,9 +261,20 @@ public void onFailure(Throwable throwable) { } private void onBatchCompletion() { - if (numOfOutstandingBatches.decrementAndGet() == 0) { - synchronized (flushLock) { + boolean shouldClose = false; + + synchronized (flushLock) { + if (numOfOutstandingBatches.decrementAndGet() == 0) { flushLock.notifyAll(); + shouldClose = closeFuture != null; + } + } + if (shouldClose) { + BatchingException overallError = batcherStats.asException(); + if (overallError != null) { + closeFuture.setException(overallError); + } else { + closeFuture.set(null); } } } @@ -279,17 +294,57 @@ private void awaitAllOutstandingBatches() throws InterruptedException { /** {@inheritDoc} */ @Override public void close() throws InterruptedException { - if (isClosed) { - return; + try { + closeAsync().get(); + } catch (ExecutionException e) { + // Original stacktrace of a batching exception is not useful, so rethrow the error with + // the caller stacktrace + if (e.getCause() instanceof BatchingException) { + BatchingException cause = (BatchingException) e.getCause(); + throw new BatchingException(cause.getMessage()); + } else { + throw new IllegalStateException("unexpected error closing the batcher", e.getCause()); + } + } + } + + @Override + public ApiFuture closeAsync() { + if (closeFuture != null) { + return closeFuture; + } + + // Send any buffered elements + // Must come before numOfOutstandingBatches check below + sendOutstanding(); + + boolean closeImmediately; + + synchronized (flushLock) { + // prevent admission of new elements + closeFuture = SettableApiFuture.create(); + // check if we can close immediately + closeImmediately = numOfOutstandingBatches.get() == 0; } - flush(); + + // Clean up accounting scheduledFuture.cancel(true); - isClosed = true; currentBatcherReference.closed = true; currentBatcherReference.clear(); - BatchingException exception = batcherStats.asException(); - if (exception != null) { - throw exception; + + // notify futures + if (closeImmediately) { + finishClose(); + } + return closeFuture; + } + + private void finishClose() { + BatchingException batchingException = batcherStats.asException(); + if (batchingException != null) { + closeFuture.setException(batchingException); + } else { + closeFuture.set(null); } } diff --git a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java index 96235d5cb..10ccb1453 100644 --- a/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java +++ b/gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java @@ -45,6 +45,7 @@ import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList; import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntSquarerCallable; import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.collect.Queues; import java.util.ArrayList; @@ -69,7 +70,9 @@ import java.util.logging.Logger; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Test; +import org.junit.function.ThrowingRunnable; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.threeten.bp.Duration; @@ -92,7 +95,12 @@ public class BatcherImplTest { @After public void tearDown() throws InterruptedException { if (underTest != null) { - underTest.close(); + try { + // Close the batcher to avoid warnings of orphaned batchers + underTest.close(); + } catch (BatchingException ignored) { + // Some tests intentionally inject failures into mutations + } } } @@ -172,6 +180,55 @@ public void testNoElementAdditionAfterClose() throws Exception { .matches("Cannot add elements on a closed batcher"); } + /** Validates exception when batch is called after {@link Batcher#close()}. */ + @Test + public void testNoElementAdditionAfterCloseAsync() throws Exception { + underTest = createDefaultBatcherImpl(batchingSettings, null); + underTest.add(1); + underTest.closeAsync(); + + IllegalStateException e = + Assert.assertThrows( + IllegalStateException.class, + new ThrowingRunnable() { + @Override + public void run() throws Throwable { + underTest.add(1); + } + }); + + assertThat(e).hasMessageThat().matches("Cannot add elements on a closed batcher"); + } + + @Test + public void testCloseAsyncNonblocking() throws ExecutionException, InterruptedException { + final SettableApiFuture> innerFuture = SettableApiFuture.create(); + + UnaryCallable> unaryCallable = + new UnaryCallable>() { + @Override + public ApiFuture> futureCall( + LabeledIntList request, ApiCallContext context) { + return innerFuture; + } + }; + underTest = + new BatcherImpl<>( + SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR); + + ApiFuture elementFuture = underTest.add(1); + + Stopwatch stopwatch = Stopwatch.createStarted(); + ApiFuture closeFuture = underTest.closeAsync(); + assertThat(stopwatch.elapsed(TimeUnit.MILLISECONDS)).isAtMost(100); + + assertThat(closeFuture.isDone()).isFalse(); + assertThat(elementFuture.isDone()).isFalse(); + + innerFuture.set(ImmutableList.of(1)); + closeFuture.get(); + } + /** Verifies exception occurred at RPC is propagated to element results */ @Test public void testResultFailureAfterRPCFailure() throws Exception { @@ -614,6 +671,73 @@ public boolean isLoggable(LogRecord record) { } } + /** + * Validates the absence of warning in case {@link BatcherImpl} is garbage collected after being + * closed. + * + *

Note:This test cannot run concurrently with other tests that use Batchers. + */ + @Test + public void testClosedBatchersAreNotLogged() throws Exception { + // Clean out the existing instances + final long DELAY_TIME = 30L; + int actualRemaining = 0; + for (int retry = 0; retry < 3; retry++) { + System.gc(); + System.runFinalization(); + actualRemaining = BatcherReference.cleanQueue(); + if (actualRemaining == 0) { + break; + } + Thread.sleep(DELAY_TIME * (1L << retry)); + } + assertThat(actualRemaining).isAtMost(0); + + // Capture logs + final List records = new ArrayList<>(1); + Logger batcherLogger = Logger.getLogger(BatcherImpl.class.getName()); + Filter oldFilter = batcherLogger.getFilter(); + batcherLogger.setFilter( + new Filter() { + @Override + public boolean isLoggable(LogRecord record) { + synchronized (records) { + records.add(record); + } + return false; + } + }); + + try { + // Create a bunch of batchers that will garbage collected after being closed + for (int i = 0; i < 1_000; i++) { + BatcherImpl> batcher = + createDefaultBatcherImpl(batchingSettings, null); + batcher.add(1); + + if (i % 2 == 0) { + batcher.close(); + } else { + batcher.closeAsync(); + } + } + // Run GC a few times to give the batchers a chance to be collected + for (int retry = 0; retry < 100; retry++) { + System.gc(); + System.runFinalization(); + BatcherReference.cleanQueue(); + Thread.sleep(10); + } + + synchronized (records) { + assertThat(records).isEmpty(); + } + } finally { + // reset logging + batcherLogger.setFilter(oldFilter); + } + } + @Test public void testCloseRace() throws ExecutionException, InterruptedException, TimeoutException { int iterations = 1_000_000;