diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunner.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunner.java index 32be0411ed..21c9f60492 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunner.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunner.java @@ -57,17 +57,10 @@ interface AsyncWork { */ ApiFuture getCommitTimestamp(); - /** - * Indicates that the {@link AsyncRunner} should request the backend to return {@link - * CommitStats}. The {@link CommitStats} can be retrieved by calling {@link #getCommitStats()} - * after the transaction has successfully committed. - */ - AsyncRunner withCommitStats(); - /** * Returns the {@link CommitStats} of this transaction. This method may only be called after the - * transaction has successfully committed, and only if {@link #withCommitStats()} was called - * before executing the transaction. + * transaction has successfully committed, and only if {@link Options#commitStats()} was specified + * for the transaction. */ ApiFuture getCommitStats(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunnerImpl.java index 06383dc0dc..5b11c53820 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunnerImpl.java @@ -26,12 +26,13 @@ class AsyncRunnerImpl implements AsyncRunner { private final TransactionRunnerImpl delegate; + private final Options options; private final SettableApiFuture commitTimestamp = SettableApiFuture.create(); private final SettableApiFuture commitStats = SettableApiFuture.create(); - private boolean returnCommitStats; - AsyncRunnerImpl(TransactionRunnerImpl delegate) { - this.delegate = delegate; + AsyncRunnerImpl(TransactionRunnerImpl delegate, Options options) { + this.delegate = Preconditions.checkNotNull(delegate); + this.options = Preconditions.checkNotNull(options); } @Override @@ -75,7 +76,7 @@ private void setCommitTimestampAndStats() { } catch (Throwable t) { commitTimestamp.setException(t); } - if (returnCommitStats) { + if (options.withCommitStats()) { try { commitStats.set(delegate.getCommitStats()); } catch (Throwable t) { @@ -89,18 +90,11 @@ public ApiFuture getCommitTimestamp() { return commitTimestamp; } - @Override - public AsyncRunner withCommitStats() { - delegate.withCommitStats(); - returnCommitStats = true; - return this; - } - @Override public ApiFuture getCommitStats() { Preconditions.checkState( - returnCommitStats, - "getCommitStats may only be invoked if withCommitStats has been invoked before executing the transaction"); + options.withCommitStats(), + "getCommitStats may only be invoked if Options.commitStats() was specified for the transaction"); return commitStats; } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java index 8ceb486b9e..7b55eb1048 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManager.java @@ -191,17 +191,10 @@ public interface AsyncTransactionFunction { /** Returns the state of the transaction. */ TransactionState getState(); - /** - * Indicates that the {@link AsyncTransactionManager} should request the backend to return {@link - * CommitStats}. The {@link CommitStats} can be retrieved by calling {@link #getCommitStats()} - * after the transaction has successfully committed. - */ - AsyncTransactionManager withCommitStats(); - /** * Returns the {@link CommitStats} of this transaction. This method may only be called after the - * transaction has successfully committed, and only if {@link #withCommitStats()} was called - * before committing the transaction. + * transaction has successfully committed, and only if {@link Options#commitStats()} was specified + * for the transaction. */ ApiFuture getCommitStats(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java index 192990a888..1da763a6fb 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java @@ -37,16 +37,19 @@ final class AsyncTransactionManagerImpl private static final Tracer tracer = Tracing.getTracer(); private final SessionImpl session; + private final Options options; private Span span; - private boolean returnCommitStats; private TransactionRunnerImpl.TransactionContextImpl txn; private TransactionState txnState; private final SettableApiFuture commitTimestamp = SettableApiFuture.create(); private SettableApiFuture commitStats; - AsyncTransactionManagerImpl(SessionImpl session, Span span) { + AsyncTransactionManagerImpl(SessionImpl session, Span span, Options options) { + Preconditions.checkNotNull(session); + Preconditions.checkNotNull(options); this.session = session; + this.options = options; this.span = span; } @@ -55,20 +58,14 @@ public void setSpan(Span span) { this.span = span; } - @Override - public AsyncTransactionManager withCommitStats() { - this.returnCommitStats = true; - return this; - } - @Override public ApiFuture getCommitStats() { Preconditions.checkState( txnState == TransactionState.COMMITTED, "getCommitStats can only be invoked if the transaction committed successfully"); Preconditions.checkState( - returnCommitStats, - "getCommitStats can only be invoked if withCommitStats() was invoked before committing the transaction"); + options.withCommitStats(), + "getCommitStats can only be invoked if Options.commitStats() was specified for the transaction"); return commitStats; } @@ -138,8 +135,8 @@ public ApiFuture commitAsync() { SpannerExceptionFactory.newSpannerException( ErrorCode.ABORTED, "Transaction already aborted")); } - ApiFuture res = txn.commitAsync(returnCommitStats); - if (returnCommitStats) { + ApiFuture res = txn.commitAsync(options.withCommitStats()); + if (options.withCommitStats()) { commitStats = SettableApiFuture.create(); } txnState = TransactionState.COMMITTED; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java index 290265da83..c8f350634a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java @@ -17,6 +17,7 @@ package com.google.cloud.spanner; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Options.TransactionOption; /** * Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An @@ -24,6 +25,21 @@ */ public interface DatabaseClient { + /** + * Response for write methods that return both a commit {@link Timestamp} and {@link CommitStats}. + */ + interface WriteResponse { + /** The commit timestamp of the transaction. */ + Timestamp getCommitTimestamp(); + + /** + * The commit statistics of the transaction (if requested). + * + * @throws IllegalStateException if no {@link CommitStats} were requested for the transaction. + */ + CommitStats getCommitStats(); + } + /** * Writes the given mutations atomically to the database. * @@ -33,7 +49,7 @@ public interface DatabaseClient { * is not possible to know whether the mutations were applied without performing a subsequent * database operation, but the mutations will have been applied at most once. * - *

Example of blind write. + *

Example of blind write that returns {@link CommitStats}. * *

{@code
    * long singerId = my_singer_id;
@@ -45,12 +61,14 @@ public interface DatabaseClient {
    *         .set("LastName")
    *         .to("Joel")
    *         .build();
-   * dbClient.write(Collections.singletonList(mutation));
+   * dbClient.write(Collections.singletonList(mutation), Options.commitStats());
    * }
* - * @return the timestamp at which the write was committed + * @return the timestamp at which the write was committed and the {@link CommitStats} if those + * were requested. */ - Timestamp write(Iterable mutations) throws SpannerException; + WriteResponse write(Iterable mutations, TransactionOption... options) + throws SpannerException; /** * Writes the given mutations atomically to the database without replay protection. @@ -64,7 +82,7 @@ public interface DatabaseClient { * requires two RPCs (one of which may be performed in advance), and so this method may be * appropriate for latency sensitive and/or high throughput blind writing. * - *

Example of unprotected blind write. + *

Example of unprotected blind write that returns {@link CommitStats}. * *

{@code
    * long singerId = my_singer_id;
@@ -76,35 +94,13 @@ public interface DatabaseClient {
    *         .set("LastName")
    *         .to("Joel")
    *         .build();
-   * dbClient.writeAtLeastOnce(Collections.singletonList(mutation));
+   * dbClient.writeAtLeastOnce(Collections.singletonList(mutation), Options.commitStats());
    * }
* - * @return the timestamp at which the write was committed - */ - Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException; - - /** - * Response for write methods that return both a commit {@link Timestamp} and {@link CommitStats}. - */ - interface WriteResponse { - /** The commit timestamp of the transaction. */ - Timestamp getCommitTimestamp(); - - /** The commit statistics of the transaction. */ - CommitStats getCommitStats(); - } - - /** - * Same as {@link #write(Iterable)}, but requests the backend to return both a commit timestamp - * and {@link CommitStats}. - */ - WriteResponse writeWithCommitStats(Iterable mutations) throws SpannerException; - - /** - * Same as {@link #writeAtLeastOnce(Iterable)}, but requests the backend to return both a commit - * timestamp and {@link CommitStats}. + * @return the timestamp at which the write was committed and the {@link CommitStats} if those + * were requested. */ - WriteResponse writeAtLeastOnceWithCommitStats(Iterable mutations) + WriteResponse writeAtLeastOnce(Iterable mutations, TransactionOption... options) throws SpannerException; /** @@ -270,7 +266,7 @@ WriteResponse writeAtLeastOnceWithCommitStats(Iterable mutations) * }); * */ - TransactionRunner readWriteTransaction(); + TransactionRunner readWriteTransaction(TransactionOption... options); /** * Returns a transaction manager which allows manual management of transaction lifecycle. This API @@ -300,7 +296,7 @@ WriteResponse writeAtLeastOnceWithCommitStats(Iterable mutations) * } * } */ - TransactionManager transactionManager(); + TransactionManager transactionManager(TransactionOption... options); /** * Returns an asynchronous transaction runner for executing a single logical transaction with @@ -333,7 +329,7 @@ WriteResponse writeAtLeastOnceWithCommitStats(Iterable mutations) * executor); * */ - AsyncRunner runAsync(); + AsyncRunner runAsync(TransactionOption... options); /** * Returns an asynchronous transaction manager which allows manual management of transaction @@ -421,7 +417,7 @@ WriteResponse writeAtLeastOnceWithCommitStats(Iterable mutations) * } * } */ - AsyncTransactionManager transactionManagerAsync(); + AsyncTransactionManager transactionManagerAsync(TransactionOption... options); /** * Returns the lower bound of rows modified by this DML statement. diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index 458368ec24..3793ed1156 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -16,7 +16,7 @@ package com.google.cloud.spanner; -import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SpannerImpl.ClosedException; import com.google.common.annotations.VisibleForTesting; @@ -62,26 +62,7 @@ PooledSessionFuture getReadWriteSession() { } @Override - public Timestamp write(final Iterable mutations) throws SpannerException { - Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); - try (Scope s = tracer.withSpan(span)) { - return runWithSessionRetry( - SessionMode.READ_WRITE, - new Function() { - @Override - public Timestamp apply(Session session) { - return session.write(mutations); - } - }); - } catch (RuntimeException e) { - TraceUtil.setWithFailure(span, e); - throw e; - } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); - } - } - - public WriteResponse writeWithCommitStats(final Iterable mutations) + public WriteResponse write(final Iterable mutations, final TransactionOption... options) throws SpannerException { Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); try (Scope s = tracer.withSpan(span)) { @@ -90,27 +71,7 @@ public WriteResponse writeWithCommitStats(final Iterable mutations) new Function() { @Override public WriteResponse apply(Session session) { - return session.writeWithCommitStats(mutations); - } - }); - } catch (RuntimeException e) { - TraceUtil.setWithFailure(span, e); - throw e; - } finally { - span.end(TraceUtil.END_SPAN_OPTIONS); - } - } - - @Override - public Timestamp writeAtLeastOnce(final Iterable mutations) throws SpannerException { - Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); - try (Scope s = tracer.withSpan(span)) { - return runWithSessionRetry( - SessionMode.READ_WRITE, - new Function() { - @Override - public Timestamp apply(Session session) { - return session.writeAtLeastOnce(mutations); + return session.write(mutations, options); } }); } catch (RuntimeException e) { @@ -122,7 +83,8 @@ public Timestamp apply(Session session) { } @Override - public WriteResponse writeAtLeastOnceWithCommitStats(final Iterable mutations) + public WriteResponse writeAtLeastOnce( + final Iterable mutations, final TransactionOption... options) throws SpannerException { Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); try (Scope s = tracer.withSpan(span)) { @@ -131,7 +93,7 @@ public WriteResponse writeAtLeastOnceWithCommitStats(final Iterable mu new Function() { @Override public WriteResponse apply(Session session) { - return session.writeAtLeastOnceWithCommitStats(mutations); + return session.writeAtLeastOnce(mutations, options); } }); } catch (RuntimeException e) { @@ -209,10 +171,10 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { } @Override - public TransactionRunner readWriteTransaction() { + public TransactionRunner readWriteTransaction(TransactionOption... options) { Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); try (Scope s = tracer.withSpan(span)) { - return getReadWriteSession().readWriteTransaction(); + return getReadWriteSession().readWriteTransaction(options); } catch (RuntimeException e) { TraceUtil.setWithFailure(span, e); throw e; @@ -222,10 +184,10 @@ public TransactionRunner readWriteTransaction() { } @Override - public TransactionManager transactionManager() { + public TransactionManager transactionManager(TransactionOption... options) { Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); try (Scope s = tracer.withSpan(span)) { - return getReadWriteSession().transactionManager(); + return getReadWriteSession().transactionManager(options); } catch (RuntimeException e) { TraceUtil.endSpanWithFailure(span, e); throw e; @@ -233,10 +195,10 @@ public TransactionManager transactionManager() { } @Override - public AsyncRunner runAsync() { + public AsyncRunner runAsync(TransactionOption... options) { Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); try (Scope s = tracer.withSpan(span)) { - return getReadWriteSession().runAsync(); + return getReadWriteSession().runAsync(options); } catch (RuntimeException e) { TraceUtil.endSpanWithFailure(span, e); throw e; @@ -244,10 +206,10 @@ public AsyncRunner runAsync() { } @Override - public AsyncTransactionManager transactionManagerAsync() { + public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) { Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); try (Scope s = tracer.withSpan(span)) { - return getReadWriteSession().transactionManagerAsync(); + return getReadWriteSession().transactionManagerAsync(options); } catch (RuntimeException e) { TraceUtil.endSpanWithFailure(span, e); throw e; diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index 879b632d17..323ee6cfc9 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -24,6 +24,9 @@ public final class Options implements Serializable { private static final long serialVersionUID = 8067099123096783941L; + /** Marker interface to mark options applicable to read/write transactions. */ + public interface TransactionOption {} + /** Marker interface to mark options applicable to both Read and Query operations */ public interface ReadAndQueryOption extends ReadOption, QueryOption {} @@ -36,6 +39,11 @@ public interface QueryOption {} /** Marker interface to mark options applicable to list operations in admin API. */ public interface ListOption {} + /** Specifying this will cause the transaction to request {@link CommitStats} from the backend. */ + public static TransactionOption commitStats() { + return COMMIT_STATS_OPTION; + } + /** * Specifying this will cause the read to yield at most this many rows. This should be greater * than 0. @@ -106,6 +114,16 @@ public static ListOption filter(String filter) { return new FilterOption(filter); } + /** Option to request {@link CommitStats} for read/write transactions. */ + static final class CommitStatsOption extends InternalOption implements TransactionOption { + @Override + void appendToOptions(Options options) { + options.withCommitStats = true; + } + } + + static final CommitStatsOption COMMIT_STATS_OPTION = new CommitStatsOption(); + /** Option pertaining to flow control. */ static final class FlowControlOption extends InternalOption implements ReadAndQueryOption { final int prefetchChunks; @@ -133,6 +151,7 @@ void appendToOptions(Options options) { } } + private boolean withCommitStats; private Long limit; private Integer prefetchChunks; private Integer bufferRows; @@ -143,6 +162,10 @@ void appendToOptions(Options options) { // Construction is via factory methods below. private Options() {} + boolean withCommitStats() { + return withCommitStats; + } + boolean hasLimit() { return limit != null; } @@ -194,6 +217,9 @@ String filter() { @Override public String toString() { StringBuilder b = new StringBuilder(); + if (withCommitStats) { + b.append("withCommitStats: ").append(withCommitStats).append(' '); + } if (limit != null) { b.append("limit: ").append(limit).append(' '); } @@ -224,7 +250,8 @@ public boolean equals(Object o) { } Options that = (Options) o; - return (!hasLimit() && !that.hasLimit() + return Objects.equals(withCommitStats, that.withCommitStats) + && (!hasLimit() && !that.hasLimit() || hasLimit() && that.hasLimit() && Objects.equals(limit(), that.limit())) && (!hasPrefetchChunks() && !that.hasPrefetchChunks() || hasPrefetchChunks() @@ -243,6 +270,9 @@ public boolean equals(Object o) { @Override public int hashCode() { int result = 31; + if (withCommitStats) { + result = 31 * result + Boolean.valueOf(withCommitStats).hashCode(); + } if (limit != null) { result = 31 * result + limit.hashCode(); } @@ -264,6 +294,16 @@ public int hashCode() { return result; } + static Options fromTransactionOptions(TransactionOption... options) { + Options transactionOptions = new Options(); + for (TransactionOption option : options) { + if (option instanceof InternalOption) { + ((InternalOption) option).appendToOptions(transactionOptions); + } + } + return transactionOptions; + } + static Options fromReadOptions(ReadOption... options) { Options readOptions = new Options(); for (ReadOption option : options) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 1d99a544a2..121a3c3512 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -25,9 +25,11 @@ import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction; import com.google.cloud.spanner.AbstractReadContext.SingleReadContext; import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction; +import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.SessionClient.SessionId; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.common.base.Preconditions; import com.google.common.base.Ticker; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; @@ -88,14 +90,22 @@ private static class WriteResponseImpl implements WriteResponse { private final Timestamp commitTimestamp; private final CommitStats commitStats; - private WriteResponseImpl(TransactionRunner runner) { + private WriteResponseImpl(TransactionRunner runner, Options options) { this.commitTimestamp = runner.getCommitTimestamp(); - this.commitStats = runner.getCommitStats(); + if (options.withCommitStats()) { + this.commitStats = runner.getCommitStats(); + } else { + this.commitStats = null; + } } private WriteResponseImpl(CommitResponse response) { this.commitTimestamp = Timestamp.fromProto(response.getCommitTimestamp()); - this.commitStats = CommitStats.fromProto(response.getCommitStats()); + if (response.hasCommitStats()) { + this.commitStats = CommitStats.fromProto(response.getCommitStats()); + } else { + this.commitStats = null; + } } @Override @@ -105,6 +115,9 @@ public Timestamp getCommitTimestamp() { @Override public CommitStats getCommitStats() { + Preconditions.checkState( + this.commitStats != null, + "No CommitStats were requested for the transaction. Use Options.commitStats() to request CommitStats for a read/write transaction."); return this.commitStats; } } @@ -147,20 +160,9 @@ public long executePartitionedUpdate(Statement stmt) { } @Override - public Timestamp write(Iterable mutations) throws SpannerException { - return write(mutations, false).getCommitTimestamp(); - } - - @Override - public WriteResponse writeWithCommitStats(Iterable mutations) throws SpannerException { - return new WriteResponseImpl(write(mutations, true)); - } - - private TransactionRunner write(Iterable mutations, boolean withCommitStats) { - TransactionRunner runner = readWriteTransaction(); - if (withCommitStats) { - runner = runner.withCommitStats(); - } + public WriteResponse write(Iterable mutations, TransactionOption... options) + throws SpannerException { + TransactionRunner runner = readWriteTransaction(options); final Collection finalMutations = mutations instanceof java.util.Collection ? (Collection) mutations @@ -173,28 +175,20 @@ public Void run(TransactionContext ctx) { return null; } }); - return runner; + return new WriteResponseImpl(runner, Options.fromTransactionOptions(options)); } @Override - public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { - return Timestamp.fromProto(writeAtLeastOnce(mutations, false).getCommitTimestamp()); - } - - @Override - public WriteResponse writeAtLeastOnceWithCommitStats(Iterable mutations) + public WriteResponse writeAtLeastOnce(Iterable mutations, TransactionOption... options) throws SpannerException { - return new WriteResponseImpl(writeAtLeastOnce(mutations, true)); - } - - private CommitResponse writeAtLeastOnce(Iterable mutations, boolean withCommitStats) { setActive(null); + Options opts = Options.fromTransactionOptions(options); List mutationsProto = new ArrayList<>(); Mutation.toProto(mutations, mutationsProto); final CommitRequest request = CommitRequest.newBuilder() .setSession(name) - .setReturnCommitStats(withCommitStats) + .setReturnCommitStats(opts.withCommitStats()) .addAllMutations(mutationsProto) .setSingleUseTransaction( TransactionOptions.newBuilder() @@ -202,7 +196,7 @@ private CommitResponse writeAtLeastOnce(Iterable mutations, boolean wi .build(); Span span = tracer.spanBuilder(SpannerImpl.COMMIT).startSpan(); try (Scope s = tracer.withSpan(span)) { - return spanner.getRpc().commit(request, options); + return new WriteResponseImpl(spanner.getRpc().commit(request, this.options)); } catch (IllegalArgumentException e) { TraceUtil.setWithFailure(span, e); throw newSpannerException(ErrorCode.INTERNAL, "Could not parse commit timestamp", e); @@ -272,26 +266,25 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { } @Override - public TransactionRunner readWriteTransaction() { - return setActive( - new TransactionRunnerImpl(this, spanner.getRpc(), spanner.getDefaultPrefetchChunks())); + public TransactionRunner readWriteTransaction(TransactionOption... options) { + return setActive(new TransactionRunnerImpl(this, Options.fromTransactionOptions(options))); } @Override - public AsyncRunner runAsync() { - return new AsyncRunnerImpl( - setActive( - new TransactionRunnerImpl(this, spanner.getRpc(), spanner.getDefaultPrefetchChunks()))); + public AsyncRunner runAsync(TransactionOption... options) { + Options opts = Options.fromTransactionOptions(options); + return new AsyncRunnerImpl(setActive(new TransactionRunnerImpl(this, opts)), opts); } @Override - public TransactionManager transactionManager() { - return new TransactionManagerImpl(this, currentSpan); + public TransactionManager transactionManager(TransactionOption... options) { + return new TransactionManagerImpl(this, currentSpan, Options.fromTransactionOptions(options)); } @Override - public AsyncTransactionManagerImpl transactionManagerAsync() { - return new AsyncTransactionManagerImpl(this, currentSpan); + public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... options) { + return new AsyncTransactionManagerImpl( + this, currentSpan, Options.fromTransactionOptions(options)); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index 0d8f23a3c6..b1a55e89de 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -47,6 +47,7 @@ import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.Options.QueryOption; import com.google.cloud.spanner.Options.ReadOption; +import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.SessionClient.SessionConsumer; import com.google.cloud.spanner.SpannerException.ResourceNotFoundException; import com.google.cloud.spanner.SpannerImpl.ClosedException; @@ -720,21 +721,20 @@ public void close() { private TransactionManager delegate; private final SessionPool sessionPool; private PooledSessionFuture session; - private boolean returnCommitStats; + private final TransactionOption[] options; private boolean closed; private boolean restartedAfterSessionNotFound; - AutoClosingTransactionManager(SessionPool sessionPool, PooledSessionFuture session) { - this.sessionPool = sessionPool; - this.session = session; + AutoClosingTransactionManager( + SessionPool sessionPool, PooledSessionFuture session, TransactionOption... options) { + this.sessionPool = Preconditions.checkNotNull(sessionPool); + this.session = Preconditions.checkNotNull(session); + this.options = Preconditions.checkNotNull(options); } @Override public TransactionContext begin() { - this.delegate = session.get().transactionManager(); - if (returnCommitStats) { - this.delegate.withCommitStats(); - } + this.delegate = session.get().transactionManager(options); while (true) { try { return internalBegin(); @@ -747,10 +747,7 @@ public TransactionContext begin() { @SuppressWarnings("resource") private void refreshDelegateTransactionManager() { - delegate = session.get().delegate.transactionManager(); - if (returnCommitStats) { - delegate.withCommitStats(); - } + delegate = session.get().delegate.transactionManager(options); } private TransactionContext internalBegin() { @@ -813,12 +810,6 @@ public Timestamp getCommitTimestamp() { return delegate.getCommitTimestamp(); } - @Override - public TransactionManager withCommitStats() { - this.returnCommitStats = true; - return this; - } - @Override public CommitStats getCommitStats() { return delegate.getCommitStats(); @@ -855,17 +846,20 @@ public TransactionState getState() { */ private static final class SessionPoolTransactionRunner implements TransactionRunner { private final SessionPool sessionPool; + private final TransactionOption[] options; private PooledSessionFuture session; private TransactionRunner runner; - private SessionPoolTransactionRunner(SessionPool sessionPool, PooledSessionFuture session) { - this.sessionPool = sessionPool; - this.session = session; + private SessionPoolTransactionRunner( + SessionPool sessionPool, PooledSessionFuture session, TransactionOption... options) { + this.sessionPool = Preconditions.checkNotNull(sessionPool); + this.session = Preconditions.checkNotNull(session); + this.options = Preconditions.checkNotNull(options); } private TransactionRunner getRunner() { if (this.runner == null) { - this.runner = session.get().readWriteTransaction(); + this.runner = session.get().readWriteTransaction(options); } return runner; } @@ -881,7 +875,7 @@ public T run(TransactionCallable callable) { break; } catch (SessionNotFoundException e) { session = sessionPool.replaceReadWriteSession(e, session); - runner = session.get().delegate.readWriteTransaction(); + runner = session.get().delegate.readWriteTransaction(options); } } session.get().markUsed(); @@ -904,12 +898,6 @@ public TransactionRunner allowNestedTransaction() { return this; } - @Override - public TransactionRunner withCommitStats() { - getRunner().withCommitStats(); - return this; - } - @Override public CommitStats getCommitStats() { return getRunner().getCommitStats(); @@ -918,14 +906,21 @@ public CommitStats getCommitStats() { private static class SessionPoolAsyncRunner implements AsyncRunner { private final SessionPool sessionPool; + private final TransactionOption[] options; private volatile PooledSessionFuture session; private final SettableApiFuture commitTimestamp = SettableApiFuture.create(); - private final SettableApiFuture commitStats = SettableApiFuture.create(); - private boolean returnCommitStats; - - private SessionPoolAsyncRunner(SessionPool sessionPool, PooledSessionFuture session) { - this.sessionPool = sessionPool; - this.session = session; + private final SettableApiFuture commitStats; + + private SessionPoolAsyncRunner( + SessionPool sessionPool, PooledSessionFuture session, TransactionOption... options) { + this.sessionPool = Preconditions.checkNotNull(sessionPool); + this.session = Preconditions.checkNotNull(session); + this.options = options; + if (Options.fromTransactionOptions(options).withCommitStats()) { + commitStats = SettableApiFuture.create(); + } else { + commitStats = null; + } } @Override @@ -940,10 +935,7 @@ public void run() { AsyncRunner runner = null; while (true) { try { - runner = session.get().runAsync(); - if (returnCommitStats) { - runner.withCommitStats(); - } + runner = session.get().runAsync(options); r = runner.runAsync(work, MoreExecutors.directExecutor()).get(); break; } catch (ExecutionException e) { @@ -980,7 +972,7 @@ private void setCommitTimestampAndStats(AsyncRunner delegate) { } catch (Throwable t) { commitTimestamp.setException(t); } - if (returnCommitStats) { + if (commitStats != null) { try { commitStats.set(delegate.getCommitStats().get()); } catch (Throwable t) { @@ -994,16 +986,10 @@ public ApiFuture getCommitTimestamp() { return commitTimestamp; } - @Override - public AsyncRunner withCommitStats() { - this.returnCommitStats = true; - return this; - } - @Override public ApiFuture getCommitStats() { Preconditions.checkState( - returnCommitStats, + commitStats != null, "getCommitStats may only be invoked if withCommitStats has been invoked before executing the transaction"); return commitStats; } @@ -1155,38 +1141,20 @@ private void markCheckedOut() { } @Override - public Timestamp write(Iterable mutations) throws SpannerException { - try { - return get().write(mutations); - } finally { - close(); - } - } - - @Override - public WriteResponse writeWithCommitStats(Iterable mutations) + public WriteResponse write(Iterable mutations, TransactionOption... options) throws SpannerException { try { - return get().writeWithCommitStats(mutations); - } finally { - close(); - } - } - - @Override - public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { - try { - return get().writeAtLeastOnce(mutations); + return get().write(mutations, options); } finally { close(); } } @Override - public WriteResponse writeAtLeastOnceWithCommitStats(Iterable mutations) - throws SpannerException { + public WriteResponse writeAtLeastOnce( + Iterable mutations, TransactionOption... options) throws SpannerException { try { - return get().writeAtLeastOnceWithCommitStats(mutations); + return get().writeAtLeastOnce(mutations, options); } finally { close(); } @@ -1291,23 +1259,23 @@ private ReadOnlyTransaction internalReadOnlyTransaction( } @Override - public TransactionRunner readWriteTransaction() { - return new SessionPoolTransactionRunner(SessionPool.this, this); + public TransactionRunner readWriteTransaction(TransactionOption... options) { + return new SessionPoolTransactionRunner(SessionPool.this, this, options); } @Override - public TransactionManager transactionManager() { - return new AutoClosingTransactionManager(SessionPool.this, this); + public TransactionManager transactionManager(TransactionOption... options) { + return new AutoClosingTransactionManager(SessionPool.this, this, options); } @Override - public AsyncRunner runAsync() { - return new SessionPoolAsyncRunner(SessionPool.this, this); + public AsyncRunner runAsync(TransactionOption... options) { + return new SessionPoolAsyncRunner(SessionPool.this, this, options); } @Override - public AsyncTransactionManager transactionManagerAsync() { - return new SessionPoolAsyncTransactionManager(this); + public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) { + return new SessionPoolAsyncTransactionManager(this, options); } @Override @@ -1418,42 +1386,22 @@ void setAllowReplacing(boolean allowReplacing) { } @Override - public Timestamp write(Iterable mutations) throws SpannerException { - try { - markUsed(); - return delegate.write(mutations); - } catch (SpannerException e) { - throw lastException = e; - } - } - - @Override - public WriteResponse writeWithCommitStats(Iterable mutations) + public WriteResponse write(Iterable mutations, TransactionOption... options) throws SpannerException { try { markUsed(); - return delegate.writeWithCommitStats(mutations); - } catch (SpannerException e) { - throw lastException = e; - } - } - - @Override - public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { - try { - markUsed(); - return delegate.writeAtLeastOnce(mutations); + return delegate.write(mutations, options); } catch (SpannerException e) { throw lastException = e; } } @Override - public WriteResponse writeAtLeastOnceWithCommitStats(Iterable mutations) - throws SpannerException { + public WriteResponse writeAtLeastOnce( + Iterable mutations, TransactionOption... options) throws SpannerException { try { markUsed(); - return delegate.writeAtLeastOnceWithCommitStats(mutations); + return delegate.writeAtLeastOnce(mutations, options); } catch (SpannerException e) { throw lastException = e; } @@ -1500,18 +1448,18 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { } @Override - public TransactionRunner readWriteTransaction() { - return delegate.readWriteTransaction(); + public TransactionRunner readWriteTransaction(TransactionOption... options) { + return delegate.readWriteTransaction(options); } @Override - public AsyncRunner runAsync() { - return delegate.runAsync(); + public AsyncRunner runAsync(TransactionOption... options) { + return delegate.runAsync(options); } @Override - public AsyncTransactionManagerImpl transactionManagerAsync() { - return delegate.transactionManagerAsync(); + public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... options) { + return delegate.transactionManagerAsync(options); } @Override @@ -1582,8 +1530,8 @@ void markUsed() { } @Override - public TransactionManager transactionManager() { - return delegate.transactionManager(); + public TransactionManager transactionManager(TransactionOption... options) { + return delegate.transactionManager(options); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java index ceeea512b7..09594a7c0f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java @@ -22,6 +22,7 @@ import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager; import com.google.cloud.spanner.TransactionManager.TransactionState; @@ -39,7 +40,8 @@ class SessionPoolAsyncTransactionManager implements CommittableAsyncTransactionM private final SettableApiFuture delegate = SettableApiFuture.create(); - SessionPoolAsyncTransactionManager(PooledSessionFuture session) { + SessionPoolAsyncTransactionManager( + PooledSessionFuture session, final TransactionOption... options) { this.session = session; this.session.addListener( new Runnable() { @@ -47,7 +49,10 @@ class SessionPoolAsyncTransactionManager implements CommittableAsyncTransactionM public void run() { try { delegate.set( - SessionPoolAsyncTransactionManager.this.session.get().transactionManagerAsync()); + SessionPoolAsyncTransactionManager.this + .session + .get() + .transactionManagerAsync(options)); } catch (Throwable t) { delegate.setException(t); } @@ -240,26 +245,6 @@ public TransactionState getState() { } } - @Override - public AsyncTransactionManager withCommitStats() { - ApiFutures.addCallback( - delegate, - new ApiFutureCallback() { - @Override - public void onFailure(Throwable t) { - // Ignore this error as there is no underlying AsyncTransactionManager to instruct that - // it should return CommitStats. - } - - @Override - public void onSuccess(AsyncTransactionManagerImpl result) { - result.withCommitStats(); - } - }, - MoreExecutors.directExecutor()); - return this; - } - @Override public ApiFuture getCommitStats() { return ApiFutures.transformAsync( diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java index e2e6ba7471..2bd7b67c53 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManager.java @@ -90,13 +90,9 @@ public enum TransactionState { /** Returns the state of the transaction. */ TransactionState getState(); - /** Instructs the {@link TransactionManager} to request {@link CommitStats} from the backend. */ - TransactionManager withCommitStats(); - /** - * Returns the {@link CommitStats} if {@link #withCommitStats()} was called before the transaction - * was committed and the transaction committed successfully. Otherwise it will throw {@code - * IllegalStateException}. + * Returns the {@link CommitStats} if {@link Options#commitStats()} was specified for the + * transaction. Otherwise it will throw {@code IllegalStateException}. */ CommitStats getCommitStats(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java index c7875c8314..547828f7e7 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java @@ -29,27 +29,22 @@ final class TransactionManagerImpl implements TransactionManager, SessionTransac private static final Tracer tracer = Tracing.getTracer(); private final SessionImpl session; + private final Options options; private Span span; - private boolean returnCommitStats; private TransactionRunnerImpl.TransactionContextImpl txn; private TransactionState txnState; - TransactionManagerImpl(SessionImpl session, Span span) { + TransactionManagerImpl(SessionImpl session, Span span, Options options) { this.session = session; this.span = span; + this.options = options; } Span getSpan() { return span; } - @Override - public TransactionManager withCommitStats() { - this.returnCommitStats = true; - return this; - } - @Override public void setSpan(Span span) { this.span = span; @@ -78,7 +73,7 @@ public void commit() { ErrorCode.ABORTED, "Transaction already aborted"); } try { - txn.commit(returnCommitStats); + txn.commit(options.withCommitStats()); txnState = TransactionState.COMMITTED; } catch (AbortedException e1) { txnState = TransactionState.ABORTED; @@ -129,8 +124,8 @@ public CommitStats getCommitStats() { txnState == TransactionState.COMMITTED, "getCommitStats can only be invoked if the transaction committed successfully"); Preconditions.checkState( - returnCommitStats, - "getCommitStats can only be invoked if withCommitStats() was invoked before committing the transaction"); + options.withCommitStats(), + "getCommitStats can only be invoked if Options.commitStats() was specified for the transaction"); return txn.commitStats(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunner.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunner.java index 928079cdba..6ba3deaff2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunner.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunner.java @@ -92,17 +92,10 @@ interface TransactionCallable { */ TransactionRunner allowNestedTransaction(); - /** - * Indicates that the {@link TransactionRunner} should request the backend to return {@link - * CommitStats}. The {@link CommitStats} can be retrieved by calling {@link #getCommitStats()} - * after the transaction has successfully committed. - */ - TransactionRunner withCommitStats(); - /** * Returns the {@link CommitStats} of this transaction. This method may only be called after the - * transaction has successfully committed, and only if {@link #withCommitStats()} was called - * before executing the transaction. + * transaction has successfully committed, and only if {@link Options#commitStats()} was specified + * for the transaction. */ CommitStats getCommitStats(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 2d0a731d2f..3aafc251dc 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -30,8 +30,8 @@ import com.google.cloud.spanner.Options.QueryOption; import com.google.cloud.spanner.Options.ReadOption; import com.google.cloud.spanner.SessionImpl.SessionTransaction; -import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; @@ -635,8 +635,8 @@ public ListenableAsyncResultSet executeQueryAsync( } private boolean blockNestedTxn = true; - private boolean returnCommitStats = false; private final SessionImpl session; + private final Options options; private Span span; private TransactionContextImpl txn; private volatile boolean isValid = true; @@ -647,14 +647,9 @@ public TransactionRunner allowNestedTransaction() { return this; } - @Override - public TransactionRunner withCommitStats() { - returnCommitStats = true; - return this; - } - - TransactionRunnerImpl(SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks) { - this.session = session; + TransactionRunnerImpl(SessionImpl session, Options options) { + this.session = Preconditions.checkNotNull(session); + this.options = Preconditions.checkNotNull(options); this.txn = session.newTransaction(); } @@ -741,7 +736,7 @@ public T call() { } try { - txn.commit(returnCommitStats); + txn.commit(options.withCommitStats()); span.addAnnotation( "Transaction Attempt Succeeded", ImmutableMap.of( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java index 7d3f051053..c7869f31e5 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java @@ -35,22 +35,24 @@ public class AsyncTransactionManagerImplTest { @Mock private SessionImpl session; @Mock TransactionRunnerImpl.TransactionContextImpl txn; - private AsyncTransactionManagerImpl manager; @Before public void setUp() { initMocks(this); - manager = new AsyncTransactionManagerImpl(session, mock(Span.class)); } @Test public void commitReturnsCommitStats() { - when(session.newTransaction()).thenReturn(txn); - when(txn.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null)); - Timestamp commitTimestamp = Timestamp.ofTimeMicroseconds(1); - when(txn.commitAsync(true)).thenReturn(ApiFutures.immediateFuture(commitTimestamp)); - manager.withCommitStats().beginAsync(); - manager.commitAsync(); - verify(txn).commitAsync(true); + try (AsyncTransactionManagerImpl manager = + new AsyncTransactionManagerImpl( + session, mock(Span.class), Options.fromTransactionOptions(Options.commitStats()))) { + when(session.newTransaction()).thenReturn(txn); + when(txn.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null)); + Timestamp commitTimestamp = Timestamp.ofTimeMicroseconds(1); + when(txn.commitAsync(true)).thenReturn(ApiFutures.immediateFuture(commitTimestamp)); + manager.beginAsync(); + manager.commitAsync(); + verify(txn).commitAsync(true); + } } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java index eba0c55c8f..ae1aad6de7 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java @@ -212,7 +212,8 @@ public boolean apply(AbstractMessage input) { @Test public void asyncTransactionManager_returnsCommitStats() throws Exception { - try (AsyncTransactionManager manager = client().transactionManagerAsync().withCommitStats()) { + try (AsyncTransactionManager manager = + client().transactionManagerAsync(Options.commitStats())) { TransactionContextFuture txn = manager.beginAsync(); while (true) { try { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 975058a524..91706009bc 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -163,9 +163,10 @@ public void writeWithCommitStats() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); WriteResponse response = - client.writeWithCommitStats( + client.write( Arrays.asList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), + Options.commitStats()); assertThat(response).isNotNull(); assertThat(response.getCommitTimestamp()).isNotNull(); assertThat(response.getCommitStats()).isNotNull(); @@ -185,9 +186,10 @@ public void writeAtLeastOnceWithCommitStats() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); WriteResponse response = - client.writeAtLeastOnceWithCommitStats( + client.writeAtLeastOnce( Arrays.asList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build())); + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build()), + Options.commitStats()); assertThat(response).isNotNull(); assertThat(response.getCommitTimestamp()).isNotNull(); assertThat(response.getCommitStats()).isNotNull(); @@ -465,7 +467,7 @@ public Void run(TransactionContext transaction) throws Exception { public void readWriteTransaction_returnsCommitStats() { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - TransactionRunner runner = client.readWriteTransaction().withCommitStats(); + TransactionRunner runner = client.readWriteTransaction(Options.commitStats()); runner.run( new TransactionCallable() { @Override @@ -522,7 +524,7 @@ public void runAsync_returnsCommitStats() throws Exception { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); ExecutorService executor = Executors.newSingleThreadExecutor(); - AsyncRunner runner = client.runAsync().withCommitStats(); + AsyncRunner runner = client.runAsync(Options.commitStats()); ApiFuture fut = runner.runAsync( new AsyncWork() { @@ -610,7 +612,7 @@ public void transactionManager() throws Exception { public void transactionManager_returnsCommitStats() throws Exception { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); - try (TransactionManager txManager = client.transactionManager().withCommitStats()) { + try (TransactionManager txManager = client.transactionManager(Options.commitStats())) { while (true) { TransactionContext tx = txManager.begin(); try { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java index fcf1c6e35b..489fe43187 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java @@ -23,7 +23,7 @@ import com.google.api.gax.core.NoCredentialsProvider; import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.cloud.NoCredentials; -import com.google.cloud.Timestamp; +import com.google.cloud.spanner.DatabaseClient.WriteResponse; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.cloud.spanner.v1.SpannerClient; @@ -1432,8 +1432,9 @@ public void write() throws InterruptedException { initReadWriteSessionPool(); invalidateSessionPool(); try { - Timestamp timestamp = client.write(Arrays.asList(Mutation.delete("FOO", KeySet.all()))); - assertThat(timestamp).isNotNull(); + WriteResponse response = client.write(Arrays.asList(Mutation.delete("FOO", KeySet.all()))); + assertThat(response).isNotNull(); + assertThat(response.getCommitTimestamp()).isNotNull(); assertThat(failOnInvalidatedSession).isFalse(); } catch (SessionNotFoundException e) { assertThat(failOnInvalidatedSession).isTrue(); @@ -1445,9 +1446,10 @@ public void writeAtLeastOnce() throws InterruptedException { initReadWriteSessionPool(); invalidateSessionPool(); try { - Timestamp timestamp = + WriteResponse response = client.writeAtLeastOnce(Arrays.asList(Mutation.delete("FOO", KeySet.all()))); - assertThat(timestamp).isNotNull(); + assertThat(response).isNotNull(); + assertThat(response.getCommitTimestamp()).isNotNull(); assertThat(failOnInvalidatedSession).isFalse(); } catch (SessionNotFoundException e) { assertThat(failOnInvalidatedSession).isTrue(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index c756a7898a..04496d88c0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -213,8 +213,10 @@ public void writeAtLeastOnce() throws ParseException { Mockito.when(rpc.commit(commit.capture(), Mockito.eq(options))).thenReturn(response); Timestamp timestamp = - session.writeAtLeastOnce( - Arrays.asList(Mutation.newInsertBuilder("T").set("C").to("x").build())); + session + .writeAtLeastOnce( + Arrays.asList(Mutation.newInsertBuilder("T").set("C").to("x").build())) + .getCommitTimestamp(); assertThat(timestamp.getSeconds()) .isEqualTo(utcTimeSeconds(2015, Calendar.OCTOBER, 1, 10, 54, 20)); assertThat(timestamp.getNanos()).isEqualTo(TimeUnit.MILLISECONDS.toNanos(21)); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index d5ea648bbd..d09bfac23e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -40,6 +40,7 @@ import com.google.api.core.ApiFutures; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.DatabaseClient.WriteResponse; import com.google.cloud.spanner.MetricRegistryTestUtils.FakeMetricRegistry; import com.google.cloud.spanner.MetricRegistryTestUtils.MetricsRecord; import com.google.cloud.spanner.MetricRegistryTestUtils.PointWithFunction; @@ -1360,7 +1361,7 @@ public void testSessionNotFoundReadWriteTransaction() { when(closedSession.newTransaction()).thenReturn(closedTransactionContext); when(closedSession.beginTransactionAsync()).thenThrow(sessionNotFound); TransactionRunnerImpl closedTransactionRunner = - new TransactionRunnerImpl(closedSession, rpc, 10); + new TransactionRunnerImpl(closedSession, Options.fromTransactionOptions()); closedTransactionRunner.setSpan(mock(Span.class)); when(closedSession.readWriteTransaction()).thenReturn(closedTransactionRunner); @@ -1374,7 +1375,7 @@ public void testSessionNotFoundReadWriteTransaction() { when(openSession.beginTransactionAsync()) .thenReturn(ApiFutures.immediateFuture(ByteString.copyFromUtf8("open-txn"))); TransactionRunnerImpl openTransactionRunner = - new TransactionRunnerImpl(openSession, mock(SpannerRpc.class), 10); + new TransactionRunnerImpl(openSession, Options.fromTransactionOptions()); openTransactionRunner.setSpan(mock(Span.class)); when(openSession.readWriteTransaction()).thenReturn(openTransactionRunner); @@ -1578,7 +1579,9 @@ public void testSessionNotFoundWrite() { when(closedSession.write(mutations)).thenThrow(sessionNotFound); final SessionImpl openSession = mockSession(); - when(openSession.write(mutations)).thenReturn(Timestamp.now()); + WriteResponse response = mock(WriteResponse.class); + when(response.getCommitTimestamp()).thenReturn(Timestamp.now()); + when(openSession.write(mutations)).thenReturn(response); doAnswer( new Answer() { @Override @@ -1630,7 +1633,9 @@ public void testSessionNotFoundWriteAtLeastOnce() { when(closedSession.writeAtLeastOnce(mutations)).thenThrow(sessionNotFound); final SessionImpl openSession = mockSession(); - when(openSession.writeAtLeastOnce(mutations)).thenReturn(Timestamp.now()); + WriteResponse response = mock(WriteResponse.class); + when(response.getCommitTimestamp()).thenReturn(Timestamp.now()); + when(openSession.writeAtLeastOnce(mutations)).thenReturn(response); doAnswer( new Answer() { @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java index 1bfea218f6..e67d524b52 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java @@ -78,7 +78,8 @@ public void release(ScheduledExecutorService exec) { @Before public void setUp() { initMocks(this); - manager = new TransactionManagerImpl(session, mock(Span.class)); + manager = + new TransactionManagerImpl(session, mock(Span.class), Options.fromTransactionOptions()); } @Test @@ -146,12 +147,16 @@ public void commitSucceeds() { @Test public void commitReturnsCommitStats() { - when(session.newTransaction()).thenReturn(txn); - Timestamp commitTimestamp = Timestamp.ofTimeMicroseconds(1); - when(txn.commitTimestamp()).thenReturn(commitTimestamp); - manager.withCommitStats().begin(); - manager.commit(); - verify(txn).commit(true); + try (TransactionManager manager = + new TransactionManagerImpl( + session, mock(Span.class), Options.fromTransactionOptions(Options.commitStats()))) { + when(session.newTransaction()).thenReturn(txn); + Timestamp commitTimestamp = Timestamp.ofTimeMicroseconds(1); + when(txn.commitTimestamp()).thenReturn(commitTimestamp); + manager.begin(); + manager.commit(); + verify(txn).commit(true); + } } @Test diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index 60b2063c95..28202fc398 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -97,7 +97,7 @@ public void setUp() { MockitoAnnotations.initMocks(this); firstRun = true; when(session.newTransaction()).thenReturn(txn); - transactionRunner = new TransactionRunnerImpl(session, rpc, 1); + transactionRunner = new TransactionRunnerImpl(session, Options.fromTransactionOptions()); when(rpc.commitAsync(Mockito.any(CommitRequest.class), Mockito.anyMap())) .thenReturn( ApiFutures.immediateFuture( @@ -195,15 +195,16 @@ public Void run(TransactionContext transaction) { @Test public void testReturnCommitStats() { - transactionRunner - .withCommitStats() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - return null; - } - }); + TransactionRunnerImpl transactionRunner = + new TransactionRunnerImpl(session, Options.fromTransactionOptions(Options.commitStats())); + transactionRunner.setSpan(mock(Span.class)); + transactionRunner.run( + new TransactionCallable() { + @Override + public Void run(TransactionContext transaction) throws Exception { + return null; + } + }); verify(txn).commit(true); } @@ -303,7 +304,8 @@ private long[] batchDmlException(int status) { .thenReturn( ApiFutures.immediateFuture(ByteString.copyFromUtf8(UUID.randomUUID().toString()))); when(session.getName()).thenReturn(SessionId.of("p", "i", "d", "test").getName()); - TransactionRunnerImpl runner = new TransactionRunnerImpl(session, rpc, 10); + TransactionRunnerImpl runner = + new TransactionRunnerImpl(session, Options.fromTransactionOptions()); runner.setSpan(mock(Span.class)); ExecuteBatchDmlResponse response1 = ExecuteBatchDmlResponse.newBuilder() diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionImplTest.java index 7da737ca3f..15702f6d5f 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ConnectionImplTest.java @@ -131,11 +131,6 @@ public void close() { } } - @Override - public TransactionManager withCommitStats() { - return this; - } - @Override public CommitStats getCommitStats() { throw new UnsupportedOperationException(); @@ -352,11 +347,6 @@ public TransactionRunner allowNestedTransaction() { return this; } - @Override - public TransactionRunner withCommitStats() { - return this; - } - @Override public CommitStats getCommitStats() { throw new UnsupportedOperationException(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadWriteTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadWriteTransactionTest.java index 44245fc9e1..355de39f6f 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadWriteTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/ReadWriteTransactionTest.java @@ -131,11 +131,6 @@ public void close() { } } - @Override - public TransactionManager withCommitStats() { - return this; - } - @Override public CommitStats getCommitStats() { throw new UnsupportedOperationException(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SingleUseTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SingleUseTransactionTest.java index 3024db5924..464596fd68 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SingleUseTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/connection/SingleUseTransactionTest.java @@ -159,11 +159,6 @@ public void close() { } } - @Override - public TransactionManager withCommitStats() { - return this; - } - @Override public CommitStats getCommitStats() { throw new UnsupportedOperationException(); @@ -433,11 +428,6 @@ public TransactionRunner allowNestedTransaction() { return this; } - @Override - public TransactionRunner withCommitStats() { - return this; - } - @Override public CommitStats getCommitStats() { throw new UnsupportedOperationException(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java index c894f0fbb3..ea818594b8 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java @@ -41,6 +41,7 @@ import com.google.cloud.spanner.KeyRange; import com.google.cloud.spanner.KeySet; import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Options; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.Struct; @@ -318,7 +319,7 @@ public ApiFuture doWorkAsync(TransactionContext txn) { @Test public void asyncRunnerReturnsCommitStats() { - AsyncRunner runner = client.runAsync().withCommitStats(); + AsyncRunner runner = client.runAsync(Options.commitStats()); runner.runAsync( new AsyncWork() { @Override @@ -341,7 +342,7 @@ public ApiFuture doWorkAsync(TransactionContext txn) { @Test public void asyncTransactionManagerReturnsCommitStats() throws InterruptedException { - try (AsyncTransactionManager mgr = client.transactionManagerAsync().withCommitStats()) { + try (AsyncTransactionManager mgr = client.transactionManagerAsync(Options.commitStats())) { TransactionContextFuture ctx = mgr.beginAsync(); while (true) { try { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBackupTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBackupTest.java index 0fbd0702ec..a2b7b05864 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBackupTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBackupTest.java @@ -306,14 +306,16 @@ public void testBackups() throws InterruptedException, ExecutionException { } // Insert some more data into db2 to get a timestamp from the server. Timestamp commitTs = - client.writeAtLeastOnce( - Arrays.asList( - Mutation.newInsertOrUpdateBuilder("BAR") - .set("ID") - .to(2L) - .set("NAME") - .to("TEST2") - .build())); + client + .writeAtLeastOnce( + Arrays.asList( + Mutation.newInsertOrUpdateBuilder("BAR") + .set("ID") + .to(2L) + .set("NAME") + .to("TEST2") + .build())) + .getCommitTimestamp(); // Test listing operations. // List all backups. diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITCommitTimestampTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITCommitTimestampTest.java index 187f1e9f12..2feb2f60a2 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITCommitTimestampTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITCommitTimestampTest.java @@ -89,7 +89,7 @@ public void deleteAllTestRecords() { } private Timestamp write(Mutation m) { - return client.write(Arrays.asList(m)); + return client.write(Arrays.asList(m)).getCommitTimestamp(); } private Struct readRow(DatabaseClient client, String table, Key key, String... columns) { @@ -255,11 +255,13 @@ private void alterColumnOption(String databaseId, String table, String opt) thro private void writeAndVerify(DatabaseClient client, Timestamp ts) { Timestamp commitTimestamp = - client.write( - Arrays.asList( - Mutation.newInsertOrUpdateBuilder("T1").set("ts").to(ts).build(), - Mutation.newInsertOrUpdateBuilder("T2").set("ts").to(ts).build(), - Mutation.newInsertOrUpdateBuilder("T3").set("ts").to(ts).build())); + client + .write( + Arrays.asList( + Mutation.newInsertOrUpdateBuilder("T1").set("ts").to(ts).build(), + Mutation.newInsertOrUpdateBuilder("T2").set("ts").to(ts).build(), + Mutation.newInsertOrUpdateBuilder("T3").set("ts").to(ts).build())) + .getCommitTimestamp(); if (ts == Value.COMMIT_TIMESTAMP) { ts = commitTimestamp; } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITReadOnlyTxnTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITReadOnlyTxnTest.java index db68b5ec88..f1c8e5e6b7 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITReadOnlyTxnTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITReadOnlyTxnTest.java @@ -101,7 +101,7 @@ private static void writeNewValue( String value = "v" + i; Mutation m = Mutation.newInsertOrUpdateBuilder(TABLE_NAME).set("StringValue").to(value).build(); long minCommitNanoTime = System.nanoTime(); - Timestamp timestamp = client.writeAtLeastOnce(Arrays.asList(m)); + Timestamp timestamp = client.writeAtLeastOnce(Arrays.asList(m)).getCommitTimestamp(); if (historyBuilder != null) { historyBuilder.add(new History(timestamp, value, minCommitNanoTime)); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerTest.java index d80589d4b8..91639c94cb 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerTest.java @@ -28,6 +28,7 @@ import com.google.cloud.spanner.IntegrationTestEnv; import com.google.cloud.spanner.Key; import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Options; import com.google.cloud.spanner.ParallelIntegrationTest; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Struct; @@ -207,7 +208,7 @@ public void abortAndRetry() throws InterruptedException { @SuppressWarnings("resource") @Test public void transactionManagerReturnsCommitStats() throws InterruptedException { - try (TransactionManager manager = client.transactionManager().withCommitStats()) { + try (TransactionManager manager = client.transactionManager(Options.commitStats())) { TransactionContext txn = manager.begin(); while (true) { txn.buffer( diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java index 42dc57918d..3a3e0dc81d 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java @@ -34,6 +34,7 @@ import com.google.cloud.spanner.Key; import com.google.cloud.spanner.KeySet; import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Options; import com.google.cloud.spanner.ParallelIntegrationTest; import com.google.cloud.spanner.PartitionOptions; import com.google.cloud.spanner.ReadContext; @@ -510,7 +511,7 @@ public Void run(TransactionContext transaction) throws SpannerException { @Test public void transactionRunnerReturnsCommitStats() { final String key = uniqueKey(); - TransactionRunner runner = client.readWriteTransaction().withCommitStats(); + TransactionRunner runner = client.readWriteTransaction(Options.commitStats()); runner.run( new TransactionCallable() { @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWriteTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWriteTest.java index c80056bc2d..38dcc4693e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWriteTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITWriteTest.java @@ -33,6 +33,7 @@ import com.google.cloud.spanner.Key; import com.google.cloud.spanner.KeySet; import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Options; import com.google.cloud.spanner.ParallelIntegrationTest; import com.google.cloud.spanner.ResultSet; import com.google.cloud.spanner.SpannerException; @@ -130,7 +131,7 @@ private static String uniqueString() { private String lastKey; private Timestamp write(Mutation m) { - return client.write(Arrays.asList(m)); + return client.write(Arrays.asList(m)).getCommitTimestamp(); } private Mutation.WriteBuilder baseInsert() { @@ -161,14 +162,15 @@ public void writeAtLeastOnce() { @Test public void writeReturnsCommitStats() { WriteResponse response = - client.writeWithCommitStats( + client.write( Arrays.asList( Mutation.newInsertOrUpdateBuilder("T") .set("K") .to(lastKey = uniqueString()) .set("StringValue") .to("v1") - .build())); + .build()), + Options.commitStats()); assertThat(response).isNotNull(); assertThat(response.getCommitTimestamp()).isNotNull(); assertThat(response.getCommitStats()).isNotNull(); @@ -178,14 +180,15 @@ public void writeReturnsCommitStats() { @Test public void writeAtLeastOnceReturnsCommitStats() { WriteResponse response = - client.writeAtLeastOnceWithCommitStats( + client.writeAtLeastOnce( Arrays.asList( Mutation.newInsertOrUpdateBuilder("T") .set("K") .to(lastKey = uniqueString()) .set("StringValue") .to("v1") - .build())); + .build()), + Options.commitStats()); assertThat(response).isNotNull(); assertThat(response.getCommitTimestamp()).isNotNull(); assertThat(response.getCommitStats()).isNotNull();