Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add support for CommitStats #544

Merged
merged 25 commits into from Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3872199
feat!: add support for CommitStats
olavloite Oct 24, 2020
def8768
fix: add clirr ignored differences
olavloite Oct 24, 2020
5372dae
Merge branch 'master' into commit-stats2
olavloite Oct 31, 2020
34fbda6
fix: error message should start with getCommitResponse
olavloite Oct 31, 2020
8f61c2a
Merge branch 'master' into commit-stats2
olavloite Nov 6, 2020
041b34d
Merge branch 'master' into commit-stats2
olavloite Dec 5, 2020
8299054
fix: remove overload delay
olavloite Dec 10, 2020
9b710a7
Merge branch 'master' into commit-stats2
olavloite Dec 10, 2020
8c15641
Merge branch 'master' into commit-stats2
olavloite Jan 23, 2021
919cf02
chore: cleanup after merge
olavloite Jan 23, 2021
97ec917
fix: update copyright years of new files
olavloite Jan 23, 2021
afeb0fd
test: fix flaky test
olavloite Jan 23, 2021
8524526
test: skip commit stats tests on emulator
olavloite Jan 23, 2021
06b3e22
test: missed one commit stats tests against emulator
olavloite Jan 23, 2021
1c17d16
test: skip another emulator test
olavloite Jan 23, 2021
664b87d
test: add missing test cases
olavloite Jan 23, 2021
6573a0f
fix: address review comments
olavloite Jan 30, 2021
0b448c3
Merge branch 'master' into commit-stats2
olavloite Feb 1, 2021
83039fb
Merge branch 'master' into commit-stats2
olavloite Feb 3, 2021
57ea714
chore: use junit assertion instead of truth
olavloite Feb 3, 2021
66c5f88
chore: replace truth asserts with junit asserts
olavloite Feb 4, 2021
1d17973
chore: replace truth assertions with junit
olavloite Feb 5, 2021
48ef21b
chore: cleanup test and variable names
olavloite Feb 8, 2021
1f33c42
fix: rename test method and variables
olavloite Feb 9, 2021
7d45ace
fix: address review comments
olavloite Feb 16, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 23 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -453,4 +453,27 @@
<className>com/google/cloud/spanner/TransactionContext</className>
<method>com.google.api.core.ApiFuture executeUpdateAsync(com.google.cloud.spanner.Statement)</method>
</difference>

<!-- Support for CommitStats added -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/AsyncTransactionManager</className>
<method>com.google.api.core.ApiFuture getCommitResponse()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/AsyncRunner</className>
<method>com.google.api.core.ApiFuture getCommitResponse()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionManager</className>
<method>com.google.cloud.spanner.CommitResponse getCommitResponse()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionRunner</className>
<method>com.google.cloud.spanner.CommitResponse getCommitResponse()</method>
</difference>

</differences>
Expand Up @@ -56,4 +56,10 @@ interface AsyncWork<R> {
* {@link ExecutionException} if the transaction did not commit.
*/
ApiFuture<Timestamp> getCommitTimestamp();

/**
* Returns the {@link CommitResponse} of this transaction. {@link ApiFuture#get()} will throw an
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will throw --> throws
per Google style

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* {@link ExecutionException} if the transaction did not commit.
*/
ApiFuture<CommitResponse> getCommitResponse();
}
Expand Up @@ -16,23 +16,31 @@

package com.google.cloud.spanner;

import static com.google.common.base.Preconditions.checkState;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;

class AsyncRunnerImpl implements AsyncRunner {
private final TransactionRunnerImpl delegate;
private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create();
private SettableApiFuture<CommitResponse> commitResponse;

AsyncRunnerImpl(TransactionRunnerImpl delegate) {
this.delegate = delegate;
this.delegate = Preconditions.checkNotNull(delegate);
}

@Override
public <R> ApiFuture<R> runAsync(final AsyncWork<R> work, Executor executor) {
Preconditions.checkState(commitResponse == null, "runAsync() can only be called once");
commitResponse = SettableApiFuture.create();
final SettableApiFuture<R> res = SettableApiFuture.create();
executor.execute(
new Runnable() {
Expand All @@ -43,7 +51,7 @@ public void run() {
} catch (Throwable t) {
res.setException(t);
} finally {
setCommitTimestamp();
setCommitResponse();
}
}
});
Expand All @@ -66,16 +74,30 @@ public R run(TransactionContext transaction) throws Exception {
});
}

