Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: async connection API #392

Merged
merged 41 commits into from Oct 8, 2020
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
ec4cc30
feat: support setting timeout per RPC
olavloite Aug 10, 2020
dd27157
fix: change grpc deps from test to compile scope
olavloite Aug 10, 2020
2ae9a11
feat: add async api for connection
olavloite Aug 12, 2020
5d28817
fix: fix test failures
olavloite Aug 23, 2020
b565560
fix: move state handling from callback to callable
olavloite Aug 24, 2020
2446111
fix: fix integration tests with emulator
olavloite Aug 24, 2020
321d23c
fix: fix timeout integration test on emulator
olavloite Aug 24, 2020
c753e7c
fix: prevent flakiness in DDL tests
olavloite Aug 24, 2020
0a80a85
fix: fix clirr build failures
olavloite Aug 25, 2020
79308b6
fix: do not set transaction state for Aborted err
olavloite Aug 25, 2020
8d5a618
Merge branch 'master' into async-connection
olavloite Aug 25, 2020
2d531c5
fix: set transaction state after retry
olavloite Aug 25, 2020
1ce6795
cleanup: remove sync methods and use async instead
olavloite Aug 25, 2020
a33a31f
cleanup: remove unused code
olavloite Aug 25, 2020
9111798
feat: make ddl async
olavloite Aug 25, 2020
2711d47
fix: reduce timeout and remove debug info
olavloite Aug 25, 2020
895b024
feat: make runBatch async
olavloite Aug 25, 2020
3b8d29e
test: set forkCount to 1 to investigate test failure
olavloite Aug 25, 2020
fdc64a3
fix: linting + clirr
olavloite Aug 25, 2020
c6ff32d
fix: prevent deadlock in DmlBatch
olavloite Aug 25, 2020
3096ef6
fix: fix DMLBatch state handling
olavloite Aug 26, 2020
0395413
tests: add tests for aborted async transactions
olavloite Aug 26, 2020
bb130b1
test: add aborted tests
olavloite Aug 26, 2020
35210df
fix: add change to clirr + more tests
olavloite Aug 29, 2020
2e84dc1
fix: require a rollback after a tx has aborted
olavloite Aug 30, 2020
2500e42
docs: add javadoc for new methods
olavloite Aug 30, 2020
24bc392
tests: add integration tests
olavloite Aug 30, 2020
d671278
fix: wait for commit before select
olavloite Aug 31, 2020
a258579
fix: fix handling aborted commit
olavloite Sep 4, 2020
2878257
docs: document behavior -Async methods
olavloite Sep 5, 2020
81b70c3
fix: iterating without callback could cause exception
olavloite Sep 8, 2020
5c75bee
fix: remove todos and commented code
olavloite Sep 8, 2020
f1a40ee
feat: keep track of caller to include in stacktrace
olavloite Sep 11, 2020
a541398
Merge branch 'master' into async-connection
olavloite Sep 11, 2020
51ef086
docs: explain why Aborted is active
olavloite Sep 11, 2020
ded2101
fix: use ticker for better testability
olavloite Sep 11, 2020
5989efb
test: increase coverage and remove unused code
olavloite Sep 11, 2020
c50f1ed
test: add additional tests
olavloite Sep 12, 2020
c5fba07
Merge branch 'master' into async-connection
olavloite Oct 5, 2020
3e775b1
docs: add missing @override
olavloite Oct 6, 2020
b6e175a
docs: fix comment
olavloite Oct 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 52 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -305,4 +305,56 @@
<className>com/google/cloud/spanner/Value</className>
<method>java.util.List getNumericArray()</method>
</difference>

