Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add support for CommitStats #522

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
54 changes: 54 additions & 0 deletions .github/workflows/approve-readme.yaml
@@ -0,0 +1,54 @@
on:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this file be part of this PR?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's part of the generated changes by running the synth tool. It should be removed before we merge it.

pull_request:
name: auto-merge-readme
jobs:
approve:
runs-on: ubuntu-latest
if: github.repository_owner == 'googleapis' && github.head_ref == 'autosynth-readme'
steps:
- uses: actions/github-script@v3.0.0
with:
github-token: ${{secrets.YOSHI_APPROVER_TOKEN}}
script: |
// only approve PRs from yoshi-automation
if (context.payload.pull_request.user.login !== "yoshi-automation") {
return;
}

// only approve PRs like "chore: release <release version>"
if (!context.payload.pull_request.title === "chore: regenerate README") {
return;
}

// only approve PRs with README.md and synth.metadata changes
const files = new Set(
(
await github.paginate(
github.pulls.listFiles.endpoint({
owner: context.repo.owner,
repo: context.repo.repo,
pull_number: context.payload.pull_request.number,
})
)
).map(file => file.filename)
);
if (files.size != 2 || !files.has("README.md") || !files.has(".github/readme/synth.metadata/synth.metadata")) {
return;
}

// approve README regeneration PR
await github.pulls.createReview({
owner: context.repo.owner,
repo: context.repo.repo,
body: 'Rubber stamped PR!',
pull_number: context.payload.pull_request.number,
event: 'APPROVE'
});

// attach automerge label
await github.issues.addLabels({
owner: context.repo.owner,
repo: context.repo.repo,
issue_number: context.payload.pull_request.number,
labels: ['automerge']
});
65 changes: 65 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -377,4 +377,69 @@
<className>com/google/cloud/spanner/AsyncTransactionManager</className>
<method>com.google.api.core.ApiFuture closeAsync()</method>
</difference>

<!-- CommitStats added -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionManager</className>
<method>com.google.cloud.spanner.CommitStats getCommitStats()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionRunner</className>
<method>com.google.cloud.spanner.CommitStats getCommitStats()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/AsyncTransactionManager</className>
<method>com.google.api.core.ApiFuture getCommitStats()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/AsyncRunner</className>
<method>com.google.api.core.ApiFuture getCommitStats()</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.Timestamp write(java.lang.Iterable)</method>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.Timestamp write(java.lang.Iterable)</method>
<to>com.google.cloud.spanner.DatabaseClient$WriteResponse</to>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.Timestamp writeAtLeastOnce(java.lang.Iterable)</method>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.Timestamp writeAtLeastOnce(java.lang.Iterable)</method>
<to>com.google.cloud.spanner.DatabaseClient$WriteResponse</to>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.TransactionRunner readWriteTransaction()</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.TransactionManager transactionManager()</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.AsyncRunner runAsync()</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.AsyncTransactionManager transactionManagerAsync()</method>
</difference>

</differences>
Expand Up @@ -56,4 +56,11 @@ interface AsyncWork<R> {
* {@link ExecutionException} if the transaction did not commit.
*/
ApiFuture<Timestamp> getCommitTimestamp();

/**
* Returns the {@link CommitStats} of this transaction. This method may only be called after the
* transaction has successfully committed, and only if {@link Options#commitStats()} was specified
* for the transaction.
*/
ApiFuture<CommitStats> getCommitStats();
}
Expand Up @@ -20,15 +20,19 @@
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.common.base.Preconditions;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;

class AsyncRunnerImpl implements AsyncRunner {
private final TransactionRunnerImpl delegate;
private final Options options;
private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create();
private final SettableApiFuture<CommitStats> commitStats = SettableApiFuture.create();

AsyncRunnerImpl(TransactionRunnerImpl delegate) {
this.delegate = delegate;
AsyncRunnerImpl(TransactionRunnerImpl delegate, Options options) {
this.delegate = Preconditions.checkNotNull(delegate);
this.options = Preconditions.checkNotNull(options);
}

@Override
Expand All @@ -43,7 +47,7 @@ public void run() {
} catch (Throwable t) {
res.setException(t);
} finally {
setCommitTimestamp();
setCommitTimestampAndStats();
}
}
});
Expand All @@ -66,16 +70,31 @@ public R run(TransactionContext transaction) throws Exception {
});
}

