Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add async api #81

Merged
merged 59 commits into from Jun 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
e5e8df8
feat: add async api
olavloite Feb 20, 2020
c7db649
feat: session pool is non-blocking
olavloite Feb 23, 2020
e485709
tests: fix integration tests that assumed tx was blocking
olavloite Feb 24, 2020
e3ebeb3
feat: add read methods support
olavloite Feb 25, 2020
54629ad
tests: test async runner
olavloite Feb 25, 2020
8a10b64
feat: create async runner
olavloite Feb 25, 2020
91253cf
tests: centralize some commonly used test objects
olavloite Feb 26, 2020
a2d28cd
feat: keep session checked out until async finishes
olavloite Feb 26, 2020
2a63e62
fix: fix span test cases after rebase
olavloite Feb 26, 2020
9d58bc3
fix: fix async runner tests
olavloite Feb 27, 2020
4f79632
fix: make async runner wait for async operations
olavloite Feb 28, 2020
cfd1802
examples: add example integration test
olavloite Feb 28, 2020
a2e28a5
examples: add more examples
olavloite Feb 28, 2020
fa61e7d
tests: fix flaky tests
olavloite Feb 28, 2020
fc53dbf
rebase: rebase on current master
olavloite Mar 16, 2020
0eee1f6
fix: run code formatter
olavloite Mar 20, 2020
d3d2ffc
feat: add support for poller
olavloite Mar 23, 2020
2e01ca7
tests: support more param types
olavloite Apr 2, 2020
5e63f2b
fix: fix race conditions
olavloite Apr 8, 2020
abe455e
feat: return ApiFuture to monitor end of AsyncResultSet
olavloite Apr 9, 2020
cc091e8
feat: add helper method for create test result sets
olavloite Apr 19, 2020
944c701
merge: merge latest changes from master
olavloite Apr 21, 2020
d75f979
feat: add batchUpdateAsync
olavloite Apr 21, 2020
7f06e84
fix: add ignored interface differences
olavloite Apr 26, 2020
c1b0615
merge: merge master into async branch
olavloite Apr 26, 2020
00b83d2
refactor: use future as waiter in SessionPool
olavloite Apr 26, 2020
2ea27d7
Merge branch 'master' into async-api
olavloite Apr 27, 2020
17fb940
merge: merge with latest from master
olavloite Apr 29, 2020
51b5113
format: run code formatter
olavloite Apr 29, 2020
a03380b
tests: fix test case + remove commented code
olavloite Apr 29, 2020
8bc3a13
fix: AsyncResultSet should throw Cancelled
olavloite Apr 29, 2020
0927957
Merge branch 'master' into async-api
olavloite May 6, 2020
e486c54
feat: expose DatabaseId.of(String name)
olavloite May 11, 2020
a7dd1dd
deps: set version to 1.53 to match bom
olavloite May 12, 2020
c662ed7
feat: steps to add async support for tx manager
olavloite Jun 9, 2020
226f91b
review: process review comments
olavloite Jun 11, 2020
a10b61c
fix: run formatter
olavloite Jun 11, 2020
64b8a34
chore: remove unused code
olavloite Jun 11, 2020
3354344
clirr: add ignored differences to clirr
olavloite Jun 11, 2020
2b2a6a4
Merge branch 'master' into async-api
olavloite Jun 12, 2020
1fada1c
fix: call listeners after all rows have been consumed
olavloite Jun 12, 2020
35743e6
feat: towards AsyncTransactionManager
olavloite Jun 12, 2020
48de42e
Merge branch 'master' into async-api
olavloite Jun 13, 2020
888edd8
fix: session leaks + code format
olavloite Jun 13, 2020
b2a7176
fix: more session leak fixes
olavloite Jun 13, 2020
fba270f
feat: further work on AsyncTransactionManager
olavloite Jun 14, 2020
8a0ad3f
fix: fix test failures
olavloite Jun 15, 2020
8022457
fix: fix several race conditions
olavloite Jun 15, 2020
5e84d34
tests: increase test timeout
olavloite Jun 15, 2020
707382b
Merge branch 'master' into async-api
olavloite Jun 21, 2020
fcf37fd
feat: further towards AsyncTransactionManager
olavloite Jun 21, 2020
910b6c7
feat: require executor for transaction functions
olavloite Jun 21, 2020
86f85c0
revert: remove async connection api from branch
olavloite Jun 21, 2020
f5af48f
chore: run code formatter
olavloite Jun 21, 2020
b38d164
chore: fix flaky test case
olavloite Jun 21, 2020
fda9f01
Merge branch 'master' into async-api
olavloite Jun 30, 2020
eee881a
tests: fix ITs for emulator
olavloite Jun 30, 2020
a1ef640
fix: SpannerOptions.toBuilder().host should override emulatorHost
olavloite Jun 30, 2020
ea45612
tests: fix potentially hanging test
olavloite Jun 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
83 changes: 82 additions & 1 deletion google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -93,7 +93,6 @@
<className>com/google/cloud/spanner/DatabaseAdminClient</className>
<method>com.google.cloud.spanner.Backup updateBackup(java.lang.String, java.lang.String, com.google.cloud.Timestamp)</method>
</difference>