<!-- Async Connection API -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture beginTransactionAsync()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture commitAsync()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.connection.AsyncStatementResult executeAsync(com.google.cloud.spanner.Statement)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture executeBatchUpdateAsync(java.lang.Iterable)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture executeUpdateAsync(com.google.cloud.spanner.Statement)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture rollbackAsync()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture runBatchAsync()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture writeAsync(com.google.cloud.spanner.Mutation)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture writeAsync(java.lang.Iterable)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/ResultSets</className>
<method>com.google.cloud.spanner.AsyncResultSet toAsyncResultSet(com.google.cloud.spanner.ResultSet, com.google.api.gax.core.ExecutorProvider)</method>
</difference>
</differences>
Expand Up @@ -25,6 +25,8 @@
import com.google.cloud.spanner.AbstractReadContext.ListenableAsyncResultSet;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -88,7 +90,7 @@ private State(boolean shouldStop) {
private final BlockingDeque<Struct> buffer;
private Struct currentRow;
/** The underlying synchronous {@link ResultSet} that is producing the rows. */
olavloite marked this conversation as resolved.
Show resolved Hide resolved
private final ResultSet delegateResultSet;
private final Supplier<ResultSet> delegateResultSet;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The executeQueryAsync method will produce a Future<ResultSet> that should be used as the underlying delegate of an AsyncResultSet.


/**
* Any exception that occurs while executing the query and iterating over the result set will be
Expand Down Expand Up @@ -143,6 +145,11 @@ private State(boolean shouldStop) {
private volatile CountDownLatch consumingLatch = new CountDownLatch(0);

AsyncResultSetImpl(ExecutorProvider executorProvider, ResultSet delegate, int bufferSize) {
this(executorProvider, Suppliers.ofInstance(Preconditions.checkNotNull(delegate)), bufferSize);
}

AsyncResultSetImpl(
ExecutorProvider executorProvider, Supplier<ResultSet> delegate, int bufferSize) {
super(delegate);
this.executorProvider = Preconditions.checkNotNull(executorProvider);
this.delegateResultSet = Preconditions.checkNotNull(delegate);
Expand All @@ -164,7 +171,7 @@ public void close() {
return;
}
if (state == State.INITIALIZED || state == State.SYNC) {
delegateResultSet.close();
delegateResultSet.get().close();
}
this.closed = true;
}
Expand Down Expand Up @@ -227,7 +234,7 @@ public CursorState tryNext() throws SpannerException {

private void closeDelegateResultSet() {
try {
delegateResultSet.close();
delegateResultSet.get().close();
} catch (Throwable t) {
log.log(Level.FINE, "Ignoring error from closing delegate result set", t);
}
Expand Down Expand Up @@ -260,7 +267,7 @@ public void run() {
// we'll keep the cancelled state.
return;
}
executionException = SpannerExceptionFactory.newSpannerException(e);
executionException = SpannerExceptionFactory.asSpannerException(e);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returns the same exception instance if e is already a SpannerException, instead of wrapping a SpannerException in another SpannerException.

cursorReturnedDoneOrException = true;
}
return;
Expand All @@ -274,7 +281,8 @@ public void run() {
switch (response) {
case DONE:
state = State.DONE;
closeDelegateResultSet();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The delegate ResultSet should not be closed by the callback executor, but only by the executor that fetches and buffers the rows (in the ProduceRowsCallable at line 380)

cursorReturnedDoneOrException = true;
// closeDelegateResultSet();
return;
case PAUSE:
state = State.PAUSED;
Expand Down Expand Up @@ -324,10 +332,10 @@ public Void call() throws Exception {
boolean stop = false;
boolean hasNext = false;
try {
hasNext = delegateResultSet.next();
hasNext = delegateResultSet.get().next();
} catch (Throwable e) {
synchronized (monitor) {
executionException = SpannerExceptionFactory.newSpannerException(e);
executionException = SpannerExceptionFactory.asSpannerException(e);
}
}
try {
Expand Down Expand Up @@ -356,13 +364,13 @@ public Void call() throws Exception {
}
}
if (!stop) {
buffer.put(delegateResultSet.getCurrentRowAsStruct());
buffer.put(delegateResultSet.get().getCurrentRowAsStruct());
startCallbackIfNecessary();
hasNext = delegateResultSet.next();
hasNext = delegateResultSet.get().next();
}
} catch (Throwable e) {
synchronized (monitor) {
executionException = SpannerExceptionFactory.newSpannerException(e);
executionException = SpannerExceptionFactory.asSpannerException(e);
stop = true;
}
}
Expand Down Expand Up @@ -544,9 +552,9 @@ public <T> ImmutableList<T> toList(Function<StructReader, T> transformer)
try {
return future.get();
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause());
throw SpannerExceptionFactory.asSpannerException(e.getCause());
} catch (Throwable e) {
throw SpannerExceptionFactory.newSpannerException(e);
throw SpannerExceptionFactory.asSpannerException(e);
}
}

Expand All @@ -558,14 +566,14 @@ public boolean next() throws SpannerException {
"Cannot call next() on a result set with a callback.");
this.state = State.SYNC;
}
boolean res = delegateResultSet.next();
currentRow = delegateResultSet.getCurrentRowAsStruct();
boolean res = delegateResultSet.get().next();
currentRow = res ? delegateResultSet.get().getCurrentRowAsStruct() : null;
return res;
}

@Override
public ResultSetStats getStats() {
return delegateResultSet.getStats();
return delegateResultSet.get().getStats();
}

@Override
Expand Down
Expand Up @@ -89,7 +89,7 @@ static ErrorCode valueOf(String name, ErrorCode defaultValue) {
/**
* Returns the error code corresponding to a gRPC status, or {@code UNKNOWN} if not recognized.
*/
static ErrorCode fromGrpcStatus(Status status) {
public static ErrorCode fromGrpcStatus(Status status) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to public to be accessible from the com.google.cloud.spanner.connection package.

ErrorCode code = errorByRpcCode.get(status.getCode().value());
return code == null ? UNKNOWN : code;
}
Expand Down
Expand Up @@ -16,14 +16,17 @@

package com.google.cloud.spanner;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Type.Code;
import com.google.cloud.spanner.Type.StructField;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.spanner.v1.ResultSetStats;
Expand Down Expand Up @@ -65,8 +68,41 @@ public static AsyncResultSet toAsyncResultSet(ResultSet delegate) {
* ExecutorProvider}.
*/
public static AsyncResultSet toAsyncResultSet(
ResultSet delegate, ExecutorProvider executorProvider) {
return new AsyncResultSetImpl(executorProvider, delegate, 100);
ResultSet delegate, ExecutorProvider executorProvider, QueryOption... options) {
Options readOptions = Options.fromQueryOptions(options);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow specifying a buffer size instead of always using a default size.

final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(executorProvider, delegate, bufferRows);
}

/**
* Converts the {@link ResultSet} that will be returned by the given {@link ApiFuture} to an
* {@link AsyncResultSet} using the given {@link ExecutorProvider}.
*/
public static AsyncResultSet toAsyncResultSet(
ApiFuture<ResultSet> delegate, ExecutorProvider executorProvider, QueryOption... options) {
Options readOptions = Options.fromQueryOptions(options);
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider, new FutureResultSetSupplier(delegate), bufferRows);
}

