Skip to content

Commit

Permalink
fix: veneer adapter batching (#3053)
Browse files Browse the repository at this point in the history
* fix: veneer adapter batching

There was a race condition of the BulkReadWrapper being cleaned up before the batch executor was done with it. This PR fixes the issue by making lifecycle management more explicit  by scoping batch executors to a single  method(List) invocation and making close explicit. This will be further cleaned up once  googleapis/gax-java#1423 lands

* deflake test

* use updated veneer for jwt fix

* Revert "deflake test"

This reverts commit 386ab65.

* better error messaging
  • Loading branch information
igorbernstein2 committed Jul 2, 2021
1 parent 360e4c5 commit 6768357
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 40 deletions.
Expand Up @@ -112,7 +112,6 @@ private static void addBatchSizeAnnotation(Collection<?> c) {
protected final HBaseRequestAdapter hbaseAdapter;

protected final DataClientWrapper clientWrapper;
private BatchExecutor batchExecutor;
protected final AbstractBigtableConnection bigtableConnection;
private TableMetrics metrics = new TableMetrics();

Expand Down Expand Up @@ -183,7 +182,9 @@ public boolean[] existsAll(List<Get> gets) throws IOException {
for (Get get : gets) {
existGets.add(GetAdapter.setCheckExistenceOnly(get));
}
return getBatchExecutor().exists(existGets);
try (BatchExecutor executor = createBatchExecutor()) {
return executor.exists(existGets);
}
}
}

Expand All @@ -194,7 +195,9 @@ public void batch(List<? extends Row> actions, Object[] results)
LOG.trace("batch(List<>, Object[])");
try (Scope scope = TRACER.spanBuilder("BigtableTable.batch").startScopedSpan()) {
addBatchSizeAnnotation(actions);
getBatchExecutor().batch(actions, results);
try (BatchExecutor executor = createBatchExecutor()) {
executor.batch(actions, results);
}
}
}

Expand All @@ -205,7 +208,10 @@ public Object[] batch(List<? extends Row> actions) throws IOException, Interrupt
LOG.trace("batch(List<>)");
try (Scope scope = TRACER.spanBuilder("BigtableTable.batch").startScopedSpan()) {
addBatchSizeAnnotation(actions);
return getBatchExecutor().batch(actions);

try (BatchExecutor executor = createBatchExecutor()) {
return executor.batch(actions);
}
}
}

Expand All @@ -217,7 +223,9 @@ public <R> void batchCallback(
LOG.trace("batchCallback(List<>, Object[], Batch.Callback)");
try (Scope scope = TRACER.spanBuilder("BigtableTable.batchCallback").startScopedSpan()) {
addBatchSizeAnnotation(actions);
getBatchExecutor().batchCallback(actions, results, callback);
try (BatchExecutor executor = createBatchExecutor()) {
executor.batchCallback(actions, results, callback);
}
}
}

Expand All @@ -230,7 +238,9 @@ public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R>
try (Scope scope = TRACER.spanBuilder("BigtableTable.batchCallback").startScopedSpan()) {
addBatchSizeAnnotation(actions);
Object[] results = new Object[actions.size()];
getBatchExecutor().batchCallback(actions, results, callback);
try (BatchExecutor executor = createBatchExecutor()) {
executor.batchCallback(actions, results, callback);
}
return results;
}
}
Expand All @@ -251,7 +261,9 @@ public Result[] get(List<Get> gets) throws IOException {
} else {
try (Scope scope = TRACER.spanBuilder("BigtableTable.get").startScopedSpan()) {
addBatchSizeAnnotation(gets);
return getBatchExecutor().batch(gets);
try (BatchExecutor executor = createBatchExecutor()) {
return executor.batch(gets);
}
}
}
}
Expand Down Expand Up @@ -359,7 +371,9 @@ public void put(List<Put> puts) throws IOException {
throw createRetriesExhaustedWithDetailsException(e, puts.get(0));
}
} else {
getBatchExecutor().batch(puts);
try (BatchExecutor executor = createBatchExecutor()) {
executor.batch(puts);
}
}
}

