Skip to content

Commit

Permalink
feat!: add support for CommitStats (#544)
Browse files Browse the repository at this point in the history
* feat!: add support for CommitStats

Adds support for returning CommitStats from read/write transactions.

* fix: add clirr ignored differences

* fix: error message should start with getCommitResponse

Co-authored-by: skuruppu <skuruppu@google.com>

* fix: remove overload delay

* chore: cleanup after merge

* fix: update copyright years of new files

* test: fix flaky test

* test: skip commit stats tests on emulator

* test: missed one commit stats tests against emulator

* test: skip another emulator test

* test: add missing test cases

* fix: address review comments

* chore: use junit assertion instead of truth

* chore: replace truth asserts with junit asserts

* chore: replace truth assertions with junit

* chore: cleanup test and variable names

* fix: rename test method and variables

* fix: address review comments

Co-authored-by: skuruppu <skuruppu@google.com>
  • Loading branch information
olavloite and skuruppu committed Feb 17, 2021
1 parent 5e07e4e commit 44aa384
Show file tree
Hide file tree
Showing 37 changed files with 1,111 additions and 153 deletions.
23 changes: 23 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -453,4 +453,27 @@
<className>com/google/cloud/spanner/TransactionContext</className>
<method>com.google.api.core.ApiFuture executeUpdateAsync(com.google.cloud.spanner.Statement)</method>
</difference>

<!-- Support for CommitStats added -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/AsyncTransactionManager</className>
<method>com.google.api.core.ApiFuture getCommitResponse()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/AsyncRunner</className>
<method>com.google.api.core.ApiFuture getCommitResponse()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionManager</className>
<method>com.google.cloud.spanner.CommitResponse getCommitResponse()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionRunner</className>
<method>com.google.cloud.spanner.CommitResponse getCommitResponse()</method>
</difference>

</differences>
Expand Up @@ -56,4 +56,10 @@ interface AsyncWork<R> {
* {@link ExecutionException} if the transaction did not commit.
*/
ApiFuture<Timestamp> getCommitTimestamp();

/**
* Returns the {@link CommitResponse} of this transaction. {@link ApiFuture#get()} throws an
* {@link ExecutionException} if the transaction did not commit.
*/
ApiFuture<CommitResponse> getCommitResponse();
}
Expand Up @@ -16,23 +16,31 @@

package com.google.cloud.spanner;

import static com.google.common.base.Preconditions.checkState;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;

class AsyncRunnerImpl implements AsyncRunner {
private final TransactionRunnerImpl delegate;
private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create();
private SettableApiFuture<CommitResponse> commitResponse;

AsyncRunnerImpl(TransactionRunnerImpl delegate) {
this.delegate = delegate;
this.delegate = Preconditions.checkNotNull(delegate);
}

@Override
public <R> ApiFuture<R> runAsync(final AsyncWork<R> work, Executor executor) {
Preconditions.checkState(commitResponse == null, "runAsync() can only be called once");
commitResponse = SettableApiFuture.create();
final SettableApiFuture<R> res = SettableApiFuture.create();
executor.execute(
new Runnable() {
Expand All @@ -43,7 +51,7 @@ public void run() {
} catch (Throwable t) {
res.setException(t);
} finally {
setCommitTimestamp();
setCommitResponse();
}
}
});
Expand All @@ -66,16 +74,30 @@ public R run(TransactionContext transaction) throws Exception {
});
}