private static class FutureResultSetSupplier implements Supplier<ResultSet> {
final ApiFuture<ResultSet> delegate;

FutureResultSetSupplier(ApiFuture<ResultSet> delegate) {
this.delegate = Preconditions.checkNotNull(delegate);
}

@Override
public ResultSet get() {
return SpannerApiFutures.get(delegate);
}
}

private static class PrePopulatedResultSet implements ResultSet {
Expand Down
@@ -0,0 +1,43 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.api.core.ApiFuture;
import com.google.common.base.Preconditions;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;

public class SpannerApiFutures {
public static <T> T get(ApiFuture<T> future) throws SpannerException {
return getOrNull(Preconditions.checkNotNull(future));
}

public static <T> T getOrNull(ApiFuture<T> future) throws SpannerException {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convenience method that will catch and wrap common exceptions.

try {
return future == null ? null : future.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof SpannerException) {
throw (SpannerException) e.getCause();
}
throw SpannerExceptionFactory.newSpannerException(e.getCause());
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
} catch (CancellationException e) {
throw SpannerExceptionFactory.newSpannerExceptionForCancellation(null, e);
}
}
}
Expand Up @@ -83,6 +83,18 @@ public static SpannerException propagateTimeout(TimeoutException e) {
ErrorCode.DEADLINE_EXCEEDED, "Operation did not complete in the given time", e);
}

/**
* Converts the given {@link Throwable} to a {@link SpannerException}. If <code>t</code> is
* already a (subclass of a) {@link SpannerException}, <code>t</code> is returned unaltered.
* Otherwise, a new {@link SpannerException} is created with <code>t</code> as its cause.
*/
public static SpannerException asSpannerException(Throwable t) {
if (t instanceof SpannerException) {
return (SpannerException) t;
}
return newSpannerException(t);
}

/**
* Creates a new exception based on {@code cause}.
*
Expand Down Expand Up @@ -126,6 +138,20 @@ public static SpannerBatchUpdateException newSpannerBatchUpdateException(
databaseError);
}

/**
* Constructs a new {@link AbortedDueToConcurrentModificationException} that can be re-thrown for
* a transaction that had already been aborted, but that the client application tried to use for
* additional statements.
*/
public static AbortedDueToConcurrentModificationException
newAbortedDueToConcurrentModificationException(
AbortedDueToConcurrentModificationException cause) {
return new AbortedDueToConcurrentModificationException(
DoNotConstructDirectly.ALLOWED,
"This transaction has already been aborted and could not be retried due to a concurrent modification. Rollback this transaction to start a new one.",
cause);
}

/**
* Creates a new exception based on {@code cause}. If {@code cause} indicates cancellation, {@code
* context} will be inspected to establish the type of cancellation.
Expand Down
Expand Up @@ -150,7 +150,7 @@ public void removeListener(Runnable listener) {
@GuardedBy("lock")
private long retryDelayInMillis = -1L;

private ByteString transactionId;
private volatile ByteString transactionId;
private Timestamp commitTimestamp;

private TransactionContextImpl(Builder builder) {
Expand Down Expand Up @@ -238,12 +238,17 @@ void commit() {
try {
commitTimestamp = commitAsync().get();
} catch (InterruptedException e) {
if (commitFuture != null) {
commitFuture.cancel(true);
}
throw SpannerExceptionFactory.propagateInterrupt(e);
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause());
}
}

volatile ApiFuture<CommitResponse> commitFuture;

ApiFuture<Timestamp> commitAsync() {
final SettableApiFuture<Timestamp> res = SettableApiFuture.create();
final SettableApiFuture<Void> latch;
Expand Down Expand Up @@ -273,8 +278,7 @@ public void run() {
span.addAnnotation("Starting Commit");
final Span opSpan =
tracer.spanBuilderWithExplicitParent(SpannerImpl.COMMIT, span).startSpan();
final ApiFuture<CommitResponse> commitFuture =
rpc.commitAsync(commitRequest, session.getOptions());
commitFuture = rpc.commitAsync(commitRequest, session.getOptions());
commitFuture.addListener(
tracer.withSpan(
opSpan,
Expand Down