Expand Down Expand Up @@ -403,8 +417,9 @@ public void delete(Delete delete) throws IOException {
@Override
public void delete(List<Delete> deletes) throws IOException {
LOG.trace("delete(List<Delete>)");
try (Scope scope = TRACER.spanBuilder("BigtableTable.delete").startScopedSpan()) {
getBatchExecutor().batch(deletes, true);
try (Scope scope = TRACER.spanBuilder("BigtableTable.delete").startScopedSpan();
BatchExecutor executor = createBatchExecutor()) {
executor.batch(deletes, true);
}
}

Expand Down Expand Up @@ -583,9 +598,6 @@ public long incrementColumnValue(
/** {@inheritDoc} */
@Override
public void close() throws IOException {
if (batchExecutor != null) {
batchExecutor.close();
}
// TODO: shutdown the executor.
}

Expand Down Expand Up @@ -697,12 +709,8 @@ static String makeGenericExceptionMessage(
*
* @return a {@link com.google.cloud.bigtable.hbase.BatchExecutor} object.
*/
protected synchronized BatchExecutor getBatchExecutor() {
if (batchExecutor == null) {
batchExecutor =
new BatchExecutor(bigtableConnection.getBigtableApi(), settings, hbaseAdapter);
}
return batchExecutor;
protected BatchExecutor createBatchExecutor() {
return new BatchExecutor(bigtableConnection.getBigtableApi(), settings, hbaseAdapter);
}

@Override
Expand Down
Expand Up @@ -56,7 +56,7 @@
* <p>For internal use only - public for technical reasons.
*/
@InternalApi("For internal usage only")
public class BatchExecutor {
public class BatchExecutor implements AutoCloseable {

/** Constant <code>LOG</code> */
protected static final Logger LOG = new Logger(BatchExecutor.class);
Expand Down Expand Up @@ -145,6 +145,12 @@ public BatchExecutor(
this.bufferedMutatorHelper = new BigtableBufferedMutatorHelper(bigtableApi, settings, adapter);
}

@Override
public void close() throws IOException {
bufferedMutatorHelper.close();
bulkRead.close();
}

/**
* Issue a single RPC recording the result into {@code results[index]} and if not-null, invoking
* the supplied callback.
Expand Down Expand Up @@ -321,8 +327,4 @@ public boolean[] exists(List<Get> gets) throws IOException {
}
return exists;
}

public void close() throws IOException {
bufferedMutatorHelper.close();
}
}
Expand Up @@ -28,7 +28,7 @@
* <p>For internal use only - public for technical reasons.
*/
@InternalApi("For internal usage only")
public interface BulkReadWrapper {
public interface BulkReadWrapper extends AutoCloseable {

/**
* Adds a {@code rowKey} to a batch read row request with an optional {@link Filters.Filter}. The
Expand All @@ -41,4 +41,11 @@ public interface BulkReadWrapper {
* complete.
*/
void sendOutstanding();

/**
* Closes the wrapper by preventing new elements from being added, then sending outstanding
* elements. This method is non-blocking.
*/
@Override
void close();
}
Expand Up @@ -44,6 +44,12 @@ public class BulkReadClassicApi implements BulkReadWrapper {
this.tableId = tableId;
}

@Override
public void close() {
isClosed = true;
sendOutstanding();
}

@Override
public ApiFuture<Result> add(ByteString rowKey, Filters.Filter filter) {
Preconditions.checkState(!isClosed, "can't add request when the bulk read is closed.");
Expand Down
Expand Up @@ -27,13 +27,15 @@
import com.google.cloud.bigtable.data.v2.models.Row;
import com.google.cloud.bigtable.hbase.adapters.Adapters;
import com.google.cloud.bigtable.hbase.wrappers.BulkReadWrapper;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.hadoop.hbase.client.Result;
Expand All @@ -50,7 +52,6 @@
*/
@InternalApi("For internal usage only")
public class BulkReadVeneerApi implements BulkReadWrapper {

private static final Executor CLEANUP_EXECUTOR =
Executors.newSingleThreadExecutor(
new ThreadFactory() {
Expand All @@ -69,6 +70,7 @@ public Thread newThread(Runnable r) {
// TODO: remove this once gax-java's Batcher supports asyncClose(). This will eliminate the need
// to track individual entries
private final AtomicLong cleanupBarrier;
private AtomicBoolean isClosed = new AtomicBoolean(false);
private final GrpcCallContext callContext;

BulkReadVeneerApi(BigtableDataClient client, String tableId, GrpcCallContext callContext) {
Expand All @@ -77,13 +79,22 @@ public Thread newThread(Runnable r) {
this.callContext = callContext;

this.batchers = new HashMap<>();
this.cleanupBarrier = new AtomicLong();
this.cleanupBarrier
.incrementAndGet(); // wait for sendOutstanding to signal before cleaning up the batcher map
this.cleanupBarrier = new AtomicLong(1);
}

@Override
public void close() {
// TODO: use closeAsync after https://github.com/googleapis/gax-java/pull/1423 is available
if (!isClosed.compareAndSet(false, true)) {
return;
}
notifyArrival();
}

@Override
public ApiFuture<Result> add(ByteString rowKey, @Nullable Filters.Filter filter) {
Preconditions.checkState(!isClosed.get(), "can't add request when the bulk read is closed.");

cleanupBarrier.incrementAndGet();

ApiFuture<Row> rowFuture = getOrCreateBatcher(filter).add(rowKey);
Expand All @@ -94,7 +105,7 @@ public void run() {
notifyArrival();
}
},
CLEANUP_EXECUTOR);
MoreExecutors.directExecutor());

return ApiFutures.transform(
rowFuture,
Expand All @@ -107,30 +118,38 @@ public Result apply(Row row) {
MoreExecutors.directExecutor());
}

/** Called when a completion action (getting a result for an element or close) occurs */
private void notifyArrival() {
if (cleanupBarrier.decrementAndGet() == 0) {
cleanUp();
}
}

/** close all outstanding Batchers */
/** close all outstanding Batchers to avoid orphaned batcher warnings */
private void cleanUp() {
for (Batcher<ByteString, Row> batcher : batchers.values()) {
try {
batcher.close();
} catch (Throwable ignored) {
// Ignored
}
}
batchers.clear();
// Close batchers out of band to avoid deadlock.
// See https://github.com/googleapis/java-bigtable-hbase/pull/2484#issuecomment-612972727 for
// more details
CLEANUP_EXECUTOR.execute(
new Runnable() {
@Override
public void run() {
for (Batcher<ByteString, Row> batcher : batchers.values()) {
try {
batcher.close();
} catch (Throwable ignored) {
// Ignored
}
}
}
});
}

@Override
public void sendOutstanding() {
for (Batcher<ByteString, Row> batcher : batchers.values()) {
batcher.sendOutstanding();
}
notifyArrival();
}

private Batcher<ByteString, Row> getOrCreateBatcher(@Nullable Filters.Filter filter) {
Expand Down

0 comments on commit 6768357

Please sign in to comment.