Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

feat: introduce closeAsync to Batcher #1423

Merged
merged 8 commits into from Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 9 additions & 1 deletion gax/src/main/java/com/google/api/gax/batching/Batcher.java
Expand Up @@ -31,6 +31,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;

/**
* Represents a batching context where individual elements will be accumulated and flushed in a
Expand All @@ -45,6 +46,7 @@
* @param <ElementResultT> The type of the result for each individual element.
*/
@BetaApi("The surface for batching is not stable yet and may change in the future.")
@InternalExtensionOnly
public interface Batcher<ElementT, ElementResultT> extends AutoCloseable {

/**
Expand Down Expand Up @@ -74,9 +76,15 @@ public interface Batcher<ElementT, ElementResultT> extends AutoCloseable {
void sendOutstanding();

/**
* Closes this Batcher by preventing new elements from being added and flushing the existing
* Closes this Batcher by preventing new elements from being added, and then flushing the existing
* elements.
*/
@Override
void close() throws InterruptedException;

/**
* Closes this Batcher by preventing new elements from being added, and then sending outstanding
* elements. The returned future will be resolved when the last element completes
*/
ApiFuture<Void> closeAsync();
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}
77 changes: 66 additions & 11 deletions gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Expand Up @@ -52,6 +52,7 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -89,7 +90,7 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
private final Object flushLock = new Object();
private final Object elementLock = new Object();
private final Future<?> scheduledFuture;
private volatile boolean isClosed = false;
private SettableApiFuture<Void> closeFuture;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this remain volatile? I mean tthe referrence itself, because it seems it is checked for non-null value withotu anyu locks in the add() method below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont believe so: it can only be set by a single caller thread and is read by that same thread or in a synchronized block.

But I dont see any harm in adding it, so I did

private final BatcherStats batcherStats = new BatcherStats();
private final FlowController flowController;

