Skip to content

Commit

Permalink
feat!: async connection API (#392)
Browse files Browse the repository at this point in the history
* feat: support setting timeout per RPC

The Spanner client allows a user to set custom timeouts while creating a
SpannerOptions instance, but these timeouts are static and are applied to
all invocations of the RPCs. This change introduces the possibility to set
custom timeouts and other call options on a per-RPC basis.

Fixes #378

* fix: change grpc deps from test to compile scope

* feat: add async api for connection

* fix: fix test failures

* fix: move state handling from callback to callable

* fix: fix integration tests with emulator

* fix: fix timeout integration test on emulator

* fix: prevent flakiness in DDL tests

* fix: fix clirr build failures

* fix: do not set transaction state for Aborted err

* fix: set transaction state after retry

* cleanup: remove sync methods and use async instead

* cleanup: remove unused code

* feat: make ddl async

* fix: reduce timeout and remove debug info

* feat: make runBatch async

* test: set forkCount to 1 to investigate test failure

* fix: linting + clirr

* fix: prevent deadlock in DmlBatch

* fix: fix DMLBatch state handling

* tests: add tests for aborted async transactions

* test: add aborted tests

* fix: add change to clirr + more tests

* fix: require a rollback after a tx has aborted

* docs: add javadoc for new methods

* tests: add integration tests

* fix: wait for commit before select

* fix: fix handling aborted commit

* docs: document behavior -Async methods

* fix: iterating without callback could cause exception

* fix: remove todos and commented code

* feat: keep track of caller to include in stacktrace

* docs: explain why Aborted is active

* fix: use ticker for better testability

* test: increase coverage and remove unused code

* test: add additional tests

* docs: add missing @OverRide

* docs: fix comment
  • Loading branch information
olavloite committed Oct 8, 2020
1 parent 691a23c commit 3dd0675
Show file tree
Hide file tree
Showing 46 changed files with 5,690 additions and 1,762 deletions.
52 changes: 52 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -319,4 +319,56 @@
<className>com/google/cloud/spanner/Value</className>
<method>java.util.List getNumericArray()</method>
</difference>

<!-- Async Connection API -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture beginTransactionAsync()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture commitAsync()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.connection.AsyncStatementResult executeAsync(com.google.cloud.spanner.Statement)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture executeBatchUpdateAsync(java.lang.Iterable)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture executeUpdateAsync(com.google.cloud.spanner.Statement)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture rollbackAsync()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture runBatchAsync()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture writeAsync(com.google.cloud.spanner.Mutation)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.api.core.ApiFuture writeAsync(java.lang.Iterable)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/ResultSets</className>
<method>com.google.cloud.spanner.AsyncResultSet toAsyncResultSet(com.google.cloud.spanner.ResultSet, com.google.api.gax.core.ExecutorProvider)</method>
</difference>
</differences>
Expand Up @@ -25,6 +25,8 @@
import com.google.cloud.spanner.AbstractReadContext.ListenableAsyncResultSet;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -88,8 +90,8 @@ private State(boolean shouldStop) {

private final BlockingDeque<Struct> buffer;
private Struct currentRow;
/** The underlying synchronous {@link ResultSet} that is producing the rows. */
private final ResultSet delegateResultSet;
/** Supplies the underlying synchronous {@link ResultSet} that will be producing the rows. */
private final Supplier<ResultSet> delegateResultSet;

/**
* Any exception that occurs while executing the query and iterating over the result set will be
Expand Down Expand Up @@ -144,6 +146,11 @@ private State(boolean shouldStop) {
private volatile CountDownLatch consumingLatch = new CountDownLatch(0);

AsyncResultSetImpl(ExecutorProvider executorProvider, ResultSet delegate, int bufferSize) {
this(executorProvider, Suppliers.ofInstance(Preconditions.checkNotNull(delegate)), bufferSize);
}

AsyncResultSetImpl(
ExecutorProvider executorProvider, Supplier<ResultSet> delegate, int bufferSize) {
super(delegate);
this.executorProvider = Preconditions.checkNotNull(executorProvider);
this.delegateResultSet = Preconditions.checkNotNull(delegate);
Expand All @@ -165,7 +172,7 @@ public void close() {
return;
}
if (state == State.INITIALIZED || state == State.SYNC) {
delegateResultSet.close();
delegateResultSet.get().close();
}
this.closed = true;
}
Expand Down Expand Up @@ -228,7 +235,7 @@ public CursorState tryNext() throws SpannerException {

private void closeDelegateResultSet() {
try {
delegateResultSet.close();
delegateResultSet.get().close();
} catch (Throwable t) {
log.log(Level.FINE, "Ignoring error from closing delegate result set", t);
}
Expand Down Expand Up @@ -261,7 +268,7 @@ public void run() {
// we'll keep the cancelled state.
return;
}
executionException = SpannerExceptionFactory.newSpannerException(e);
executionException = SpannerExceptionFactory.asSpannerException(e);
cursorReturnedDoneOrException = true;
}
return;
Expand Down Expand Up @@ -325,10 +332,10 @@ public Void call() throws Exception {
boolean stop = false;
boolean hasNext = false;
try {
hasNext = delegateResultSet.next();
hasNext = delegateResultSet.get().next();
} catch (Throwable e) {
synchronized (monitor) {
executionException = SpannerExceptionFactory.newSpannerException(e);
executionException = SpannerExceptionFactory.asSpannerException(e);
}
}
try {
Expand Down Expand Up @@ -357,13 +364,13 @@ public Void call() throws Exception {
}
}
if (!stop) {
buffer.put(delegateResultSet.getCurrentRowAsStruct());
buffer.put(delegateResultSet.get().getCurrentRowAsStruct());
startCallbackIfNecessary();
hasNext = delegateResultSet.next();
hasNext = delegateResultSet.get().next();
}
} catch (Throwable e) {
synchronized (monitor) {
executionException = SpannerExceptionFactory.newSpannerException(e);
executionException = SpannerExceptionFactory.asSpannerException(e);
stop = true;
}
}
Expand Down Expand Up @@ -544,9 +551,9 @@ public <T> List<T> toList(Function<StructReader, T> transformer) throws SpannerE
try {
return future.get();
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause());
throw SpannerExceptionFactory.asSpannerException(e.getCause());
} catch (Throwable e) {
throw SpannerExceptionFactory.newSpannerException(e);
throw SpannerExceptionFactory.asSpannerException(e);
}
}

Expand All @@ -558,14 +565,14 @@ public boolean next() throws SpannerException {
"Cannot call next() on a result set with a callback.");
this.state = State.SYNC;
}
boolean res = delegateResultSet.next();
currentRow = res ? delegateResultSet.getCurrentRowAsStruct() : null;
boolean res = delegateResultSet.get().next();
currentRow = res ? delegateResultSet.get().getCurrentRowAsStruct() : null;
return res;
}

@Override
public ResultSetStats getStats() {
return delegateResultSet.getStats();
return delegateResultSet.get().getStats();
}

@Override
Expand Down
Expand Up @@ -89,7 +89,7 @@ static ErrorCode valueOf(String name, ErrorCode defaultValue) {
/**
* Returns the error code corresponding to a gRPC status, or {@code UNKNOWN} if not recognized.
*/
static ErrorCode fromGrpcStatus(Status status) {
public static ErrorCode fromGrpcStatus(Status status) {
ErrorCode code = errorByRpcCode.get(status.getCode().value());
return code == null ? UNKNOWN : code;
}
Expand Down
Expand Up @@ -16,14 +16,17 @@

package com.google.cloud.spanner;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.ByteArray;
import com.google.cloud.Date;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Type.Code;
import com.google.cloud.spanner.Type.StructField;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.spanner.v1.ResultSetStats;
Expand Down Expand Up @@ -65,8 +68,41 @@ public static AsyncResultSet toAsyncResultSet(ResultSet delegate) {
* ExecutorProvider}.
*/
public static AsyncResultSet toAsyncResultSet(
ResultSet delegate, ExecutorProvider executorProvider) {
return new AsyncResultSetImpl(executorProvider, delegate, 100);
ResultSet delegate, ExecutorProvider executorProvider, QueryOption... options) {
Options readOptions = Options.fromQueryOptions(options);
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(executorProvider, delegate, bufferRows);
}

/**
* Converts the {@link ResultSet} that will be returned by the given {@link ApiFuture} to an
* {@link AsyncResultSet} using the given {@link ExecutorProvider}.
*/
public static AsyncResultSet toAsyncResultSet(
ApiFuture<ResultSet> delegate, ExecutorProvider executorProvider, QueryOption... options) {
Options readOptions = Options.fromQueryOptions(options);
final int bufferRows =
readOptions.hasBufferRows()
? readOptions.bufferRows()
: AsyncResultSetImpl.DEFAULT_BUFFER_SIZE;
return new AsyncResultSetImpl(
executorProvider, new FutureResultSetSupplier(delegate), bufferRows);
}

private static class FutureResultSetSupplier implements Supplier<ResultSet> {
final ApiFuture<ResultSet> delegate;

FutureResultSetSupplier(ApiFuture<ResultSet> delegate) {
this.delegate = Preconditions.checkNotNull(delegate);
}

@Override
public ResultSet get() {
return SpannerApiFutures.get(delegate);
}
}

private static class PrePopulatedResultSet implements ResultSet {
Expand Down
@@ -0,0 +1,43 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.api.core.ApiFuture;
import com.google.common.base.Preconditions;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;

public class SpannerApiFutures {
public static <T> T get(ApiFuture<T> future) throws SpannerException {
return getOrNull(Preconditions.checkNotNull(future));
}

public static <T> T getOrNull(ApiFuture<T> future) throws SpannerException {
try {
return future == null ? null : future.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof SpannerException) {
throw (SpannerException) e.getCause();
}
throw SpannerExceptionFactory.newSpannerException(e.getCause());
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
} catch (CancellationException e) {
throw SpannerExceptionFactory.newSpannerExceptionForCancellation(null, e);
}
}
}
Expand Up @@ -83,6 +83,18 @@ public static SpannerException propagateTimeout(TimeoutException e) {
ErrorCode.DEADLINE_EXCEEDED, "Operation did not complete in the given time", e);
}

/**
* Converts the given {@link Throwable} to a {@link SpannerException}. If <code>t</code> is
* already a (subclass of a) {@link SpannerException}, <code>t</code> is returned unaltered.
* Otherwise, a new {@link SpannerException} is created with <code>t</code> as its cause.
*/
public static SpannerException asSpannerException(Throwable t) {
if (t instanceof SpannerException) {
return (SpannerException) t;
}
return newSpannerException(t);
}

/**
* Creates a new exception based on {@code cause}.
*
Expand Down Expand Up @@ -126,6 +138,20 @@ public static SpannerBatchUpdateException newSpannerBatchUpdateException(
databaseError);
}

/**
* Constructs a new {@link AbortedDueToConcurrentModificationException} that can be re-thrown for
* a transaction that had already been aborted, but that the client application tried to use for
* additional statements.
*/
public static AbortedDueToConcurrentModificationException
newAbortedDueToConcurrentModificationException(
AbortedDueToConcurrentModificationException cause) {
return new AbortedDueToConcurrentModificationException(
DoNotConstructDirectly.ALLOWED,
"This transaction has already been aborted and could not be retried due to a concurrent modification. Rollback this transaction to start a new one.",
cause);
}

/**
* Creates a new exception based on {@code cause}. If {@code cause} indicates cancellation, {@code
* context} will be inspected to establish the type of cancellation.
Expand Down
Expand Up @@ -150,7 +150,7 @@ public void removeListener(Runnable listener) {
@GuardedBy("lock")
private long retryDelayInMillis = -1L;

private ByteString transactionId;
private volatile ByteString transactionId;
private Timestamp commitTimestamp;

private TransactionContextImpl(Builder builder) {
Expand Down Expand Up @@ -238,12 +238,17 @@ void commit() {
try {
commitTimestamp = commitAsync().get();
} catch (InterruptedException e) {
if (commitFuture != null) {
commitFuture.cancel(true);
}
throw SpannerExceptionFactory.propagateInterrupt(e);
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause());
}
}

volatile ApiFuture<CommitResponse> commitFuture;

ApiFuture<Timestamp> commitAsync() {
final SettableApiFuture<Timestamp> res = SettableApiFuture.create();
final SettableApiFuture<Void> latch;
Expand Down Expand Up @@ -273,8 +278,7 @@ public void run() {
span.addAnnotation("Starting Commit");
final Span opSpan =
tracer.spanBuilderWithExplicitParent(SpannerImpl.COMMIT, span).startSpan();
final ApiFuture<CommitResponse> commitFuture =
rpc.commitAsync(commitRequest, session.getOptions());
commitFuture = rpc.commitAsync(commitRequest, session.getOptions());
commitFuture.addListener(
tracer.withSpan(
opSpan,
Expand Down

0 comments on commit 3dd0675

Please sign in to comment.