private void setCommitTimestamp() {
private void setCommitResponse() {
try {
commitTimestamp.set(delegate.getCommitTimestamp());
commitResponse.set(delegate.getCommitResponse());
} catch (Throwable t) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

letting it slide because it isn't changed in this PR, but catching Throwable is only rarely what you want. This is probably worth filing a bug on.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added issue: #875

commitTimestamp.setException(t);
commitResponse.setException(t);
}
}

@Override
public ApiFuture<Timestamp> getCommitTimestamp() {
return commitTimestamp;
checkState(commitResponse != null, "runAsync() has not yet been called");
return ApiFutures.transform(
commitResponse,
new ApiFunction<CommitResponse, Timestamp>() {
@Override
public Timestamp apply(CommitResponse input) {
return input.getCommitTimestamp();
}
},
MoreExecutors.directExecutor());
}

public ApiFuture<CommitResponse> getCommitResponse() {
checkState(commitResponse != null, "runAsync() has not yet been called");
return commitResponse;
}
}
Expand Up @@ -191,6 +191,9 @@ public interface AsyncTransactionFunction<I, O> {
/** Returns the state of the transaction. */
TransactionState getState();

/** Returns the {@link CommitResponse} of this transaction. */
ApiFuture<CommitResponse> getCommitResponse();

/**
* Closes the manager. If there is an active transaction, it will be rolled back. Underlying
* session will be released back to the session pool.
Expand Down
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import com.google.api.core.ApiAsyncFunction;
import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
Expand Down Expand Up @@ -45,7 +46,7 @@ final class AsyncTransactionManagerImpl

private TransactionRunnerImpl.TransactionContextImpl txn;
private TransactionState txnState;
private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create();
private final SettableApiFuture<CommitResponse> commitResponse = SettableApiFuture.create();

AsyncTransactionManagerImpl(SessionImpl session, Span span, TransactionOption... options) {
this.session = session;
Expand Down Expand Up @@ -132,29 +133,37 @@ public ApiFuture<Timestamp> commitAsync() {
SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED, "Transaction already aborted"));
}
ApiFuture<Timestamp> res = txn.commitAsync();
ApiFuture<CommitResponse> res = txn.commitAsync();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no abbreviated variable names per google style.
Concretely I did not know what this was when I read it below and had to scroll up to find out.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to commitResponseFuture

txnState = TransactionState.COMMITTED;

ApiFutures.addCallback(
res,
new ApiFutureCallback<Timestamp>() {
new ApiFutureCallback<CommitResponse>() {
@Override
public void onFailure(Throwable t) {
if (t instanceof AbortedException) {
txnState = TransactionState.ABORTED;
} else {
txnState = TransactionState.COMMIT_FAILED;
commitTimestamp.setException(t);
commitResponse.setException(t);
}
}

@Override
public void onSuccess(Timestamp result) {
commitTimestamp.set(result);
public void onSuccess(CommitResponse result) {
commitResponse.set(result);
}
},
MoreExecutors.directExecutor());
return ApiFutures.transform(
res,
new ApiFunction<CommitResponse, Timestamp>() {
@Override
public Timestamp apply(CommitResponse input) {
return input.getCommitTimestamp();
}
},
MoreExecutors.directExecutor());
return res;
}

@Override
Expand Down Expand Up @@ -187,6 +196,11 @@ public TransactionState getState() {
return txnState;
}

@Override
public ApiFuture<CommitResponse> getCommitResponse() {
return commitResponse;
}

@Override
public void invalidate() {
if (txnState == TransactionState.STARTED || txnState == null) {
Expand Down
Expand Up @@ -17,20 +17,38 @@
package com.google.cloud.spanner;

import com.google.cloud.Timestamp;
import com.google.common.base.Preconditions;
import java.util.Objects;

/** Represents a response from a commit operation. */
public class CommitResponse {

private final Timestamp commitTimestamp;
private final com.google.spanner.v1.CommitResponse proto;

public CommitResponse(Timestamp commitTimestamp) {
this.commitTimestamp = commitTimestamp;
this.proto =
com.google.spanner.v1.CommitResponse.newBuilder()
.setCommitTimestamp(commitTimestamp.toProto())
.build();
}

/** Returns a {@link Timestamp} representing the commit time of the write operation. */
CommitResponse(com.google.spanner.v1.CommitResponse proto) {
this.proto = proto;
}

/** Returns a {@link Timestamp} representing the commit time of the transaction. */
public Timestamp getCommitTimestamp() {
return commitTimestamp;
return Timestamp.fromProto(proto.getCommitTimestamp());
}

/**
* Commit statistics are returned by a read/write transaction if specifically requested by passing
* in {@link Options#commitStats()} to the transaction.
*/
public CommitStats getCommitStats() {
Preconditions.checkState(
proto.hasCommitStats(), "The CommitResponse does not contain any commit statistics.");
return CommitStats.fromProto(proto.getCommitStats());
}

@Override
Expand All @@ -42,11 +60,11 @@ public boolean equals(Object o) {
return false;
}
CommitResponse that = (CommitResponse) o;
return Objects.equals(commitTimestamp, that.commitTimestamp);
return Objects.equals(proto, that.proto);
}

@Override
public int hashCode() {
return Objects.hash(commitTimestamp);
return Objects.hash(proto);
}
}
@@ -0,0 +1,54 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.common.base.Preconditions;

