New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add support for tagging to Connection API #623
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
a87dfdf
feat: add support for tagging in Connection API
olavloite 6c14164
fix: disallow statement tags for commit/rollback/run
olavloite 510fdb3
chore: cleanup after rebase
olavloite d9f9ca8
test: add generated tests + cleanup
olavloite 2a6c635
build: add new methods to clirr
olavloite 0868a05
fix: add default implementations for new methods
olavloite df0b11d
Merge branch 'master' into connection-api-tagging
olavloite File filter
Filter by extension
Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,7 +26,9 @@ | |
import com.google.cloud.spanner.DatabaseClient; | ||
import com.google.cloud.spanner.ErrorCode; | ||
import com.google.cloud.spanner.Mutation; | ||
import com.google.cloud.spanner.Options; | ||
import com.google.cloud.spanner.Options.QueryOption; | ||
import com.google.cloud.spanner.Options.UpdateOption; | ||
import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode; | ||
import com.google.cloud.spanner.ResultSet; | ||
import com.google.cloud.spanner.ResultSets; | ||
|
@@ -45,6 +47,7 @@ | |
import com.google.common.util.concurrent.MoreExecutors; | ||
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.Collections; | ||
import java.util.Iterator; | ||
import java.util.LinkedList; | ||
|
@@ -206,6 +209,9 @@ static UnitOfWorkType of(TransactionMode transactionMode) { | |
private TimestampBound readOnlyStaleness = TimestampBound.strong(); | ||
private QueryOptions queryOptions = QueryOptions.getDefaultInstance(); | ||
|
||
private String transactionTag; | ||
private String statementTag; | ||
|
||
/** Create a connection and register it in the SpannerPool. */ | ||
ConnectionImpl(ConnectionOptions options) { | ||
Preconditions.checkNotNull(options); | ||
|
@@ -512,6 +518,47 @@ public void setTransactionMode(TransactionMode transactionMode) { | |
this.unitOfWorkType = UnitOfWorkType.of(transactionMode); | ||
} | ||
|
||
@Override | ||
public String getTransactionTag() { | ||
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG); | ||
ConnectionPreconditions.checkState(!isDdlBatchActive(), "This connection is in a DDL batch"); | ||
return transactionTag; | ||
} | ||
|
||
@Override | ||
public void setTransactionTag(String tag) { | ||
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG); | ||
ConnectionPreconditions.checkState( | ||
!isBatchActive(), "Cannot set transaction tag while in a batch"); | ||
ConnectionPreconditions.checkState(isInTransaction(), "This connection has no transaction"); | ||
ConnectionPreconditions.checkState( | ||
!isTransactionStarted(), | ||
"The transaction tag cannot be set after the transaction has started"); | ||
ConnectionPreconditions.checkState( | ||
getTransactionMode() == TransactionMode.READ_WRITE_TRANSACTION, | ||
"Transaction tag can only be set for a read/write transaction"); | ||
|
||
this.transactionBeginMarked = true; | ||
this.transactionTag = tag; | ||
} | ||
|
||
@Override | ||
public String getStatementTag() { | ||
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG); | ||
ConnectionPreconditions.checkState( | ||
!isBatchActive(), "Statement tags are not allowed inside a batch"); | ||
return statementTag; | ||
} | ||
|
||
@Override | ||
public void setStatementTag(String tag) { | ||
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG); | ||
ConnectionPreconditions.checkState( | ||
!isBatchActive(), "Statement tags are not allowed inside a batch"); | ||
|
||
this.statementTag = tag; | ||
} | ||
|
||
/** | ||
* Throws an {@link SpannerException} with code {@link ErrorCode#FAILED_PRECONDITION} if the | ||
* current state of this connection does not allow changing the setting for retryAbortsInternally. | ||
|
@@ -643,6 +690,7 @@ private void setDefaultTransactionOptions() { | |
? UnitOfWorkType.READ_ONLY_TRANSACTION | ||
: UnitOfWorkType.READ_WRITE_TRANSACTION; | ||
batchMode = BatchMode.NONE; | ||
transactionTag = null; | ||
} else { | ||
popUnitOfWorkFromTransactionStack(); | ||
} | ||
|
@@ -717,6 +765,8 @@ public ApiFuture<Void> rollbackAsync() { | |
private ApiFuture<Void> endCurrentTransactionAsync(EndTransactionMethod endTransactionMethod) { | ||
ConnectionPreconditions.checkState(!isBatchActive(), "This connection has an active batch"); | ||
ConnectionPreconditions.checkState(isInTransaction(), "This connection has no transaction"); | ||
ConnectionPreconditions.checkState( | ||
statementTag == null, "Statement tags are not supported for COMMIT or ROLLBACK"); | ||
ApiFuture<Void> res; | ||
try { | ||
if (isTransactionStarted()) { | ||
|
@@ -954,14 +1004,43 @@ public ApiFuture<long[]> executeBatchUpdateAsync(Iterable<Statement> updates) { | |
return internalExecuteBatchUpdateAsync(parsedStatements); | ||
} | ||
|
||
private QueryOption[] mergeQueryStatementTag(QueryOption... options) { | ||
if (this.statementTag != null) { | ||
// Shortcut for the most common scenario. | ||
if (options == null || options.length == 0) { | ||
options = new QueryOption[] {Options.tag(statementTag)}; | ||
} else { | ||
options = Arrays.copyOf(options, options.length + 1); | ||
options[options.length - 1] = Options.tag(statementTag); | ||
} | ||
this.statementTag = null; | ||
} | ||
return options; | ||
} | ||
|
||
private UpdateOption[] mergeUpdateStatementTag(UpdateOption... options) { | ||
if (this.statementTag != null) { | ||
// Shortcut for the most common scenario. | ||
if (options == null || options.length == 0) { | ||
options = new UpdateOption[] {Options.tag(statementTag)}; | ||
} else { | ||
options = Arrays.copyOf(options, options.length + 1); | ||
options[options.length - 1] = Options.tag(statementTag); | ||
} | ||
this.statementTag = null; | ||
} | ||
return options; | ||
} | ||
|
||
private ResultSet internalExecuteQuery( | ||
final ParsedStatement statement, | ||
final AnalyzeMode analyzeMode, | ||
final QueryOption... options) { | ||
Preconditions.checkArgument( | ||
statement.getType() == StatementType.QUERY, "Statement must be a query"); | ||
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork(); | ||
return get(transaction.executeQueryAsync(statement, analyzeMode, options)); | ||
return get( | ||
transaction.executeQueryAsync(statement, analyzeMode, mergeQueryStatementTag(options))); | ||
} | ||
|
||
private AsyncResultSet internalExecuteQueryAsync( | ||
|
@@ -972,21 +1051,23 @@ private AsyncResultSet internalExecuteQueryAsync( | |
statement.getType() == StatementType.QUERY, "Statement must be a query"); | ||
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork(); | ||
return ResultSets.toAsyncResultSet( | ||
transaction.executeQueryAsync(statement, analyzeMode, options), | ||
transaction.executeQueryAsync(statement, analyzeMode, mergeQueryStatementTag(options)), | ||
spanner.getAsyncExecutorProvider(), | ||
options); | ||
} | ||
|
||
private ApiFuture<Long> internalExecuteUpdateAsync(final ParsedStatement update) { | ||
private ApiFuture<Long> internalExecuteUpdateAsync( | ||
final ParsedStatement update, UpdateOption... options) { | ||
Preconditions.checkArgument( | ||
update.getType() == StatementType.UPDATE, "Statement must be an update"); | ||
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork(); | ||
return transaction.executeUpdateAsync(update); | ||
return transaction.executeUpdateAsync(update, mergeUpdateStatementTag(options)); | ||
} | ||
|
||
private ApiFuture<long[]> internalExecuteBatchUpdateAsync(List<ParsedStatement> updates) { | ||
private ApiFuture<long[]> internalExecuteBatchUpdateAsync( | ||
List<ParsedStatement> updates, UpdateOption... options) { | ||
UnitOfWork transaction = getCurrentUnitOfWorkOrStartNewUnitOfWork(); | ||
return transaction.executeBatchUpdateAsync(updates); | ||
return transaction.executeBatchUpdateAsync(updates, mergeUpdateStatementTag(options)); | ||
} | ||
|
||
/** | ||
|
@@ -1001,7 +1082,8 @@ UnitOfWork getCurrentUnitOfWorkOrStartNewUnitOfWork() { | |
return this.currentUnitOfWork; | ||
} | ||
|
||
private UnitOfWork createNewUnitOfWork() { | ||
@VisibleForTesting | ||
UnitOfWork createNewUnitOfWork() { | ||
if (isAutocommit() && !isInTransaction() && !isInBatch()) { | ||
return SingleUseTransaction.newBuilder() | ||
.setDdlClient(ddlClient) | ||
|
@@ -1021,6 +1103,7 @@ private UnitOfWork createNewUnitOfWork() { | |
.setReadOnlyStaleness(readOnlyStaleness) | ||
.setStatementTimeout(statementTimeout) | ||
.withStatementExecutor(statementExecutor) | ||
.setTransactionTag(transactionTag) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How will this behave if the user never set a transaction tag? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is nullable, so that is not a problem. It is verified by this test case. |
||
.build(); | ||
case READ_WRITE_TRANSACTION: | ||
return ReadWriteTransaction.newBuilder() | ||
|
@@ -1030,6 +1113,7 @@ private UnitOfWork createNewUnitOfWork() { | |
.setTransactionRetryListeners(transactionRetryListeners) | ||
.setStatementTimeout(statementTimeout) | ||
.withStatementExecutor(statementExecutor) | ||
.setTransactionTag(transactionTag) | ||
.build(); | ||
case DML_BATCH: | ||
// A DML batch can run inside the current transaction. It should therefore only | ||
|
@@ -1039,6 +1123,7 @@ private UnitOfWork createNewUnitOfWork() { | |
.setTransaction(currentUnitOfWork) | ||
.setStatementTimeout(statementTimeout) | ||
.withStatementExecutor(statementExecutor) | ||
.setStatementTag(statementTag) | ||
.build(); | ||
case DDL_BATCH: | ||
return DdlBatch.newBuilder() | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of curiosity, could we prevent the calling of this for a
COMMIT
? The user should not be able to do a statement tag in that case.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. We can't prevent it before the
COMMIT
, but we can throw an exception ifCOMMIT
is called after a statement tag has been set, and I think that makes sense as we are quite strict in checking the order of other statements (e.g. you are only allowed to get a commit timestamp if you actually committed etc.).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've changed the implementation to check for this, and to throw an error if the application tries to set a statement tag for a
COMMIT
orROLLBACK
statement. This change also adds a change relating to DML batches: DML batches can only include one statement tag, as the statements are sent as oneExecuteBatchDmlRequest
, which only allows one statement tag. This statement tag must be set before callingSTART BATCH DML
, e.g.