<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
Expand Down Expand Up @@ -147,6 +146,88 @@
<method>com.google.api.gax.paging.Page listDatabases()</method>
</difference>

<!-- Add Async API -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>com.google.api.core.ApiFuture executeQueryAsync(com.google.spanner.v1.ExecuteSqlRequest, java.util.Map)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>* runAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>* transactionManagerAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/Spanner</className>
<method>* getAsyncExecutorProvider(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* executeQueryAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* readAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* readRowAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* readUsingIndexAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/ReadContext</className>
<method>* readRowUsingIndexAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>* batchUpdateAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>* executeUpdateAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>* beginTransactionAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>* commitAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>* rollbackAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/spi/v1/SpannerRpc</className>
<method>* executeBatchDmlAsync(*)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>* executeQueryAsync(*)</method>
</difference>

<!-- Adding operation RPCs to InstanceAdminClient. -->
<difference>
<differenceType>7012</differenceType>
Expand Down
Expand Up @@ -21,16 +21,24 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
import com.google.cloud.spanner.AbstractResultSet.GrpcResultSet;
import com.google.cloud.spanner.AbstractResultSet.GrpcStreamIterator;
import com.google.cloud.spanner.AbstractResultSet.ResumableStreamIterator;
import com.google.cloud.spanner.AsyncResultSet.CallbackResponse;
import com.google.cloud.spanner.AsyncResultSet.ReadyCallback;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
Expand Down Expand Up @@ -62,6 +70,7 @@ abstract static class Builder<B extends Builder<?, T>, T extends AbstractReadCon
private Span span = Tracing.getTracer().getCurrentSpan();
private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS;
private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS;
private ExecutorProvider executorProvider;

Builder() {}

Expand Down Expand Up @@ -95,9 +104,25 @@ B setDefaultQueryOptions(QueryOptions defaultQueryOptions) {
return self();
}

B setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = executorProvider;
return self();
}

abstract T build();
}

/**
* {@link AsyncResultSet} that supports adding listeners that are called when all rows from the
* underlying result stream have been fetched.
*/
interface ListenableAsyncResultSet extends AsyncResultSet {
/** Adds a listener to this {@link AsyncResultSet}. */
void addListener(Runnable listener);

void removeListener(Runnable listener);
}

