This method is guaranteed to be non-blocking. The returned {@link ApiFuture} will be done
+ * when the transaction has been rolled back.
+ *
+ *
If the connection is in autocommit mode, and there is a temporary transaction active on this
+ * connection, calling this method will cause the connection to go back to autocommit mode after
+ * calling this method.
+ *
+ *
rollbackAsync();
+
/**
* @return true
if this connection has a transaction (that has not necessarily
* started). This method will only return false when the {@link Connection} is in autocommit
@@ -572,11 +679,30 @@ public interface Connection extends AutoCloseable {
* This method may only be called when a (possibly empty) batch is active.
*
* @return the update counts in case of a DML batch. Returns an array containing 1 for each
- * successful statement and 0 for each failed statement or statement that was not executed DDL
- * in case of a DDL batch.
+ * successful statement and 0 for each failed statement or statement that was not executed in
+ * case of a DDL batch.
*/
long[] runBatch();
+ /**
+ * Sends all buffered DML or DDL statements of the current batch to the database, waits for these
+ * to be executed and ends the current batch. The method will throw an exception for the first
+ * statement that cannot be executed, or return successfully if all statements could be executed.
+ * If an exception is thrown for a statement in the batch, the preceding statements in the same
+ * batch may still have been applied to the database.
+ *
+ *
This method is guaranteed to be non-blocking. The returned {@link ApiFuture} will be done
+ * when the batch has been successfully applied, or when one or more of the statements in the
+ * batch has failed and the further execution of the batch has been halted.
+ *
+ *
This method may only be called when a (possibly empty) batch is active.
+ *
+ * @return an {@link ApiFuture} containing the update counts in case of a DML batch. The {@link
+ * ApiFuture} contains an array containing 1 for each successful statement and 0 for each
+ * failed statement or statement that was not executed in case of a DDL batch.
+ */
+ ApiFuture runBatchAsync();
+
/**
* Clears all buffered statements in the current batch and ends the batch.
*
@@ -608,6 +734,30 @@ public interface Connection extends AutoCloseable {
*/
StatementResult execute(Statement statement);
+ /**
+ * Executes the given statement if allowed in the current {@link TransactionMode} and connection
+ * state asynchronously. The returned value depends on the type of statement:
+ *
+ *
+ * - Queries will return an {@link AsyncResultSet}
+ *
- DML statements will return an {@link ApiFuture} with an update count that is done when
+ * the DML statement has been applied successfully, or that throws an {@link
+ * ExecutionException} if the DML statement failed.
+ *
- DDL statements will return an {@link ApiFuture} containing a {@link Void} that is done
+ * when the DDL statement has been applied successfully, or that throws an {@link
+ * ExecutionException} if the DDL statement failed.
+ *
- Connection and transaction statements (SET AUTOCOMMIT=TRUE|FALSE, SHOW AUTOCOMMIT, SET
+ * TRANSACTION READ ONLY, etc) will return either a {@link ResultSet} or {@link
+ * ResultType#NO_RESULT}, depending on the type of statement (SHOW or SET)
+ *
+ *
+ * This method is guaranteed to be non-blocking.
+ *
+ * @param statement The statement to execute
+ * @return the result of the statement
+ */
+ AsyncStatementResult executeAsync(Statement statement);
+
/**
* Executes the given statement as a query and returns the result as a {@link ResultSet}. This
* method blocks and waits for a response from Spanner. If the statement does not contain a valid
@@ -619,6 +769,31 @@ public interface Connection extends AutoCloseable {
*/
ResultSet executeQuery(Statement query, QueryOption... options);
+ /**
+ * Same as {@link #executeQuery(Statement, QueryOption...)}, but is guaranteed to be non-blocking
+ * and returns the query result as an {@link AsyncResultSet}. See {@link
+ * AsyncResultSet#setCallback(java.util.concurrent.Executor,
+ * com.google.cloud.spanner.AsyncResultSet.ReadyCallback)} for more information on how to consume
+ * the results of the query asynchronously.
+ */
+ /**
+ * Executes the given statement asynchronously as a query and returns the result as an {@link
+ * AsyncResultSet}. This method is guaranteed to be non-blocking. If the statement does not
+ * contain a valid query, the method will throw a {@link SpannerException}.
+ *
+ * See {@link AsyncResultSet#setCallback(java.util.concurrent.Executor,
+ * com.google.cloud.spanner.AsyncResultSet.ReadyCallback)} for more information on how to consume
+ * the results of the query asynchronously.
+ *
+ *
It is also possible to consume the returned {@link AsyncResultSet} in the same way as a
+ * normal {@link ResultSet}, i.e. in a while-loop calling {@link AsyncResultSet#next()}.
+ *
+ * @param query The query statement to execute
+ * @param options the options to configure the query
+ * @return an {@link AsyncResultSet} with the results of the query
+ */
+ AsyncResultSet executeQueryAsync(Statement query, QueryOption... options);
+
/**
* Analyzes a query and returns query plan and/or query execution statistics information.
*
@@ -655,6 +830,18 @@ public interface Connection extends AutoCloseable {
*/
long executeUpdate(Statement update);
+ /**
+ * Executes the given statement asynchronously as a DML statement. If the statement does not
+ * contain a valid DML statement, the method will throw a {@link SpannerException}.
+ *
+ *
This method is guaranteed to be non-blocking.
+ *
+ * @param update The update statement to execute
+ * @return an {@link ApiFuture} containing the number of records that were
+ * inserted/updated/deleted by this statement
+ */
+ ApiFuture executeUpdateAsync(Statement update);
+
/**
* Executes a list of DML statements in a single request. The statements will be executed in order
* and the semantics is the same as if each statement is executed by {@link
@@ -677,6 +864,31 @@ public interface Connection extends AutoCloseable {
*/
long[] executeBatchUpdate(Iterable updates);
+ /**
+ * Executes a list of DML statements in a single request. The statements will be executed in order
+ * and the semantics is the same as if each statement is executed by {@link
+ * Connection#executeUpdate(Statement)} in a loop. This method returns an {@link ApiFuture} that
+ * contains an array of long integers, each representing the number of rows modified by each
+ * statement.
+ *
+ * This method is guaranteed to be non-blocking.
+ *
+ *
If an individual statement fails, execution stops and a {@code SpannerBatchUpdateException}
+ * is returned, which includes the error and the number of rows affected by the statements that
+ * are run prior to the error.
+ *
+ *
For example, if statements contains 3 statements, and the 2nd one is not a valid DML. This
+ * method throws a {@code SpannerBatchUpdateException} that contains the error message from the
+ * 2nd statement, and an array of length 1 that contains the number of rows modified by the 1st
+ * statement. The 3rd statement will not run. Executes the given statements as DML statements in
+ * one batch. If one of the statements does not contain a valid DML statement, the method will
+ * throw a {@link SpannerException}.
+ *
+ * @param updates The update statements that will be executed as one batch.
+ * @return an {@link ApiFuture} containing an array with the update counts per statement.
+ */
+ ApiFuture executeBatchUpdateAsync(Iterable updates);
+
/**
* Writes the specified mutation directly to the database and commits the change. The value is
* readable after the successful completion of this method. Writing multiple mutations to a
@@ -692,6 +904,23 @@ public interface Connection extends AutoCloseable {
*/
void write(Mutation mutation);
+ /**
+ * Writes the specified mutation directly to the database and commits the change. The value is
+ * readable after the successful completion of the returned {@link ApiFuture}. Writing multiple
+ * mutations to a database by calling this method multiple times mode is inefficient, as each call
+ * will need a round trip to the database. Instead, you should consider writing the mutations
+ * together by calling {@link Connection#writeAsync(Iterable)}.
+ *
+ * This method is guaranteed to be non-blocking.
+ *
+ *
Calling this method is only allowed in autocommit mode. See {@link
+ * Connection#bufferedWrite(Iterable)} for writing mutations in transactions.
+ *
+ * @param mutation The {@link Mutation} to write to the database
+ * @throws SpannerException if the {@link Connection} is not in autocommit mode
+ */
+ ApiFuture writeAsync(Mutation mutation);
+
/**
* Writes the specified mutations directly to the database and commits the changes. The values are
* readable after the successful completion of this method.
@@ -704,6 +933,20 @@ public interface Connection extends AutoCloseable {
*/
void write(Iterable mutations);
+ /**
+ * Writes the specified mutations directly to the database and commits the changes. The values are
+ * readable after the successful completion of the returned {@link ApiFuture}.
+ *
+ * This method is guaranteed to be non-blocking.
+ *
+ *
Calling this method is only allowed in autocommit mode. See {@link
+ * Connection#bufferedWrite(Iterable)} for writing mutations in transactions.
+ *
+ * @param mutations The {@link Mutation}s to write to the database
+ * @throws SpannerException if the {@link Connection} is not in autocommit mode
+ */
+ ApiFuture writeAsync(Iterable mutations);
+
/**
* Buffers the given mutation locally on the current transaction of this {@link Connection}. The
* mutation will be written to the database at the next call to {@link Connection#commit()}. The
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java
index ce24791859..b49adbf124 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java
@@ -16,13 +16,19 @@
package com.google.cloud.spanner.connection;
+import static com.google.cloud.spanner.SpannerApiFutures.get;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
+import com.google.cloud.spanner.AsyncResultSet;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.ResultSets;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
@@ -583,6 +589,11 @@ private void setDefaultTransactionOptions() {
@Override
public void beginTransaction() {
+ get(beginTransactionAsync());
+ }
+
+ @Override
+ public ApiFuture beginTransactionAsync() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(
!isBatchActive(), "This connection has an active batch and cannot begin a transaction");
@@ -596,17 +607,18 @@ public void beginTransaction() {
if (isAutocommit()) {
inTransaction = true;
}
+ return ApiFutures.immediateFuture(null);
}
/** Internal interface for ending a transaction (commit/rollback). */
private static interface EndTransactionMethod {
- public void end(UnitOfWork t);
+ public ApiFuture endAsync(UnitOfWork t);
}
private static final class Commit implements EndTransactionMethod {
@Override
- public void end(UnitOfWork t) {
- t.commit();
+ public ApiFuture endAsync(UnitOfWork t) {
+ return t.commitAsync();
}
}
@@ -614,14 +626,18 @@ public void end(UnitOfWork t) {
@Override
public void commit() {
+ get(commitAsync());
+ }
+
+ public ApiFuture commitAsync() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
- endCurrentTransaction(commit);
+ return endCurrentTransactionAsync(commit);
}
private static final class Rollback implements EndTransactionMethod {
@Override
- public void end(UnitOfWork t) {
- t.rollback();
+ public ApiFuture endAsync(UnitOfWork t) {
+ return t.rollbackAsync();
}
}
@@ -629,18 +645,24 @@ public void end(UnitOfWork t) {
@Override
public void rollback() {
+ get(rollbackAsync());
+ }
+
+ public ApiFuture rollbackAsync() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
- endCurrentTransaction(rollback);
+ return endCurrentTransactionAsync(rollback);
}
- private void endCurrentTransaction(EndTransactionMethod endTransactionMethod) {
+ private ApiFuture endCurrentTransactionAsync(EndTransactionMethod endTransactionMethod) {
ConnectionPreconditions.checkState(!isBatchActive(), "This connection has an active batch");
ConnectionPreconditions.checkState(isInTransaction(), "This connection has no transaction");
+ ApiFuture res;
try {
if (isTransactionStarted()) {
- endTransactionMethod.end(getCurrentUnitOfWorkOrStartNewUnitOfWork());
+ res = endTransactionMethod.endAsync(getCurrentUnitOfWorkOrStartNewUnitOfWork());
} else {
this.currentUnitOfWork = null;
+ res = ApiFutures.immediateFuture(null);
}
} finally {
transactionBeginMarked = false;
@@ -649,6 +671,7 @@ private void endCurrentTransaction(EndTransactionMethod endTransactionMethod) {
}
setDefaultTransactionOptions();
}
+ return res;
}
@Override
@@ -664,9 +687,9 @@ public StatementResult execute(Statement statement) {
case QUERY:
return StatementResultImpl.of(internalExecuteQuery(parsedStatement, AnalyzeMode.NONE));
case UPDATE:
- return StatementResultImpl.of(internalExecuteUpdate(parsedStatement));
+ return StatementResultImpl.of(get(internalExecuteUpdateAsync(parsedStatement)));
case DDL:
- executeDdl(parsedStatement);
+ get(executeDdlAsync(parsedStatement));
return StatementResultImpl.noResult();
case UNKNOWN:
default:
@@ -676,11 +699,43 @@ public StatementResult execute(Statement statement) {
"Unknown statement: " + parsedStatement.getSqlWithoutComments());
}
+ @Override
+ public AsyncStatementResult executeAsync(Statement statement) {
+ Preconditions.checkNotNull(statement);
+ ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
+ ParsedStatement parsedStatement = parser.parse(statement, this.queryOptions);
+ switch (parsedStatement.getType()) {
+ case CLIENT_SIDE:
+ return AsyncStatementResultImpl.of(
+ parsedStatement
+ .getClientSideStatement()
+ .execute(connectionStatementExecutor, parsedStatement.getSqlWithoutComments()),
+ spanner.getAsyncExecutorProvider());
+ case QUERY:
+ return AsyncStatementResultImpl.of(
+ internalExecuteQueryAsync(parsedStatement, AnalyzeMode.NONE));
+ case UPDATE:
+ return AsyncStatementResultImpl.of(internalExecuteUpdateAsync(parsedStatement));
+ case DDL:
+ return AsyncStatementResultImpl.noResult(executeDdlAsync(parsedStatement));
+ case UNKNOWN:
+ default:
+ }
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.INVALID_ARGUMENT,
+ "Unknown statement: " + parsedStatement.getSqlWithoutComments());
+ }
+
@Override
public ResultSet executeQuery(Statement query, QueryOption... options) {
return parseAndExecuteQuery(query, AnalyzeMode.NONE, options);
}
+ @Override
+ public AsyncResultSet executeQueryAsync(Statement query, QueryOption... options) {
+ return parseAndExecuteQueryAsync(query, AnalyzeMode.NONE, options);
+ }
+
@Override
public ResultSet analyzeQuery(Statement query, QueryAnalyzeMode queryMode) {
Preconditions.checkNotNull(queryMode);
@@ -717,6 +772,34 @@ private ResultSet parseAndExecuteQuery(
"Statement is not a query: " + parsedStatement.getSqlWithoutComments());
}
+ private AsyncResultSet parseAndExecuteQueryAsync(
+ Statement query, AnalyzeMode analyzeMode, QueryOption... options) {
+ Preconditions.checkNotNull(query);
+ ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
+ ParsedStatement parsedStatement = parser.parse(query, this.queryOptions);
+ if (parsedStatement.isQuery()) {
+ switch (parsedStatement.getType()) {
+ case CLIENT_SIDE:
+ return ResultSets.toAsyncResultSet(
+ parsedStatement
+ .getClientSideStatement()
+ .execute(connectionStatementExecutor, parsedStatement.getSqlWithoutComments())
+ .getResultSet(),
+ spanner.getAsyncExecutorProvider(),
+ options);
+ case QUERY:
+ return internalExecuteQueryAsync(parsedStatement, analyzeMode, options);
+ case UPDATE:
+ case DDL:
+ case UNKNOWN:
+ default:
+ }
+ }
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.INVALID_ARGUMENT,
+ "Statement is not a query: " + parsedStatement.getSqlWithoutComments());
+ }
+
@Override
public long executeUpdate(Statement update) {
Preconditions.checkNotNull(update);
@@ -725,7 +808,27 @@ public long executeUpdate(Statement update) {
if (parsedStatement.isUpdate()) {
switch (parsedStatement.getType()) {
case UPDATE:
- return internalExecuteUpdate(parsedStatement);
+ return get(internalExecuteUpdateAsync(parsedStatement));
+ case CLIENT_SIDE:
+ case QUERY:
+ case DDL:
+ case UNKNOWN:
+ default:
+ }
+ }
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.INVALID_ARGUMENT,
+ "Statement is not an update statement: " + parsedStatement.getSqlWithoutComments());
+ }
+
+ public ApiFuture executeUpdateAsync(Statement update) {
+ Preconditions.checkNotNull(update);
+ ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
+ ParsedStatement parsedStatement = parser.parse(update);
+ if (parsedStatement.isUpdate()) {
+ switch (parsedStatement.getType()) {
+ case UPDATE:
+ return internalExecuteUpdateAsync(parsedStatement);
case CLIENT_SIDE:
case QUERY:
case DDL:
@@ -746,24 +849,48 @@ public long[] executeBatchUpdate(Iterable updates) {
List parsedStatements = new LinkedList<>();
for (Statement update : updates) {
ParsedStatement parsedStatement = parser.parse(update);
- if (parsedStatement.isUpdate()) {
- switch (parsedStatement.getType()) {
- case UPDATE:
- parsedStatements.add(parsedStatement);
- break;
- case CLIENT_SIDE:
- case QUERY:
- case DDL:
- case UNKNOWN:
- default:
- throw SpannerExceptionFactory.newSpannerException(
- ErrorCode.INVALID_ARGUMENT,
- "The batch update list contains a statement that is not an update statement: "
- + parsedStatement.getSqlWithoutComments());
- }
+ switch (parsedStatement.getType()) {
+ case UPDATE:
+ parsedStatements.add(parsedStatement);
+ break;
+ case CLIENT_SIDE:
+ case QUERY:
+ case DDL:
+ case UNKNOWN:
+ default:
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.INVALID_ARGUMENT,
+ "The batch update list contains a statement that is not an update statement: "
+ + parsedStatement.getSqlWithoutComments());
+ }
+ }
+ return get(internalExecuteBatchUpdateAsync(parsedStatements));
+ }
+
+ @Override
+ public ApiFuture executeBatchUpdateAsync(Iterable updates) {
+ Preconditions.checkNotNull(updates);
+ ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
+ // Check that there are only DML statements in the input.
+ List parsedStatements = new LinkedList<>();
+ for (Statement update : updates) {
+ ParsedStatement parsedStatement = parser.parse(update);
+ switch (parsedStatement.getType()) {
+ case UPDATE:
+ parsedStatements.add(parsedStatement);
+ break;
+ case CLIENT_SIDE:
+ case QUERY:
+ case DDL:
+ case UNKNOWN:
+ default:
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.INVALID_ARGUMENT,
+ "The batch update list contains a statement that is not an update statement: "
+ + parsedStatement.getSqlWithoutComments());
}
}
- return internalExecuteBatchUpdate(parsedStatements);
+ return internalExecuteBatchUpdateAsync(parsedStatements);
}
private ResultSet internalExecuteQuery(
@@ -773,52 +900,32 @@ private ResultSet internalExecuteQuery(
Preconditions.checkArgument(
statement.getType() == StatementType.QUERY, "Statement must be a query");
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
- try {
- return transaction.executeQuery(statement, analyzeMode, options);
- } catch (SpannerException e) {
- // In case of a timed out or cancelled query we need to replace the executor to ensure that we
- // have an executor that is not busy executing a statement. Although we try to cancel the
- // current statement, it is not guaranteed to actually stop the execution directly.
- if (e.getErrorCode() == ErrorCode.DEADLINE_EXCEEDED
- || e.getErrorCode() == ErrorCode.CANCELLED) {
- this.statementExecutor.recreate();
- }
- throw e;
- }
+ return get(transaction.executeQueryAsync(statement, analyzeMode, options));
}
- private long internalExecuteUpdate(final ParsedStatement update) {
+ private AsyncResultSet internalExecuteQueryAsync(
+ final ParsedStatement statement,
+ final AnalyzeMode analyzeMode,
+ final QueryOption... options) {
+ Preconditions.checkArgument(
+ statement.getType() == StatementType.QUERY, "Statement must be a query");
+ UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
+ return ResultSets.toAsyncResultSet(
+ transaction.executeQueryAsync(statement, analyzeMode, options),
+ spanner.getAsyncExecutorProvider(),
+ options);
+ }
+
+ private ApiFuture internalExecuteUpdateAsync(final ParsedStatement update) {
Preconditions.checkArgument(
update.getType() == StatementType.UPDATE, "Statement must be an update");
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
- try {
- return transaction.executeUpdate(update);
- } catch (SpannerException e) {
- // In case of a timed out or cancelled query we need to replace the executor to ensure that we
- // have an executor that is not busy executing a statement. Although we try to cancel the
- // current statement, it is not guaranteed to actually stop the execution directly.
- if (e.getErrorCode() == ErrorCode.DEADLINE_EXCEEDED
- || e.getErrorCode() == ErrorCode.CANCELLED) {
- this.statementExecutor.recreate();
- }
- throw e;
- }
+ return transaction.executeUpdateAsync(update);
}
- private long[] internalExecuteBatchUpdate(final List updates) {
+ private ApiFuture internalExecuteBatchUpdateAsync(List updates) {
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
- try {
- return transaction.executeBatchUpdate(updates);
- } catch (SpannerException e) {
- // In case of a timed out or cancelled query we need to replace the executor to ensure that we
- // have an executor that is not busy executing a statement. Although we try to cancel the
- // current statement, it is not guaranteed to actually stop the execution directly.
- if (e.getErrorCode() == ErrorCode.DEADLINE_EXCEEDED
- || e.getErrorCode() == ErrorCode.CANCELLED) {
- this.statementExecutor.recreate();
- }
- throw e;
- }
+ return transaction.executeBatchUpdateAsync(updates);
}
/**
@@ -898,32 +1005,36 @@ private void popUnitOfWorkFromTransactionStack() {
this.currentUnitOfWork = transactionStack.pop();
}
- private void executeDdl(ParsedStatement ddl) {
- getCurrentUnitOfWorkOrStartNewUnitOfWork().executeDdl(ddl);
+ private ApiFuture executeDdlAsync(ParsedStatement ddl) {
+ return getCurrentUnitOfWorkOrStartNewUnitOfWork().executeDdlAsync(ddl);
}
@Override
public void write(Mutation mutation) {
- Preconditions.checkNotNull(mutation);
- ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
- ConnectionPreconditions.checkState(isAutocommit(), ONLY_ALLOWED_IN_AUTOCOMMIT);
- getCurrentUnitOfWorkOrStartNewUnitOfWork().write(mutation);
+ get(writeAsync(Collections.singleton(Preconditions.checkNotNull(mutation))));
+ }
+
+ @Override
+ public ApiFuture writeAsync(Mutation mutation) {
+ return writeAsync(Collections.singleton(Preconditions.checkNotNull(mutation)));
}
@Override
public void write(Iterable mutations) {
+ get(writeAsync(Preconditions.checkNotNull(mutations)));
+ }
+
+ @Override
+ public ApiFuture writeAsync(Iterable mutations) {
Preconditions.checkNotNull(mutations);
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(isAutocommit(), ONLY_ALLOWED_IN_AUTOCOMMIT);
- getCurrentUnitOfWorkOrStartNewUnitOfWork().write(mutations);
+ return getCurrentUnitOfWorkOrStartNewUnitOfWork().writeAsync(mutations);
}
@Override
public void bufferedWrite(Mutation mutation) {
- Preconditions.checkNotNull(mutation);
- ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
- ConnectionPreconditions.checkState(!isAutocommit(), NOT_ALLOWED_IN_AUTOCOMMIT);
- getCurrentUnitOfWorkOrStartNewUnitOfWork().write(mutation);
+ bufferedWrite(Preconditions.checkNotNull(Collections.singleton(mutation)));
}
@Override
@@ -931,7 +1042,7 @@ public void bufferedWrite(Iterable mutations) {
Preconditions.checkNotNull(mutations);
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(!isAutocommit(), NOT_ALLOWED_IN_AUTOCOMMIT);
- getCurrentUnitOfWorkOrStartNewUnitOfWork().write(mutations);
+ get(getCurrentUnitOfWorkOrStartNewUnitOfWork().writeAsync(mutations));
}
@Override
@@ -973,13 +1084,18 @@ public void startBatchDml() {
@Override
public long[] runBatch() {
+ return get(runBatchAsync());
+ }
+
+ @Override
+ public ApiFuture runBatchAsync() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(isBatchActive(), "This connection has no active batch");
try {
if (this.currentUnitOfWork != null) {
- return this.currentUnitOfWork.runBatch();
+ return this.currentUnitOfWork.runBatchAsync();
}
- return new long[0];
+ return ApiFutures.immediateFuture(new long[0]);
} finally {
this.batchMode = BatchMode.NONE;
setDefaultTransactionOptions();
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java
index 379459884c..d2a341430e 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionOptions.java
@@ -237,6 +237,15 @@ public static void closeSpanner() {
SpannerPool.INSTANCE.checkAndCloseSpanners();
}
+ /**
+ * {@link SpannerOptionsConfigurator} can be used to add additional configuration for a {@link
+ * Spanner} instance. Intended for tests.
+ */
+ @VisibleForTesting
+ interface SpannerOptionsConfigurator {
+ void configure(SpannerOptions.Builder options);
+ }
+
/** Builder for {@link ConnectionOptions} instances. */
public static class Builder {
private String uri;
@@ -246,6 +255,7 @@ public static class Builder {
private SessionPoolOptions sessionPoolOptions;
private List statementExecutionInterceptors =
Collections.emptyList();
+ private SpannerOptionsConfigurator configurator;
private Builder() {}
@@ -358,6 +368,12 @@ Builder setStatementExecutionInterceptors(List in
return this;
}
+ @VisibleForTesting
+ Builder setConfigurator(SpannerOptionsConfigurator configurator) {
+ this.configurator = Preconditions.checkNotNull(configurator);
+ return this;
+ }
+
@VisibleForTesting
Builder setCredentials(Credentials credentials) {
this.credentials = credentials;
@@ -401,6 +417,7 @@ public static Builder newBuilder() {
private final boolean readOnly;
private final boolean retryAbortsInternally;
private final List statementExecutionInterceptors;
+ private final SpannerOptionsConfigurator configurator;
private ConnectionOptions(Builder builder) {
Matcher matcher = Builder.SPANNER_URI_PATTERN.matcher(builder.uri);
@@ -473,6 +490,11 @@ private ConnectionOptions(Builder builder) {
this.retryAbortsInternally = parseRetryAbortsInternally(this.uri);
this.statementExecutionInterceptors =
Collections.unmodifiableList(builder.statementExecutionInterceptors);
+ this.configurator = builder.configurator;
+ }
+
+ SpannerOptionsConfigurator getConfigurator() {
+ return configurator;
}
@VisibleForTesting
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/DdlBatch.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/DdlBatch.java
index a80e93dfc0..7d4f18c4db 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/DdlBatch.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/DdlBatch.java
@@ -16,6 +16,8 @@
package com.google.cloud.spanner.connection;
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
@@ -31,15 +33,14 @@
import com.google.cloud.spanner.connection.StatementParser.StatementType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.spanner.admin.database.v1.DatabaseAdminGrpc;
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
+import com.google.spanner.v1.SpannerGrpc;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
/**
* {@link UnitOfWork} that is used when a DDL batch is started. These batches only accept DDL
@@ -111,8 +112,7 @@ public boolean isReadOnly() {
return false;
}
- @Override
- public ResultSet executeQuery(
+ public ApiFuture executeQueryAsync(
final ParsedStatement statement, AnalyzeMode analyzeMode, QueryOption... options) {
if (options != null) {
for (int i = 0; i < options.length; i++) {
@@ -136,7 +136,8 @@ public ResultSet call() throws Exception {
dbClient.singleUse().executeQuery(statement.getStatement(), internalOptions));
}
};
- return asyncExecuteStatement(statement, callable);
+ return executeStatementAsync(
+ statement, callable, SpannerGrpc.getExecuteStreamingSqlMethod());
}
}
}
@@ -168,7 +169,7 @@ public Timestamp getCommitTimestampOrNull() {
}
@Override
- public void executeDdl(ParsedStatement ddl) {
+ public ApiFuture executeDdlAsync(ParsedStatement ddl) {
ConnectionPreconditions.checkState(
state == UnitOfWorkState.STARTED,
"The batch is no longer active and cannot be used for further statements");
@@ -178,28 +179,23 @@ public void executeDdl(ParsedStatement ddl) {
+ ddl.getSqlWithoutComments()
+ "\" is not a DDL-statement.");
statements.add(ddl.getSqlWithoutComments());
+ return ApiFutures.immediateFuture(null);
}
@Override
- public long executeUpdate(ParsedStatement update) {
+ public ApiFuture executeUpdateAsync(ParsedStatement update) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "Executing updates is not allowed for DDL batches.");
}
@Override
- public long[] executeBatchUpdate(Iterable updates) {
+ public ApiFuture executeBatchUpdateAsync(Iterable updates) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "Executing batch updates is not allowed for DDL batches.");
}
@Override
- public void write(Mutation mutation) {
- throw SpannerExceptionFactory.newSpannerException(
- ErrorCode.FAILED_PRECONDITION, "Writing mutations is not allowed for DDL batches.");
- }
-
- @Override
- public void write(Iterable mutations) {
+ public ApiFuture writeAsync(Iterable mutations) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "Writing mutations is not allowed for DDL batches.");
}
@@ -214,62 +210,50 @@ public void write(Iterable mutations) {
StatementParser.INSTANCE.parse(Statement.of("RUN BATCH"));
@Override
- public long[] runBatch() {
+ public ApiFuture runBatchAsync() {
ConnectionPreconditions.checkState(
state == UnitOfWorkState.STARTED, "The batch is no longer active and cannot be ran");
- try {
- if (!statements.isEmpty()) {
- // create a statement that can be passed in to the execute method
- Callable callable =
- new Callable() {
- @Override
- public UpdateDatabaseDdlMetadata call() throws Exception {
- OperationFuture operation =
- ddlClient.executeDdl(statements);
- try {
- // Wait until the operation has finished.
- operation.get();
- // Return metadata.
- return operation.getMetadata().get();
- } catch (ExecutionException e) {
- SpannerException spannerException = extractSpannerCause(e);
- long[] updateCounts = extractUpdateCounts(operation.getMetadata().get());
- throw SpannerExceptionFactory.newSpannerBatchUpdateException(
- spannerException == null
- ? ErrorCode.UNKNOWN
- : spannerException.getErrorCode(),
- e.getMessage(),
- updateCounts);
- } catch (InterruptedException e) {
- long[] updateCounts = extractUpdateCounts(operation.getMetadata().get());
- throw SpannerExceptionFactory.newSpannerBatchUpdateException(
- ErrorCode.CANCELLED, e.getMessage(), updateCounts);
- }
- }
- };
- asyncExecuteStatement(RUN_BATCH, callable);
- }
+ if (statements.isEmpty()) {
this.state = UnitOfWorkState.RAN;
- long[] updateCounts = new long[statements.size()];
- Arrays.fill(updateCounts, 1L);
- return updateCounts;
- } catch (SpannerException e) {
- this.state = UnitOfWorkState.RUN_FAILED;
- throw e;
+ return ApiFutures.immediateFuture(new long[0]);
}
+ // create a statement that can be passed in to the execute method
+ Callable callable =
+ new Callable() {
+ @Override
+ public long[] call() throws Exception {
+ try {
+ OperationFuture operation =
+ ddlClient.executeDdl(statements);
+ try {
+ // Wait until the operation has finished.
+ getWithStatementTimeout(operation, RUN_BATCH);
+ long[] updateCounts = new long[statements.size()];
+ Arrays.fill(updateCounts, 1L);
+ state = UnitOfWorkState.RAN;
+ return updateCounts;
+ } catch (SpannerException e) {
+ long[] updateCounts = extractUpdateCounts(operation);
+ throw SpannerExceptionFactory.newSpannerBatchUpdateException(
+ e.getErrorCode(), e.getMessage(), updateCounts);
+ }
+ } catch (Throwable t) {
+ state = UnitOfWorkState.RUN_FAILED;
+ throw t;
+ }
+ }
+ };
+ this.state = UnitOfWorkState.RUNNING;
+ return executeStatementAsync(
+ RUN_BATCH, callable, DatabaseAdminGrpc.getUpdateDatabaseDdlMethod());
}
- private SpannerException extractSpannerCause(ExecutionException e) {
- Throwable cause = e.getCause();
- Set causes = new HashSet<>();
- while (cause != null && !causes.contains(cause)) {
- if (cause instanceof SpannerException) {
- return (SpannerException) cause;
- }
- causes.add(cause);
- cause = cause.getCause();
+ long[] extractUpdateCounts(OperationFuture operation) {
+ try {
+ return extractUpdateCounts(operation.getMetadata().get());
+ } catch (Throwable t) {
+ return new long[0];
}
- return null;
}
@VisibleForTesting
@@ -293,13 +277,13 @@ public void abortBatch() {
}
@Override
- public void commit() {
+ public ApiFuture commitAsync() {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "Commit is not allowed for DDL batches.");
}
@Override
- public void rollback() {
+ public ApiFuture rollbackAsync() {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "Rollback is not allowed for DDL batches.");
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/DmlBatch.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/DmlBatch.java
index ff38338d62..b5b80e46cf 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/DmlBatch.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/DmlBatch.java
@@ -16,16 +16,20 @@
package com.google.cloud.spanner.connection;
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.ResultSet;
-import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.connection.StatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.StatementParser.StatementType;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.List;
@@ -87,7 +91,7 @@ public boolean isReadOnly() {
}
@Override
- public ResultSet executeQuery(
+ public ApiFuture executeQueryAsync(
ParsedStatement statement, AnalyzeMode analyzeMode, QueryOption... options) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "Executing queries is not allowed for DML batches.");
@@ -116,13 +120,13 @@ public Timestamp getCommitTimestampOrNull() {
}
@Override
- public void executeDdl(ParsedStatement ddl) {
+ public ApiFuture executeDdlAsync(ParsedStatement ddl) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "Executing DDL statements is not allowed for DML batches.");
}
@Override
- public long executeUpdate(ParsedStatement update) {
+ public ApiFuture executeUpdateAsync(ParsedStatement update) {
ConnectionPreconditions.checkState(
state == UnitOfWorkState.STARTED,
"The batch is no longer active and cannot be used for further statements");
@@ -132,44 +136,54 @@ public long executeUpdate(ParsedStatement update) {
+ update.getSqlWithoutComments()
+ "\" is not a DML-statement.");
statements.add(update);
- return -1L;
+ return ApiFutures.immediateFuture(-1L);
}
@Override
- public long[] executeBatchUpdate(Iterable updates) {
+ public ApiFuture executeBatchUpdateAsync(Iterable updates) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "Executing batch updates is not allowed for DML batches.");
}
@Override
- public void write(Mutation mutation) {
+ public ApiFuture writeAsync(Iterable mutations) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "Writing mutations is not allowed for DML batches.");
}
@Override
- public void write(Iterable mutations) {
- throw SpannerExceptionFactory.newSpannerException(
- ErrorCode.FAILED_PRECONDITION, "Writing mutations is not allowed for DML batches.");
- }
-
- @Override
- public long[] runBatch() {
+ public ApiFuture runBatchAsync() {
ConnectionPreconditions.checkState(
state == UnitOfWorkState.STARTED, "The batch is no longer active and cannot be ran");
- try {
- long[] res;
- if (statements.isEmpty()) {
- res = new long[0];
- } else {
- res = transaction.executeBatchUpdate(statements);
- }
+ if (statements.isEmpty()) {
this.state = UnitOfWorkState.RAN;
- return res;
- } catch (SpannerException e) {
- this.state = UnitOfWorkState.RUN_FAILED;
- throw e;
+ return ApiFutures.immediateFuture(new long[0]);
}
+ this.state = UnitOfWorkState.RUNNING;
+ // Use a SettableApiFuture to return the result, instead of directly returning the future that
+ // is returned by the executeBatchUpdateAsync method. This is needed because the state of the
+ // batch is set after the update has finished, and this happens in a listener. A listener is
+ // executed AFTER a Future is done, which means that a user could read the state of the Batch
+ // before it has been changed.
+ final SettableApiFuture res = SettableApiFuture.create();
+ ApiFuture updateCounts = transaction.executeBatchUpdateAsync(statements);
+ ApiFutures.addCallback(
+ updateCounts,
+ new ApiFutureCallback() {
+ @Override
+ public void onFailure(Throwable t) {
+ state = UnitOfWorkState.RUN_FAILED;
+ res.setException(t);
+ }
+
+ @Override
+ public void onSuccess(long[] result) {
+ state = UnitOfWorkState.RAN;
+ res.set(result);
+ }
+ },
+ MoreExecutors.directExecutor());
+ return res;
}
@Override
@@ -180,13 +194,13 @@ public void abortBatch() {
}
@Override
- public void commit() {
+ public ApiFuture commitAsync() {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "Commit is not allowed for DML batches.");
}
@Override
- public void rollback() {
+ public ApiFuture rollbackAsync() {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "Rollback is not allowed for DML batches.");
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadOnlyTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadOnlyTransaction.java
index c9435886c0..09f3efc6d5 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadOnlyTransaction.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadOnlyTransaction.java
@@ -16,6 +16,8 @@
package com.google.cloud.spanner.connection;
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ErrorCode;
@@ -83,6 +85,11 @@ public boolean isReadOnly() {
return true;
}
+ @Override
+ void checkAborted() {
+ // No-op for read-only transactions as they cannot abort.
+ }
+
@Override
void checkValidTransaction() {
if (transaction == null) {
@@ -130,49 +137,45 @@ public Timestamp getCommitTimestampOrNull() {
}
@Override
- public void executeDdl(ParsedStatement ddl) {
+ public ApiFuture executeDdlAsync(ParsedStatement ddl) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "DDL statements are not allowed for read-only transactions");
}
@Override
- public long executeUpdate(ParsedStatement update) {
+ public ApiFuture executeUpdateAsync(ParsedStatement update) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"Update statements are not allowed for read-only transactions");
}
@Override
- public long[] executeBatchUpdate(Iterable updates) {
+ public ApiFuture executeBatchUpdateAsync(Iterable updates) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "Batch updates are not allowed for read-only transactions.");
}
@Override
- public void write(Mutation mutation) {
- throw SpannerExceptionFactory.newSpannerException(
- ErrorCode.FAILED_PRECONDITION, "Mutations are not allowed for read-only transactions");
- }
-
- @Override
- public void write(Iterable mutations) {
+ public ApiFuture writeAsync(Iterable mutations) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "Mutations are not allowed for read-only transactions");
}
@Override
- public void commit() {
+ public ApiFuture commitAsync() {
if (this.transaction != null) {
this.transaction.close();
}
this.state = UnitOfWorkState.COMMITTED;
+ return ApiFutures.immediateFuture(null);
}
@Override
- public void rollback() {
+ public ApiFuture rollbackAsync() {
if (this.transaction != null) {
this.transaction.close();
}
this.state = UnitOfWorkState.ROLLED_BACK;
+ return ApiFutures.immediateFuture(null);
}
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java
index 7a0155cbfb..0a8e322e79 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java
@@ -16,8 +16,13 @@
package com.google.cloud.spanner.connection;
+import static com.google.cloud.spanner.SpannerApiFutures.get;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AbortedDueToConcurrentModificationException;
import com.google.cloud.spanner.AbortedException;
@@ -35,6 +40,10 @@
import com.google.cloud.spanner.connection.TransactionRetryListener.RetryResult;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.spanner.v1.SpannerGrpc;
+import io.grpc.MethodDescriptor;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -65,12 +74,15 @@ class ReadWriteTransaction extends AbstractMultiUseTransaction {
private int transactionRetryAttempts;
private int successfulRetries;
private final List transactionRetryListeners;
- private volatile TransactionContext txContext;
+ private volatile ApiFuture txContextFuture;
+ private volatile SettableApiFuture commitTimestampFuture;
private volatile UnitOfWorkState state = UnitOfWorkState.STARTED;
+ private volatile AbortedException abortedException;
private boolean timedOutOrCancelled = false;
private final List statements = new ArrayList<>();
private final List mutations = new ArrayList<>();
private Timestamp transactionStarted;
+ final Object abortedLock = new Object();
static class Builder extends AbstractMultiUseTransaction.Builder {
private DatabaseClient dbClient;
@@ -154,36 +166,80 @@ public boolean isReadOnly() {
return false;
}
+ private static final ParsedStatement BEGIN_STATEMENT =
+ StatementParser.INSTANCE.parse(Statement.of("BEGIN"));
+
@Override
void checkValidTransaction() {
+ checkValidState();
+ if (txContextFuture == null) {
+ transactionStarted = Timestamp.now();
+ txContextFuture =
+ executeStatementAsync(
+ BEGIN_STATEMENT,
+ new Callable() {
+ @Override
+ public TransactionContext call() throws Exception {
+ return txManager.begin();
+ }
+ },
+ SpannerGrpc.getBeginTransactionMethod());
+ }
+ }
+
+ private void checkValidState() {
ConnectionPreconditions.checkState(
- state == UnitOfWorkState.STARTED,
+ this.state == UnitOfWorkState.STARTED || this.state == UnitOfWorkState.ABORTED,
"This transaction has status "
- + state.name()
+ + this.state.name()
+ ", only "
+ UnitOfWorkState.STARTED
+ + "or "
+ + UnitOfWorkState.ABORTED
+ " is allowed.");
ConnectionPreconditions.checkState(
!timedOutOrCancelled,
"The last statement of this transaction timed out or was cancelled. "
+ "The transaction is no longer usable. "
+ "Rollback the transaction and start a new one.");
- if (txManager.getState() == null) {
- transactionStarted = Timestamp.now();
- txContext = txManager.begin();
- }
- if (txManager.getState()
- != com.google.cloud.spanner.TransactionManager.TransactionState.STARTED) {
- throw SpannerExceptionFactory.newSpannerException(
- ErrorCode.FAILED_PRECONDITION,
- String.format("Invalid transaction state: %s", txManager.getState()));
+ }
+
+ @Override
+ public boolean isActive() {
+ // Consider ABORTED an active state, as it is something that is automatically set if the
+ // transaction is aborted by the backend. That means that we should not automatically create a
+ // new transaction for the following statement after a transaction has aborted, and instead we
+ // should wait until the application has rolled back the current transaction.
+ //
+ // Othwerwise the following list of statements could show unexpected behavior:
+
+ // connection.executeUpdateAsync("UPDATE FOO SET BAR=1 ...");
+ // connection.executeUpdateAsync("UPDATE BAR SET FOO=2 ...");
+ // connection.commitAsync();
+ //
+ // If the first update statement fails with an aborted exception, the second update statement
+ // should not be executed in a new transaction, but should also abort.
+ return getState().isActive() || state == UnitOfWorkState.ABORTED;
+ }
+
+ void checkAborted() {
+ if (this.state == UnitOfWorkState.ABORTED && this.abortedException != null) {
+ if (this.abortedException instanceof AbortedDueToConcurrentModificationException) {
+ throw SpannerExceptionFactory.newAbortedDueToConcurrentModificationException(
+ (AbortedDueToConcurrentModificationException) this.abortedException);
+ } else {
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.ABORTED,
+ "This transaction has already been aborted. Rollback this transaction to start a new one.",
+ this.abortedException);
+ }
}
}
@Override
TransactionContext getReadContext() {
- ConnectionPreconditions.checkState(txContext != null, "Missing transaction context");
- return txContext;
+ ConnectionPreconditions.checkState(txContextFuture != null, "Missing transaction context");
+ return get(txContextFuture);
}
@Override
@@ -199,23 +255,22 @@ public Timestamp getReadTimestampOrNull() {
}
private boolean hasCommitTimestamp() {
- return txManager.getState()
- == com.google.cloud.spanner.TransactionManager.TransactionState.COMMITTED;
+ return commitTimestampFuture != null;
}
@Override
public Timestamp getCommitTimestamp() {
ConnectionPreconditions.checkState(hasCommitTimestamp(), "This transaction has not committed.");
- return txManager.getCommitTimestamp();
+ return get(commitTimestampFuture);
}
@Override
public Timestamp getCommitTimestampOrNull() {
- return hasCommitTimestamp() ? txManager.getCommitTimestamp() : null;
+ return hasCommitTimestamp() ? get(commitTimestampFuture) : null;
}
@Override
- public void executeDdl(ParsedStatement ddl) {
+ public ApiFuture executeDdlAsync(ParsedStatement ddl) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION,
"DDL-statements are not allowed inside a read/write transaction.");
@@ -229,108 +284,138 @@ private void handlePossibleInvalidatingException(SpannerException e) {
}
@Override
- public ResultSet executeQuery(
+ public ApiFuture executeQueryAsync(
final ParsedStatement statement,
final AnalyzeMode analyzeMode,
final QueryOption... options) {
Preconditions.checkArgument(statement.isQuery(), "Statement is not a query");
checkValidTransaction();
- try {
- if (retryAbortsInternally) {
- return asyncExecuteStatement(
- statement,
- new Callable() {
- @Override
- public ResultSet call() throws Exception {
- return runWithRetry(
- new Callable() {
- @Override
- public ResultSet call() throws Exception {
- try {
- getStatementExecutor()
- .invokeInterceptors(
- statement,
- StatementExecutionStep.EXECUTE_STATEMENT,
- ReadWriteTransaction.this);
- ResultSet delegate =
- DirectExecuteResultSet.ofResultSet(
- internalExecuteQuery(statement, analyzeMode, options));
- return createAndAddRetryResultSet(
- delegate, statement, analyzeMode, options);
- } catch (AbortedException e) {
- throw e;
- } catch (SpannerException e) {
- createAndAddFailedQuery(e, statement, analyzeMode, options);
- throw e;
+
+ ApiFuture res;
+ if (retryAbortsInternally) {
+ res =
+ executeStatementAsync(
+ statement,
+ new Callable() {
+ @Override
+ public ResultSet call() throws Exception {
+ return runWithRetry(
+ new Callable() {
+ @Override
+ public ResultSet call() throws Exception {
+ try {
+ getStatementExecutor()
+ .invokeInterceptors(
+ statement,
+ StatementExecutionStep.EXECUTE_STATEMENT,
+ ReadWriteTransaction.this);
+ ResultSet delegate =
+ DirectExecuteResultSet.ofResultSet(
+ internalExecuteQuery(statement, analyzeMode, options));
+ return createAndAddRetryResultSet(
+ delegate, statement, analyzeMode, options);
+ } catch (AbortedException e) {
+ throw e;
+ } catch (SpannerException e) {
+ createAndAddFailedQuery(e, statement, analyzeMode, options);
+ throw e;
+ }
}
- }
- });
- }
- },
- InterceptorsUsage
- .IGNORE_INTERCEPTORS); // ignore interceptors here as they are invoked in the
- // Callable.
- } else {
- return super.executeQuery(statement, analyzeMode, options);
- }
- } catch (SpannerException e) {
- handlePossibleInvalidatingException(e);
- throw e;
+ });
+ }
+ },
+ // ignore interceptors here as they are invoked in the Callable.
+ InterceptorsUsage.IGNORE_INTERCEPTORS,
+ ImmutableList.>of(SpannerGrpc.getExecuteStreamingSqlMethod()));
+ } else {
+ res = super.executeQueryAsync(statement, analyzeMode, options);
}
+
+ ApiFutures.addCallback(
+ res,
+ new ApiFutureCallback() {
+ @Override
+ public void onFailure(Throwable t) {
+ if (t instanceof SpannerException) {
+ handlePossibleInvalidatingException((SpannerException) t);
+ }
+ }
+
+ @Override
+ public void onSuccess(ResultSet result) {}
+ },
+ MoreExecutors.directExecutor());
+ return res;
}
@Override
- public long executeUpdate(final ParsedStatement update) {
+ public ApiFuture executeUpdateAsync(final ParsedStatement update) {
Preconditions.checkNotNull(update);
Preconditions.checkArgument(update.isUpdate(), "The statement is not an update statement");
checkValidTransaction();
- try {
- if (retryAbortsInternally) {
- return asyncExecuteStatement(
- update,
- new Callable() {
- @Override
- public Long call() throws Exception {
- return runWithRetry(
- new Callable() {
- @Override
- public Long call() throws Exception {
- try {
- getStatementExecutor()
- .invokeInterceptors(
- update,
- StatementExecutionStep.EXECUTE_STATEMENT,
- ReadWriteTransaction.this);
- long updateCount = txContext.executeUpdate(update.getStatement());
- createAndAddRetriableUpdate(update, updateCount);
- return updateCount;
- } catch (AbortedException e) {
- throw e;
- } catch (SpannerException e) {
- createAndAddFailedUpdate(e, update);
- throw e;
+ ApiFuture res;
+ if (retryAbortsInternally) {
+ res =
+ executeStatementAsync(
+ update,
+ new Callable() {
+ @Override
+ public Long call() throws Exception {
+ return runWithRetry(
+ new Callable() {
+ @Override
+ public Long call() throws Exception {
+ try {
+ getStatementExecutor()
+ .invokeInterceptors(
+ update,
+ StatementExecutionStep.EXECUTE_STATEMENT,
+ ReadWriteTransaction.this);
+ long updateCount =
+ get(txContextFuture).executeUpdate(update.getStatement());
+ createAndAddRetriableUpdate(update, updateCount);
+ return updateCount;
+ } catch (AbortedException e) {
+ throw e;
+ } catch (SpannerException e) {
+ createAndAddFailedUpdate(e, update);
+ throw e;
+ }
}
- }
- });
- }
- },
- InterceptorsUsage
- .IGNORE_INTERCEPTORS); // ignore interceptors here as they are invoked in the
- // Callable.
- } else {
- return asyncExecuteStatement(
- update,
- new Callable() {
- @Override
- public Long call() throws Exception {
- return txContext.executeUpdate(update.getStatement());
- }
- });
- }
- } catch (SpannerException e) {
- handlePossibleInvalidatingException(e);
- throw e;
+ });
+ }
+ },
+ // ignore interceptors here as they are invoked in the Callable.
+ InterceptorsUsage.IGNORE_INTERCEPTORS,
+ ImmutableList.>of(SpannerGrpc.getExecuteSqlMethod()));
+ } else {
+ res =
+ executeStatementAsync(
+ update,
+ new Callable() {
+ @Override
+ public Long call() throws Exception {
+ checkAborted();
+ return get(txContextFuture).executeUpdate(update.getStatement());
+ }
+ },
+ SpannerGrpc.getExecuteSqlMethod());
}
+ ApiFutures.addCallback(
+ res,
+ new ApiFutureCallback() {
+ @Override
+ public void onFailure(Throwable t) {
+ if (t instanceof SpannerException) {
+ handlePossibleInvalidatingException((SpannerException) t);
+ }
+ }
+
+ @Override
+ public void onSuccess(Long result) {}
+ },
+ MoreExecutors.directExecutor());
+ return res;
}
/**
@@ -348,7 +433,7 @@ public Long call() throws Exception {
StatementParser.INSTANCE.parse(Statement.of("RUN BATCH"));
@Override
- public long[] executeBatchUpdate(final Iterable updates) {
+ public ApiFuture executeBatchUpdateAsync(Iterable updates) {
Preconditions.checkNotNull(updates);
final List updateStatements = new LinkedList<>();
for (ParsedStatement update : updates) {
@@ -358,69 +443,81 @@ public long[] executeBatchUpdate(final Iterable updates) {
updateStatements.add(update.getStatement());
}
checkValidTransaction();
- try {
- if (retryAbortsInternally) {
- return asyncExecuteStatement(
- EXECUTE_BATCH_UPDATE_STATEMENT,
- new Callable() {
- @Override
- public long[] call() throws Exception {
- return runWithRetry(
- new Callable() {
- @Override
- public long[] call() throws Exception {
- try {
- getStatementExecutor()
- .invokeInterceptors(
- EXECUTE_BATCH_UPDATE_STATEMENT,
- StatementExecutionStep.EXECUTE_STATEMENT,
- ReadWriteTransaction.this);
- long[] updateCounts = txContext.batchUpdate(updateStatements);
- createAndAddRetriableBatchUpdate(updateStatements, updateCounts);
- return updateCounts;
- } catch (AbortedException e) {
- throw e;
- } catch (SpannerException e) {
- createAndAddFailedBatchUpdate(e, updateStatements);
- throw e;
+
+ ApiFuture res;
+ if (retryAbortsInternally) {
+ res =
+ executeStatementAsync(
+ EXECUTE_BATCH_UPDATE_STATEMENT,
+ new Callable() {
+ @Override
+ public long[] call() throws Exception {
+ return runWithRetry(
+ new Callable() {
+ @Override
+ public long[] call() throws Exception {
+ try {
+ getStatementExecutor()
+ .invokeInterceptors(
+ EXECUTE_BATCH_UPDATE_STATEMENT,
+ StatementExecutionStep.EXECUTE_STATEMENT,
+ ReadWriteTransaction.this);
+ long[] updateCounts =
+ get(txContextFuture).batchUpdate(updateStatements);
+ createAndAddRetriableBatchUpdate(updateStatements, updateCounts);
+ return updateCounts;
+ } catch (AbortedException e) {
+ throw e;
+ } catch (SpannerException e) {
+ createAndAddFailedBatchUpdate(e, updateStatements);
+ throw e;
+ }
}
- }
- });
- }
- },
- InterceptorsUsage
- .IGNORE_INTERCEPTORS); // ignore interceptors here as they are invoked in the
- // Callable.
- } else {
- return asyncExecuteStatement(
- EXECUTE_BATCH_UPDATE_STATEMENT,
- new Callable() {
- @Override
- public long[] call() throws Exception {
- return txContext.batchUpdate(updateStatements);
- }
- });
- }
- } catch (SpannerException e) {
- handlePossibleInvalidatingException(e);
- throw e;
+ });
+ }
+ },
+ // ignore interceptors here as they are invoked in the Callable.
+ InterceptorsUsage.IGNORE_INTERCEPTORS,
+ ImmutableList.>of(SpannerGrpc.getExecuteBatchDmlMethod()));
+ } else {
+ res =
+ executeStatementAsync(
+ EXECUTE_BATCH_UPDATE_STATEMENT,
+ new Callable() {
+ @Override
+ public long[] call() throws Exception {
+ checkAborted();
+ return get(txContextFuture).batchUpdate(updateStatements);
+ }
+ },
+ SpannerGrpc.getExecuteBatchDmlMethod());
}
- }
- @Override
- public void write(Mutation mutation) {
- Preconditions.checkNotNull(mutation);
- checkValidTransaction();
- mutations.add(mutation);
+ ApiFutures.addCallback(
+ res,
+ new ApiFutureCallback() {
+ @Override
+ public void onFailure(Throwable t) {
+ if (t instanceof SpannerException) {
+ handlePossibleInvalidatingException((SpannerException) t);
+ }
+ }
+
+ @Override
+ public void onSuccess(long[] result) {}
+ },
+ MoreExecutors.directExecutor());
+ return res;
}
@Override
- public void write(Iterable mutations) {
+ public ApiFuture writeAsync(Iterable mutations) {
Preconditions.checkNotNull(mutations);
checkValidTransaction();
for (Mutation mutation : mutations) {
this.mutations.add(checkNotNull(mutation));
}
+ return ApiFutures.immediateFuture(null);
}
/**
@@ -440,51 +537,79 @@ public void write(Iterable mutations) {
new Callable() {
@Override
public Void call() throws Exception {
- txContext.buffer(mutations);
+ checkAborted();
+ get(txContextFuture).buffer(mutations);
txManager.commit();
+ commitTimestampFuture.set(txManager.getCommitTimestamp());
+ state = UnitOfWorkState.COMMITTED;
return null;
}
};
@Override
- public void commit() {
+ public ApiFuture commitAsync() {
checkValidTransaction();
- try {
- if (retryAbortsInternally) {
- asyncExecuteStatement(
- COMMIT_STATEMENT,
- new Callable() {
- @Override
- public Void call() throws Exception {
- return runWithRetry(
- new Callable() {
- @Override
- public Void call() throws Exception {
- getStatementExecutor()
- .invokeInterceptors(
- COMMIT_STATEMENT,
- StatementExecutionStep.EXECUTE_STATEMENT,
- ReadWriteTransaction.this);
- commitCallable.call();
- return null;
- }
- });
- }
- },
- InterceptorsUsage.IGNORE_INTERCEPTORS);
- } else {
- asyncExecuteStatement(COMMIT_STATEMENT, commitCallable);
- }
- ReadWriteTransaction.this.state = UnitOfWorkState.COMMITTED;
- } catch (SpannerException e) {
- try {
- txManager.close();
- } catch (Throwable t) {
- // ignore
- }
- this.state = UnitOfWorkState.COMMIT_FAILED;
- throw e;
+ state = UnitOfWorkState.COMMITTING;
+ commitTimestampFuture = SettableApiFuture.create();
+ ApiFuture res;
+ if (retryAbortsInternally) {
+ res =
+ executeStatementAsync(
+ COMMIT_STATEMENT,
+ new Callable() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ return runWithRetry(
+ new Callable() {
+ @Override
+ public Void call() throws Exception {
+ getStatementExecutor()
+ .invokeInterceptors(
+ COMMIT_STATEMENT,
+ StatementExecutionStep.EXECUTE_STATEMENT,
+ ReadWriteTransaction.this);
+ return commitCallable.call();
+ }
+ });
+ } catch (Throwable t) {
+ commitTimestampFuture.setException(t);
+ state = UnitOfWorkState.COMMIT_FAILED;
+ try {
+ txManager.close();
+ } catch (Throwable t2) {
+ // Ignore.
+ }
+ throw t;
+ }
+ }
+ },
+ InterceptorsUsage.IGNORE_INTERCEPTORS,
+ ImmutableList.>of(SpannerGrpc.getCommitMethod()));
+ } else {
+ res =
+ executeStatementAsync(
+ COMMIT_STATEMENT,
+ new Callable() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ return commitCallable.call();
+ } catch (Throwable t) {
+ commitTimestampFuture.setException(t);
+ state = UnitOfWorkState.COMMIT_FAILED;
+ try {
+ txManager.close();
+ } catch (Throwable t2) {
+ // Ignore.
+ }
+ throw t;
+ }
+ }
+ },
+ SpannerGrpc.getCommitMethod());
}
+ return res;
}
/**
@@ -508,18 +633,17 @@ public Void call() throws Exception {
*/
T runWithRetry(Callable callable) throws SpannerException {
while (true) {
- try {
- return callable.call();
- } catch (final AbortedException aborted) {
- if (retryAbortsInternally) {
+ synchronized (abortedLock) {
+ checkAborted();
+ try {
+ return callable.call();
+ } catch (final AbortedException aborted) {
handleAborted(aborted);
- } else {
- throw aborted;
+ } catch (SpannerException e) {
+ throw e;
+ } catch (Exception e) {
+ throw SpannerExceptionFactory.asSpannerException(e);
}
- } catch (SpannerException e) {
- throw e;
- } catch (Exception e) {
- throw SpannerExceptionFactory.newSpannerException(ErrorCode.UNKNOWN, e.getMessage(), e);
}
}
}
@@ -609,7 +733,7 @@ private void handleAborted(AbortedException aborted) {
ErrorCode.CANCELLED, "The statement was cancelled");
}
try {
- txContext = txManager.resetForRetry();
+ txContextFuture = ApiFutures.immediateFuture(txManager.resetForRetry());
// Inform listeners about the transaction retry that is about to start.
invokeTransactionRetryListenersOnStart();
// Then retry all transaction statements.
@@ -630,13 +754,14 @@ private void handleAborted(AbortedException aborted) {
RetryResult.RETRY_ABORTED_DUE_TO_CONCURRENT_MODIFICATION);
logger.fine(
toString() + ": Internal transaction retry aborted due to a concurrent modification");
- // Try to rollback the new transaction and ignore any exceptions.
+ // Do a shoot and forget rollback.
try {
txManager.rollback();
} catch (Throwable t) {
// ignore
}
this.state = UnitOfWorkState.ABORTED;
+ this.abortedException = e;
throw e;
} catch (AbortedException e) {
// Retry aborted, do another retry of the transaction.
@@ -651,7 +776,7 @@ private void handleAborted(AbortedException aborted) {
Level.FINE,
toString() + ": Internal transaction retry failed due to an unexpected exception",
e);
- // Try to rollback the new transaction and ignore any exceptions.
+ // Do a shoot and forget rollback.
try {
txManager.rollback();
} catch (Throwable t) {
@@ -659,6 +784,7 @@ private void handleAborted(AbortedException aborted) {
}
// Set transaction state to aborted as the retry failed.
this.state = UnitOfWorkState.ABORTED;
+ this.abortedException = aborted;
// Re-throw underlying exception.
throw e;
}
@@ -671,6 +797,7 @@ private void handleAborted(AbortedException aborted) {
}
// Internal retry is not enabled.
this.state = UnitOfWorkState.ABORTED;
+ this.abortedException = aborted;
throw aborted;
}
}
@@ -689,8 +816,11 @@ private void throwAbortWithRetryAttemptsExceeded() throws SpannerException {
// ignore
}
this.state = UnitOfWorkState.ABORTED;
- throw SpannerExceptionFactory.newSpannerException(
- ErrorCode.ABORTED, MAX_INTERNAL_RETRIES_EXCEEDED);
+ this.abortedException =
+ (AbortedException)
+ SpannerExceptionFactory.newSpannerException(
+ ErrorCode.ABORTED, MAX_INTERNAL_RETRIES_EXCEEDED);
+ throw this.abortedException;
}
private void invokeTransactionRetryListenersOnStart() {
@@ -713,26 +843,30 @@ private void invokeTransactionRetryListenersOnFinish(RetryResult result) {
new Callable() {
@Override
public Void call() throws Exception {
- txManager.rollback();
- return null;
+ try {
+ if (state != UnitOfWorkState.ABORTED) {
+ // Make sure the transaction has actually started before we try to rollback.
+ get(txContextFuture);
+ txManager.rollback();
+ }
+ return null;
+ } finally {
+ txManager.close();
+ }
}
};
@Override
- public void rollback() {
+ public ApiFuture rollbackAsync() {
ConnectionPreconditions.checkState(
- state == UnitOfWorkState.STARTED, "This transaction has status " + state.name());
- try {
- asyncExecuteStatement(rollbackStatement, rollbackCallable);
- } finally {
- // Whatever happens, we should always call close in order to return the underlying session to
- // the session pool to avoid any session leaks.
- try {
- txManager.close();
- } catch (Throwable e) {
- // ignore
- }
- this.state = UnitOfWorkState.ROLLED_BACK;
+ state == UnitOfWorkState.STARTED || state == UnitOfWorkState.ABORTED,
+ "This transaction has status " + state.name());
+ state = UnitOfWorkState.ROLLED_BACK;
+ if (txContextFuture != null && state != UnitOfWorkState.ABORTED) {
+ return executeStatementAsync(
+ rollbackStatement, rollbackCallable, SpannerGrpc.getRollbackMethod());
+ } else {
+ return ApiFutures.immediateFuture(null);
}
}
diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SingleUseTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SingleUseTransaction.java
index 614d0c61e5..52011eb910 100644
--- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SingleUseTransaction.java
+++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SingleUseTransaction.java
@@ -16,15 +16,17 @@
package com.google.cloud.spanner.connection;
+import com.google.api.core.ApiFuture;
+import com.google.api.core.SettableApiFuture;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.Timestamp;
-import com.google.cloud.spanner.AbortedException;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.ReadOnlyTransaction;
import com.google.cloud.spanner.ResultSet;
+import com.google.cloud.spanner.SpannerApiFutures;
import com.google.cloud.spanner.SpannerBatchUpdateException;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.SpannerExceptionFactory;
@@ -36,13 +38,15 @@
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.cloud.spanner.connection.StatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.StatementParser.StatementType;
+import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.spanner.admin.database.v1.DatabaseAdminGrpc;
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
+import com.google.spanner.v1.SpannerGrpc;
+import io.grpc.MethodDescriptor;
import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
/**
* Transaction that is used when a {@link Connection} is in autocommit mode. Each method on this
@@ -66,11 +70,11 @@ class SingleUseTransaction extends AbstractBaseUnitOfWork {
private final DatabaseClient dbClient;
private final TimestampBound readOnlyStaleness;
private final AutocommitDmlMode autocommitDmlMode;
- private Timestamp readTimestamp = null;
+ private volatile SettableApiFuture readTimestamp = null;
private volatile TransactionManager txManager;
- private TransactionRunner writeTransaction;
+ private volatile TransactionRunner writeTransaction;
private boolean used = false;
- private UnitOfWorkState state = UnitOfWorkState.STARTED;
+ private volatile UnitOfWorkState state = UnitOfWorkState.STARTED;
static class Builder extends AbstractBaseUnitOfWork.Builder {
private DdlClient ddlClient;
@@ -160,7 +164,7 @@ private void checkAndMarkUsed() {
}
@Override
- public ResultSet executeQuery(
+ public ApiFuture executeQueryAsync(
final ParsedStatement statement,
final AnalyzeMode analyzeMode,
final QueryOption... options) {
@@ -185,42 +189,43 @@ public ResultSet call() throws Exception {
}
// Return a DirectExecuteResultSet, which will directly do a next() call in order to
// ensure that the query is actually sent to Spanner.
- return DirectExecuteResultSet.ofResultSet(rs);
- } finally {
+ ResultSet directRs = DirectExecuteResultSet.ofResultSet(rs);
+ state = UnitOfWorkState.COMMITTED;
+ readTimestamp.set(currentTransaction.getReadTimestamp());
+ return directRs;
+ } catch (Throwable t) {
+ state = UnitOfWorkState.COMMIT_FAILED;
+ readTimestamp.set(null);
currentTransaction.close();
+ throw t;
}
}
};
- try {
- ResultSet res = asyncExecuteStatement(statement, callable);
- readTimestamp = currentTransaction.getReadTimestamp();
- state = UnitOfWorkState.COMMITTED;
- return res;
- } catch (Throwable e) {
- state = UnitOfWorkState.COMMIT_FAILED;
- throw e;
- } finally {
- currentTransaction.close();
- }
+ readTimestamp = SettableApiFuture.create();
+ ApiFuture res =
+ executeStatementAsync(statement, callable, SpannerGrpc.getExecuteStreamingSqlMethod());
+ return res;
}
@Override
public Timestamp getReadTimestamp() {
ConnectionPreconditions.checkState(
- readTimestamp != null, "There is no read timestamp available for this transaction.");
- return readTimestamp;
+ SpannerApiFutures.getOrNull(readTimestamp) != null,
+ "There is no read timestamp available for this transaction.");
+ return SpannerApiFutures.get(readTimestamp);
}
@Override
public Timestamp getReadTimestampOrNull() {
- return readTimestamp;
+ return SpannerApiFutures.getOrNull(readTimestamp);
}
private boolean hasCommitTimestamp() {
- return writeTransaction != null
- || (txManager != null
- && txManager.getState()
- == com.google.cloud.spanner.TransactionManager.TransactionState.COMMITTED);
+ return state == UnitOfWorkState.COMMITTED
+ && (writeTransaction != null
+ || (txManager != null
+ && txManager.getState()
+ == com.google.cloud.spanner.TransactionManager.TransactionState.COMMITTED));
}
@Override
@@ -247,7 +252,7 @@ public Timestamp getCommitTimestampOrNull() {
}
@Override
- public void executeDdl(final ParsedStatement ddl) {
+ public ApiFuture executeDdlAsync(final ParsedStatement ddl) {
Preconditions.checkNotNull(ddl);
Preconditions.checkArgument(
ddl.getType() == StatementType.DDL, "Statement is not a ddl statement");
@@ -255,70 +260,53 @@ public void executeDdl(final ParsedStatement ddl) {
!isReadOnly(), "DDL statements are not allowed in read-only mode");
checkAndMarkUsed();
- try {
- Callable callable =
- new Callable() {
- @Override
- public Void call() throws Exception {
+ Callable callable =
+ new Callable() {
+ @Override
+ public Void call() throws Exception {
+ try {
OperationFuture operation =
ddlClient.executeDdl(ddl.getSqlWithoutComments());
- return operation.get();
+ Void res = getWithStatementTimeout(operation, ddl);
+ state = UnitOfWorkState.COMMITTED;
+ return res;
+ } catch (Throwable t) {
+ state = UnitOfWorkState.COMMIT_FAILED;
+ throw t;
}
- };
- asyncExecuteStatement(ddl, callable);
- state = UnitOfWorkState.COMMITTED;
- } catch (Throwable e) {
- state = UnitOfWorkState.COMMIT_FAILED;
- throw e;
- }
+ }
+ };
+ return executeStatementAsync(ddl, callable, DatabaseAdminGrpc.getUpdateDatabaseDdlMethod());
}
@Override
- public long executeUpdate(final ParsedStatement update) {
+ public ApiFuture executeUpdateAsync(ParsedStatement update) {
Preconditions.checkNotNull(update);
Preconditions.checkArgument(update.isUpdate(), "Statement is not an update statement");
ConnectionPreconditions.checkState(
!isReadOnly(), "Update statements are not allowed in read-only mode");
checkAndMarkUsed();
- long res;
- try {
- switch (autocommitDmlMode) {
- case TRANSACTIONAL:
- res = executeAsyncTransactionalUpdate(update, new TransactionalUpdateCallable(update));
- break;
- case PARTITIONED_NON_ATOMIC:
- res = executeAsyncPartitionedUpdate(update);
- break;
- default:
- throw SpannerExceptionFactory.newSpannerException(
- ErrorCode.FAILED_PRECONDITION, "Unknown dml mode: " + autocommitDmlMode);
- }
- } catch (Throwable e) {
- state = UnitOfWorkState.COMMIT_FAILED;
- throw e;
+ ApiFuture res;
+ switch (autocommitDmlMode) {
+ case TRANSACTIONAL:
+ res = executeTransactionalUpdateAsync(update);
+ break;
+ case PARTITIONED_NON_ATOMIC:
+ res = executePartitionedUpdateAsync(update);
+ break;
+ default:
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.FAILED_PRECONDITION, "Unknown dml mode: " + autocommitDmlMode);
}
- state = UnitOfWorkState.COMMITTED;
return res;
}
- /** Execute an update statement as a partitioned DML statement. */
- private long executeAsyncPartitionedUpdate(final ParsedStatement update) {
- Callable callable =
- new Callable() {
- @Override
- public Long call() throws Exception {
- return dbClient.executePartitionedUpdate(update.getStatement());
- }
- };
- return asyncExecuteStatement(update, callable);
- }
-
private final ParsedStatement executeBatchUpdateStatement =
StatementParser.INSTANCE.parse(Statement.of("RUN BATCH"));
@Override
- public long[] executeBatchUpdate(Iterable updates) {
+ public ApiFuture executeBatchUpdateAsync(Iterable updates) {
Preconditions.checkNotNull(updates);
for (ParsedStatement update : updates) {
Preconditions.checkArgument(
@@ -329,170 +317,157 @@ public long[] executeBatchUpdate(Iterable updates) {
!isReadOnly(), "Batch update statements are not allowed in read-only mode");
checkAndMarkUsed();
- long[] res;
- try {
- switch (autocommitDmlMode) {
- case TRANSACTIONAL:
- res =
- executeAsyncTransactionalUpdate(
- executeBatchUpdateStatement, new TransactionalBatchUpdateCallable(updates));
- break;
- case PARTITIONED_NON_ATOMIC:
- throw SpannerExceptionFactory.newSpannerException(
- ErrorCode.FAILED_PRECONDITION,
- "Batch updates are not allowed in " + autocommitDmlMode);
- default:
- throw SpannerExceptionFactory.newSpannerException(
- ErrorCode.FAILED_PRECONDITION, "Unknown dml mode: " + autocommitDmlMode);
- }
- } catch (SpannerBatchUpdateException e) {
- // Batch update exceptions does not cause a rollback.
- state = UnitOfWorkState.COMMITTED;
- throw e;
- } catch (Throwable e) {
- state = UnitOfWorkState.COMMIT_FAILED;
- throw e;
+ switch (autocommitDmlMode) {
+ case TRANSACTIONAL:
+ return executeTransactionalBatchUpdateAsync(updates);
+ case PARTITIONED_NON_ATOMIC:
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.FAILED_PRECONDITION, "Batch updates are not allowed in " + autocommitDmlMode);
+ default:
+ throw SpannerExceptionFactory.newSpannerException(
+ ErrorCode.FAILED_PRECONDITION, "Unknown dml mode: " + autocommitDmlMode);
}
- state = UnitOfWorkState.COMMITTED;
- return res;
}
- /** Base class for executing DML updates (both single statements and batches). */
- private abstract class AbstractUpdateCallable implements Callable {
- abstract T executeUpdate(TransactionContext txContext);
-
- @Override
- public T call() throws Exception {
- try {
- txManager = dbClient.transactionManager();
- // Check the interrupted state after each (possible) round-trip to the db to allow the
- // statement to be cancelled.
- checkInterrupted();
- try (TransactionContext txContext = txManager.begin()) {
- checkInterrupted();
- T res = executeUpdate(txContext);
- checkInterrupted();
- txManager.commit();
- checkInterrupted();
- return res;
- }
- } finally {
- if (txManager != null) {
- // Calling txManager.close() will rollback the transaction if it is still active, i.e. if
- // an error occurred before the commit() call returned successfully.
- txManager.close();
- }
- }
- }
- }
-
- /** {@link Callable} for a single update statement. */
- private final class TransactionalUpdateCallable extends AbstractUpdateCallable {
- private final ParsedStatement update;
-
- private TransactionalUpdateCallable(ParsedStatement update) {
- this.update = update;
- }
-
- @Override
- Long executeUpdate(TransactionContext txContext) {
- return txContext.executeUpdate(update.getStatement());
- }
- }
-
- /** {@link Callable} for a batch update. */
- private final class TransactionalBatchUpdateCallable extends AbstractUpdateCallable {
- private final List updates;
-
- private TransactionalBatchUpdateCallable(Iterable updates) {
- this.updates = new LinkedList<>();
- for (ParsedStatement update : updates) {
- this.updates.add(update.getStatement());
- }
- }
-
- @Override
- long[] executeUpdate(TransactionContext txContext) {
- return txContext.batchUpdate(updates);
- }
+ private ApiFuture executeTransactionalUpdateAsync(final ParsedStatement update) {
+ Callable callable =
+ new Callable() {
+ @Override
+ public Long call() throws Exception {
+ try {
+ writeTransaction = dbClient.readWriteTransaction();
+ Long res =
+ writeTransaction.run(
+ new TransactionCallable() {
+ @Override
+ public Long run(TransactionContext transaction) throws Exception {
+ return transaction.executeUpdate(update.getStatement());
+ }
+ });
+ state = UnitOfWorkState.COMMITTED;
+ return res;
+ } catch (Throwable t) {
+ state = UnitOfWorkState.COMMIT_FAILED;
+ throw t;
+ }
+ }
+ };
+ return executeStatementAsync(
+ update,
+ callable,
+ ImmutableList.>of(
+ SpannerGrpc.getExecuteSqlMethod(), SpannerGrpc.getCommitMethod()));
}
- private T executeAsyncTransactionalUpdate(
- final ParsedStatement update, final AbstractUpdateCallable callable) {
- long startedTime = System.currentTimeMillis();
- // This method uses a TransactionManager instead of the TransactionRunner in order to be able to
- // handle timeouts and canceling of a statement.
- while (true) {
- try {
- return asyncExecuteStatement(update, callable);
- } catch (AbortedException e) {
- try {
- Thread.sleep(e.getRetryDelayInMillis() / 1000);
- } catch (InterruptedException e1) {
- throw SpannerExceptionFactory.newSpannerException(
- ErrorCode.CANCELLED, "Statement execution was interrupted", e1);
- }
- // Check whether the timeout time has been exceeded.
- long executionTime = System.currentTimeMillis() - startedTime;
- if (getStatementTimeout().hasTimeout()
- && executionTime > getStatementTimeout().getTimeoutValue(TimeUnit.MILLISECONDS)) {
- throw SpannerExceptionFactory.newSpannerException(
- ErrorCode.DEADLINE_EXCEEDED,
- "Statement execution timeout occurred for " + update.getSqlWithoutComments());
- }
- }
- }
+ private ApiFuture executePartitionedUpdateAsync(final ParsedStatement update) {
+ Callable callable =
+ new Callable() {
+ @Override
+ public Long call() throws Exception {
+ try {
+ Long res = dbClient.executePartitionedUpdate(update.getStatement());
+ state = UnitOfWorkState.COMMITTED;
+ return res;
+ } catch (Throwable t) {
+ state = UnitOfWorkState.COMMIT_FAILED;
+ throw t;
+ }
+ }
+ };
+ return executeStatementAsync(update, callable, SpannerGrpc.getExecuteStreamingSqlMethod());
}
- private void checkInterrupted() throws InterruptedException {
- if (Thread.currentThread().isInterrupted()) {
- throw new InterruptedException();
- }
+ private ApiFuture executeTransactionalBatchUpdateAsync(
+ final Iterable updates) {
+ Callable callable =
+ new Callable() {
+ @Override
+ public long[] call() throws Exception {
+ writeTransaction = dbClient.readWriteTransaction();
+ return writeTransaction.run(
+ new TransactionCallable() {
+ @Override
+ public long[] run(TransactionContext transaction) throws Exception {
+ try {
+ long[] res =
+ transaction.batchUpdate(
+ Iterables.transform(
+ updates,
+ new Function() {
+ @Override
+ public Statement apply(ParsedStatement input) {
+ return input.getStatement();
+ }
+ }));
+ state = UnitOfWorkState.COMMITTED;
+ return res;
+ } catch (Throwable t) {
+ if (t instanceof SpannerBatchUpdateException) {
+ // Batch update exceptions does not cause a rollback.
+ state = UnitOfWorkState.COMMITTED;
+ } else {
+ state = UnitOfWorkState.COMMIT_FAILED;
+ }
+ throw t;
+ }
+ }
+ });
+ }
+ };
+ return executeStatementAsync(
+ executeBatchUpdateStatement, callable, SpannerGrpc.getExecuteBatchDmlMethod());
}
- @Override
- public void write(final Mutation mutation) {
- write(Arrays.asList(mutation));
- }
+ private final ParsedStatement commitStatement =
+ StatementParser.INSTANCE.parse(Statement.of("COMMIT"));
@Override
- public void write(final Iterable mutations) {
+ public ApiFuture writeAsync(final Iterable mutations) {
Preconditions.checkNotNull(mutations);
ConnectionPreconditions.checkState(
!isReadOnly(), "Update statements are not allowed in read-only mode");
checkAndMarkUsed();
- writeTransaction = dbClient.readWriteTransaction();
- try {
- writeTransaction.run(
- new TransactionCallable() {
- @Override
- public Void run(TransactionContext transaction) throws Exception {
- transaction.buffer(mutations);
- return null;
+ Callable callable =
+ new Callable() {
+ @Override
+ public Void call() throws Exception {
+ try {
+ writeTransaction = dbClient.readWriteTransaction();
+ Void res =
+ writeTransaction.run(
+ new TransactionCallable