private void setCommitTimestamp() {
private void setCommitTimestampAndStats() {
try {
commitTimestamp.set(delegate.getCommitTimestamp());
} catch (Throwable t) {
commitTimestamp.setException(t);
}
if (options.withCommitStats()) {
try {
commitStats.set(delegate.getCommitStats());
} catch (Throwable t) {
commitStats.setException(t);
}
}
}

@Override
public ApiFuture<Timestamp> getCommitTimestamp() {
return commitTimestamp;
}

@Override
public ApiFuture<CommitStats> getCommitStats() {
Preconditions.checkState(
options.withCommitStats(),
"getCommitStats may only be invoked if Options.commitStats() was specified for the transaction");
return commitStats;
}
}
Expand Up @@ -191,6 +191,13 @@ public interface AsyncTransactionFunction<I, O> {
/** Returns the state of the transaction. */
TransactionState getState();

/**
* Returns the {@link CommitStats} of this transaction. This method may only be called after the
* transaction has successfully committed, and only if {@link Options#commitStats()} was specified
* for the transaction.
*/
ApiFuture<CommitStats> getCommitStats();

/**
* Closes the manager. If there is an active transaction, it will be rolled back. Underlying
* session will be released back to the session pool.
Expand Down
Expand Up @@ -37,14 +37,19 @@ final class AsyncTransactionManagerImpl
private static final Tracer tracer = Tracing.getTracer();

private final SessionImpl session;
private final Options options;
private Span span;

private TransactionRunnerImpl.TransactionContextImpl txn;
private TransactionState txnState;
private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create();
private SettableApiFuture<CommitStats> commitStats;

AsyncTransactionManagerImpl(SessionImpl session, Span span) {
AsyncTransactionManagerImpl(SessionImpl session, Span span, Options options) {
Preconditions.checkNotNull(session);
Preconditions.checkNotNull(options);
this.session = session;
this.options = options;
this.span = span;
}

Expand All @@ -53,6 +58,17 @@ public void setSpan(Span span) {
this.span = span;
}

@Override
public ApiFuture<CommitStats> getCommitStats() {
Preconditions.checkState(
txnState == TransactionState.COMMITTED,
"getCommitStats can only be invoked if the transaction committed successfully");
Preconditions.checkState(
options.withCommitStats(),
"getCommitStats can only be invoked if Options.commitStats() was specified for the transaction");
return commitStats;
}

@Override
public void close() {
closeAsync();
Expand Down Expand Up @@ -119,7 +135,10 @@ public ApiFuture<Timestamp> commitAsync() {
SpannerExceptionFactory.newSpannerException(
ErrorCode.ABORTED, "Transaction already aborted"));
}
ApiFuture<Timestamp> res = txn.commitAsync();
ApiFuture<Timestamp> res = txn.commitAsync(options.withCommitStats());
if (options.withCommitStats()) {
commitStats = SettableApiFuture.create();
}
txnState = TransactionState.COMMITTED;
ApiFutures.addCallback(
res,
Expand All @@ -131,12 +150,14 @@ public void onFailure(Throwable t) {
} else {
txnState = TransactionState.COMMIT_FAILED;
commitTimestamp.setException(t);
commitStats.setException(t);
}
}

@Override
public void onSuccess(Timestamp result) {
commitTimestamp.set(result);
commitStats.set(txn.commitStats());
}
},
MoreExecutors.directExecutor());
Expand Down
@@ -0,0 +1,76 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.common.base.Preconditions;
import org.threeten.bp.Duration;
import org.threeten.bp.temporal.ChronoUnit;

/**
* Commit statistics are returned by a read/write transaction if specifically requested.
*
* <p>Use one of the following options to run a transaction that returns {@link CommitStats}:
*
* <ul>
* <li>{@link TransactionRunner#withCommitStats()}
* <li>{@link TransactionManager#withCommitStats()}
* <li>{@link AsyncRunner#withCommitStats()}
* <li>{@link AsyncTransactionManager#withCommitStats()}
* <li>{@link DatabaseClient#writeWithCommitStats(Iterable)}
* <li>{@link DatabaseClient#writeAtLeastOnceWithCommitStats(Iterable)}
* </ul>
*/
public class CommitStats {
private final long mutationCount;
private final Duration overloadDelay;

private CommitStats(long mutationCount, Duration overloadDelay) {
this.mutationCount = mutationCount;
this.overloadDelay = overloadDelay;
}

static CommitStats fromProto(com.google.spanner.v1.CommitResponse.CommitStats proto) {
Preconditions.checkNotNull(proto);
return new CommitStats(
proto.getMutationCount(),
Duration.of(proto.getOverloadDelay().getSeconds(), ChronoUnit.SECONDS)
.plusNanos(proto.getOverloadDelay().getNanos()));
}

/**
* The number of mutations that were executed by the transaction. Insert and update operations
* count with the multiplicity of the number of columns they affect. For example, inserting a new
* record may count as five mutations, if values are inserted into five columns. Delete and delete
* range operations count as one mutation regardless of the number of columns affected. Deleting a
* row from a parent table that has the ON DELETE CASCADE annotation is also counted as one
* mutation regardless of the number of interleaved child rows present. The exception to this is
* if there are secondary indexes defined on rows being deleted, then the changes to the secondary
* indexes will be counted individually. For example, if a table has 2 secondary indexes, deleting
* a range of rows in the table will count as 1 mutation for the table, plus 2 mutations for each
* row that is deleted because the rows in the secondary index might be scattered over the
* key-space, making it impossible for Cloud Spanner to call a single delete range operation on
* the secondary indexes. Secondary indexes include the foreign keys backing indexes.
*/
public long getMutationCount() {
return mutationCount;
}

/** The duration that the commit was delayed due to overloaded servers. */
public Duration getOverloadDelay() {
return overloadDelay;
}
}