Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for tagging to Connection API #623

Merged
merged 7 commits into from Jul 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 22 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -592,6 +592,28 @@
<className>com/google/cloud/spanner/AsyncTransactionManager$CommitTimestampFuture</className>
<method>java.lang.Object get()</method>
</difference>
<!-- Support for tagging in Connection API -->
<!-- These are not breaking changes, since we provide default interface implementation -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>java.lang.String getStatementTag()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setStatementTag(java.lang.String)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>java.lang.String getTransactionTag()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setTransactionTag(java.lang.String)</method>
</difference>

<!-- Adds getValue to ResultSet -->
<!-- These are not breaking changes, since we provide default interface implementation -->
Expand Down
Expand Up @@ -51,6 +51,7 @@
abstract class AbstractBaseUnitOfWork implements UnitOfWork {
private final StatementExecutor statementExecutor;
private final StatementTimeout statementTimeout;
protected final String transactionTag;

/** Class for keeping track of the stacktrace of the caller of an async statement. */
static final class SpannerAsyncExecutionException extends RuntimeException {
Expand Down Expand Up @@ -82,6 +83,7 @@ enum InterceptorsUsage {
abstract static class Builder<B extends Builder<?, T>, T extends AbstractBaseUnitOfWork> {
private StatementExecutor statementExecutor;
private StatementTimeout statementTimeout = new StatementTimeout();
private String transactionTag;

Builder() {}

Expand All @@ -102,13 +104,19 @@ B setStatementTimeout(StatementTimeout timeout) {
return self();
}

B setTransactionTag(@Nullable String tag) {
this.transactionTag = tag;
return self();
}

abstract T build();
}

AbstractBaseUnitOfWork(Builder<?, ?> builder) {
Preconditions.checkState(builder.statementExecutor != null, "No statement executor specified");
this.statementExecutor = builder.statementExecutor;
this.statementTimeout = builder.statementTimeout;
this.transactionTag = builder.transactionTag;
}

StatementExecutor getStatementExecutor() {
Expand Down
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.connection.StatementResult.ResultType;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import java.util.Iterator;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -330,6 +331,52 @@ public interface Connection extends AutoCloseable {
*/
TransactionMode getTransactionMode();

/**
* Sets the transaction tag to use for the current transaction. This method may only be called
* when in a transaction and before any statements have been executed in the transaction.
*
* <p>The tag will be set as the transaction tag of all statements during the transaction, and as
* the transaction tag of the commit.
*
* <p>The transaction tag will automatically be cleared after the transaction has ended.
*
* @param tag The tag to use.
*/
default void setTransactionTag(String tag) {
throw new UnsupportedOperationException();
}

/** @return The transaction tag of the current transaction. */
default String getTransactionTag() {
throw new UnsupportedOperationException();
}

/**
* Sets the statement tag to use for the next statement that is executed. The tag is automatically
* cleared after the statement is executed. Statement tags can be used both with autocommit=true
* and autocommit=false, and can be used for partitioned DML.
*
* <p>Statement tags are not allowed before COMMIT and ROLLBACK statements.
*
* <p>Statement tags are allowed before START BATCH DML statements and will be included in the
* {@link ExecuteBatchDmlRequest} that is sent to Spanner. Statement tags are not allowed inside a
* batch.
*
* @param tag The statement tag to use with the next statement that will be executed on this
* connection.
*/
default void setStatementTag(String tag) {
throw new UnsupportedOperationException();
}

/**
* @return The statement tag that will be used with the next statement that is executed on this
* connection.
*/
default String getStatementTag() {
throw new UnsupportedOperationException();
}

/**
* @return <code>true</code> if this connection will automatically retry read/write transactions
* that abort. This method may only be called when the connection is in read/write
Expand Down
Expand Up @@ -26,7 +26,9 @@
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.ResultSets;
Expand All @@ -45,6 +47,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
Expand Down Expand Up @@ -206,6 +209,9 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
private TimestampBound readOnlyStaleness = TimestampBound.strong();
private QueryOptions queryOptions = QueryOptions.getDefaultInstance();

private String transactionTag;
private String statementTag;

/** Create a connection and register it in the SpannerPool. */
ConnectionImpl(ConnectionOptions options) {
Preconditions.checkNotNull(options);
Expand Down Expand Up @@ -512,6 +518,47 @@ public void setTransactionMode(TransactionMode transactionMode) {
this.unitOfWorkType = UnitOfWorkType.of(transactionMode);
}

@Override
public String getTransactionTag() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(!isDdlBatchActive(), "This connection is in a DDL batch");
return transactionTag;
}

@Override
public void setTransactionTag(String tag) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(
!isBatchActive(), "Cannot set transaction tag while in a batch");
ConnectionPreconditions.checkState(isInTransaction(), "This connection has no transaction");
ConnectionPreconditions.checkState(
!isTransactionStarted(),
"The transaction tag cannot be set after the transaction has started");
ConnectionPreconditions.checkState(
getTransactionMode() == TransactionMode.READ_WRITE_TRANSACTION,
"Transaction tag can only be set for a read/write transaction");

this.transactionBeginMarked = true;
this.transactionTag = tag;
}

@Override
public String getStatementTag() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(
!isBatchActive(), "Statement tags are not allowed inside a batch");
return statementTag;
}

@Override
public void setStatementTag(String tag) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, could we prevent the calling of this for a COMMIT? The user should not be able to do a statement tag in that case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. We can't prevent it before the COMMIT, but we can throw an exception if COMMIT is called after a statement tag has been set, and I think that makes sense as we are quite strict in checking the order of other statements (e.g. you are only allowed to get a commit timestamp if you actually committed etc.).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've changed the implementation to check for this, and to throw an error if the application tries to set a statement tag for a COMMIT or ROLLBACK statement. This change also adds a change relating to DML batches: DML batches can only include one statement tag, as the statements are sent as one ExecuteBatchDmlRequest, which only allows one statement tag. This statement tag must be set before calling START BATCH DML, e.g.

SET STATEMENT_TAG = 'tag-1';
START BATCH DML;
INSERT INTO Singers (SingerId, Name) VALUES (1, 'Morrison');
INSERT INTO Singers (SingerId, Name) VALUES (2, 'Pieterson');
RUN BATCH;

ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(
!isBatchActive(), "Statement tags are not allowed inside a batch");

this.statementTag = tag;
}

/**
* Throws an {@link SpannerException} with code {@link ErrorCode#FAILED_PRECONDITION} if the
* current state of this connection does not allow changing the setting for retryAbortsInternally.
Expand Down Expand Up @@ -643,6 +690,7 @@ private void setDefaultTransactionOptions() {
? UnitOfWorkType.READ_ONLY_TRANSACTION
: UnitOfWorkType.READ_WRITE_TRANSACTION;
batchMode = BatchMode.NONE;
transactionTag = null;
} else {
popUnitOfWorkFromTransactionStack();
}
Expand Down Expand Up @@ -717,6 +765,8 @@ public ApiFuture<Void> rollbackAsync() {
private ApiFuture<Void> endCurrentTransactionAsync(EndTransactionMethod endTransactionMethod) {
ConnectionPreconditions.checkState(!isBatchActive(), "This connection has an active batch");
ConnectionPreconditions.checkState(isInTransaction(), "This connection has no transaction");
ConnectionPreconditions.checkState(
statementTag == null, "Statement tags are not supported for COMMIT or ROLLBACK");
ApiFuture<Void> res;
try {
if (isTransactionStarted()) {
Expand Down Expand Up @@ -954,14 +1004,43 @@ public ApiFuture<long[]> executeBatchUpdateAsync(Iterable<Statement> updates) {
return internalExecuteBatchUpdateAsync(parsedStatements);
}

private QueryOption[] mergeQueryStatementTag(QueryOption... options) {
if (this.statementTag != null) {
// Shortcut for the most common scenario.
if (options == null || options.length == 0) {
options = new QueryOption[] {Options.tag(statementTag)};
} else {
options = Arrays.copyOf(options, options.length + 1);
options[options.length - 1] = Options.tag(statementTag);
}
this.statementTag = null;
}
return options;
}

private UpdateOption[] mergeUpdateStatementTag(UpdateOption... options) {
if (this.statementTag != null) {
// Shortcut for the most common scenario.
if (options == null || options.length == 0) {
options = new UpdateOption[] {Options.tag(statementTag)};
} else {
options = Arrays.copyOf(options, options.length + 1);
options[options.length - 1] = Options.tag(statementTag);
}
this.statementTag = null;
}
return options;
}

private ResultSet internalExecuteQuery(
final ParsedStatement statement,
final AnalyzeMode analyzeMode,
final QueryOption... options) {
Preconditions.checkArgument(
statement.getType() == StatementType.QUERY, "Statement must be a query");
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return get(transaction.executeQueryAsync(statement, analyzeMode, options));
return get(
transaction.executeQueryAsync(statement, analyzeMode, mergeQueryStatementTag(options)));
}

private AsyncResultSet internalExecuteQueryAsync(
Expand All @@ -972,21 +1051,23 @@ private AsyncResultSet internalExecuteQueryAsync(
statement.getType() == StatementType.QUERY, "Statement must be a query");
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return ResultSets.toAsyncResultSet(
transaction.executeQueryAsync(statement, analyzeMode, options),
transaction.executeQueryAsync(statement, analyzeMode, mergeQueryStatementTag(options)),
spanner.getAsyncExecutorProvider(),
options);
}

private ApiFuture<Long> internalExecuteUpdateAsync(final ParsedStatement update) {
private ApiFuture<Long> internalExecuteUpdateAsync(
final ParsedStatement update, UpdateOption... options) {
Preconditions.checkArgument(
update.getType() == StatementType.UPDATE, "Statement must be an update");
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return transaction.executeUpdateAsync(update);
return transaction.executeUpdateAsync(update, mergeUpdateStatementTag(options));
}

private ApiFuture<long[]> internalExecuteBatchUpdateAsync(List<ParsedStatement> updates) {
private ApiFuture<long[]> internalExecuteBatchUpdateAsync(
List<ParsedStatement> updates, UpdateOption... options) {
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork();
return transaction.executeBatchUpdateAsync(updates);
return transaction.executeBatchUpdateAsync(updates, mergeUpdateStatementTag(options));
}

/**
Expand All @@ -1001,7 +1082,8 @@ UnitOfWork getCurrentUnitOfWorkOrStartNewUnitOfWork() {
return this.currentUnitOfWork;
}

private UnitOfWork createNewUnitOfWork() {
@VisibleForTesting
UnitOfWork createNewUnitOfWork() {
if (isAutocommit() && !isInTransaction() && !isInBatch()) {
return SingleUseTransaction.newBuilder()
.setDdlClient(ddlClient)
Expand All @@ -1021,6 +1103,7 @@ private UnitOfWork createNewUnitOfWork() {
.setReadOnlyStaleness(readOnlyStaleness)
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.setTransactionTag(transactionTag)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this behave if the user never set a transaction tag?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is nullable, so that is not a problem. It is verified by this test case.

.build();
case READ_WRITE_TRANSACTION:
return ReadWriteTransaction.newBuilder()
Expand All @@ -1030,6 +1113,7 @@ private UnitOfWork createNewUnitOfWork() {
.setTransactionRetryListeners(transactionRetryListeners)
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.setTransactionTag(transactionTag)
.build();
case DML_BATCH:
// A DML batch can run inside the current transaction. It should therefore only
Expand All @@ -1039,6 +1123,7 @@ private UnitOfWork createNewUnitOfWork() {
.setTransaction(currentUnitOfWork)
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.setStatementTag(statementTag)
.build();
case DDL_BATCH:
return DdlBatch.newBuilder()
Expand Down
Expand Up @@ -74,6 +74,14 @@ interface ConnectionStatementExecutor {

StatementResult statementShowReturnCommitStats();

StatementResult statementSetStatementTag(String tag);

StatementResult statementShowStatementTag();

StatementResult statementSetTransactionTag(String tag);

StatementResult statementShowTransactionTag();

StatementResult statementBeginTransaction();

StatementResult statementCommit();
Expand Down