/**
* Commit statistics are returned by a read/write transaction if specifically requested by passing
* in {@link Options#commitStats()} to the transaction.
*/
public class CommitStats {
private final long mutationCount;

private CommitStats(long mutationCount) {
this.mutationCount = mutationCount;
}

static CommitStats fromProto(com.google.spanner.v1.CommitResponse.CommitStats proto) {
Preconditions.checkNotNull(proto);
return new CommitStats(proto.getMutationCount());
}

/**
* The number of mutations that were executed by the transaction. Insert and update operations
* count with the multiplicity of the number of columns they affect. For example, inserting a new
* record may count as five mutations, if values are inserted into five columns. Delete and delete
* range operations count as one mutation regardless of the number of columns affected. Deleting a
* row from a parent table that has the ON DELETE CASCADE annotation is also counted as one
* mutation regardless of the number of interleaved child rows present. The exception to this is
* if there are secondary indexes defined on rows being deleted, then the changes to the secondary
* indexes will be counted individually. For example, if a table has 2 secondary indexes, deleting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be --> are

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* a range of rows in the table will count as 1 mutation for the table, plus 2 mutations for each
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will count --> counts

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

* row that is deleted because the rows in the secondary index might be scattered over the
* key-space, making it impossible for Cloud Spanner to call a single delete range operation on
* the secondary indexes. Secondary indexes include the foreign keys backing indexes.
*/
public long getMutationCount() {
return mutationCount;
}
}
Expand Up @@ -46,6 +46,11 @@ public interface UpdateOption {}
/** Marker interface to mark options applicable to list operations in admin API. */
public interface ListOption {}

/** Specifying this will cause the transaction to request {@link CommitStats} from the backend. */
public static TransactionOption commitStats() {
return COMMIT_STATS_OPTION;
}

/**
* Specifying this will cause the read to yield at most this many rows. This should be greater
* than 0.
Expand Down Expand Up @@ -116,6 +121,16 @@ public static ListOption filter(String filter) {
return new FilterOption(filter);
}

/** Option to request {@link CommitStats} for read/write transactions. */
static final class CommitStatsOption extends InternalOption implements TransactionOption {
@Override
void appendToOptions(Options options) {
options.withCommitStats = true;
}
}

static final CommitStatsOption COMMIT_STATS_OPTION = new CommitStatsOption();

/** Option pertaining to flow control. */
static final class FlowControlOption extends InternalOption implements ReadAndQueryOption {
final int prefetchChunks;
Expand Down Expand Up @@ -143,6 +158,7 @@ void appendToOptions(Options options) {
}
}

private boolean withCommitStats;
private Long limit;
private Integer prefetchChunks;
private Integer bufferRows;
Expand All @@ -153,6 +169,10 @@ void appendToOptions(Options options) {
// Construction is via factory methods below.
private Options() {}

boolean withCommitStats() {
return withCommitStats;
}

boolean hasLimit() {
return limit != null;
}
Expand Down Expand Up @@ -204,6 +224,9 @@ String filter() {
@Override
public String toString() {
StringBuilder b = new StringBuilder();
if (withCommitStats) {
b.append("withCommitStats: ").append(withCommitStats).append(' ');
}
if (limit != null) {
b.append("limit: ").append(limit).append(' ');
}
Expand Down Expand Up @@ -234,7 +257,8 @@ public boolean equals(Object o) {
}

Options that = (Options) o;
return (!hasLimit() && !that.hasLimit()
return Objects.equals(withCommitStats, that.withCommitStats)
&& (!hasLimit() && !that.hasLimit()
|| hasLimit() && that.hasLimit() && Objects.equals(limit(), that.limit()))
&& (!hasPrefetchChunks() && !that.hasPrefetchChunks()
|| hasPrefetchChunks()
Expand All @@ -253,6 +277,9 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
int result = 31;
if (withCommitStats) {
result = 31 * result + 1231;
}
if (limit != null) {
result = 31 * result + limit.hashCode();
}
Expand Down