Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat: add support for tagging to Connection API (#623)
* feat: add support for tagging in Connection API

* fix: disallow statement tags for commit/rollback/run

* chore: cleanup after rebase

* test: add generated tests + cleanup

* build: add new methods to clirr

* fix: add default implementations for new methods
  • Loading branch information
olavloite committed Jul 5, 2021
1 parent f257671 commit 5722372
Show file tree
Hide file tree
Showing 26 changed files with 10,184 additions and 6,496 deletions.
22 changes: 22 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -592,6 +592,28 @@
<className>com/google/cloud/spanner/AsyncTransactionManager$CommitTimestampFuture</className>
<method>java.lang.Object get()</method>
</difference>
<!-- Support for tagging in Connection API -->
<!-- These are not breaking changes, since we provide default interface implementation -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>java.lang.String getStatementTag()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setStatementTag(java.lang.String)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>java.lang.String getTransactionTag()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setTransactionTag(java.lang.String)</method>
</difference>

<!-- Adds getValue to ResultSet -->
<!-- These are not breaking changes, since we provide default interface implementation -->
Expand Down
Expand Up @@ -51,6 +51,7 @@
abstract class AbstractBaseUnitOfWork implements UnitOfWork {
private final StatementExecutor statementExecutor;
private final StatementTimeout statementTimeout;
protected final String transactionTag;

/** Class for keeping track of the stacktrace of the caller of an async statement. */
static final class SpannerAsyncExecutionException extends RuntimeException {
Expand Down Expand Up @@ -82,6 +83,7 @@ enum InterceptorsUsage {
abstract static class Builder<B extends Builder<?, T>, T extends AbstractBaseUnitOfWork> {
private StatementExecutor statementExecutor;
private StatementTimeout statementTimeout = new StatementTimeout();
private String transactionTag;

Builder() {}

Expand All @@ -102,13 +104,19 @@ B setStatementTimeout(StatementTimeout timeout) {
return self();
}

B setTransactionTag(@Nullable String tag) {
this.transactionTag = tag;
return self();
}

abstract T build();
}

AbstractBaseUnitOfWork(Builder<?, ?> builder) {
Preconditions.checkState(builder.statementExecutor != null, "No statement executor specified");
this.statementExecutor = builder.statementExecutor;
this.statementTimeout = builder.statementTimeout;
this.transactionTag = builder.transactionTag;
}

StatementExecutor getStatementExecutor() {
Expand Down
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.connection.StatementResult.ResultType;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -330,6 +331,52 @@ public interface Connection extends AutoCloseable {
*/
TransactionMode getTransactionMode();

/**
* Sets the transaction tag to use for the current transaction. This method may only be called
* when in a transaction and before any statements have been executed in the transaction.
*
* <p>The tag will be set as the transaction tag of all statements during the transaction, and as
* the transaction tag of the commit.
*
* <p>The transaction tag will automatically be cleared after the transaction has ended.
*
* @param tag The tag to use.
*/
default void setTransactionTag(String tag) {
throw new UnsupportedOperationException();
}

/** @return The transaction tag of the current transaction. */
default String getTransactionTag() {
throw new UnsupportedOperationException();
}

/**
* Sets the statement tag to use for the next statement that is executed. The tag is automatically
* cleared after the statement is executed. Statement tags can be used both with autocommit=true
* and autocommit=false, and can be used for partitioned DML.
*
* <p>Statement tags are not allowed before COMMIT and ROLLBACK statements.
*
* <p>Statement tags are allowed before START BATCH DML statements and will be included in the
* {@link ExecuteBatchDmlRequest} that is sent to Spanner. Statement tags are not allowed inside a
* batch.
*
* @param tag The statement tag to use with the next statement that will be executed on this
* connection.
*/
default void setStatementTag(String tag) {
throw new UnsupportedOperationException();
}

/**
* @return The statement tag that will be used with the next statement that is executed on this
* connection.
*/
default String getStatementTag() {
throw new UnsupportedOperationException();
}

/**
* @return <code>true</code> if this connection will automatically retry read/write transactions
* that abort. This method may only be called when the connection is in read/write
Expand Down
Expand Up @@ -26,7 +26,9 @@
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.ResultSets;
Expand All @@ -45,6 +47,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
Expand Down Expand Up @@ -206,6 +209,9 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
private TimestampBound readOnlyStaleness = TimestampBound.strong();
private QueryOptions queryOptions = QueryOptions.getDefaultInstance();

private String transactionTag;
private String statementTag;

/** Create a connection and register it in the SpannerPool. */
ConnectionImpl(ConnectionOptions options) {
Preconditions.checkNotNull(options);
Expand Down Expand Up @@ -512,6 +518,47 @@ public void setTransactionMode(TransactionMode transactionMode) {
this.unitOfWorkType = UnitOfWorkType.of(transactionMode);
}

@Override
public String getTransactionTag() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(!isDdlBatchActive(), "This connection is in a DDL batch");
return transactionTag;
}

@Override
public void setTransactionTag(String tag) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(
!isBatchActive(), "Cannot set transaction tag while in a batch");
ConnectionPreconditions.checkState(isInTransaction(), "This connection has no transaction");
ConnectionPreconditions.checkState(
!isTransactionStarted(),
"The transaction tag cannot be set after the transaction has started");
ConnectionPreconditions.checkState(
getTransactionMode() == TransactionMode.READ_WRITE_TRANSACTION,
"Transaction tag can only be set for a read/write transaction");

this.transactionBeginMarked = true;
this.transactionTag = tag;
}

@Override
public String getStatementTag() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(
!isBatchActive(), "Statement tags are not allowed inside a batch");
return statementTag;
}

@Override
public void setStatementTag(String tag) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(
!isBatchActive(), "Statement tags are not allowed inside a batch");

this.statementTag = tag;
}

/**
* Throws an {@link SpannerException} with code {@link ErrorCode#FAILED_PRECONDITION} if the
* current state of this connection does not allow changing the setting for retryAbortsInternally.
Expand Down Expand Up @@ -643,6 +690,7 @@ private void setDefaultTransactionOptions() {
? UnitOfWorkType.READ_ONLY_TRANSACTION
: UnitOfWorkType.READ_WRITE_TRANSACTION;
batchMode = BatchMode.NONE;
transactionTag = null;
} else {
popUnitOfWorkFromTransactionStack();
}
Expand Down Expand Up @@ -717,6 +765,8 @@ public ApiFuture<Void> rollbackAsync() {
private ApiFuture<Void> endCurrentTransactionAsync(EndTransactionMethod endTransactionMethod) {
ConnectionPreconditions.checkState(!isBatchActive(), "This connection has an active batch");
ConnectionPreconditions.checkState(isInTransaction(), "This connection has no transaction");
ConnectionPreconditions.checkState(
statementTag == null, "Statement tags are not supported for COMMIT or ROLLBACK");
ApiFuture<Void> res;
try {
if (isTransactionStarted()) {
Expand Down Expand Up @@ -954,14 +1004,43 @@ public ApiFuture<long[]> executeBatchUpdateAsync(Iterable<Statement> updates) {
return internalExecuteBatchUpdateAsync(parsedStatements);
}

private QueryOption[] mergeQueryStatementTag(QueryOption... options) {
if (this.statementTag != null) {
// Shortcut for the most common scenario.
if (options == null || options.length == 0) {
options = new QueryOption[] {Options.tag(statementTag)};
} else {
options = Arrays.copyOf(options, options.length + 1);
options[options.length - 1] = Options.tag(statementTag);
}
this.statementTag = null;
}
return options;
}

private UpdateOption[] mergeUpdateStatementTag(UpdateOption... options) {
if (this.statementTag != null) {
// Shortcut for the most common scenario.
if (options == null || options.length == 0) {
options = new UpdateOption[] {Options.tag(statementTag)};
} else {
options = Arrays.copyOf(options, options.length + 1);
options[options.length - 1] = Options.tag(statementTag);
}
this.statementTag = null;
}
return options;
}

private ResultSet internalExecuteQuery(
final ParsedStatement statement,
final AnalyzeMode analyzeMode,
final QueryOption... options) {
Preconditions.checkArgument(
statement.getType() == StatementType.QUERY, "Statement must be a query");
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return get(transaction.executeQueryAsync(statement, analyzeMode, options));
return get(
transaction.executeQueryAsync(statement, analyzeMode, mergeQueryStatementTag(options)));
}

private AsyncResultSet internalExecuteQueryAsync(
Expand All @@ -972,21 +1051,23 @@ private AsyncResultSet internalExecuteQueryAsync(
statement.getType() == StatementType.QUERY, "Statement must be a query");
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return ResultSets.toAsyncResultSet(
transaction.executeQueryAsync(statement, analyzeMode, options),
transaction.executeQueryAsync(statement, analyzeMode, mergeQueryStatementTag(options)),
spanner.getAsyncExecutorProvider(),
options);
}

private ApiFuture<Long> internalExecuteUpdateAsync(final ParsedStatement update) {
private ApiFuture<Long> internalExecuteUpdateAsync(
final ParsedStatement update, UpdateOption... options) {
Preconditions.checkArgument(
update.getType() == StatementType.UPDATE, "Statement must be an update");
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return transaction.executeUpdateAsync(update);
return transaction.executeUpdateAsync(update, mergeUpdateStatementTag(options));
}

private ApiFuture<long[]> internalExecuteBatchUpdateAsync(List<ParsedStatement> updates) {
private ApiFuture<long[]> internalExecuteBatchUpdateAsync(
List<ParsedStatement> updates, UpdateOption... options) {
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return transaction.executeBatchUpdateAsync(updates);
return transaction.executeBatchUpdateAsync(updates, mergeUpdateStatementTag(options));
}

/**
Expand All @@ -1001,7 +1082,8 @@ UnitOfWork getCurrentUnitOfWorkOrStartNewUnitOfWork() {
return this.currentUnitOfWork;
}

private UnitOfWork createNewUnitOfWork() {
@VisibleForTesting
UnitOfWork createNewUnitOfWork() {
if (isAutocommit() && !isInTransaction() && !isInBatch()) {
return SingleUseTransaction.newBuilder()
.setDdlClient(ddlClient)
Expand All @@ -1021,6 +1103,7 @@ private UnitOfWork createNewUnitOfWork() {
.setReadOnlyStaleness(readOnlyStaleness)
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.setTransactionTag(transactionTag)
.build();
case READ_WRITE_TRANSACTION:
return ReadWriteTransaction.newBuilder()
Expand All @@ -1030,6 +1113,7 @@ private UnitOfWork createNewUnitOfWork() {
.setTransactionRetryListeners(transactionRetryListeners)
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.setTransactionTag(transactionTag)
.build();
case DML_BATCH:
// A DML batch can run inside the current transaction. It should therefore only
Expand All @@ -1039,6 +1123,7 @@ private UnitOfWork createNewUnitOfWork() {
.setTransaction(currentUnitOfWork)
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.setStatementTag(statementTag)
.build();
case DDL_BATCH:
return DdlBatch.newBuilder()
Expand Down
Expand Up @@ -74,6 +74,14 @@ interface ConnectionStatementExecutor {

StatementResult statementShowReturnCommitStats();

StatementResult statementSetStatementTag(String tag);

StatementResult statementShowStatementTag();

StatementResult statementSetTransactionTag(String tag);

StatementResult statementShowTransactionTag();

StatementResult statementBeginTransaction();

StatementResult statementCommit();
Expand Down

0 comments on commit 5722372

Please sign in to comment.