Skip to content

Commit

Permalink
feat: add async api (#81)
Browse files Browse the repository at this point in the history
* feat: add async api

* feat: session pool is non-blocking

* tests: fix integration tests that assumed tx was blocking

Some integration tests started transactions without executing a query,
and expected these transactions to fail. However, as the client is
now non-blocking up until the first call to ResultSet#next(), no
exception would occur.

* feat: add read methods support

* tests: test async runner

* feat: create async runner

* tests: centralize some commonly used test objects

* feat: keep session checked out until async finishes

* fix: fix span test cases after rebase

* fix: fix async runner tests

* fix: make async runner wait for async operations

* examples: add example integration test

* examples: add more examples

* tests: fix flaky tests

* rebase: rebase on current master

* fix: run code formatter

* feat: add support for poller

* tests: support more param types

* fix: fix race conditions

* feat: return ApiFuture to monitor end of AsyncResultSet

* feat: add helper method for create test result sets

* feat: add batchUpdateAsync

* fix: add ignored interface differences

* refactor: use future as waiter in SessionPool

* format: run code formatter

* tests: fix test case + remove commented code

* fix: AsyncResultSet should throw Cancelled

* feat: expose DatabaseId.of(String name)

* deps: set version to 1.53 to match bom

* feat: steps to add async support for tx manager

* review: process review comments

* chore: remove unused code

* clirr: add ignored differences to clirr

* fix: call listeners after all rows have been consumed

* feat: towards AsyncTransactionManager

* fix: session leaks + code format

* fix: more session leak fixes

* feat: further work on AsyncTransactionManager

* fix: fix test failures

* fix: fix several race conditions

* tests: increase test timeout

* feat: further towards AsyncTransactionManager

* feat: require executor for transaction functions

* revert: remove async connection api from branch

* chore: run code formatter

* chore: fix flaky test case

* tests: fix ITs for emulator

* fix: SpannerOptions.toBuilder().host should override emulatorHost

* tests: fix potentially hanging test
  • Loading branch information
olavloite committed Jun 30, 2020
1 parent 965e95e commit 462839b
Show file tree
Hide file tree
Showing 69 changed files with 9,895 additions and 687 deletions.
83 changes: 82 additions & 1 deletion google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -93,7 +93,6 @@
<className>com/google/cloud/spanner/DatabaseAdminClient</className>
<method>com.google.cloud.spanner.Backup updateBackup(java.lang.String, java.lang.String, com.google.cloud.Timestamp)</method>
</difference>

<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
Expand Down Expand Up @@ -147,6 +146,88 @@
<method>com.google.api.gax.paging.Page listDatabases()</method>
</difference>

<!-- Add Async API -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.api.core.ApiFuture executeQueryAsync(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>* runAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>* transactionManagerAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/Spanner</className>
<method>* getAsyncExecutorProvider(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* executeQueryAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* readAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* readRowAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* readUsingIndexAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* readRowUsingIndexAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>* batchUpdateAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>* executeUpdateAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>* beginTransactionAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>* commitAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>* rollbackAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>* executeBatchDmlAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>* executeQueryAsync(*)</method>
</difference>

<!-- Adding operation RPCs to InstanceAdminClient. -->
<difference>
<differenceType>7012</differenceType>
Expand Down
Expand Up @@ -21,16 +21,24 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
import com.google.cloud.spanner.AbstractResultSet.GrpcResultSet;
import com.google.cloud.spanner.AbstractResultSet.GrpcStreamIterator;
import com.google.cloud.spanner.AbstractResultSet.ResumableStreamIterator;
import com.google.cloud.spanner.AsyncResultSet.CallbackResponse;
import com.google.cloud.spanner.AsyncResultSet.ReadyCallback;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
Expand Down Expand Up @@ -62,6 +70,7 @@ abstract static class Builder<B extends Builder<?, T>, T extends AbstractReadCon
private Span span = Tracing.getTracer().getCurrentSpan();
private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS;
private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS;
private ExecutorProvider executorProvider;

Builder() {}

Expand Down Expand Up @@ -95,9 +104,25 @@ B setDefaultQueryOptions(QueryOptions defaultQueryOptions) {
return self();
}

B setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
return self();
}

abstract T build();
}

/**
* {@link AsyncResultSet} that supports adding listeners that are called when all rows from the
* underlying result stream have been fetched.
*/
interface ListenableAsyncResultSet extends AsyncResultSet {
/** Adds a listener to this {@link AsyncResultSet}. */
void addListener(Runnable listener);

void removeListener(Runnable listener);
}

/**
* A {@code ReadContext} for standalone reads. This can only be used for a single operation, since
* each standalone read may see a different timestamp of Cloud Spanner data.
Expand Down Expand Up @@ -350,7 +375,8 @@ void initTransaction() {
final Object lock = new Object();
final SessionImpl session;
final SpannerRpc rpc;
final Span span;
final ExecutorProvider executorProvider;
Span span;
private final int defaultPrefetchChunks;
private final QueryOptions defaultQueryOptions;

Expand All @@ -374,6 +400,12 @@ void initTransaction() {
this.defaultPrefetchChunks = builder.defaultPrefetchChunks;
this.defaultQueryOptions = builder.defaultQueryOptions;
this.span = builder.span;
this.executorProvider = builder.executorProvider;
}

@Override
public void setSpan(Span span) {
this.span = span;
}

long getSeqNo() {
Expand All @@ -386,12 +418,38 @@ public final ResultSet read(
return readInternal(table, null, keys, columns, options);
}

@Override
public ListenableAsyncResultSet readAsync(
String table, KeySet keys, Iterable<String> columns, ReadOption... options) {
Options readOptions = Options.fromReadOptions(options);
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider, readInternal(table, null, keys, columns, options), bufferRows);
}

@Override
public final ResultSet readUsingIndex(
String table, String index, KeySet keys, Iterable<String> columns, ReadOption... options) {
return readInternal(table, checkNotNull(index), keys, columns, options);
}

@Override
public ListenableAsyncResultSet readUsingIndexAsync(
String table, String index, KeySet keys, Iterable<String> columns, ReadOption... options) {
Options readOptions = Options.fromReadOptions(options);
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider,
readInternal(table, checkNotNull(index), keys, columns, options),
bufferRows);
}

@Nullable
@Override
public final Struct readRow(String table, Key key, Iterable<String> columns) {
Expand All @@ -400,6 +458,13 @@ public final Struct readRow(String table, Key key, Iterable<String> columns) {
}
}

@Override
public final ApiFuture<Struct> readRowAsync(String table, Key key, Iterable<String> columns) {
try (AsyncResultSet resultSet = readAsync(table, KeySet.singleKey(key), columns)) {
return consumeSingleRowAsync(resultSet);
}
}

@Nullable
@Override
public final Struct readRowUsingIndex(
Expand All @@ -409,12 +474,35 @@ public final Struct readRowUsingIndex(
}
}

@Override
public final ApiFuture<Struct> readRowUsingIndexAsync(
String table, String index, Key key, Iterable<String> columns) {
try (AsyncResultSet resultSet =
readUsingIndexAsync(table, index, KeySet.singleKey(key), columns)) {
return consumeSingleRowAsync(resultSet);
}
}

@Override
public final ResultSet executeQuery(Statement statement, QueryOption... options) {
return executeQueryInternal(
statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options);
}

@Override
public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOption... options) {
Options readOptions = Options.fromQueryOptions(options);
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider,
executeQueryInternal(
statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options),
bufferRows);
}

@Override
public final ResultSet analyzeQuery(Statement statement, QueryAnalyzeMode readContextQueryMode) {
switch (readContextQueryMode) {
Expand Down Expand Up @@ -666,4 +754,71 @@ private Struct consumeSingleRow(ResultSet resultSet) {
}
return row;
}

static ApiFuture<Struct> consumeSingleRowAsync(AsyncResultSet resultSet) {
final SettableApiFuture<Struct> result = SettableApiFuture.create();
// We can safely use a directExecutor here, as we will only be consuming one row, and we will
// not be doing any blocking stuff in the handler.
final SettableApiFuture<Struct> row = SettableApiFuture.create();
ApiFutures.addCallback(
resultSet.setCallback(MoreExecutors.directExecutor(), ConsumeSingleRowCallback.create(row)),
new ApiFutureCallback<Void>() {
@Override
public void onFailure(Throwable t) {
result.setException(t);
}

@Override
public void onSuccess(Void input) {
try {
result.set(row.get());
} catch (Throwable t) {
result.setException(t);
}
}
},
MoreExecutors.directExecutor());
return result;
}

/**
* {@link ReadyCallback} for returning the first row in a result set as a future {@link Struct}.
*/
private static class ConsumeSingleRowCallback implements ReadyCallback {
private final SettableApiFuture<Struct> result;
private Struct row;

static ConsumeSingleRowCallback create(SettableApiFuture<Struct> result) {
return new ConsumeSingleRowCallback(result);
}

private ConsumeSingleRowCallback(SettableApiFuture<Struct> result) {
this.result = result;
}

@Override
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
try {
switch (resultSet.tryNext()) {
case DONE:
result.set(row);
return CallbackResponse.DONE;
case NOT_READY:
return CallbackResponse.CONTINUE;
case OK:
if (row != null) {
throw newSpannerException(
ErrorCode.INTERNAL, "Multiple rows returned for single key");
}
row = resultSet.getCurrentRowAsStruct();
return CallbackResponse.CONTINUE;
default:
throw new IllegalStateException();
}
} catch (Throwable t) {
result.setException(t);
return CallbackResponse.DONE;
}
}
}
}
Expand Up @@ -495,7 +495,7 @@ private static Struct decodeStructValue(Type structType, ListValue structValue)
return new GrpcStruct(structType, fields);
}