private void setCommitTimestamp() {
private void setCommitResponse() {
try {
commitTimestamp.set(delegate.getCommitTimestamp());
commitResponse.set(delegate.getCommitResponse());
} catch (Throwable t) {
commitTimestamp.setException(t);
commitResponse.setException(t);
}
}

@Override
public ApiFuture<Timestamp> getCommitTimestamp() {
return commitTimestamp;
checkState(commitResponse != null, "runAsync() has not yet been called");
return ApiFutures.transform(
commitResponse,
new ApiFunction<CommitResponse, Timestamp>() {
@Override
public Timestamp apply(CommitResponse input) {
return input.getCommitTimestamp();
}
},
MoreExecutors.directExecutor());
}

public ApiFuture<CommitResponse> getCommitResponse() {
checkState(commitResponse != null, "runAsync() has not yet been called");
return commitResponse;
}
}
Expand Up @@ -191,6 +191,9 @@ public interface AsyncTransactionFunction<I, O> {
/** Returns the state of the transaction. */
TransactionState getState();

/** Returns the {@link CommitResponse} of this transaction. */
ApiFuture<CommitResponse> getCommitResponse();

/**
* Closes the manager. If there is an active transaction, it will be rolled back. Underlying
* session will be released back to the session pool.
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
Expand Down Expand Up @@ -45,7 +46,7 @@ final class AsyncTransactionManagerImpl

private TransactionRunnerImpl.TransactionContextImpl txn;
private TransactionState txnState;
private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create();
private final SettableApiFuture<CommitResponse> commitResponse = SettableApiFuture.create();

AsyncTransactionManagerImpl(SessionImpl session, Span span, TransactionOption... options) {
this.session = session;
Expand Down Expand Up @@ -132,29 +133,37 @@ public ApiFuture<Timestamp> commitAsync() {
SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED, "Transaction already aborted"));
}
ApiFuture<Timestamp> res = txn.commitAsync();
ApiFuture<CommitResponse> commitResponseFuture = txn.commitAsync();
txnState = TransactionState.COMMITTED;

ApiFutures.addCallback(
res,
new ApiFutureCallback<Timestamp>() {
commitResponseFuture,
new ApiFutureCallback<CommitResponse>() {
@Override
public void onFailure(Throwable t) {
if (t instanceof AbortedException) {
txnState = TransactionState.ABORTED;
} else {
txnState = TransactionState.COMMIT_FAILED;
commitTimestamp.setException(t);
commitResponse.setException(t);
}
}

@Override
public void onSuccess(Timestamp result) {
commitTimestamp.set(result);
public void onSuccess(CommitResponse result) {
commitResponse.set(result);
}
},
MoreExecutors.directExecutor());
return ApiFutures.transform(
commitResponseFuture,
new ApiFunction<CommitResponse, Timestamp>() {
@Override
public Timestamp apply(CommitResponse input) {
return input.getCommitTimestamp();
}
},
MoreExecutors.directExecutor());
return res;
}

@Override
Expand Down Expand Up @@ -187,6 +196,11 @@ public TransactionState getState() {
return txnState;
}

@Override
public ApiFuture<CommitResponse> getCommitResponse() {
return commitResponse;
}

@Override
public void invalidate() {
if (txnState == TransactionState.STARTED || txnState == null) {
Expand Down
Expand Up @@ -17,20 +17,38 @@
package com.google.cloud.spanner;

import com.google.cloud.Timestamp;
import com.google.common.base.Preconditions;
import java.util.Objects;

/** Represents a response from a commit operation. */
public class CommitResponse {

private final Timestamp commitTimestamp;
private final com.google.spanner.v1.CommitResponse proto;

public CommitResponse(Timestamp commitTimestamp) {
this.commitTimestamp = commitTimestamp;
this.proto =
com.google.spanner.v1.CommitResponse.newBuilder()
.setCommitTimestamp(commitTimestamp.toProto())
.build();
}

/** Returns a {@link Timestamp} representing the commit time of the write operation. */
CommitResponse(com.google.spanner.v1.CommitResponse proto) {
this.proto = Preconditions.checkNotNull(proto);
}

/** Returns a {@link Timestamp} representing the commit time of the transaction. */
public Timestamp getCommitTimestamp() {
return commitTimestamp;
return Timestamp.fromProto(proto.getCommitTimestamp());
}

/**
* Commit statistics are returned by a read/write transaction if specifically requested by passing
* in {@link Options#commitStats()} to the transaction.
*/
public CommitStats getCommitStats() {
Preconditions.checkState(
proto.hasCommitStats(), "The CommitResponse does not contain any commit statistics.");
return CommitStats.fromProto(proto.getCommitStats());
}

@Override
Expand All @@ -42,11 +60,11 @@ public boolean equals(Object o) {
return false;
}
CommitResponse that = (CommitResponse) o;
return Objects.equals(commitTimestamp, that.commitTimestamp);
return Objects.equals(proto, that.proto);
}

@Override
public int hashCode() {
return Objects.hash(commitTimestamp);
return Objects.hash(proto);
}
}
@@ -0,0 +1,54 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.common.base.Preconditions;

