feat: introduce closeAsync to Batcher #1423
Conversation
This should allow callers to signal that they are done using a batcher without blocking their thread.
gax/src/test/java/com/google/api/gax/batching/BatcherImplTest.java
Outdated
Show resolved
Hide resolved
There was a race condition of the BulkReadWrapper being cleaned up before the batch executor was done with it. This PR fixes the issue by making lifecycle management more explicit by scoping batch executors to a single method(List) invocation and making close explicit. This will be further cleaned up once googleapis/gax-java#1423 lands
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There don't seem to be any tests for the new behavior.
added async specific tests, which in combination with the existing sync close tests to cover the functionality fully |
* fix: veneer adapter batching There was a race condition of the BulkReadWrapper being cleaned up before the batch executor was done with it. This PR fixes the issue by making lifecycle management more explicit by scoping batch executors to a single method(List) invocation and making close explicit. This will be further cleaned up once googleapis/gax-java#1423 lands * deflake test * use updated veneer for jwt fix * Revert "deflake test" This reverts commit 386ab65. * better error messaging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm ok with the changes, but a few a few questions to confirm that the imlementation matches the original intentions/
@@ -89,7 +90,7 @@ | |||
private final Object flushLock = new Object(); | |||
private final Object elementLock = new Object(); | |||
private final Future<?> scheduledFuture; | |||
private volatile boolean isClosed = false; | |||
private SettableApiFuture<Void> closeFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this remain volatile? I mean tthe referrence itself, because it seems it is checked for non-null value withotu anyu locks in the add() method below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont believe so: it can only be set by a single caller thread and is read by that same thread or in a synchronized block.
But I dont see any harm in adding it, so I did
BatchingException cause = (BatchingException) e.getCause(); | ||
throw new BatchingException(cause.getMessage()); | ||
} else { | ||
throw new IllegalStateException("unexpected error closing the batcher", e.getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will loose the ExecutionException itself (with maybe some useful message there). I.e. why e.getCause()
instead of just e
as the second argument? Get cause will still be there as a "sub-exception".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExecutionException doesn't provide any useful info here and leaks implementation details. The caller doesn't care that this was implemented with a future, but they do care that one of their elements failed.
return closeFuture; | ||
} | ||
|
||
private void finishClose() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure that this deserves its own private method, as it is used only once and very short.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I inlined the method
boolean shouldClose = false; | ||
|
||
synchronized (flushLock) { | ||
if (numOfOutstandingBatches.decrementAndGet() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe use <= 0
just in case as a safer option guaranteeing we will not jump over 0 and go into negative infinity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that would create weird behavior. I dont think I want waiters on the flushLock to be notified multiple times
if (isClosed) { | ||
return; | ||
try { | ||
closeAsync().get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a change in behavior - looks like close used to not take flushLock
at all, so it was non-blocking. Now it is blocking (taking flushLock via closeAsync). If it was intentional, then ok, but just decided to point it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the old close impl called flush(), which called awaitAllOutstandingBatches(), which took the flush lock. So that behavior remains the same.
|
||
boolean closeImmediately; | ||
|
||
synchronized (flushLock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would mean that closeAsync
is still blocking potentially. The stuff under lock will execute immediatelly fast, but waiting for the lock may take arbitrarily long time. Is it Ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think there is any code here that would hold the lock for an arbitrary amount of time. All critical sections are explicitly coded to avoid blocking indefinitely
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vam-google thanks for looking, I think I address all of the comments, PTAL
@@ -89,7 +90,7 @@ | |||
private final Object flushLock = new Object(); | |||
private final Object elementLock = new Object(); | |||
private final Future<?> scheduledFuture; | |||
private volatile boolean isClosed = false; | |||
private SettableApiFuture<Void> closeFuture; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont believe so: it can only be set by a single caller thread and is read by that same thread or in a synchronized block.
But I dont see any harm in adding it, so I did
boolean shouldClose = false; | ||
|
||
synchronized (flushLock) { | ||
if (numOfOutstandingBatches.decrementAndGet() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that would create weird behavior. I dont think I want waiters on the flushLock to be notified multiple times
if (isClosed) { | ||
return; | ||
try { | ||
closeAsync().get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the old close impl called flush(), which called awaitAllOutstandingBatches(), which took the flush lock. So that behavior remains the same.
BatchingException cause = (BatchingException) e.getCause(); | ||
throw new BatchingException(cause.getMessage()); | ||
} else { | ||
throw new IllegalStateException("unexpected error closing the batcher", e.getCause()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExecutionException doesn't provide any useful info here and leaks implementation details. The caller doesn't care that this was implemented with a future, but they do care that one of their elements failed.
|
||
boolean closeImmediately; | ||
|
||
synchronized (flushLock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont think there is any code here that would hold the lock for an arbitrary amount of time. All critical sections are explicitly coded to avoid blocking indefinitely
return closeFuture; | ||
} | ||
|
||
private void finishClose() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I inlined the method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
🤖 I have created a release \*beep\* \*boop\* --- ## [1.67.0](https://www.github.com/googleapis/gax-java/compare/v1.66.0...v1.67.0) (2021-07-19) ### Features * introduce closeAsync to Batcher ([#1423](https://www.github.com/googleapis/gax-java/issues/1423)) ([aab5288](https://www.github.com/googleapis/gax-java/commit/aab528803405c2b5f9fc89641f47abff948a876d)) * optimize unary callables to not wait for trailers ([#1356](https://www.github.com/googleapis/gax-java/issues/1356)) ([dd5f955](https://www.github.com/googleapis/gax-java/commit/dd5f955a3ab740c677fbc6f1247094798eb814a3)) * update DirectPath environment variables ([#1412](https://www.github.com/googleapis/gax-java/issues/1412)) ([4f63b61](https://www.github.com/googleapis/gax-java/commit/4f63b61f1259936aa4a1eaf9162218c787b92f2a)) ### Bug Fixes * remove `extends ApiMessage` from `HttpJsonStubCallableFactory` definition ([#1426](https://www.github.com/googleapis/gax-java/issues/1426)) ([87636a5](https://www.github.com/googleapis/gax-java/commit/87636a5812874a77e9004aab07607121efa43736)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
This should allow callers to signal that they are done using a batcher without blocking their thread.
This will be used in bigtable-hbase to implement async batching cleanup