From 44aa384429056dd6c6563351c43fe7dcac451008 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 17 Feb 2021 04:20:11 +0100 Subject: [PATCH] feat!: add support for CommitStats (#544) * feat!: add support for CommitStats Adds support for returning CommitStats from read/write transactions. * fix: add clirr ignored differences * fix: error message should start with getCommitResponse Co-authored-by: skuruppu * fix: remove overload delay * chore: cleanup after merge * fix: update copyright years of new files * test: fix flaky test * test: skip commit stats tests on emulator * test: missed one commit stats tests against emulator * test: skip another emulator test * test: add missing test cases * fix: address review comments * chore: use junit assertion instead of truth * chore: replace truth asserts with junit asserts * chore: replace truth assertions with junit * chore: cleanup test and variable names * fix: rename test method and variables * fix: address review comments Co-authored-by: skuruppu --- .../clirr-ignored-differences.xml | 23 +++ .../com/google/cloud/spanner/AsyncRunner.java | 6 + .../google/cloud/spanner/AsyncRunnerImpl.java | 36 +++- .../spanner/AsyncTransactionManager.java | 3 + .../spanner/AsyncTransactionManagerImpl.java | 30 +++- .../google/cloud/spanner/CommitResponse.java | 30 +++- .../com/google/cloud/spanner/CommitStats.java | 54 ++++++ .../com/google/cloud/spanner/Options.java | 29 +++- .../com/google/cloud/spanner/SessionImpl.java | 20 +-- .../com/google/cloud/spanner/SessionPool.java | 41 ++++- .../SessionPoolAsyncTransactionManager.java | 18 ++ .../cloud/spanner/TransactionManager.java | 3 + .../cloud/spanner/TransactionManagerImpl.java | 10 +- .../cloud/spanner/TransactionRunner.java | 3 + .../cloud/spanner/TransactionRunnerImpl.java | 47 +++--- .../cloud/spanner/AsyncRunnerImplTest.java | 156 ++++++++++++++++++ .../google/cloud/spanner/AsyncRunnerTest.java | 24 +++ .../AsyncTransactionManagerImplTest.java | 53 ++++++ .../spanner/AsyncTransactionManagerTest.java | 28 ++++ .../cloud/spanner/BaseSessionPoolTest.java | 5 +- .../cloud/spanner/CommitResponseTest.java | 86 ++++++++++ .../cloud/spanner/DatabaseClientImplTest.java | 139 +++++++++++++--- .../spanner/InlineBeginTransactionTest.java | 8 +- .../cloud/spanner/MockSpannerServiceImpl.java | 12 +- .../com/google/cloud/spanner/OptionsTest.java | 55 +++++- .../RetryOnInvalidatedSessionTest.java | 13 +- .../google/cloud/spanner/SessionPoolTest.java | 22 ++- .../spanner/TransactionContextImplTest.java | 34 ++++ .../spanner/TransactionManagerImplTest.java | 20 +-- .../spanner/TransactionRunnerImplTest.java | 6 +- .../connection/ConnectionImplTest.java | 29 +++- .../connection/ReadWriteTransactionTest.java | 11 +- .../connection/SingleUseTransactionTest.java | 28 +++- .../cloud/spanner/it/ITAsyncAPITest.java | 89 +++++++++- .../spanner/it/ITTransactionManagerTest.java | 29 ++++ .../cloud/spanner/it/ITTransactionTest.java | 22 +++ .../google/cloud/spanner/it/ITWriteTest.java | 42 +++++ 37 files changed, 1111 insertions(+), 153 deletions(-) create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/CommitStats.java create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerImplTest.java create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/CommitResponseTest.java diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index 8dd5d9dad6..fcafcb7c3a 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -453,4 +453,27 @@ com/google/cloud/spanner/TransactionContext com.google.api.core.ApiFuture executeUpdateAsync(com.google.cloud.spanner.Statement) + + + + 7012 + com/google/cloud/spanner/AsyncTransactionManager + com.google.api.core.ApiFuture getCommitResponse() + + + 7012 + com/google/cloud/spanner/AsyncRunner + com.google.api.core.ApiFuture getCommitResponse() + + + 7012 + com/google/cloud/spanner/TransactionManager + com.google.cloud.spanner.CommitResponse getCommitResponse() + + + 7012 + com/google/cloud/spanner/TransactionRunner + com.google.cloud.spanner.CommitResponse getCommitResponse() + + diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunner.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunner.java index 3cae49e65b..c9dec98d55 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunner.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunner.java @@ -56,4 +56,10 @@ interface AsyncWork { * {@link ExecutionException} if the transaction did not commit. */ ApiFuture getCommitTimestamp(); + + /** + * Returns the {@link CommitResponse} of this transaction. {@link ApiFuture#get()} throws an + * {@link ExecutionException} if the transaction did not commit. + */ + ApiFuture getCommitResponse(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunnerImpl.java index 5b83402919..7982f0d282 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunnerImpl.java @@ -16,23 +16,31 @@ package com.google.cloud.spanner; +import static com.google.common.base.Preconditions.checkState; + +import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.cloud.Timestamp; import com.google.cloud.spanner.TransactionRunner.TransactionCallable; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; class AsyncRunnerImpl implements AsyncRunner { private final TransactionRunnerImpl delegate; - private final SettableApiFuture commitTimestamp = SettableApiFuture.create(); + private SettableApiFuture commitResponse; AsyncRunnerImpl(TransactionRunnerImpl delegate) { - this.delegate = delegate; + this.delegate = Preconditions.checkNotNull(delegate); } @Override public ApiFuture runAsync(final AsyncWork work, Executor executor) { + Preconditions.checkState(commitResponse == null, "runAsync() can only be called once"); + commitResponse = SettableApiFuture.create(); final SettableApiFuture res = SettableApiFuture.create(); executor.execute( new Runnable() { @@ -43,7 +51,7 @@ public void run() { } catch (Throwable t) { res.setException(t); } finally { - setCommitTimestamp(); + setCommitResponse(); } } }); @@ -66,16 +74,30 @@ public R run(TransactionContext transaction) throws Exception { }); } - private void setCommitTimestamp() { + private void setCommitResponse() { try { - commitTimestamp.set(delegate.getCommitTimestamp()); + commitResponse.set(delegate.getCommitResponse()); } catch (Throwable t) { - commitTimestamp.setException(t); + commitResponse.setException(t); } } @Override public ApiFuture getCommitTimestamp() { - return commitTimestamp; + checkState(commitResponse != null, "runAsync() has not yet been called"); + return ApiFutures.transform( + commitResponse, + new ApiFunction() { + @Override + public Timestamp apply(CommitResponse input) { + return input.getCommitTimestamp(); + } + }, + MoreExecutors.directExecutor()); + } + + public ApiFuture getCommitResponse() { + checkState(commitResponse != null, "runAsync() has not yet been called"); + return commitResponse; } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java index 02d4a9dbd2..a4741dd329 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java @@ -191,6 +191,9 @@ public interface AsyncTransactionFunction { /** Returns the state of the transaction. */ TransactionState getState(); + /** Returns the {@link CommitResponse} of this transaction. */ + ApiFuture getCommitResponse(); + /** * Closes the manager. If there is an active transaction, it will be rolled back. Underlying * session will be released back to the session pool. diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java index 8dab813015..2ec914c24a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java @@ -17,6 +17,7 @@ package com.google.cloud.spanner; import com.google.api.core.ApiAsyncFunction; +import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; @@ -45,7 +46,7 @@ final class AsyncTransactionManagerImpl private TransactionRunnerImpl.TransactionContextImpl txn; private TransactionState txnState; - private final SettableApiFuture commitTimestamp = SettableApiFuture.create(); + private final SettableApiFuture commitResponse = SettableApiFuture.create(); AsyncTransactionManagerImpl(SessionImpl session, Span span, TransactionOption... options) { this.session = session; @@ -132,29 +133,37 @@ public ApiFuture commitAsync() { SpannerExceptionFactory.newSpannerException( ErrorCode.ABORTED, "Transaction already aborted")); } - ApiFuture res = txn.commitAsync(); + ApiFuture commitResponseFuture = txn.commitAsync(); txnState = TransactionState.COMMITTED; ApiFutures.addCallback( - res, - new ApiFutureCallback() { + commitResponseFuture, + new ApiFutureCallback() { @Override public void onFailure(Throwable t) { if (t instanceof AbortedException) { txnState = TransactionState.ABORTED; } else { txnState = TransactionState.COMMIT_FAILED; - commitTimestamp.setException(t); + commitResponse.setException(t); } } @Override - public void onSuccess(Timestamp result) { - commitTimestamp.set(result); + public void onSuccess(CommitResponse result) { + commitResponse.set(result); + } + }, + MoreExecutors.directExecutor()); + return ApiFutures.transform( + commitResponseFuture, + new ApiFunction() { + @Override + public Timestamp apply(CommitResponse input) { + return input.getCommitTimestamp(); } }, MoreExecutors.directExecutor()); - return res; } @Override @@ -187,6 +196,11 @@ public TransactionState getState() { return txnState; } + @Override + public ApiFuture getCommitResponse() { + return commitResponse; + } + @Override public void invalidate() { if (txnState == TransactionState.STARTED || txnState == null) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CommitResponse.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CommitResponse.java index dd5534d7c3..00505cf8d1 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CommitResponse.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CommitResponse.java @@ -17,20 +17,38 @@ package com.google.cloud.spanner; import com.google.cloud.Timestamp; +import com.google.common.base.Preconditions; import java.util.Objects; /** Represents a response from a commit operation. */ public class CommitResponse { - private final Timestamp commitTimestamp; + private final com.google.spanner.v1.CommitResponse proto; public CommitResponse(Timestamp commitTimestamp) { - this.commitTimestamp = commitTimestamp; + this.proto = + com.google.spanner.v1.CommitResponse.newBuilder() + .setCommitTimestamp(commitTimestamp.toProto()) + .build(); } - /** Returns a {@link Timestamp} representing the commit time of the write operation. */ + CommitResponse(com.google.spanner.v1.CommitResponse proto) { + this.proto = Preconditions.checkNotNull(proto); + } + + /** Returns a {@link Timestamp} representing the commit time of the transaction. */ public Timestamp getCommitTimestamp() { - return commitTimestamp; + return Timestamp.fromProto(proto.getCommitTimestamp()); + } + + /** + * Commit statistics are returned by a read/write transaction if specifically requested by passing + * in {@link Options#commitStats()} to the transaction. + */ + public CommitStats getCommitStats() { + Preconditions.checkState( + proto.hasCommitStats(), "The CommitResponse does not contain any commit statistics."); + return CommitStats.fromProto(proto.getCommitStats()); } @Override @@ -42,11 +60,11 @@ public boolean equals(Object o) { return false; } CommitResponse that = (CommitResponse) o; - return Objects.equals(commitTimestamp, that.commitTimestamp); + return Objects.equals(proto, that.proto); } @Override public int hashCode() { - return Objects.hash(commitTimestamp); + return Objects.hash(proto); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CommitStats.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CommitStats.java new file mode 100644 index 0000000000..eaf1a78819 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/CommitStats.java @@ -0,0 +1,54 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import com.google.common.base.Preconditions; + +/** + * Commit statistics are returned by a read/write transaction if specifically requested by passing + * in {@link Options#commitStats()} to the transaction. + */ +public class CommitStats { + private final long mutationCount; + + private CommitStats(long mutationCount) { + this.mutationCount = mutationCount; + } + + static CommitStats fromProto(com.google.spanner.v1.CommitResponse.CommitStats proto) { + Preconditions.checkNotNull(proto); + return new CommitStats(proto.getMutationCount()); + } + + /** + * The number of mutations that were executed by the transaction. Insert and update operations + * count with the multiplicity of the number of columns they affect. For example, inserting a new + * record may count as five mutations, if values are inserted into five columns. Delete and delete + * range operations count as one mutation regardless of the number of columns affected. Deleting a + * row from a parent table that has the ON DELETE CASCADE annotation is also counted as one + * mutation regardless of the number of interleaved child rows present. The exception to this is + * if there are secondary indexes defined on rows being deleted, then the changes to the secondary + * indexes are counted individually. For example, if a table has 2 secondary indexes, deleting a + * range of rows in the table counts as 1 mutation for the table, plus 2 mutations for each row + * that is deleted because the rows in the secondary index might be scattered over the key-space, + * making it impossible for Cloud Spanner to call a single delete range operation on the secondary + * indexes. Secondary indexes include the foreign keys backing indexes. + */ + public long getMutationCount() { + return mutationCount; + } +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index 26d3cd6832..38932bf8a8 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -46,6 +46,11 @@ public interface UpdateOption {} /** Marker interface to mark options applicable to list operations in admin API. */ public interface ListOption {} + /** Specifying this instructs the transaction to request {@link CommitStats} from the backend. */ + public static TransactionOption commitStats() { + return COMMIT_STATS_OPTION; + } + /** * Specifying this will cause the read to yield at most this many rows. This should be greater * than 0. @@ -116,6 +121,16 @@ public static ListOption filter(String filter) { return new FilterOption(filter); } + /** Option to request {@link CommitStats} for read/write transactions. */ + static final class CommitStatsOption extends InternalOption implements TransactionOption { + @Override + void appendToOptions(Options options) { + options.withCommitStats = true; + } + } + + static final CommitStatsOption COMMIT_STATS_OPTION = new CommitStatsOption(); + /** Option pertaining to flow control. */ static final class FlowControlOption extends InternalOption implements ReadAndQueryOption { final int prefetchChunks; @@ -143,6 +158,7 @@ void appendToOptions(Options options) { } } + private boolean withCommitStats; private Long limit; private Integer prefetchChunks; private Integer bufferRows; @@ -153,6 +169,10 @@ void appendToOptions(Options options) { // Construction is via factory methods below. private Options() {} + boolean withCommitStats() { + return withCommitStats; + } + boolean hasLimit() { return limit != null; } @@ -204,6 +224,9 @@ String filter() { @Override public String toString() { StringBuilder b = new StringBuilder(); + if (withCommitStats) { + b.append("withCommitStats: ").append(withCommitStats).append(' '); + } if (limit != null) { b.append("limit: ").append(limit).append(' '); } @@ -234,7 +257,8 @@ public boolean equals(Object o) { } Options that = (Options) o; - return (!hasLimit() && !that.hasLimit() + return Objects.equals(withCommitStats, that.withCommitStats) + && (!hasLimit() && !that.hasLimit() || hasLimit() && that.hasLimit() && Objects.equals(limit(), that.limit())) && (!hasPrefetchChunks() && !that.hasPrefetchChunks() || hasPrefetchChunks() @@ -253,6 +277,9 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = 31; + if (withCommitStats) { + result = 31 * result + 1231; + } if (limit != null) { result = 31 * result + limit.hashCode(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index a20a6b7b24..51ec7b4d50 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -147,7 +147,7 @@ public Void run(TransactionContext ctx) { return null; } }); - return new CommitResponse(runner.getCommitTimestamp()); + return runner.getCommitResponse(); } @Override @@ -165,6 +165,8 @@ public CommitResponse writeAtLeastOnceWithOptions( final CommitRequest request = CommitRequest.newBuilder() .setSession(name) + .setReturnCommitStats( + Options.fromTransactionOptions(transactionOptions).withCommitStats()) .addAllMutations(mutationsProto) .setSingleUseTransaction( TransactionOptions.newBuilder() @@ -174,11 +176,7 @@ public CommitResponse writeAtLeastOnceWithOptions( try (Scope s = tracer.withSpan(span)) { com.google.spanner.v1.CommitResponse response = spanner.getRpc().commit(request, this.options); - Timestamp t = Timestamp.fromProto(response.getCommitTimestamp()); - return new CommitResponse(t); - } catch (IllegalArgumentException e) { - TraceUtil.setWithFailure(span, e); - throw newSpannerException(ErrorCode.INTERNAL, "Could not parse commit timestamp", e); + return new CommitResponse(response); } catch (RuntimeException e) { TraceUtil.setWithFailure(span, e); throw e; @@ -246,17 +244,12 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { @Override public TransactionRunner readWriteTransaction(TransactionOption... options) { - return setActive( - new TransactionRunnerImpl( - this, spanner.getRpc(), spanner.getDefaultPrefetchChunks(), options)); + return setActive(new TransactionRunnerImpl(this, options)); } @Override public AsyncRunner runAsync(TransactionOption... options) { - return new AsyncRunnerImpl( - setActive( - new TransactionRunnerImpl( - this, spanner.getRpc(), spanner.getDefaultPrefetchChunks(), options))); + return new AsyncRunnerImpl(setActive(new TransactionRunnerImpl(this, options))); } @Override @@ -350,6 +343,7 @@ public void run() { TransactionContextImpl newTransaction(Options options) { return TransactionContextImpl.newBuilder() .setSession(this) + .setOptions(options) .setTransactionId(readyTransactionId) .setOptions(options) .setTrackTransactionStarter(spanner.getOptions().isTrackTransactionStarter()) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index b24393340a..51a42525bf 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -37,6 +37,7 @@ import static com.google.cloud.spanner.MetricRegistryConstants.SPANNER_LABEL_KEYS; import static com.google.cloud.spanner.MetricRegistryConstants.SPANNER_LABEL_KEYS_WITH_TYPE; import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; +import static com.google.common.base.Preconditions.checkState; import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; @@ -860,7 +861,7 @@ public TransactionContext resetForRetry() { } catch (SessionNotFoundException e) { session = sessionPool.replaceSession(e, session); PooledSession pooledSession = session.get(); - delegate = pooledSession.delegate.transactionManager(); + delegate = pooledSession.delegate.transactionManager(options); restartedAfterSessionNotFound = true; } } @@ -871,6 +872,11 @@ public Timestamp getCommitTimestamp() { return delegate.getCommitTimestamp(); } + @Override + public CommitResponse getCommitResponse() { + return delegate.getCommitResponse(); + } + @Override public void close() { if (closed) { @@ -949,6 +955,11 @@ public Timestamp getCommitTimestamp() { return getRunner().getCommitTimestamp(); } + @Override + public CommitResponse getCommitResponse() { + return getRunner().getCommitResponse(); + } + @Override public TransactionRunner allowNestedTransaction() { getRunner().allowNestedTransaction(); @@ -960,7 +971,7 @@ private static class SessionPoolAsyncRunner implements AsyncRunner { private final SessionPool sessionPool; private volatile PooledSessionFuture session; private final TransactionOption[] options; - private final SettableApiFuture commitTimestamp = SettableApiFuture.create(); + private SettableApiFuture commitResponse; private SessionPoolAsyncRunner( SessionPool sessionPool, PooledSessionFuture session, TransactionOption... options) { @@ -971,6 +982,7 @@ private SessionPoolAsyncRunner( @Override public ApiFuture runAsync(final AsyncWork work, Executor executor) { + commitResponse = SettableApiFuture.create(); final SettableApiFuture res = SettableApiFuture.create(); executor.execute( new Runnable() { @@ -1010,7 +1022,7 @@ public void run() { } session.get().markUsed(); session.close(); - setCommitTimestamp(runner); + setCommitResponse(runner); if (exception != null) { res.setException(exception); } else { @@ -1021,17 +1033,32 @@ public void run() { return res; } - private void setCommitTimestamp(AsyncRunner delegate) { + private void setCommitResponse(AsyncRunner delegate) { try { - commitTimestamp.set(delegate.getCommitTimestamp().get()); + commitResponse.set(delegate.getCommitResponse().get()); } catch (Throwable t) { - commitTimestamp.setException(t); + commitResponse.setException(t); } } @Override public ApiFuture getCommitTimestamp() { - return commitTimestamp; + checkState(commitResponse != null, "runAsync() has not yet been called"); + return ApiFutures.transform( + commitResponse, + new ApiFunction() { + @Override + public Timestamp apply(CommitResponse input) { + return input.getCommitTimestamp(); + } + }, + MoreExecutors.directExecutor()); + } + + @Override + public ApiFuture getCommitResponse() { + checkState(commitResponse != null, "runAsync() has not yet been called"); + return commitResponse; } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java index e618e2b44a..83056603f2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java @@ -290,4 +290,22 @@ public TransactionState getState() { return txnState; } } + + public ApiFuture getCommitResponse() { + synchronized (lock) { + Preconditions.checkState( + txnState == TransactionState.COMMITTED, + "commit can only be invoked if the transaction was successfully committed"); + } + return ApiFutures.transformAsync( + delegate, + new ApiAsyncFunction() { + @Override + public ApiFuture apply(AsyncTransactionManagerImpl input) + throws Exception { + return input.getCommitResponse(); + } + }, + MoreExecutors.directExecutor()); + } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java index 34e2165715..c9a56d8100 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java @@ -87,6 +87,9 @@ public enum TransactionState { */ Timestamp getCommitTimestamp(); + /** Returns the {@link CommitResponse} of this transaction. */ + CommitResponse getCommitResponse(); + /** Returns the state of the transaction. */ TransactionState getState(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java index 1cef304e48..2d47fcd5c7 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java @@ -118,7 +118,15 @@ public Timestamp getCommitTimestamp() { Preconditions.checkState( txnState == TransactionState.COMMITTED, "getCommitTimestamp can only be invoked if the transaction committed successfully"); - return txn.commitTimestamp(); + return txn.getCommitResponse().getCommitTimestamp(); + } + + @Override + public CommitResponse getCommitResponse() { + Preconditions.checkState( + txnState == TransactionState.COMMITTED, + "getCommitResponse can only be invoked if the transaction committed successfully"); + return txn.getCommitResponse(); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunner.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunner.java index 2ad27fcd7b..33cb57e90f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunner.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunner.java @@ -74,6 +74,9 @@ interface TransactionCallable { */ Timestamp getCommitTimestamp(); + /** Returns the {@link CommitResponse} of this transaction. */ + CommitResponse getCommitResponse(); + /** * Allows overriding the default behaviour of blocking nested transactions. * diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index a4973c2934..dbbbca068a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -31,7 +31,6 @@ import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionImpl.SessionTransaction; -import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; @@ -40,7 +39,6 @@ import com.google.protobuf.Empty; import com.google.rpc.Code; import com.google.spanner.v1.CommitRequest; -import com.google.spanner.v1.CommitResponse; import com.google.spanner.v1.ExecuteBatchDmlRequest; import com.google.spanner.v1.ExecuteBatchDmlResponse; import com.google.spanner.v1.ExecuteSqlRequest; @@ -184,7 +182,7 @@ public void removeListener(Runnable listener) { volatile ByteString transactionId; - private Timestamp commitTimestamp; + private CommitResponse commitResponse; private TransactionContextImpl(Builder builder) { super(builder); @@ -275,7 +273,7 @@ public void run() { void commit() { try { - commitTimestamp = commitAsync().get(); + commitResponse = commitAsync().get(); } catch (InterruptedException e) { if (commitFuture != null) { commitFuture.cancel(true); @@ -288,10 +286,13 @@ void commit() { volatile ApiFuture commitFuture; - ApiFuture commitAsync() { - final SettableApiFuture res = SettableApiFuture.create(); + ApiFuture commitAsync() { + final SettableApiFuture res = SettableApiFuture.create(); final SettableApiFuture finishOps; - CommitRequest.Builder builder = CommitRequest.newBuilder().setSession(session.getName()); + CommitRequest.Builder builder = + CommitRequest.newBuilder() + .setSession(session.getName()) + .setReturnCommitStats(options.withCommitStats()); synchronized (lock) { if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) { finishOps = SettableApiFuture.create(); @@ -313,12 +314,12 @@ ApiFuture commitAsync() { } private final class CommitRunnable implements Runnable { - private final SettableApiFuture res; + private final SettableApiFuture res; private final ApiFuture prev; private final CommitRequest.Builder requestBuilder; CommitRunnable( - SettableApiFuture res, + SettableApiFuture res, ApiFuture prev, CommitRequest.Builder requestBuilder) { this.res = res; @@ -342,7 +343,7 @@ public void run() { span.addAnnotation("Starting Commit"); final Span opSpan = tracer.spanBuilderWithExplicitParent(SpannerImpl.COMMIT, span).startSpan(); - final ApiFuture commitFuture = + final ApiFuture commitFuture = rpc.commitAsync(commitRequest, session.getOptions()); commitFuture.addListener( tracer.withSpan( @@ -351,15 +352,14 @@ public void run() { @Override public void run() { try { - CommitResponse commitResponse = commitFuture.get(); - if (!commitResponse.hasCommitTimestamp()) { + com.google.spanner.v1.CommitResponse proto = commitFuture.get(); + if (!proto.hasCommitTimestamp()) { throw newSpannerException( ErrorCode.INTERNAL, "Missing commitTimestamp:\n" + session.getName()); } - Timestamp ts = Timestamp.fromProto(commitResponse.getCommitTimestamp()); span.addAnnotation("Commit Done"); opSpan.end(TraceUtil.END_SPAN_OPTIONS); - res.set(ts); + res.set(new CommitResponse(proto)); } catch (Throwable e) { if (e instanceof ExecutionException) { e = @@ -387,9 +387,9 @@ public void run() { } } - Timestamp commitTimestamp() { - checkState(commitTimestamp != null, "run() has not yet returned normally"); - return commitTimestamp; + CommitResponse getCommitResponse() { + checkState(commitResponse != null, "run() has not yet returned normally"); + return commitResponse; } boolean isAborted() { @@ -826,11 +826,7 @@ public TransactionRunner allowNestedTransaction() { return this; } - TransactionRunnerImpl( - SessionImpl session, - SpannerRpc rpc, - int defaultPrefetchChunks, - TransactionOption... options) { + TransactionRunnerImpl(SessionImpl session, TransactionOption... options) { this.session = session; this.options = Options.fromTransactionOptions(options); this.txn = session.newTransaction(this.options); @@ -955,7 +951,12 @@ public T call() { @Override public Timestamp getCommitTimestamp() { checkState(txn != null, "run() has not yet returned normally"); - return txn.commitTimestamp(); + return txn.getCommitResponse().getCommitTimestamp(); + } + + public CommitResponse getCommitResponse() { + checkState(txn != null, "run() has not yet returned normally"); + return txn.getCommitResponse(); } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerImplTest.java new file mode 100644 index 0000000000..fab29d2900 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerImplTest.java @@ -0,0 +1,156 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static com.google.cloud.spanner.SpannerApiFutures.get; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.cloud.spanner.AsyncRunner.AsyncWork; +import com.google.cloud.spanner.TransactionRunner.TransactionCallable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.AfterClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class AsyncRunnerImplTest { + private static final ExecutorService executor = Executors.newSingleThreadExecutor(); + + @AfterClass + public static void teardown() { + executor.shutdown(); + } + + @SuppressWarnings("unchecked") + @Test + public void testAsyncRunReturnsResultAndCommitResponse() { + final Object expectedResult = new Object(); + final CommitResponse expectedCommitResponse = mock(CommitResponse.class); + + TransactionRunnerImpl delegate = mock(TransactionRunnerImpl.class); + when(delegate.run(any(TransactionCallable.class))).thenReturn(expectedResult); + when(delegate.getCommitResponse()).thenReturn(expectedCommitResponse); + + AsyncRunnerImpl runner = new AsyncRunnerImpl(delegate); + ApiFuture result = + runner.runAsync( + new AsyncWork() { + @Override + public ApiFuture doWorkAsync(TransactionContext txn) { + return ApiFutures.immediateFuture(expectedResult); + } + }, + executor); + + assertSame(expectedResult, get(result)); + assertSame(expectedCommitResponse, get(runner.getCommitResponse())); + assertEquals( + get(runner.getCommitResponse()).getCommitTimestamp(), get(runner.getCommitTimestamp())); + } + + @Test + public void testGetCommitTimestampReturnsErrorBeforeRun() { + TransactionRunnerImpl delegate = mock(TransactionRunnerImpl.class); + AsyncRunnerImpl runner = new AsyncRunnerImpl(delegate); + try { + runner.getCommitTimestamp(); + fail("missing expected exception"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("runAsync() has not yet been called")); + } + } + + @Test + public void testGetCommitResponseReturnsErrorBeforeRun() { + TransactionRunnerImpl delegate = mock(TransactionRunnerImpl.class); + AsyncRunnerImpl runner = new AsyncRunnerImpl(delegate); + try { + runner.getCommitResponse(); + fail("missing expected exception"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("runAsync() has not yet been called")); + } + } + + @Test + public void testGetCommitResponseReturnsErrorIfRunFails() { + final SpannerException expectedException = + SpannerExceptionFactory.newSpannerException(ErrorCode.ALREADY_EXISTS, "Row already exists"); + + TransactionRunnerImpl delegate = mock(TransactionRunnerImpl.class); + when(delegate.getCommitResponse()).thenThrow(expectedException); + + AsyncRunnerImpl runner = new AsyncRunnerImpl(delegate); + runner.runAsync( + new AsyncWork() { + @Override + public ApiFuture doWorkAsync(TransactionContext txn) { + return ApiFutures.immediateFailedFuture(expectedException); + } + }, + executor); + + try { + get(runner.getCommitResponse()); + fail("missing expected exception"); + } catch (SpannerException e) { + assertSame(expectedException, e); + } + } + + @SuppressWarnings("unchecked") + @Test + public void testRunAyncFailsIfCalledMultipleTimes() { + final Object result = new Object(); + TransactionRunnerImpl delegate = mock(TransactionRunnerImpl.class); + when(delegate.run(any(TransactionCallable.class))).thenReturn(result); + + AsyncRunnerImpl runner = new AsyncRunnerImpl(delegate); + runner.runAsync( + new AsyncWork() { + @Override + public ApiFuture doWorkAsync(TransactionContext txn) { + return ApiFutures.immediateFuture(result); + } + }, + executor); + + try { + runner.runAsync( + new AsyncWork() { + @Override + public ApiFuture doWorkAsync(TransactionContext txn) { + return ApiFutures.immediateFuture(null); + } + }, + executor); + fail("missing expected exception"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("runAsync() can only be called once")); + } + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java index 2af185ae14..0c67871e89 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java @@ -18,6 +18,7 @@ import static com.google.cloud.spanner.MockSpannerTestUtil.*; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.google.api.core.ApiFunction; @@ -53,6 +54,29 @@ @RunWith(JUnit4.class) public class AsyncRunnerTest extends AbstractAsyncTransactionTest { + + @Test + public void testAsyncRunner_doesNotReturnCommitTimestampBeforeCommit() { + AsyncRunner runner = client().runAsync(); + try { + runner.getCommitTimestamp(); + fail("missing expected exception"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("runAsync() has not yet been called")); + } + } + + @Test + public void testAsyncRunner_doesNotReturnCommitResponseBeforeCommit() { + AsyncRunner runner = client().runAsync(); + try { + runner.getCommitResponse(); + fail("missing expected exception"); + } catch (IllegalStateException e) { + assertTrue(e.getMessage().contains("runAsync() has not yet been called")); + } + } + @Test public void asyncRunnerUpdate() throws Exception { AsyncRunner runner = client().runAsync(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java new file mode 100644 index 0000000000..c60363e793 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.api.core.ApiFutures; +import com.google.cloud.Timestamp; +import io.opencensus.trace.Span; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class AsyncTransactionManagerImplTest { + + @Mock private SessionImpl session; + @Mock TransactionRunnerImpl.TransactionContextImpl transaction; + + @Test + public void testCommitReturnsCommitStats() { + try (AsyncTransactionManagerImpl manager = + new AsyncTransactionManagerImpl(session, mock(Span.class), Options.commitStats())) { + when(session.newTransaction(Options.fromTransactionOptions(Options.commitStats()))) + .thenReturn(transaction); + when(transaction.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null)); + Timestamp commitTimestamp = Timestamp.ofTimeMicroseconds(1); + CommitResponse response = mock(CommitResponse.class); + when(response.getCommitTimestamp()).thenReturn(commitTimestamp); + when(transaction.commitAsync()).thenReturn(ApiFutures.immediateFuture(response)); + manager.beginAsync(); + manager.commitAsync(); + verify(transaction).commitAsync(); + } + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java index aed844ed9a..748d6f7640 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java @@ -23,6 +23,8 @@ import static com.google.cloud.spanner.MockSpannerTestUtil.UPDATE_COUNT; import static com.google.cloud.spanner.MockSpannerTestUtil.UPDATE_STATEMENT; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import com.google.api.core.ApiFuture; @@ -213,6 +215,32 @@ public boolean apply(AbstractMessage input) { 0L); } + @Test + public void testAsyncTransactionManager_returnsCommitStats() throws Exception { + try (AsyncTransactionManager manager = + client().transactionManagerAsync(Options.commitStats())) { + TransactionContextFuture transaction = manager.beginAsync(); + while (true) { + try { + CommitTimestampFuture commitTimestamp = + transaction + .then( + AsyncTransactionManagerHelper.buffer( + Mutation.delete("FOO", Key.of("foo"))), + executor) + .commitAsync(); + assertNotNull(commitTimestamp.get()); + assertNotNull(manager.getCommitResponse().get()); + assertNotNull(manager.getCommitResponse().get().getCommitStats()); + assertEquals(1L, manager.getCommitResponse().get().getCommitStats().getMutationCount()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetryAsync(); + } + } + } + } + @Test public void asyncTransactionManagerUpdate() throws Exception { final SettableApiFuture updateCount = SettableApiFuture.create(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java index 6fd6a4383e..3db2bdb1c4 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.when; import com.google.api.core.ApiFutures; -import com.google.cloud.Timestamp; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.SessionPool.Clock; import com.google.protobuf.Empty; @@ -67,9 +66,9 @@ SessionImpl mockSession() { "projects/dummy/instances/dummy/database/dummy/sessions/session" + sessionIndex); when(session.asyncClose()).thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); when(session.writeWithOptions(any(Iterable.class))) - .thenReturn(new CommitResponse(Timestamp.now())); + .thenReturn(new CommitResponse(com.google.spanner.v1.CommitResponse.getDefaultInstance())); when(session.writeAtLeastOnceWithOptions(any(Iterable.class))) - .thenReturn(new CommitResponse(Timestamp.now())); + .thenReturn(new CommitResponse(com.google.spanner.v1.CommitResponse.getDefaultInstance())); sessionIndex++; return session; } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/CommitResponseTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/CommitResponseTest.java new file mode 100644 index 0000000000..7fb4c559d4 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/CommitResponseTest.java @@ -0,0 +1,86 @@ +/* + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import com.google.cloud.Timestamp; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class CommitResponseTest { + + @Test + public void testConstructWithTimestamp() { + Timestamp timestamp = Timestamp.ofTimeSecondsAndNanos(100L, 100); + CommitResponse response = new CommitResponse(timestamp); + assertEquals(timestamp, response.getCommitTimestamp()); + } + + @Test + public void testFromProto() { + long mutationCount = 5L; + com.google.protobuf.Timestamp timestamp = + com.google.protobuf.Timestamp.newBuilder().setSeconds(123L).setNanos(456).build(); + com.google.spanner.v1.CommitResponse proto = + com.google.spanner.v1.CommitResponse.newBuilder() + .setCommitStats( + com.google.spanner.v1.CommitResponse.CommitStats.newBuilder() + .setMutationCount(mutationCount) + .build()) + .setCommitTimestamp(timestamp) + .build(); + + CommitResponse response = new CommitResponse(proto); + + assertEquals(Timestamp.ofTimeSecondsAndNanos(123L, 456), response.getCommitTimestamp()); + assertEquals(mutationCount, response.getCommitStats().getMutationCount()); + } + + @Test + public void testEqualsAndHashCode() { + com.google.spanner.v1.CommitResponse proto1 = + com.google.spanner.v1.CommitResponse.newBuilder() + .setCommitTimestamp(com.google.protobuf.Timestamp.newBuilder().setSeconds(1L).build()) + .build(); + com.google.spanner.v1.CommitResponse proto2 = + com.google.spanner.v1.CommitResponse.newBuilder() + .setCommitTimestamp(com.google.protobuf.Timestamp.newBuilder().setSeconds(2L).build()) + .build(); + com.google.spanner.v1.CommitResponse proto3 = + com.google.spanner.v1.CommitResponse.newBuilder() + .setCommitTimestamp(com.google.protobuf.Timestamp.newBuilder().setSeconds(1L).build()) + .build(); + + CommitResponse response1 = new CommitResponse(proto1); + CommitResponse response2 = new CommitResponse(proto2); + CommitResponse response3 = new CommitResponse(proto3); + + assertEquals(response3, response1); + assertNotEquals(response2, response1); + assertNotEquals(response3, response2); + assertNotEquals(response1, null); + assertNotEquals(response1, new Object()); + + assertEquals(response3.hashCode(), response1.hashCode()); + assertNotEquals(response2.hashCode(), response1.hashCode()); + assertNotEquals(response3.hashCode(), response2.hashCode()); + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 8ed55e0b9e..42a158d755 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -17,7 +17,11 @@ package com.google.cloud.spanner; import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1; +import static com.google.cloud.spanner.SpannerApiFutures.get; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -28,6 +32,7 @@ import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.api.gax.retrying.RetrySettings; import com.google.cloud.NoCredentials; +import com.google.cloud.Timestamp; import com.google.cloud.spanner.AbstractResultSet.GrpcStreamIterator; import com.google.cloud.spanner.AsyncResultSet.CallbackResponse; import com.google.cloud.spanner.AsyncResultSet.ReadyCallback; @@ -155,21 +160,53 @@ public void tearDown() { } @Test - public void write() { + public void testWrite() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - client.write( - Arrays.asList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); + Timestamp timestamp = + client.write( + Arrays.asList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); + assertNotNull(timestamp); } @Test - public void writeAtLeastOnce() { + public void testWriteWithCommitStats() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - client.writeAtLeastOnce( - Arrays.asList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); + CommitResponse response = + client.writeWithOptions( + Arrays.asList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), + Options.commitStats()); + assertNotNull(response); + assertNotNull(response.getCommitTimestamp()); + assertNotNull(response.getCommitStats()); + } + + @Test + public void testWriteAtLeastOnce() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + Timestamp timestamp = + client.writeAtLeastOnce( + Arrays.asList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); + assertNotNull(timestamp); + } + + @Test + public void testWriteAtLeastOnceWithCommitStats() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + CommitResponse response = + client.writeAtLeastOnceWithOptions( + Arrays.asList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), + Options.commitStats()); + assertNotNull(response); + assertNotNull(response.getCommitTimestamp()); + assertNotNull(response.getCommitStats()); } @Test @@ -426,7 +463,7 @@ public void readOnlyTransactionBoundIsNonBlocking() { } @Test - public void readWriteTransaction() { + public void testReadWriteTransaction() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); TransactionRunner runner = client.readWriteTransaction(); @@ -438,6 +475,25 @@ public Void run(TransactionContext transaction) throws Exception { return null; } }); + assertNotNull(runner.getCommitTimestamp()); + } + + @Test + public void testReadWriteTransaction_returnsCommitStats() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = client.readWriteTransaction(Options.commitStats()); + runner.run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + transaction.buffer(Mutation.delete("FOO", Key.of("foo"))); + return null; + } + }); + assertNotNull(runner.getCommitResponse()); + assertNotNull(runner.getCommitResponse().getCommitStats()); + assertEquals(1L, runner.getCommitResponse().getCommitStats().getMutationCount()); } @Test @@ -461,12 +517,12 @@ public Void run(TransactionContext transaction) throws Exception { } @Test - public void runAsync() throws Exception { + public void testRunAsync() throws Exception { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); ExecutorService executor = Executors.newSingleThreadExecutor(); AsyncRunner runner = client.runAsync(); - ApiFuture fut = + ApiFuture result = runner.runAsync( new AsyncWork() { @Override @@ -475,7 +531,31 @@ public ApiFuture doWorkAsync(TransactionContext txn) { } }, executor); - assertThat(fut.get()).isEqualTo(UPDATE_COUNT); + assertEquals(UPDATE_COUNT, result.get().longValue()); + assertNotNull(runner.getCommitTimestamp().get()); + executor.shutdown(); + } + + @Test + public void testRunAsync_returnsCommitStats() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + ExecutorService executor = Executors.newSingleThreadExecutor(); + AsyncRunner runner = client.runAsync(Options.commitStats()); + ApiFuture result = + runner.runAsync( + new AsyncWork() { + @Override + public ApiFuture doWorkAsync(TransactionContext txn) { + txn.buffer(Mutation.delete("FOO", Key.of("foo"))); + return ApiFutures.immediateFuture(null); + } + }, + executor); + assertNull(get(result)); + assertNotNull(get(runner.getCommitResponse())); + assertNotNull(get(runner.getCommitResponse()).getCommitStats()); + assertEquals(1L, get(runner.getCommitResponse()).getCommitStats().getMutationCount()); executor.shutdown(); } @@ -528,19 +608,40 @@ public ApiFuture doWorkAsync(TransactionContext txn) { } @Test - public void transactionManager() throws Exception { + public void testTransactionManager() throws Exception { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - try (TransactionManager txManager = client.transactionManager()) { + try (TransactionManager manager = client.transactionManager()) { while (true) { - TransactionContext tx = txManager.begin(); + TransactionContext transaction = manager.begin(); try { - tx.executeUpdate(UPDATE_STATEMENT); - txManager.commit(); + transaction.executeUpdate(UPDATE_STATEMENT); + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); break; } catch (AbortedException e) { - Thread.sleep(e.getRetryDelayInMillis() / 1000); - tx = txManager.resetForRetry(); + transaction = manager.resetForRetry(); + } + } + } + } + + @Test + public void testTransactionManager_returnsCommitStats() throws InterruptedException { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + try (TransactionManager manager = client.transactionManager(Options.commitStats())) { + while (true) { + TransactionContext transaction = manager.begin(); + try { + transaction.buffer(Mutation.delete("FOO", Key.of("foo"))); + manager.commit(); + assertNotNull(manager.getCommitResponse()); + assertNotNull(manager.getCommitResponse().getCommitStats()); + assertEquals(1L, manager.getCommitResponse().getCommitStats().getMutationCount()); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetry(); } } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java index aeeac5b3d1..5aa2948b51 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java @@ -1473,13 +1473,12 @@ public Void run(TransactionContext transaction) throws Exception { if (attempt == 1) { impl.waitForTransactionTimeoutMillis = 1L; // Freeze the mock server to prevent the first (async) statement from returning - // a - // transaction. + // a transaction. mockSpanner.freeze(); } else { impl.waitForTransactionTimeoutMillis = 60_000L; } - transaction.executeUpdateAsync(UPDATE_STATEMENT); + ApiFuture updateCount = transaction.executeUpdateAsync(UPDATE_STATEMENT); // Try to execute a query. This will timeout during the first attempt while // waiting @@ -1489,6 +1488,9 @@ public Void run(TransactionContext transaction) throws Exception { while (rs.next()) {} } catch (Throwable t) { mockSpanner.unfreeze(); + // Wait until the update actually finishes so it has returned a transaction. + // This ensures that the retry does not issue a BeginTransaction RPC. + SpannerApiFutures.get(updateCount); throw t; } return null; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 17714ebae0..424e979692 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -1809,8 +1809,16 @@ public void commit(CommitRequest request, StreamObserver respons } simulateAbort(session, request.getTransactionId()); commitTransaction(transaction.getId()); - responseObserver.onNext( - CommitResponse.newBuilder().setCommitTimestamp(getCurrentGoogleTimestamp()).build()); + CommitResponse.Builder responseBuilder = + CommitResponse.newBuilder().setCommitTimestamp(getCurrentGoogleTimestamp()); + if (request.getReturnCommitStats()) { + responseBuilder.setCommitStats( + com.google.spanner.v1.CommitResponse.CommitStats.newBuilder() + // This is not really always equal, but at least it returns a value. + .setMutationCount(request.getMutationsCount()) + .build()); + } + responseObserver.onNext(responseBuilder.build()); responseObserver.onCompleted(); } catch (StatusRuntimeException t) { responseObserver.onError(t); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java index 5a2777570b..db642aedd1 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java @@ -17,6 +17,8 @@ package com.google.cloud.spanner; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @@ -221,22 +223,57 @@ public void queryEquality() { } @Test - public void testFromTransactionOptions() { + public void testFromTransactionOptions_toStringNoOptions() { Options opts = Options.fromTransactionOptions(); assertThat(opts.toString()).isEqualTo(""); } @Test - public void testTransactionOptionsEquality() { - Options o1; - Options o2; + public void testFromTransactionOptions_toStringWithCommitStats() { + Options options = Options.fromTransactionOptions(Options.commitStats()); + assertThat(options.toString()).contains("withCommitStats: true"); + } - o1 = Options.fromTransactionOptions(); - o2 = Options.fromTransactionOptions(); - assertThat(o1.equals(o2)).isTrue(); + @Test + public void testTransactionOptions_noOptionsAreEqual() { + Options option1 = Options.fromTransactionOptions(); + Options option2 = Options.fromTransactionOptions(); + assertEquals(option1, option2); + } - o2 = Options.fromReadOptions(Options.prefetchChunks(1)); - assertThat(o1.equals(o2)).isFalse(); + @Test + public void testTransactionOptions_withCommitStatsAreEqual() { + Options option1 = Options.fromTransactionOptions(Options.commitStats()); + Options option2 = Options.fromTransactionOptions(Options.commitStats()); + assertEquals(option1, option2); + } + + @Test + public void testTransactionOptions_withCommitStatsAndOtherOptionAreNotEqual() { + Options option1 = Options.fromTransactionOptions(Options.commitStats()); + Options option2 = Options.fromQueryOptions(Options.prefetchChunks(10)); + assertNotEquals(option1, option2); + } + + @Test + public void testTransactionOptions_noOptionsHashCode() { + Options option1 = Options.fromTransactionOptions(); + Options option2 = Options.fromTransactionOptions(); + assertEquals(option2.hashCode(), option1.hashCode()); + } + + @Test + public void testTransactionOptions_withCommitStatsHashCode() { + Options option1 = Options.fromTransactionOptions(Options.commitStats()); + Options option2 = Options.fromTransactionOptions(Options.commitStats()); + assertEquals(option2.hashCode(), option1.hashCode()); + } + + @Test + public void testTransactionOptions_withCommitStatsAndOtherOptionHashCode() { + Options option1 = Options.fromTransactionOptions(Options.commitStats()); + Options option2 = Options.fromQueryOptions(Options.prefetchChunks(10)); + assertNotEquals(option2.hashCode(), option1.hashCode()); } @Test diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java index e49c7e739c..157d71bf36 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java @@ -18,6 +18,10 @@ import static com.google.cloud.spanner.SpannerApiFutures.get; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.google.api.core.ApiFunction; @@ -1151,7 +1155,7 @@ public void transactionManagerReadRowUsingIndex() throws InterruptedException { @Test public void transactionManagerUpdate() throws InterruptedException { invalidateSessionPool(); - try (TransactionManager manager = client.transactionManager()) { + try (TransactionManager manager = client.transactionManager(Options.commitStats())) { long count; TransactionContext transaction = manager.begin(); while (true) { @@ -1164,10 +1168,11 @@ public void transactionManagerUpdate() throws InterruptedException { transaction = manager.resetForRetry(); } } - assertThat(count).isEqualTo(UPDATE_COUNT); - assertThat(failOnInvalidatedSession).isFalse(); + assertEquals(UPDATE_COUNT, count); + assertNotNull(manager.getCommitResponse().getCommitStats()); + assertFalse(failOnInvalidatedSession); } catch (SessionNotFoundException e) { - assertThat(failOnInvalidatedSession).isTrue(); + assertTrue(failOnInvalidatedSession); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index a163b82914..337510757d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -676,7 +676,7 @@ public void run() { clock.currentTimeMillis += clock.currentTimeMillis + (options.getKeepAliveIntervalMinutes() + 5) * 60 * 1000; session1 = pool.getSession(); - session1.writeAtLeastOnce(new ArrayList()); + session1.writeAtLeastOnceWithOptions(new ArrayList()); session1.close(); runMaintainanceLoop(clock, pool, pool.poolMaintainer.numKeepAliveCycles); // The session pool only keeps MinSessions + MaxIdleSessions alive. @@ -900,8 +900,7 @@ public void testSessionNotFoundReadWriteTransaction() { when(closedSession.newTransaction(Options.fromTransactionOptions())) .thenReturn(closedTransactionContext); when(closedSession.beginTransactionAsync()).thenThrow(sessionNotFound); - TransactionRunnerImpl closedTransactionRunner = - new TransactionRunnerImpl(closedSession, rpc, 10); + TransactionRunnerImpl closedTransactionRunner = new TransactionRunnerImpl(closedSession); closedTransactionRunner.setSpan(mock(Span.class)); when(closedSession.readWriteTransaction()).thenReturn(closedTransactionRunner); @@ -915,8 +914,7 @@ public void testSessionNotFoundReadWriteTransaction() { .thenReturn(openTransactionContext); when(openSession.beginTransactionAsync()) .thenReturn(ApiFutures.immediateFuture(ByteString.copyFromUtf8("open-txn"))); - TransactionRunnerImpl openTransactionRunner = - new TransactionRunnerImpl(openSession, mock(SpannerRpc.class), 10); + TransactionRunnerImpl openTransactionRunner = new TransactionRunnerImpl(openSession); openTransactionRunner.setSpan(mock(Span.class)); when(openSession.readWriteTransaction()).thenReturn(openTransactionRunner); @@ -1047,10 +1045,13 @@ public void testSessionNotFoundWrite() { SpannerExceptionFactoryTest.newSessionNotFoundException(sessionName); List mutations = Arrays.asList(Mutation.newInsertBuilder("FOO").build()); final SessionImpl closedSession = mockSession(); - when(closedSession.write(mutations)).thenThrow(sessionNotFound); + when(closedSession.writeWithOptions(mutations)).thenThrow(sessionNotFound); final SessionImpl openSession = mockSession(); - when(openSession.write(mutations)).thenReturn(Timestamp.now()); + com.google.cloud.spanner.CommitResponse response = + mock(com.google.cloud.spanner.CommitResponse.class); + when(response.getCommitTimestamp()).thenReturn(Timestamp.now()); + when(openSession.writeWithOptions(mutations)).thenReturn(response); doAnswer( new Answer() { @Override @@ -1099,10 +1100,13 @@ public void testSessionNotFoundWriteAtLeastOnce() { SpannerExceptionFactoryTest.newSessionNotFoundException(sessionName); List mutations = Arrays.asList(Mutation.newInsertBuilder("FOO").build()); final SessionImpl closedSession = mockSession(); - when(closedSession.writeAtLeastOnce(mutations)).thenThrow(sessionNotFound); + when(closedSession.writeAtLeastOnceWithOptions(mutations)).thenThrow(sessionNotFound); final SessionImpl openSession = mockSession(); - when(openSession.writeAtLeastOnce(mutations)).thenReturn(Timestamp.now()); + com.google.cloud.spanner.CommitResponse response = + mock(com.google.cloud.spanner.CommitResponse.class); + when(response.getCommitTimestamp()).thenReturn(Timestamp.now()); + when(openSession.writeAtLeastOnceWithOptions(mutations)).thenReturn(response); doAnswer( new Answer() { @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionContextImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionContextImplTest.java index 21f2a59fb3..1ca9e0e667 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionContextImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionContextImplTest.java @@ -16,14 +16,19 @@ package com.google.cloud.spanner; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyMap; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.api.core.ApiFutures; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.protobuf.ByteString; import com.google.rpc.Code; import com.google.rpc.Status; +import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.ExecuteBatchDmlRequest; import com.google.spanner.v1.ExecuteBatchDmlResponse; import java.util.Arrays; @@ -45,6 +50,35 @@ public void batchDmlException() { batchDml(Code.FAILED_PRECONDITION_VALUE); } + @SuppressWarnings("unchecked") + @Test + public void testReturnCommitStats() { + SessionImpl session = mock(SessionImpl.class); + when(session.getName()).thenReturn("test"); + ByteString transactionId = ByteString.copyFromUtf8("test"); + SpannerRpc rpc = mock(SpannerRpc.class); + when(rpc.commitAsync(any(CommitRequest.class), anyMap())) + .thenReturn( + ApiFutures.immediateFuture(com.google.spanner.v1.CommitResponse.getDefaultInstance())); + + try (TransactionContextImpl context = + TransactionContextImpl.newBuilder() + .setSession(session) + .setRpc(rpc) + .setTransactionId(transactionId) + .setOptions(Options.fromTransactionOptions(Options.commitStats())) + .build()) { + context.commitAsync(); + CommitRequest request = + CommitRequest.newBuilder() + .setReturnCommitStats(true) + .setSession(session.getName()) + .setTransactionId(transactionId) + .build(); + verify(rpc).commitAsync(Mockito.eq(request), anyMap()); + } + } + @SuppressWarnings("unchecked") private void batchDml(int status) { SessionImpl session = mock(SessionImpl.class); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java index 39e65e21d9..4ffed254cb 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java @@ -37,7 +37,6 @@ import com.google.protobuf.Empty; import com.google.spanner.v1.BeginTransactionRequest; import com.google.spanner.v1.CommitRequest; -import com.google.spanner.v1.CommitResponse; import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions; import com.google.spanner.v1.ResultSetMetadata; @@ -141,7 +140,8 @@ public void transactionRolledBackOnClose() { public void commitSucceeds() { when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); Timestamp commitTimestamp = Timestamp.ofTimeMicroseconds(1); - when(txn.commitTimestamp()).thenReturn(commitTimestamp); + CommitResponse response = new CommitResponse(commitTimestamp); + when(txn.getCommitResponse()).thenReturn(response); manager.begin(); manager.commit(); assertThat(manager.getState()).isEqualTo(TransactionState.COMMITTED); @@ -266,12 +266,12 @@ public ApiFuture answer(InvocationOnMock invocation) { }); when(rpc.commitAsync(Mockito.any(CommitRequest.class), Mockito.anyMap())) .thenAnswer( - new Answer>() { + new Answer>() { @Override - public ApiFuture answer(InvocationOnMock invocation) - throws Throwable { + public ApiFuture answer( + InvocationOnMock invocation) throws Throwable { return ApiFutures.immediateFuture( - CommitResponse.newBuilder() + com.google.spanner.v1.CommitResponse.newBuilder() .setCommitTimestamp( com.google.protobuf.Timestamp.newBuilder() .setSeconds(System.currentTimeMillis() * 1000)) @@ -360,12 +360,12 @@ public com.google.spanner.v1.ResultSet answer(InvocationOnMock invocation) }); when(rpc.commitAsync(Mockito.any(CommitRequest.class), Mockito.anyMap())) .thenAnswer( - new Answer>() { + new Answer>() { @Override - public ApiFuture answer(InvocationOnMock invocation) - throws Throwable { + public ApiFuture answer( + InvocationOnMock invocation) throws Throwable { return ApiFutures.immediateFuture( - CommitResponse.newBuilder() + com.google.spanner.v1.CommitResponse.newBuilder() .setCommitTimestamp( com.google.protobuf.Timestamp.newBuilder() .setSeconds(System.currentTimeMillis() * 1000)) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index fc7dde22f4..b65f70b384 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -123,7 +123,7 @@ public ResultSet answer(InvocationOnMock invocation) throws Throwable { return builder.build(); } }); - transactionRunner = new TransactionRunnerImpl(session, rpc, 1); + transactionRunner = new TransactionRunnerImpl(session); when(rpc.commitAsync(Mockito.any(CommitRequest.class), Mockito.anyMap())) .thenReturn( ApiFutures.immediateFuture( @@ -323,7 +323,7 @@ public void prepareReadWriteTransaction() { } }; session.setCurrentSpan(mock(Span.class)); - TransactionRunnerImpl runner = new TransactionRunnerImpl(session, rpc, 10); + TransactionRunnerImpl runner = new TransactionRunnerImpl(session); runner.setSpan(mock(Span.class)); assertThat(usedInlinedBegin).isFalse(); runner.run( @@ -356,7 +356,7 @@ private long[] batchDmlException(int status) { .thenReturn( ApiFutures.immediateFuture(ByteString.copyFromUtf8(UUID.randomUUID().toString()))); when(session.getName()).thenReturn(SessionId.of("p", "i", "d", "test").getName()); - TransactionRunnerImpl runner = new TransactionRunnerImpl(session, rpc, 10); + TransactionRunnerImpl runner = new TransactionRunnerImpl(session); runner.setSpan(mock(Span.class)); ExecuteBatchDmlResponse response1 = ExecuteBatchDmlResponse.newBuilder() diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionImplTest.java index 88f942122a..113738bf05 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionImplTest.java @@ -37,6 +37,7 @@ import com.google.api.gax.longrunning.OperationFuture; import com.google.cloud.NoCredentials; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.CommitResponse; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.ForwardingResultSet; @@ -84,7 +85,7 @@ public class ConnectionImplTest { static class SimpleTransactionManager implements TransactionManager { private TransactionState state; - private Timestamp commitTimestamp; + private CommitResponse commitResponse; private TransactionContext txContext; private SimpleTransactionManager(TransactionContext txContext) { @@ -99,7 +100,7 @@ public TransactionContext begin() { @Override public void commit() { - commitTimestamp = Timestamp.now(); + commitResponse = new CommitResponse(Timestamp.ofTimeSecondsAndNanos(1, 1)); state = TransactionState.COMMITTED; } @@ -115,7 +116,12 @@ public TransactionContext resetForRetry() { @Override public Timestamp getCommitTimestamp() { - return commitTimestamp; + return commitResponse == null ? null : commitResponse.getCommitTimestamp(); + } + + @Override + public CommitResponse getCommitResponse() { + return commitResponse; } @Override @@ -317,15 +323,15 @@ public Timestamp answer(InvocationOnMock invocation) { public TransactionRunner answer(InvocationOnMock invocation) { TransactionRunner runner = new TransactionRunner() { - private Timestamp commitTimestamp; + private CommitResponse commitResponse; @Override public T run(TransactionCallable callable) { - this.commitTimestamp = Timestamp.now(); - TransactionContext tx = mock(TransactionContext.class); - when(tx.executeUpdate(Statement.of(UPDATE))).thenReturn(1L); + commitResponse = new CommitResponse(Timestamp.ofTimeSecondsAndNanos(1, 1)); + TransactionContext transaction = mock(TransactionContext.class); + when(transaction.executeUpdate(Statement.of(UPDATE))).thenReturn(1L); try { - return callable.run(tx); + return callable.run(transaction); } catch (Exception e) { throw SpannerExceptionFactory.newSpannerException(e); } @@ -333,7 +339,12 @@ public T run(TransactionCallable callable) { @Override public Timestamp getCommitTimestamp() { - return commitTimestamp; + return commitResponse == null ? null : commitResponse.getCommitTimestamp(); + } + + @Override + public CommitResponse getCommitResponse() { + return commitResponse; } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadWriteTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadWriteTransactionTest.java index 1e094eaeb6..de5c9fbdeb 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadWriteTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadWriteTransactionTest.java @@ -32,6 +32,7 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.AbortedException; +import com.google.cloud.spanner.CommitResponse; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode; @@ -69,7 +70,7 @@ private enum CommitBehavior { private static class SimpleTransactionManager implements TransactionManager { private TransactionState state; - private Timestamp commitTimestamp; + private CommitResponse commitResponse; private TransactionContext txContext; private CommitBehavior commitBehavior; @@ -88,7 +89,7 @@ public TransactionContext begin() { public void commit() { switch (commitBehavior) { case SUCCEED: - commitTimestamp = Timestamp.now(); + commitResponse = new CommitResponse(Timestamp.ofTimeSecondsAndNanos(1, 1)); state = TransactionState.COMMITTED; break; case FAIL: @@ -115,7 +116,11 @@ public TransactionContext resetForRetry() { @Override public Timestamp getCommitTimestamp() { - return commitTimestamp; + return commitResponse == null ? null : commitResponse.getCommitTimestamp(); + } + + public CommitResponse getCommitResponse() { + return commitResponse; } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SingleUseTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SingleUseTransactionTest.java index 76ef62a21a..af761d4cbe 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SingleUseTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SingleUseTransactionTest.java @@ -29,6 +29,7 @@ import com.google.api.gax.longrunning.OperationFuture; import com.google.cloud.Timestamp; import com.google.cloud.spanner.AsyncResultSet; +import com.google.cloud.spanner.CommitResponse; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.Key; @@ -97,7 +98,7 @@ static StatementTimeout timeout(long timeout, TimeUnit unit) { private static class SimpleTransactionManager implements TransactionManager { private TransactionState state; - private Timestamp commitTimestamp; + private CommitResponse commitResponse; private TransactionContext txContext; private CommitBehavior commitBehavior; @@ -116,7 +117,7 @@ public TransactionContext begin() { public void commit() { switch (commitBehavior) { case SUCCEED: - commitTimestamp = Timestamp.now(); + commitResponse = new CommitResponse(Timestamp.ofTimeSecondsAndNanos(1, 1)); state = TransactionState.COMMITTED; break; case FAIL: @@ -143,7 +144,12 @@ public TransactionContext resetForRetry() { @Override public Timestamp getCommitTimestamp() { - return commitTimestamp; + return commitResponse.getCommitTimestamp(); + } + + @Override + public CommitResponse getCommitResponse() { + return commitResponse; } @Override @@ -387,7 +393,7 @@ public Long answer(InvocationOnMock invocation) throws Throwable { public TransactionRunner answer(InvocationOnMock invocation) { TransactionRunner runner = new TransactionRunner() { - private Timestamp commitTimestamp; + private CommitResponse commitResponse; @Override public T run(TransactionCallable callable) { @@ -398,7 +404,8 @@ public T run(TransactionCallable callable) { } catch (Exception e) { throw SpannerExceptionFactory.newSpannerException(e); } - this.commitTimestamp = Timestamp.now(); + commitResponse = + new CommitResponse(Timestamp.ofTimeSecondsAndNanos(1, 1)); return res; } else if (commitBehavior == CommitBehavior.FAIL) { throw SpannerExceptionFactory.newSpannerException( @@ -411,10 +418,17 @@ public T run(TransactionCallable callable) { @Override public Timestamp getCommitTimestamp() { - if (commitTimestamp == null) { + if (commitResponse == null) { throw new IllegalStateException("no commit timestamp"); } - return commitTimestamp; + return commitResponse.getCommitTimestamp(); + } + + public CommitResponse getCommitResponse() { + if (commitResponse == null) { + throw new IllegalStateException("no commit response"); + } + return commitResponse; } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java index 721536cb6b..5bdb685c8b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java @@ -16,15 +16,25 @@ package com.google.cloud.spanner.it; +import static com.google.cloud.spanner.SpannerApiFutures.get; +import static com.google.cloud.spanner.testing.EmulatorSpannerHelper.isUsingEmulator; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +import static org.junit.Assume.assumeFalse; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; +import com.google.cloud.spanner.AbortedException; import com.google.cloud.spanner.AsyncResultSet; import com.google.cloud.spanner.AsyncResultSet.CallbackResponse; import com.google.cloud.spanner.AsyncResultSet.ReadyCallback; import com.google.cloud.spanner.AsyncRunner; import com.google.cloud.spanner.AsyncRunner.AsyncWork; +import com.google.cloud.spanner.AsyncTransactionManager; +import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionFunction; +import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture; import com.google.cloud.spanner.Database; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.DatabaseId; @@ -35,6 +45,7 @@ import com.google.cloud.spanner.KeyRange; import com.google.cloud.spanner.KeySet; import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Options; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.Struct; @@ -51,6 +62,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -86,7 +98,17 @@ public static void setUpDatabase() { "CREATE INDEX TestTableByValue ON TestTable(StringValue)", "CREATE INDEX TestTableByValueDesc ON TestTable(StringValue DESC)"); client = env.getTestHelper().getDatabaseClient(db); + executor = Executors.newSingleThreadExecutor(); + } + @AfterClass + public static void cleanup() { + executor.shutdown(); + } + + @Before + public void setupData() { + client.write(Arrays.asList(Mutation.delete(TABLE_NAME, KeySet.all()))); // Includes k0..k14. Note that strings k{10,14} sort between k1 and k2. List mutations = new ArrayList<>(); for (int i = 0; i < 15; ++i) { @@ -99,12 +121,6 @@ public static void setUpDatabase() { .build()); } client.write(mutations); - executor = Executors.newSingleThreadExecutor(); - } - - @AfterClass - public static void cleanup() { - executor.shutdown(); } @Test @@ -302,4 +318,65 @@ public ApiFuture doWorkAsync(TransactionContext txn) { assertThat(client.singleUse().readRow("TestTable", Key.of("k999"), ALL_COLUMNS)).isNull(); } } + + @Test + public void testAsyncRunnerReturnsCommitStats() { + assumeFalse("Emulator does not return commit statistics", isUsingEmulator()); + AsyncRunner runner = client.runAsync(Options.commitStats()); + runner.runAsync( + new AsyncWork() { + @Override + public ApiFuture doWorkAsync(TransactionContext transaction) { + transaction.buffer( + Mutation.newInsertOrUpdateBuilder(TABLE_NAME) + .set("Key") + .to("k_commit_stats") + .set("StringValue") + .to("Should return commit stats") + .build()); + return ApiFutures.immediateFuture(null); + } + }, + executor); + assertNotNull(get(runner.getCommitResponse()).getCommitStats()); + // MutationCount = 2 columns + 2 secondary indexes. + assertEquals(4L, get(runner.getCommitResponse()).getCommitStats().getMutationCount()); + } + + @Test + public void testAsyncTransactionManagerReturnsCommitStats() throws InterruptedException { + assumeFalse("Emulator does not return commit statistics", isUsingEmulator()); + try (AsyncTransactionManager manager = client.transactionManagerAsync(Options.commitStats())) { + TransactionContextFuture context = manager.beginAsync(); + while (true) { + try { + get( + context + .then( + new AsyncTransactionFunction() { + @Override + public ApiFuture apply(TransactionContext transaction, Void input) + throws Exception { + transaction.buffer( + Mutation.newInsertOrUpdateBuilder(TABLE_NAME) + .set("Key") + .to("k_commit_stats") + .set("StringValue") + .to("Should return commit stats") + .build()); + return ApiFutures.immediateFuture(null); + } + }, + executor) + .commitAsync()); + assertNotNull(get(manager.getCommitResponse()).getCommitStats()); + assertEquals(4L, get(manager.getCommitResponse()).getCommitStats().getMutationCount()); + break; + } catch (AbortedException e) { + Thread.sleep(e.getRetryDelayInMillis()); + context = manager.resetForRetryAsync(); + } + } + } + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerTest.java index 4d65af67ed..3ea4a06771 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerTest.java @@ -18,6 +18,7 @@ import static com.google.cloud.spanner.testing.EmulatorSpannerHelper.isUsingEmulator; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import static org.junit.Assume.assumeFalse; @@ -29,6 +30,7 @@ import com.google.cloud.spanner.Key; import com.google.cloud.spanner.KeySet; import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Options; import com.google.cloud.spanner.ParallelIntegrationTest; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Struct; @@ -212,4 +214,31 @@ public void abortAndRetry() throws InterruptedException { manager2.close(); } } + + @SuppressWarnings("resource") + @Test + public void testTransactionManagerReturnsCommitStats() throws InterruptedException { + assumeFalse("Emulator does not return commit statistics", isUsingEmulator()); + try (TransactionManager manager = client.transactionManager(Options.commitStats())) { + TransactionContext transaction = manager.begin(); + while (true) { + transaction.buffer( + Mutation.newInsertBuilder("T") + .set("K") + .to("KeyCommitStats") + .set("BoolValue") + .to(true) + .build()); + try { + manager.commit(); + assertNotNull(manager.getCommitResponse().getCommitStats()); + assertEquals(2L, manager.getCommitResponse().getCommitStats().getMutationCount()); + break; + } catch (AbortedException e) { + Thread.sleep(e.getRetryDelayInMillis()); + transaction = manager.resetForRetry(); + } + } + } + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java index 503f0ddf90..ed2bc564b0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java @@ -19,6 +19,8 @@ import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; import static com.google.cloud.spanner.testing.EmulatorSpannerHelper.isUsingEmulator; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import static org.junit.Assume.assumeFalse; @@ -33,6 +35,7 @@ import com.google.cloud.spanner.Key; import com.google.cloud.spanner.KeySet; import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Options; import com.google.cloud.spanner.ParallelIntegrationTest; import com.google.cloud.spanner.PartitionOptions; import com.google.cloud.spanner.ReadContext; @@ -630,4 +633,23 @@ public Long run(TransactionContext transaction) throws Exception { assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); } } + + @Test + public void testTransactionRunnerReturnsCommitStats() { + assumeFalse("Emulator does not return commit statistics", isUsingEmulator()); + final String key = uniqueKey(); + TransactionRunner runner = client.readWriteTransaction(Options.commitStats()); + runner.run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + transaction.buffer( + Mutation.newInsertBuilder("T").set("K").to(key).set("V").to(0).build()); + return null; + } + }); + assertNotNull(runner.getCommitResponse().getCommitStats()); + // MutationCount = 2 (2 columns). + assertEquals(2L, runner.getCommitResponse().getCommitStats().getMutationCount()); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWriteTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWriteTest.java index 6f11fc6b43..3cf9d387a1 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWriteTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWriteTest.java @@ -17,7 +17,9 @@ package com.google.cloud.spanner.it; import static com.google.cloud.spanner.SpannerMatchers.isSpannerException; +import static com.google.cloud.spanner.testing.EmulatorSpannerHelper.isUsingEmulator; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import static org.junit.Assume.assumeFalse; @@ -25,6 +27,7 @@ import com.google.cloud.ByteArray; import com.google.cloud.Date; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.CommitResponse; import com.google.cloud.spanner.Database; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.ErrorCode; @@ -32,6 +35,7 @@ import com.google.cloud.spanner.Key; import com.google.cloud.spanner.KeySet; import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Options; import com.google.cloud.spanner.ParallelIntegrationTest; import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.SpannerException; @@ -157,6 +161,44 @@ public void writeAtLeastOnce() { assertThat(row.getString(0)).isEqualTo("v1"); } + @Test + public void testWriteReturnsCommitStats() { + assumeFalse("Emulator does not return commit statistics", isUsingEmulator()); + CommitResponse response = + client.writeWithOptions( + Arrays.asList( + Mutation.newInsertOrUpdateBuilder("T") + .set("K") + .to(lastKey = uniqueString()) + .set("StringValue") + .to("v1") + .build()), + Options.commitStats()); + assertNotNull(response); + assertNotNull(response.getCommitTimestamp()); + assertNotNull(response.getCommitStats()); + assertEquals(2L, response.getCommitStats().getMutationCount()); + } + + @Test + public void testWriteAtLeastOnceReturnsCommitStats() { + assumeFalse("Emulator does not return commit statistics", isUsingEmulator()); + CommitResponse response = + client.writeAtLeastOnceWithOptions( + Arrays.asList( + Mutation.newInsertOrUpdateBuilder("T") + .set("K") + .to(lastKey = uniqueString()) + .set("StringValue") + .to("v1") + .build()), + Options.commitStats()); + assertNotNull(response); + assertNotNull(response.getCommitTimestamp()); + assertNotNull(response.getCommitStats()); + assertEquals(2L, response.getCommitStats().getMutationCount()); + } + @Test public void writeAlreadyExists() { client.write(