Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

feat: introduce closeAsync to Batcher #1423

Merged
merged 8 commits into from Jul 16, 2021

Conversation

igorbernstein2
Copy link
Contributor

This should allow callers to signal that they are done using a batcher without blocking their thread.
This will be used in bigtable-hbase to implement async batching cleanup

This should allow callers to signal that they are done using a batcher without blocking their thread.
@igorbernstein2 igorbernstein2 requested review from a team as code owners July 2, 2021 11:58
@google-cla google-cla bot added the cla: yes This human has signed the Contributor License Agreement. label Jul 2, 2021
igorbernstein2 added a commit to igorbernstein2/cloud-bigtable-client that referenced this pull request Jul 2, 2021
There was a race condition of the BulkReadWrapper being cleaned up before the batch executor was done with it. This PR fixes the issue by making lifecycle management more explicit  by scoping batch executors to a single  method(List) invocation and making close explicit. This will be further cleaned up once  googleapis/gax-java#1423 lands
elharo
elharo previously requested changes Jul 2, 2021
Copy link
Contributor

@elharo elharo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There don't seem to be any tests for the new behavior.

@igorbernstein2
Copy link
Contributor Author

igorbernstein2 commented Jul 2, 2021

added async specific tests, which in combination with the existing sync close tests to cover the functionality fully

igorbernstein2 added a commit to googleapis/java-bigtable-hbase that referenced this pull request Jul 2, 2021
* fix: veneer adapter batching

There was a race condition of the BulkReadWrapper being cleaned up before the batch executor was done with it. This PR fixes the issue by making lifecycle management more explicit  by scoping batch executors to a single  method(List) invocation and making close explicit. This will be further cleaned up once  googleapis/gax-java#1423 lands

* deflake test

* use updated veneer for jwt fix

* Revert "deflake test"

This reverts commit 386ab65.

* better error messaging
Copy link
Contributor

@vam-google vam-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with the changes, but a few a few questions to confirm that the imlementation matches the original intentions/

@@ -89,7 +90,7 @@
private final Object flushLock = new Object();
private final Object elementLock = new Object();
private final Future<?> scheduledFuture;
private volatile boolean isClosed = false;
private SettableApiFuture<Void> closeFuture;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this remain volatile? I mean tthe referrence itself, because it seems it is checked for non-null value withotu anyu locks in the add() method below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont believe so: it can only be set by a single caller thread and is read by that same thread or in a synchronized block.

But I dont see any harm in adding it, so I did

BatchingException cause = (BatchingException) e.getCause();
throw new BatchingException(cause.getMessage());
} else {
throw new IllegalStateException("unexpected error closing the batcher", e.getCause());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will loose the ExecutionException itself (with maybe some useful message there). I.e. why e.getCause() instead of just e as the second argument? Get cause will still be there as a "sub-exception".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutionException doesn't provide any useful info here and leaks implementation details. The caller doesn't care that this was implemented with a future, but they do care that one of their elements failed.

return closeFuture;
}

private void finishClose() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that this deserves its own private method, as it is used only once and very short.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I inlined the method

boolean shouldClose = false;

synchronized (flushLock) {
if (numOfOutstandingBatches.decrementAndGet() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use <= 0 just in case as a safer option guaranteeing we will not jump over 0 and go into negative infinity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would create weird behavior. I dont think I want waiters on the flushLock to be notified multiple times

if (isClosed) {
return;
try {
closeAsync().get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a change in behavior - looks like close used to not take flushLock at all, so it was non-blocking. Now it is blocking (taking flushLock via closeAsync). If it was intentional, then ok, but just decided to point it out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the old close impl called flush(), which called awaitAllOutstandingBatches(), which took the flush lock. So that behavior remains the same.


boolean closeImmediately;

synchronized (flushLock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would mean that closeAsync is still blocking potentially. The stuff under lock will execute immediatelly fast, but waiting for the lock may take arbitrarily long time. Is it Ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think there is any code here that would hold the lock for an arbitrary amount of time. All critical sections are explicitly coded to avoid blocking indefinitely

Copy link
Contributor Author

@igorbernstein2 igorbernstein2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vam-google thanks for looking, I think I address all of the comments, PTAL

@@ -89,7 +90,7 @@
private final Object flushLock = new Object();
private final Object elementLock = new Object();
private final Future<?> scheduledFuture;
private volatile boolean isClosed = false;
private SettableApiFuture<Void> closeFuture;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont believe so: it can only be set by a single caller thread and is read by that same thread or in a synchronized block.

But I dont see any harm in adding it, so I did

boolean shouldClose = false;

synchronized (flushLock) {
if (numOfOutstandingBatches.decrementAndGet() == 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would create weird behavior. I dont think I want waiters on the flushLock to be notified multiple times

if (isClosed) {
return;
try {
closeAsync().get();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the old close impl called flush(), which called awaitAllOutstandingBatches(), which took the flush lock. So that behavior remains the same.

BatchingException cause = (BatchingException) e.getCause();
throw new BatchingException(cause.getMessage());
} else {
throw new IllegalStateException("unexpected error closing the batcher", e.getCause());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExecutionException doesn't provide any useful info here and leaks implementation details. The caller doesn't care that this was implemented with a future, but they do care that one of their elements failed.


boolean closeImmediately;

synchronized (flushLock) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think there is any code here that would hold the lock for an arbitrary amount of time. All critical sections are explicitly coded to avoid blocking indefinitely

return closeFuture;
}

private void finishClose() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I inlined the method

Copy link
Contributor

@vam-google vam-google left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@igorbernstein2 igorbernstein2 merged commit aab5288 into googleapis:master Jul 16, 2021
@igorbernstein2 igorbernstein2 deleted the async-close branch July 16, 2021 19:19
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
cla: yes This human has signed the Contributor License Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants