Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

feat: introduce closeAsync to Batcher #1423

Merged
merged 8 commits into from Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions gax/src/main/java/com/google/api/gax/batching/Batcher.java
Expand Up @@ -31,6 +31,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.BetaApi;
import com.google.api.core.InternalExtensionOnly;

/**
* Represents a batching context where individual elements will be accumulated and flushed in a
Expand All @@ -45,6 +46,7 @@
* @param <ElementResultT> The type of the result for each individual element.
*/
@BetaApi("The surface for batching is not stable yet and may change in the future.")
@InternalExtensionOnly
public interface Batcher<ElementT, ElementResultT> extends AutoCloseable {

/**
Expand Down Expand Up @@ -79,4 +81,10 @@ public interface Batcher<ElementT, ElementResultT> extends AutoCloseable {
*/
@Override
void close() throws InterruptedException;

/**
* Closes this Batcher by preventing new elements from being added and send outstanding elements.
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
* The returned future will be resolved when the last element completes
*/
ApiFuture<Void> closeAsync();
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}
77 changes: 66 additions & 11 deletions gax/src/main/java/com/google/api/gax/batching/BatcherImpl.java
Expand Up @@ -52,6 +52,7 @@
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -89,7 +90,7 @@ public class BatcherImpl<ElementT, ElementResultT, RequestT, ResponseT>
private final Object flushLock = new Object();
private final Object elementLock = new Object();
private final Future<?> scheduledFuture;
private volatile boolean isClosed = false;
private SettableApiFuture<Void> closeFuture;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this remain volatile? I mean tthe referrence itself, because it seems it is checked for non-null value withotu anyu locks in the add() method below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont believe so: it can only be set by a single caller thread and is read by that same thread or in a synchronized block.

But I dont see any harm in adding it, so I did

private final BatcherStats batcherStats = new BatcherStats();
private final FlowController flowController;

Expand Down Expand Up @@ -172,7 +173,10 @@ public BatcherImpl(
/** {@inheritDoc} */
@Override
public ApiFuture<ElementResultT> add(ElementT element) {
Preconditions.checkState(!isClosed, "Cannot add elements on a closed batcher");
// Note: there is no need to synchronize over closeFuture. The write & read of the variable
// will only be done from a single calling thread.
Preconditions.checkState(closeFuture == null, "Cannot add elements on a closed batcher");

// This is not the optimal way of throttling. It does not send out partial batches, which
// means that the Batcher might not use up all the resources allowed by FlowController.
// The more efficient implementation should look like:
Expand Down Expand Up @@ -257,9 +261,20 @@ public void onFailure(Throwable throwable) {
}

private void onBatchCompletion() {
if (numOfOutstandingBatches.decrementAndGet() == 0) {
synchronized (flushLock) {
boolean shouldClose = false;

synchronized (flushLock) {
if (numOfOutstandingBatches.decrementAndGet() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use <= 0 just in case as a safer option guaranteeing we will not jump over 0 and go into negative infinity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would create weird behavior. I dont think I want waiters on the flushLock to be notified multiple times

flushLock.notifyAll();
shouldClose = closeFuture != null;
}
}
if (shouldClose) {
BatchingException overallError = batcherStats.asException();
if (overallError != null) {
closeFuture.setException(overallError);
} else {
closeFuture.set(null);
}
}
}
Expand All @@ -279,17 +294,57 @@ private void awaitAllOutstandingBatches() throws InterruptedException {
/** {@inheritDoc} */
@Override
public void close() throws InterruptedException {
if (isClosed) {
return;
try {
closeAsync().get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a change in behavior - looks like close used to not take flushLock at all, so it was non-blocking. Now it is blocking (taking flushLock via closeAsync). If it was intentional, then ok, but just decided to point it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the old close impl called flush(), which called awaitAllOutstandingBatches(), which took the flush lock. So that behavior remains the same.

} catch (ExecutionException e) {
// Original stacktrace of a batching exception is not useful, so rethrow the error with
// the caller stacktrace
if (e.getCause() instanceof BatchingException) {
BatchingException cause = (BatchingException) e.getCause();
throw new BatchingException(cause.getMessage());
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
} else {
throw new IllegalStateException("unexpected error closing the batcher", e.getCause());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will loose the ExecutionException itself (with maybe some useful message there). I.e. why e.getCause() instead of just e as the second argument? Get cause will still be there as a "sub-exception".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutionException doesn't provide any useful info here and leaks implementation details. The caller doesn't care that this was implemented with a future, but they do care that one of their elements failed.

}
}
}

@Override
public ApiFuture<Void> closeAsync() {
if (closeFuture != null) {
return closeFuture;
}

// Send any buffered elements
// Must come before numOfOutstandingBatches check below
sendOutstanding();

boolean closeImmediately;

synchronized (flushLock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would mean that closeAsync is still blocking potentially. The stuff under lock will execute immediatelly fast, but waiting for the lock may take arbitrarily long time. Is it Ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think there is any code here that would hold the lock for an arbitrary amount of time. All critical sections are explicitly coded to avoid blocking indefinitely

// prevent admission of new elements
closeFuture = SettableApiFuture.create();
// check if we can close immediately
closeImmediately = numOfOutstandingBatches.get() == 0;
}
flush();

// Clean up accounting
scheduledFuture.cancel(true);
isClosed = true;
currentBatcherReference.closed = true;
currentBatcherReference.clear();
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
BatchingException exception = batcherStats.asException();
if (exception != null) {
throw exception;

// notify futures
if (closeImmediately) {
finishClose();
}
return closeFuture;
}

private void finishClose() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that this deserves its own private method, as it is used only once and very short.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I inlined the method

BatchingException batchingException = batcherStats.asException();
if (batchingException != null) {
closeFuture.setException(batchingException);
} else {
closeFuture.set(null);
}
}

Expand Down
Expand Up @@ -92,7 +92,11 @@ public class BatcherImplTest {
@After
public void tearDown() throws InterruptedException {
if (underTest != null) {
underTest.close();
try {
underTest.close();
} catch (BatchingException ignored) {

igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

Expand Down