/**
* A {@code ReadContext} for standalone reads. This can only be used for a single operation, since
* each standalone read may see a different timestamp of Cloud Spanner data.
Expand Down Expand Up @@ -350,7 +375,8 @@ void initTransaction() {
final Object lock = new Object();
final SessionImpl session;
final SpannerRpc rpc;
final Span span;
final ExecutorProvider executorProvider;
Span span;
private final int defaultPrefetchChunks;
private final QueryOptions defaultQueryOptions;

Expand All @@ -374,6 +400,12 @@ void initTransaction() {
this.defaultPrefetchChunks = builder.defaultPrefetchChunks;
this.defaultQueryOptions = builder.defaultQueryOptions;
this.span = builder.span;
this.executorProvider = builder.executorProvider;
}

@Override
public void setSpan(Span span) {
this.span = span;
}

long getSeqNo() {
Expand All @@ -386,12 +418,38 @@ public final ResultSet read(
return readInternal(table, null, keys, columns, options);
}

@Override
public ListenableAsyncResultSet readAsync(
String table, KeySet keys, Iterable<String> columns, ReadOption... options) {
Options readOptions = Options.fromReadOptions(options);
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider, readInternal(table, null, keys, columns, options), bufferRows);
}

@Override
public final ResultSet readUsingIndex(
String table, String index, KeySet keys, Iterable<String> columns, ReadOption... options) {
return readInternal(table, checkNotNull(index), keys, columns, options);
}

@Override
public ListenableAsyncResultSet readUsingIndexAsync(
String table, String index, KeySet keys, Iterable<String> columns, ReadOption... options) {
Options readOptions = Options.fromReadOptions(options);
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider,
readInternal(table, checkNotNull(index), keys, columns, options),
bufferRows);
}

@Nullable
@Override
public final Struct readRow(String table, Key key, Iterable<String> columns) {
Expand All @@ -400,6 +458,13 @@ public final Struct readRow(String table, Key key, Iterable<String> columns) {
}
}

@Override
public final ApiFuture<Struct> readRowAsync(String table, Key key, Iterable<String> columns) {
try (AsyncResultSet resultSet = readAsync(table, KeySet.singleKey(key), columns)) {
return consumeSingleRowAsync(resultSet);
}
}

@Nullable
@Override
public final Struct readRowUsingIndex(
Expand All @@ -409,12 +474,35 @@ public final Struct readRowUsingIndex(
}
}

@Override
public final ApiFuture<Struct> readRowUsingIndexAsync(
String table, String index, Key key, Iterable<String> columns) {
try (AsyncResultSet resultSet =
readUsingIndexAsync(table, index, KeySet.singleKey(key), columns)) {
return consumeSingleRowAsync(resultSet);
}
}

@Override
public final ResultSet executeQuery(Statement statement, QueryOption... options) {
return executeQueryInternal(
statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options);
}

@Override
public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOption... options) {
Options readOptions = Options.fromQueryOptions(options);
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider,
executeQueryInternal(
statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options),
bufferRows);
}

@Override
public final ResultSet analyzeQuery(Statement statement, QueryAnalyzeMode readContextQueryMode) {
switch (readContextQueryMode) {
Expand Down Expand Up @@ -666,4 +754,71 @@ private Struct consumeSingleRow(ResultSet resultSet) {
}
return row;
}

static ApiFuture<Struct> consumeSingleRowAsync(AsyncResultSet resultSet) {
final SettableApiFuture<Struct> result = SettableApiFuture.create();
// We can safely use a directExecutor here, as we will only be consuming one row, and we will
// not be doing any blocking stuff in the handler.
final SettableApiFuture<Struct> row = SettableApiFuture.create();
ApiFutures.addCallback(
resultSet.setCallback(MoreExecutors.directExecutor(), ConsumeSingleRowCallback.create(row)),
new ApiFutureCallback<Void>() {
@Override
public void onFailure(Throwable t) {
result.setException(t);
}

@Override
public void onSuccess(Void input) {
try {
result.set(row.get());
} catch (Throwable t) {
result.setException(t);
}
}
},
MoreExecutors.directExecutor());
return result;
}

/**
* {@link ReadyCallback} for returning the first row in a result set as a future {@link Struct}.
*/
private static class ConsumeSingleRowCallback implements ReadyCallback {
private final SettableApiFuture<Struct> result;
private Struct row;

static ConsumeSingleRowCallback create(SettableApiFuture<Struct> result) {
return new ConsumeSingleRowCallback(result);
}

private ConsumeSingleRowCallback(SettableApiFuture<Struct> result) {
this.result = result;
}

@Override
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
try {
switch (resultSet.tryNext()) {
case DONE:
result.set(row);
return CallbackResponse.DONE;
case NOT_READY:
return CallbackResponse.CONTINUE;
case OK:
if (row != null) {
throw newSpannerException(
ErrorCode.INTERNAL, "Multiple rows returned for single key");
}
row = resultSet.getCurrentRowAsStruct();
return CallbackResponse.CONTINUE;
default:
throw new IllegalStateException();
}
} catch (Throwable t) {
result.setException(t);
return CallbackResponse.DONE;
}
}
}
}
Expand Up @@ -495,7 +495,7 @@ private static Struct decodeStructValue(Type structType, ListValue structValue)
return new GrpcStruct(structType, fields);
}

private static Object decodeArrayValue(Type elementType, ListValue listValue) {
static Object decodeArrayValue(Type elementType, ListValue listValue) {
switch (elementType.getCode()) {
case BOOL:
// Use a view: element conversion is virtually free.
Expand Down Expand Up @@ -1009,7 +1009,7 @@ protected PartialResultSet computeNext() {
}
}

private static double valueProtoToFloat64(com.google.protobuf.Value proto) {
static double valueProtoToFloat64(com.google.protobuf.Value proto) {
if (proto.getKindCase() == KindCase.STRING_VALUE) {
switch (proto.getStringValue()) {
case "-Infinity":
Expand Down Expand Up @@ -1037,7 +1037,7 @@ private static double valueProtoToFloat64(com.google.protobuf.Value proto) {
return proto.getNumberValue();
}

private static NullPointerException throwNotNull(int columnIndex) {
static NullPointerException throwNotNull(int columnIndex) {
throw new NullPointerException(
"Cannot call array getter for column " + columnIndex + " with null elements");
}
Expand All @@ -1048,7 +1048,7 @@ private static NullPointerException throwNotNull(int columnIndex) {
* {@code BigDecimal} respectively. Rather than construct new wrapper objects for each array
* element, we use primitive arrays and a {@code BitSet} to track nulls.
*/
private abstract static class PrimitiveArray<T, A> extends AbstractList<T> {
abstract static class PrimitiveArray<T, A> extends AbstractList<T> {
private final A data;
private final BitSet nulls;
private final int size;
Expand Down Expand Up @@ -1103,7 +1103,7 @@ A toPrimitiveArray(int columnIndex) {
}
}

private static class Int64Array extends PrimitiveArray<Long, long[]> {
static class Int64Array extends PrimitiveArray<Long, long[]> {
Int64Array(ListValue protoList) {
super(protoList);
}
Expand All @@ -1128,7 +1128,7 @@ Long get(long[] array, int i) {
}
}

private static class Float64Array extends PrimitiveArray<Double, double[]> {
static class Float64Array extends PrimitiveArray<Double, double[]> {
Float64Array(ListValue protoList) {
super(protoList);
}
Expand Down