Expand Down Expand Up @@ -172,7 +173,10 @@ public BatcherImpl(
/** {@inheritDoc} */
@Override
public ApiFuture<ElementResultT> add(ElementT element) {
Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher");
// Note: there is no need to synchronize over closeFuture. The write & read of the variable
// will only be done from a single calling thread.
Preconditions.checkState(closeFuture == null, "Cannot add elements on a closed batcher");

// This is not the optimal way of throttling. It does not send out partial batches, which
// means that the Batcher might not use up all the resources allowed by FlowController.
// The more efficient implementation should look like:
Expand Down Expand Up @@ -257,9 +261,20 @@ public void onFailure(Throwable throwable) {
}

private void onBatchCompletion() {
if (numOfOutstandingBatches.decrementAndGet() == 0) {
synchronized (flushLock) {
boolean shouldClose = false;

synchronized (flushLock) {
if (numOfOutstandingBatches.decrementAndGet() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use <= 0 just in case as a safer option guaranteeing we will not jump over 0 and go into negative infinity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would create weird behavior. I dont think I want waiters on the flushLock to be notified multiple times

flushLock.notifyAll();
shouldClose = closeFuture != null;
}
}
if (shouldClose) {
BatchingException overallError = batcherStats.asException();
if (overallError != null) {
closeFuture.setException(overallError);
} else {
closeFuture.set(null);
}
}
}
Expand All @@ -279,17 +294,57 @@ private void awaitAllOutstandingBatches() throws InterruptedException {
/** {@inheritDoc} */
@Override
public void close() throws InterruptedException {
if (isClosed) {
return;
try {
closeAsync().get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a change in behavior - looks like close used to not take flushLock at all, so it was non-blocking. Now it is blocking (taking flushLock via closeAsync). If it was intentional, then ok, but just decided to point it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the old close impl called flush(), which called awaitAllOutstandingBatches(), which took the flush lock. So that behavior remains the same.

} catch (ExecutionException e) {
// Original stacktrace of a batching exception is not useful, so rethrow the error with
// the caller stacktrace
if (e.getCause() instanceof BatchingException) {
BatchingException cause = (BatchingException) e.getCause();
throw new BatchingException(cause.getMessage());
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
} else {
throw new IllegalStateException("unexpected error closing the batcher", e.getCause());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will loose the ExecutionException itself (with maybe some useful message there). I.e. why e.getCause() instead of just e as the second argument? Get cause will still be there as a "sub-exception".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutionException doesn't provide any useful info here and leaks implementation details. The caller doesn't care that this was implemented with a future, but they do care that one of their elements failed.

}
}
}

@Override
public ApiFuture<Void> closeAsync() {
if (closeFuture != null) {
return closeFuture;
}

// Send any buffered elements
// Must come before numOfOutstandingBatches check below
sendOutstanding();

boolean closeImmediately;

synchronized (flushLock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would mean that closeAsync is still blocking potentially. The stuff under lock will execute immediatelly fast, but waiting for the lock may take arbitrarily long time. Is it Ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think there is any code here that would hold the lock for an arbitrary amount of time. All critical sections are explicitly coded to avoid blocking indefinitely

// prevent admission of new elements
closeFuture = SettableApiFuture.create();
// check if we can close immediately
closeImmediately = numOfOutstandingBatches.get() == 0;
}
flush();

// Clean up accounting
scheduledFuture.cancel(true);
isClosed = true;
currentBatcherReference.closed = true;
currentBatcherReference.clear();
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
BatchingException exception = batcherStats.asException();
if (exception != null) {
throw exception;

// notify futures
if (closeImmediately) {
finishClose();
}
return closeFuture;
}

private void finishClose() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that this deserves its own private method, as it is used only once and very short.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I inlined the method

BatchingException batchingException = batcherStats.asException();
if (batchingException != null) {
closeFuture.setException(batchingException);
} else {
closeFuture.set(null);
}
}

Expand Down
126 changes: 125 additions & 1 deletion gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java
Expand Up @@ -45,6 +45,7 @@
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntList;
import com.google.api.gax.rpc.testing.FakeBatchableApi.LabeledIntSquarerCallable;
import com.google.api.gax.rpc.testing.FakeBatchableApi.SquarerBatchingDescriptorV2;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Queues;
import java.util.ArrayList;
Expand All @@ -69,7 +70,9 @@
import java.util.logging.Logger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.junit.function.ThrowingRunnable;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Duration;
Expand All @@ -92,7 +95,12 @@ public class BatcherImplTest {
@After
public void tearDown() throws InterruptedException {
if (underTest != null) {
underTest.close();
try {
// Close the batcher to avoid warnings of orphaned batchers
underTest.close();
} catch (BatchingException ignored) {
// Some tests intentionally inject failures into mutations
}
}
}

Expand Down Expand Up @@ -172,6 +180,55 @@ public void testNoElementAdditionAfterClose() throws Exception {
.matches("Cannot add elements on a closed batcher");
}

/** Validates exception when batch is called after {@link Batcher#close()}. */
@Test
public void testNoElementAdditionAfterCloseAsync() throws Exception {
underTest = createDefaultBatcherImpl(batchingSettings, null);
underTest.add(1);
underTest.closeAsync();

IllegalStateException e =
Assert.assertThrows(
IllegalStateException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
underTest.add(1);
}
});

assertThat(e).hasMessageThat().matches("Cannot add elements on a closed batcher");
}

@Test
public void testCloseAsyncNonblocking() throws ExecutionException, InterruptedException {
final SettableApiFuture<List<Integer>> innerFuture = SettableApiFuture.create();

UnaryCallable<LabeledIntList, List<Integer>> unaryCallable =
new UnaryCallable<LabeledIntList, List<Integer>>() {
@Override
public ApiFuture<List<Integer>> futureCall(
LabeledIntList request, ApiCallContext context) {
return innerFuture;
}
};
underTest =
new BatcherImpl<>(
SQUARER_BATCHING_DESC_V2, unaryCallable, labeledIntList, batchingSettings, EXECUTOR);

ApiFuture<Integer> elementFuture = underTest.add(1);

Stopwatch stopwatch = Stopwatch.createStarted();
ApiFuture<Void> closeFuture = underTest.closeAsync();
assertThat(stopwatch.elapsed(TimeUnit.MILLISECONDS)).isAtMost(100);

assertThat(closeFuture.isDone()).isFalse();
assertThat(elementFuture.isDone()).isFalse();

innerFuture.set(ImmutableList.of(1));
closeFuture.get();
}

/** Verifies exception occurred at RPC is propagated to element results */
@Test
public void testResultFailureAfterRPCFailure() throws Exception {
Expand Down Expand Up @@ -614,6 +671,73 @@ public boolean isLoggable(LogRecord record) {
}
}

/**
* Validates the absence of warning in case {@link BatcherImpl} is garbage collected after being
* closed.
*
* <p>Note:This test cannot run concurrently with other tests that use Batchers.
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
*/
@Test
public void testClosedBatchersAreNotLogged() throws Exception {
// Clean out the existing instances
final long DELAY_TIME = 30L;
int actualRemaining = 0;
for (int retry = 0; retry < 3; retry++) {
System.gc();
System.runFinalization();
actualRemaining = BatcherReference.cleanQueue();
if (actualRemaining == 0) {
break;
}
Thread.sleep(DELAY_TIME * (1L << retry));
}
assertThat(actualRemaining).isAtMost(0);

// Capture logs
final List<LogRecord> records = new ArrayList<>(1);
Logger batcherLogger = Logger.getLogger(BatcherImpl.class.getName());
Filter oldFilter = batcherLogger.getFilter();
batcherLogger.setFilter(
new Filter() {
@Override
public boolean isLoggable(LogRecord record) {
synchronized (records) {
records.add(record);
}
return false;
}
});

try {
// Create a bunch of batchers that will garbage collected after being closed
for (int i = 0; i < 1_000; i++) {
BatcherImpl<Integer, Integer, LabeledIntList, List<Integer>> batcher =
createDefaultBatcherImpl(batchingSettings, null);
batcher.add(1);

if (i % 2 == 0) {
batcher.close();
} else {
batcher.closeAsync();
}
}
// Run GC a few times to give the batchers a chance to be collected
for (int retry = 0; retry < 100; retry++) {
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
System.gc();
System.runFinalization();
BatcherReference.cleanQueue();
Thread.sleep(10);
}

synchronized (records) {
assertThat(records).isEmpty();
}
} finally {
// reset logging
batcherLogger.setFilter(oldFilter);
}
}

@Test
public void testCloseRace() throws ExecutionException, InterruptedException, TimeoutException {
int iterations = 1_000_000;
Expand Down