Skip to content

Commit

Permalink
feat!: add CommitStats to Connection API (#608)
Browse files Browse the repository at this point in the history
* feat!: add support for CommitStats

Adds support for returning CommitStats from read/write transactions.
  • Loading branch information
olavloite committed Feb 24, 2021
1 parent 6a41028 commit b2b1191
Show file tree
Hide file tree
Showing 27 changed files with 7,507 additions and 3,766 deletions.
17 changes: 17 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -476,6 +476,23 @@
<method>com.google.cloud.spanner.CommitResponse getCommitResponse()</method>
</difference>

<!-- Support for Commit Stats in Connection API -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>com.google.cloud.spanner.CommitResponse getCommitResponse()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>boolean isReturnCommitStats()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setReturnCommitStats(boolean)</method>
</difference>

<!-- PITR -->
<difference>
<differenceType>7013</differenceType>
Expand Down
Expand Up @@ -41,6 +41,11 @@ public Timestamp getCommitTimestamp() {
return Timestamp.fromProto(proto.getCommitTimestamp());
}

/** @return true if the {@link CommitResponse} includes {@link CommitStats} */
public boolean hasCommitStats() {
return proto.hasCommitStats();
}

/**
* Commit statistics are returned by a read/write transaction if specifically requested by passing
* in {@link Options#commitStats()} to the transaction.
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.spanner.AbortedDueToConcurrentModificationException;
import com.google.cloud.spanner.AbortedException;
import com.google.cloud.spanner.AsyncResultSet;
import com.google.cloud.spanner.CommitResponse;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options.QueryOption;
Expand Down Expand Up @@ -438,6 +439,15 @@ public interface Connection extends AutoCloseable {
*/
String getOptimizerVersion();

/**
* Sets whether this connection should request commit statistics from Cloud Spanner for read/write
* transactions and DML statements in autocommit mode.
*/
void setReturnCommitStats(boolean returnCommitStats);

/** @return true if this connection requests commit statistics from Cloud Spanner */
boolean isReturnCommitStats();

/**
* Commits the current transaction of this connection. All mutations that have been buffered
* during the current transaction will be written to the database.
Expand Down Expand Up @@ -623,15 +633,26 @@ public interface Connection extends AutoCloseable {

/**
* @return the commit timestamp of the last {@link TransactionMode#READ_WRITE_TRANSACTION}
* transaction. This method will throw a {@link SpannerException} if there is no last {@link
* TransactionMode#READ_WRITE_TRANSACTION} transaction (i.e. the last transaction was a {@link
* TransactionMode#READ_ONLY_TRANSACTION}), or if the last {@link
* TransactionMode#READ_WRITE_TRANSACTION} transaction rolled back. It will also throw a
* {@link SpannerException} if the last {@link TransactionMode#READ_WRITE_TRANSACTION}
* transaction was empty when committed.
* transaction. This method throws a {@link SpannerException} if there is no last {@link
* TransactionMode#READ_WRITE_TRANSACTION} transaction. That is, if the last transaction was a
* {@link TransactionMode#READ_ONLY_TRANSACTION}), or if the last {@link
* TransactionMode#READ_WRITE_TRANSACTION} transaction rolled back. It also throws a {@link
* SpannerException} if the last {@link TransactionMode#READ_WRITE_TRANSACTION} transaction
* was empty when committed.
*/
Timestamp getCommitTimestamp();

/**
* @return the {@link CommitResponse} of the last {@link TransactionMode#READ_WRITE_TRANSACTION}
* transaction. This method throws a {@link SpannerException} if there is no last {@link
* TransactionMode#READ_WRITE_TRANSACTION} transaction. That is, if the last transaction was a
* {@link TransactionMode#READ_ONLY_TRANSACTION}), or if the last {@link
* TransactionMode#READ_WRITE_TRANSACTION} transaction rolled back. It also throws a {@link
* SpannerException} if the last {@link TransactionMode#READ_WRITE_TRANSACTION} transaction
* was empty when committed.
*/
CommitResponse getCommitResponse();

/**
* Starts a new DDL batch on this connection. A DDL batch allows several DDL statements to be
* grouped into a batch that can be executed as a group. DDL statements that are issued during the
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AsyncResultSet;
import com.google.cloud.spanner.CommitResponse;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Mutation;
Expand Down Expand Up @@ -179,6 +180,7 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
private DatabaseClient dbClient;
private boolean autocommit;
private boolean readOnly;
private boolean returnCommitStats;

private UnitOfWork currentUnitOfWork = null;
/**
Expand Down Expand Up @@ -213,6 +215,7 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
this.readOnly = options.isReadOnly();
this.autocommit = options.isAutocommit();
this.queryOptions = this.queryOptions.toBuilder().mergeFrom(options.getQueryOptions()).build();
this.returnCommitStats = options.isReturnCommitStats();
this.ddlClient = createDdlClient();
setDefaultTransactionOptions();
}
Expand All @@ -237,6 +240,7 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
this.dbClient = dbClient;
setReadOnly(options.isReadOnly());
setAutocommit(options.isAutocommit());
setReturnCommitStats(options.isReturnCommitStats());
setDefaultTransactionOptions();
}

Expand Down Expand Up @@ -580,6 +584,31 @@ Timestamp getCommitTimestampOrNull() {
: this.currentUnitOfWork.getCommitTimestampOrNull();
}

@Override
public CommitResponse getCommitResponse() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(
this.currentUnitOfWork != null, "There is no transaction on this connection");
return this.currentUnitOfWork.getCommitResponse();
}

CommitResponse getCommitResponseOrNull() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
return this.currentUnitOfWork == null ? null : this.currentUnitOfWork.getCommitResponseOrNull();
}

@Override
public void setReturnCommitStats(boolean returnCommitStats) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
this.returnCommitStats = returnCommitStats;
}

@Override
public boolean isReturnCommitStats() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
return this.returnCommitStats;
}

/** Resets this connection to its default transaction options. */
private void setDefaultTransactionOptions() {
if (transactionStack.isEmpty()) {
Expand Down Expand Up @@ -954,6 +983,7 @@ private UnitOfWork createNewUnitOfWork() {
.setReadOnly(isReadOnly())
.setReadOnlyStaleness(readOnlyStaleness)
.setAutocommitDmlMode(autocommitDmlMode)
.setReturnCommitStats(returnCommitStats)
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.build();
Expand All @@ -970,6 +1000,7 @@ private UnitOfWork createNewUnitOfWork() {
return ReadWriteTransaction.newBuilder()
.setDatabaseClient(dbClient)
.setRetryAbortsInternally(retryAbortsInternally)
.setReturnCommitStats(returnCommitStats)
.setTransactionRetryListeners(transactionRetryListeners)
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
Expand Down
Expand Up @@ -155,6 +155,7 @@ public String[] getValidValues() {
private static final String DEFAULT_NUM_CHANNELS = null;
private static final String DEFAULT_USER_AGENT = null;
private static final String DEFAULT_OPTIMIZER_VERSION = "";
private static final boolean DEFAULT_RETURN_COMMIT_STATS = false;
private static final boolean DEFAULT_LENIENT = false;

private static final String PLAIN_TEXT_PROTOCOL = "http:";
Expand Down Expand Up @@ -229,6 +230,7 @@ public String[] getValidValues() {
ConnectionProperty.createStringProperty(
OPTIMIZER_VERSION_PROPERTY_NAME,
"Sets the default query optimizer version to use for this connection."),
ConnectionProperty.createBooleanProperty("returnCommitStats", "", false),
ConnectionProperty.createBooleanProperty(
LENIENT_PROPERTY_NAME,
"Silently ignore unknown properties in the connection string/properties (true/false)",
Expand Down Expand Up @@ -456,6 +458,7 @@ public static Builder newBuilder() {
private final Integer maxSessions;
private final String userAgent;
private final QueryOptions queryOptions;
private final boolean returnCommitStats;

private final boolean autocommit;
private final boolean readOnly;
Expand Down Expand Up @@ -485,6 +488,7 @@ private ConnectionOptions(Builder builder) {
QueryOptions.Builder queryOptionsBuilder = QueryOptions.newBuilder();
queryOptionsBuilder.setOptimizerVersion(parseOptimizerVersion(this.uri));
this.queryOptions = queryOptionsBuilder.build();
this.returnCommitStats = parseReturnCommitStats(this.uri);

this.host =
matcher.group(Builder.HOST_GROUP) == null
Expand Down Expand Up @@ -634,6 +638,12 @@ static String parseOptimizerVersion(String uri) {
return value != null ? value : DEFAULT_OPTIMIZER_VERSION;
}

@VisibleForTesting
static boolean parseReturnCommitStats(String uri) {
String value = parseUriProperty(uri, "returnCommitStats");
return value != null ? Boolean.valueOf(value) : false;
}

@VisibleForTesting
static boolean parseLenient(String uri) {
String value = parseUriProperty(uri, LENIENT_PROPERTY_NAME);
Expand Down Expand Up @@ -823,6 +833,11 @@ QueryOptions getQueryOptions() {
return queryOptions;
}

/** Whether connections created by this {@link ConnectionOptions} return commit stats. */
public boolean isReturnCommitStats() {
return returnCommitStats;
}

/** Interceptors that should be executed after each statement */
List<StatementExecutionInterceptor> getStatementExecutionInterceptors() {
return statementExecutionInterceptors;
Expand Down
Expand Up @@ -56,6 +56,8 @@ interface ConnectionStatementExecutor {

StatementResult statementShowCommitTimestamp();

StatementResult statementShowCommitResponse();

StatementResult statementSetReadOnlyStaleness(TimestampBound staleness);

StatementResult statementShowReadOnlyStaleness();
Expand All @@ -64,6 +66,10 @@ interface ConnectionStatementExecutor {

StatementResult statementShowOptimizerVersion();

StatementResult statementSetReturnCommitStats(Boolean returnCommitStats);

StatementResult statementShowReturnCommitStats();

StatementResult statementBeginTransaction();

StatementResult statementCommit();
Expand Down
Expand Up @@ -27,26 +27,37 @@
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_READONLY;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_READ_ONLY_STALENESS;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_RETRY_ABORTS_INTERNALLY;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_RETURN_COMMIT_STATS;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_STATEMENT_TIMEOUT;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_TRANSACTION_MODE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_AUTOCOMMIT;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_AUTOCOMMIT_DML_MODE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_COMMIT_RESPONSE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_COMMIT_TIMESTAMP;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_OPTIMIZER_VERSION;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_READONLY;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_READ_ONLY_STALENESS;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_READ_TIMESTAMP;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_RETRY_ABORTS_INTERNALLY;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_RETURN_COMMIT_STATS;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_STATEMENT_TIMEOUT;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.START_BATCH_DDL;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.START_BATCH_DML;
import static com.google.cloud.spanner.connection.StatementResultImpl.noResult;
import static com.google.cloud.spanner.connection.StatementResultImpl.resultSet;

import com.google.cloud.spanner.CommitResponse;
import com.google.cloud.spanner.CommitStats;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.ResultSets;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.Type;
import com.google.cloud.spanner.Type.StructField;
import com.google.cloud.spanner.connection.ReadOnlyStalenessUtil.DurationValueGetter;
import com.google.common.base.Preconditions;
import com.google.protobuf.Duration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -170,6 +181,28 @@ public StatementResult statementShowCommitTimestamp() {
"COMMIT_TIMESTAMP", getConnection().getCommitTimestampOrNull(), SHOW_COMMIT_TIMESTAMP);
}

@Override
public StatementResult statementShowCommitResponse() {
CommitResponse response = getConnection().getCommitResponseOrNull();
CommitStats stats = null;
if (response != null && response.hasCommitStats()) {
stats = response.getCommitStats();
}
ResultSet resultSet =
ResultSets.forRows(
Type.struct(
StructField.of("COMMIT_TIMESTAMP", Type.timestamp()),
StructField.of("MUTATION_COUNT", Type.int64())),
Arrays.asList(
Struct.newBuilder()
.set("COMMIT_TIMESTAMP")
.to(response == null ? null : response.getCommitTimestamp())
.set("MUTATION_COUNT")
.to(stats == null ? null : stats.getMutationCount())
.build()));
return StatementResultImpl.of(resultSet, SHOW_COMMIT_RESPONSE);
}

@Override
public StatementResult statementSetReadOnlyStaleness(TimestampBound staleness) {
getConnection().setReadOnlyStaleness(staleness);
Expand Down Expand Up @@ -197,6 +230,18 @@ public StatementResult statementShowOptimizerVersion() {
"OPTIMIZER_VERSION", getConnection().getOptimizerVersion(), SHOW_OPTIMIZER_VERSION);
}

@Override
public StatementResult statementSetReturnCommitStats(Boolean returnCommitStats) {
getConnection().setReturnCommitStats(returnCommitStats);
return noResult(SET_RETURN_COMMIT_STATS);
}

@Override
public StatementResult statementShowReturnCommitStats() {
return resultSet(
"RETURN_COMMIT_STATS", getConnection().isReturnCommitStats(), SHOW_RETURN_COMMIT_STATS);
}

@Override
public StatementResult statementBeginTransaction() {
getConnection().beginTransaction();
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.CommitResponse;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Mutation;
Expand Down Expand Up @@ -168,6 +169,17 @@ public Timestamp getCommitTimestampOrNull() {
return null;
}

@Override
public CommitResponse getCommitResponse() {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "There is no commit response available for DDL batches.");
}

@Override
public CommitResponse getCommitResponseOrNull() {
return null;
}

@Override
public ApiFuture<Void> executeDdlAsync(ParsedStatement ddl) {
ConnectionPreconditions.checkState(
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.CommitResponse;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options.QueryOption;
Expand Down Expand Up @@ -119,6 +120,17 @@ public Timestamp getCommitTimestampOrNull() {
return null;
}

@Override
public CommitResponse getCommitResponse() {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.FAILED_PRECONDITION, "There is no commit response available for DML batches.");
}

@Override
public CommitResponse getCommitResponseOrNull() {
return null;
}

@Override
public ApiFuture<Void> executeDdlAsync(ParsedStatement ddl) {
throw SpannerExceptionFactory.newSpannerException(
Expand Down

0 comments on commit b2b1191

Please sign in to comment.