/**
* Commit statistics are returned by a read/write transaction if specifically requested by passing
* in {@link Options#commitStats()} to the transaction.
*/
public class CommitStats {
private final long mutationCount;

private CommitStats(long mutationCount) {
this.mutationCount = mutationCount;
}

static CommitStats fromProto(com.google.spanner.v1.CommitResponse.CommitStats proto) {
Preconditions.checkNotNull(proto);
return new CommitStats(proto.getMutationCount());
}

/**
* The number of mutations that were executed by the transaction. Insert and update operations
* count with the multiplicity of the number of columns they affect. For example, inserting a new
* record may count as five mutations, if values are inserted into five columns. Delete and delete
* range operations count as one mutation regardless of the number of columns affected. Deleting a
* row from a parent table that has the ON DELETE CASCADE annotation is also counted as one
* mutation regardless of the number of interleaved child rows present. The exception to this is
* if there are secondary indexes defined on rows being deleted, then the changes to the secondary
* indexes are counted individually. For example, if a table has 2 secondary indexes, deleting a
* range of rows in the table counts as 1 mutation for the table, plus 2 mutations for each row
* that is deleted because the rows in the secondary index might be scattered over the key-space,
* making it impossible for Cloud Spanner to call a single delete range operation on the secondary
* indexes. Secondary indexes include the foreign keys backing indexes.
*/
public long getMutationCount() {
return mutationCount;
}
}
Expand Up @@ -46,6 +46,11 @@ public interface UpdateOption {}
/** Marker interface to mark options applicable to list operations in admin API. */
public interface ListOption {}

/** Specifying this instructs the transaction to request {@link CommitStats} from the backend. */
public static TransactionOption commitStats() {
return COMMIT_STATS_OPTION;
}

/**
* Specifying this will cause the read to yield at most this many rows. This should be greater
* than 0.
Expand Down Expand Up @@ -116,6 +121,16 @@ public static ListOption filter(String filter) {
return new FilterOption(filter);
}

/** Option to request {@link CommitStats} for read/write transactions. */
static final class CommitStatsOption extends InternalOption implements TransactionOption {
@Override
void appendToOptions(Options options) {
options.withCommitStats = true;
}
}

static final CommitStatsOption COMMIT_STATS_OPTION = new CommitStatsOption();

/** Option pertaining to flow control. */
static final class FlowControlOption extends InternalOption implements ReadAndQueryOption {
final int prefetchChunks;
Expand Down Expand Up @@ -143,6 +158,7 @@ void appendToOptions(Options options) {
}
}

private boolean withCommitStats;
private Long limit;
private Integer prefetchChunks;
private Integer bufferRows;
Expand All @@ -153,6 +169,10 @@ void appendToOptions(Options options) {
// Construction is via factory methods below.
private Options() {}

boolean withCommitStats() {
return withCommitStats;
}

boolean hasLimit() {
return limit != null;
}
Expand Down Expand Up @@ -204,6 +224,9 @@ String filter() {
@Override
public String toString() {
StringBuilder b = new StringBuilder();
if (withCommitStats) {
b.append("withCommitStats: ").append(withCommitStats).append(' ');
}
if (limit != null) {
b.append("limit: ").append(limit).append(' ');
}
Expand Down Expand Up @@ -234,7 +257,8 @@ public boolean equals(Object o) {
}

Options that = (Options) o;
return (!hasLimit() && !that.hasLimit()
return Objects.equals(withCommitStats, that.withCommitStats)
&& (!hasLimit() && !that.hasLimit()
|| hasLimit() && that.hasLimit() && Objects.equals(limit(), that.limit()))
&& (!hasPrefetchChunks() && !that.hasPrefetchChunks()
|| hasPrefetchChunks()
Expand All @@ -253,6 +277,9 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
int result = 31;
if (withCommitStats) {
result = 31 * result + 1231;
}
if (limit != null) {
result = 31 * result + limit.hashCode();
}
Expand Down

0 comments on commit 44aa384

Please sign in to comment.