private static Object decodeArrayValue(Type elementType, ListValue listValue) {
static Object decodeArrayValue(Type elementType, ListValue listValue) {
switch (elementType.getCode()) {
case BOOL:
// Use a view: element conversion is virtually free.
Expand Down Expand Up @@ -1009,7 +1009,7 @@ protected PartialResultSet computeNext() {
}
}

private static double valueProtoToFloat64(com.google.protobuf.Value proto) {
static double valueProtoToFloat64(com.google.protobuf.Value proto) {
if (proto.getKindCase() == KindCase.STRING_VALUE) {
switch (proto.getStringValue()) {
case "-Infinity":
Expand Down Expand Up @@ -1037,7 +1037,7 @@ private static double valueProtoToFloat64(com.google.protobuf.Value proto) {
return proto.getNumberValue();
}

private static NullPointerException throwNotNull(int columnIndex) {
static NullPointerException throwNotNull(int columnIndex) {
throw new NullPointerException(
"Cannot call array getter for column " + columnIndex + " with null elements");
}
Expand All @@ -1048,7 +1048,7 @@ private static NullPointerException throwNotNull(int columnIndex) {
* {@code BigDecimal} respectively. Rather than construct new wrapper objects for each array
* element, we use primitive arrays and a {@code BitSet} to track nulls.
*/
private abstract static class PrimitiveArray<T, A> extends AbstractList<T> {
abstract static class PrimitiveArray<T, A> extends AbstractList<T> {
private final A data;
private final BitSet nulls;
private final int size;
Expand Down Expand Up @@ -1103,7 +1103,7 @@ A toPrimitiveArray(int columnIndex) {
}
}

private static class Int64Array extends PrimitiveArray<Long, long[]> {
static class Int64Array extends PrimitiveArray<Long, long[]> {
Int64Array(ListValue protoList) {
super(protoList);
}
Expand All @@ -1128,7 +1128,7 @@ Long get(long[] array, int i) {
}
}

private static class Float64Array extends PrimitiveArray<Double, double[]> {
static class Float64Array extends PrimitiveArray<Double, double[]> {
Float64Array(ListValue protoList) {
super(protoList);
}
Expand Down

0 comments on commit 462839b

Please sign in to comment.