From b56892d50637a545456fe4217557ecbb9fd56a50 Mon Sep 17 00:00:00 2001 From: Olav Loite Date: Fri, 11 Dec 2020 11:36:46 +0100 Subject: [PATCH 1/2] feat: introduce TransactionOptions and UpdateOptions Adds TransactionOptions and UpdateOptions for read/write transactions and statements. These can be used in the future to specify options to affect how transactions and statements will be executed. --- .../clirr-ignored-differences.xml | 47 ++++++ .../cloud/spanner/AbstractReadContext.java | 12 +- .../spanner/AsyncTransactionManagerImpl.java | 7 +- .../google/cloud/spanner/DatabaseClient.java | 11 +- .../cloud/spanner/DatabaseClientImpl.java | 57 +++---- .../com/google/cloud/spanner/Options.java | 27 ++++ .../spanner/PartitionedDmlTransaction.java | 21 ++- .../com/google/cloud/spanner/SessionImpl.java | 58 +++---- .../com/google/cloud/spanner/SessionPool.java | 141 ++++++++++-------- .../SessionPoolAsyncTransactionManager.java | 11 +- .../cloud/spanner/TransactionContext.java | 9 +- .../cloud/spanner/TransactionManagerImpl.java | 9 +- .../cloud/spanner/TransactionRunnerImpl.java | 46 ++++-- .../spanner/AbstractReadContextTest.java | 6 +- .../cloud/spanner/BaseSessionPoolTest.java | 6 + .../cloud/spanner/DatabaseClientImplTest.java | 57 +++++++ .../google/cloud/spanner/SessionPoolTest.java | 12 +- .../spanner/TransactionContextImplTest.java | 1 + .../spanner/TransactionManagerImplTest.java | 18 +-- .../spanner/TransactionRunnerImplTest.java | 5 +- 20 files changed, 384 insertions(+), 177 deletions(-) diff --git a/google-cloud-spanner/clirr-ignored-differences.xml b/google-cloud-spanner/clirr-ignored-differences.xml index 3968118b1c..8dd5d9dad6 100644 --- a/google-cloud-spanner/clirr-ignored-differences.xml +++ b/google-cloud-spanner/clirr-ignored-differences.xml @@ -406,4 +406,51 @@ com/google/cloud/spanner/AbstractLazyInitializer java.lang.Object initialize() + + + + 7004 + com/google/cloud/spanner/DatabaseClient + long executePartitionedUpdate(com.google.cloud.spanner.Statement) + + + 7004 + com/google/cloud/spanner/DatabaseClient + com.google.cloud.spanner.TransactionRunner readWriteTransaction() + + + 7004 + com/google/cloud/spanner/DatabaseClient + com.google.cloud.spanner.AsyncRunner runAsync() + + + 7004 + com/google/cloud/spanner/DatabaseClient + com.google.cloud.spanner.TransactionManager transactionManager() + + + 7004 + com/google/cloud/spanner/DatabaseClient + com.google.cloud.spanner.AsyncTransactionManager transactionManagerAsync() + + + 7004 + com/google/cloud/spanner/TransactionContext + long[] batchUpdate(java.lang.Iterable) + + + 7004 + com/google/cloud/spanner/TransactionContext + com.google.api.core.ApiFuture batchUpdateAsync(java.lang.Iterable) + + + 7004 + com/google/cloud/spanner/TransactionContext + long executeUpdate(com.google.cloud.spanner.Statement) + + + 7004 + com/google/cloud/spanner/TransactionContext + com.google.api.core.ApiFuture executeUpdateAsync(com.google.cloud.spanner.Statement) + diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index cff8ec1f57..7381a489af 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -554,7 +554,8 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) { return builder.build(); } - ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, QueryMode queryMode) { + ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder( + Statement statement, QueryMode queryMode, Options options) { ExecuteSqlRequest.Builder builder = ExecuteSqlRequest.newBuilder() .setSql(statement.getSql()) @@ -577,7 +578,8 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, Query return builder; } - ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(Iterable statements) { + ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder( + Iterable statements, Options options) { ExecuteBatchDmlRequest.Builder builder = ExecuteBatchDmlRequest.newBuilder().setSession(session.getName()); int idx = 0; @@ -609,7 +611,7 @@ ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(Iterable startStream(@Nullable ByteString resumeToken) { GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks); final ExecuteSqlRequest.Builder request = - getExecuteSqlRequestBuilder(statement, queryMode); + getExecuteSqlRequestBuilder(statement, queryMode, options); if (partitionToken != null) { request.setPartitionToken(partitionToken); } @@ -707,7 +709,7 @@ ResultSet readInternalWithOptions( @Nullable String index, KeySet keys, Iterable columns, - Options readOptions, + final Options readOptions, ByteString partitionToken) { beforeReadOrQuery(); final ReadRequest.Builder builder = diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java index d4bf7f4a47..8dab813015 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java @@ -22,6 +22,7 @@ import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.SessionImpl.SessionTransaction; import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager; import com.google.cloud.spanner.TransactionManager.TransactionState; @@ -40,14 +41,16 @@ final class AsyncTransactionManagerImpl private final SessionImpl session; private Span span; + private final Options options; private TransactionRunnerImpl.TransactionContextImpl txn; private TransactionState txnState; private final SettableApiFuture commitTimestamp = SettableApiFuture.create(); - AsyncTransactionManagerImpl(SessionImpl session, Span span) { + AsyncTransactionManagerImpl(SessionImpl session, Span span, TransactionOption... options) { this.session = session; this.span = span; + this.options = Options.fromTransactionOptions(options); } @Override @@ -82,7 +85,7 @@ public TransactionContextFutureImpl beginAsync() { private ApiFuture internalBeginAsync(boolean firstAttempt) { txnState = TransactionState.STARTED; - txn = session.newTransaction(); + txn = session.newTransaction(options); if (firstAttempt) { session.setActive(this); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java index dc1f2a80c7..8d0e0ea0e3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java @@ -18,6 +18,7 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.Options.TransactionOption; +import com.google.cloud.spanner.Options.UpdateOption; /** * Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An @@ -308,7 +309,7 @@ CommitResponse writeAtLeastOnceWithOptions( * }); * */ - TransactionRunner readWriteTransaction(); + TransactionRunner readWriteTransaction(TransactionOption... options); /** * Returns a transaction manager which allows manual management of transaction lifecycle. This API @@ -338,7 +339,7 @@ CommitResponse writeAtLeastOnceWithOptions( * } * } */ - TransactionManager transactionManager(); + TransactionManager transactionManager(TransactionOption... options); /** * Returns an asynchronous transaction runner for executing a single logical transaction with @@ -371,7 +372,7 @@ CommitResponse writeAtLeastOnceWithOptions( * executor); * */ - AsyncRunner runAsync(); + AsyncRunner runAsync(TransactionOption... options); /** * Returns an asynchronous transaction manager which allows manual management of transaction @@ -459,7 +460,7 @@ CommitResponse writeAtLeastOnceWithOptions( * } * } */ - AsyncTransactionManager transactionManagerAsync(); + AsyncTransactionManager transactionManagerAsync(TransactionOption... options); /** * Returns the lower bound of rows modified by this DML statement. @@ -508,5 +509,5 @@ CommitResponse writeAtLeastOnceWithOptions( *

Given the above, Partitioned DML is good fit for large, database-wide, operations that are * idempotent, such as deleting old rows from a very large table. */ - long executePartitionedUpdate(Statement stmt); + long executePartitionedUpdate(Statement stmt, UpdateOption... options); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index a6dd90713b..55002d405f 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -18,6 +18,7 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.Options.TransactionOption; +import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SpannerImpl.ClosedException; import com.google.common.annotations.VisibleForTesting; @@ -54,13 +55,20 @@ PooledSessionFuture getSession() { @Override public Timestamp write(final Iterable mutations) throws SpannerException { + return writeWithOptions(mutations).getCommitTimestamp(); + } + + @Override + public CommitResponse writeWithOptions( + final Iterable mutations, final TransactionOption... options) + throws SpannerException { Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); try (Scope s = tracer.withSpan(span)) { return runWithSessionRetry( - new Function() { + new Function() { @Override - public Timestamp apply(Session session) { - return session.write(mutations); + public CommitResponse apply(Session session) { + return session.writeWithOptions(mutations, options); } }); } catch (RuntimeException e) { @@ -72,21 +80,21 @@ public Timestamp apply(Session session) { } @Override - public CommitResponse writeWithOptions(Iterable mutations, TransactionOption... options) - throws SpannerException { - final Timestamp commitTimestamp = write(mutations); - return new CommitResponse(commitTimestamp); + public Timestamp writeAtLeastOnce(final Iterable mutations) throws SpannerException { + return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp(); } @Override - public Timestamp writeAtLeastOnce(final Iterable mutations) throws SpannerException { + public CommitResponse writeAtLeastOnceWithOptions( + final Iterable mutations, final TransactionOption... options) + throws SpannerException { Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); try (Scope s = tracer.withSpan(span)) { return runWithSessionRetry( - new Function() { + new Function() { @Override - public Timestamp apply(Session session) { - return session.writeAtLeastOnce(mutations); + public CommitResponse apply(Session session) { + return session.writeAtLeastOnceWithOptions(mutations, options); } }); } catch (RuntimeException e) { @@ -97,13 +105,6 @@ public Timestamp apply(Session session) { } } - @Override - public CommitResponse writeAtLeastOnceWithOptions( - Iterable mutations, TransactionOption... options) throws SpannerException { - final Timestamp commitTimestamp = writeAtLeastOnce(mutations); - return new CommitResponse(commitTimestamp); - } - @Override public ReadContext singleUse() { Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan(); @@ -171,10 +172,10 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { } @Override - public TransactionRunner readWriteTransaction() { + public TransactionRunner readWriteTransaction(TransactionOption... options) { Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); try (Scope s = tracer.withSpan(span)) { - return getSession().readWriteTransaction(); + return getSession().readWriteTransaction(options); } catch (RuntimeException e) { TraceUtil.endSpanWithFailure(span, e); throw e; @@ -184,10 +185,10 @@ public TransactionRunner readWriteTransaction() { } @Override - public TransactionManager transactionManager() { + public TransactionManager transactionManager(TransactionOption... options) { Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); try (Scope s = tracer.withSpan(span)) { - return getSession().transactionManager(); + return getSession().transactionManager(options); } catch (RuntimeException e) { TraceUtil.endSpanWithFailure(span, e); throw e; @@ -195,10 +196,10 @@ public TransactionManager transactionManager() { } @Override - public AsyncRunner runAsync() { + public AsyncRunner runAsync(TransactionOption... options) { Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); try (Scope s = tracer.withSpan(span)) { - return getSession().runAsync(); + return getSession().runAsync(options); } catch (RuntimeException e) { TraceUtil.endSpanWithFailure(span, e); throw e; @@ -206,10 +207,10 @@ public AsyncRunner runAsync() { } @Override - public AsyncTransactionManager transactionManagerAsync() { + public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) { Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); try (Scope s = tracer.withSpan(span)) { - return getSession().transactionManagerAsync(); + return getSession().transactionManagerAsync(options); } catch (RuntimeException e) { TraceUtil.endSpanWithFailure(span, e); throw e; @@ -217,14 +218,14 @@ public AsyncTransactionManager transactionManagerAsync() { } @Override - public long executePartitionedUpdate(final Statement stmt) { + public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) { Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan(); try (Scope s = tracer.withSpan(span)) { return runWithSessionRetry( new Function() { @Override public Long apply(Session session) { - return session.executePartitionedUpdate(stmt); + return session.executePartitionedUpdate(stmt, options); } }); } catch (RuntimeException e) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index 217d81b886..26d3cd6832 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -30,12 +30,19 @@ public interface ReadAndQueryOption extends ReadOption, QueryOption {} /** Marker interface to mark options applicable to read operation */ public interface ReadOption {} + /** Marker interface to mark options applicable to Read, Query, Update and Write operations */ + public interface ReadQueryUpdateTransactionOption + extends ReadOption, QueryOption, UpdateOption, TransactionOption {} + /** Marker interface to mark options applicable to query operation. */ public interface QueryOption {} /** Marker interface to mark options applicable to write operations */ public interface TransactionOption {} + /** Marker interface to mark options applicable to update operation. */ + public interface UpdateOption {} + /** Marker interface to mark options applicable to list operations in admin API. */ public interface ListOption {} @@ -287,6 +294,26 @@ static Options fromQueryOptions(QueryOption... options) { return readOptions; } + static Options fromUpdateOptions(UpdateOption... options) { + Options updateOptions = new Options(); + for (UpdateOption option : options) { + if (option instanceof InternalOption) { + ((InternalOption) option).appendToOptions(updateOptions); + } + } + return updateOptions; + } + + static Options fromTransactionOptions(TransactionOption... options) { + Options transactionOptions = new Options(); + for (TransactionOption option : options) { + if (option instanceof InternalOption) { + ((InternalOption) option).appendToOptions(transactionOptions); + } + } + return transactionOptions; + } + static Options fromListOptions(ListOption... options) { Options listOptions = new Options(); for (ListOption option : options) { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java index da92b6f9c6..b3aa4dea3d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java @@ -24,6 +24,7 @@ import com.google.api.gax.rpc.InternalException; import com.google.api.gax.rpc.ServerStream; import com.google.api.gax.rpc.UnavailableException; +import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.base.Stopwatch; import com.google.common.base.Ticker; @@ -66,7 +67,8 @@ public class PartitionedDmlTransaction implements SessionImpl.SessionTransaction * statement, and will retry the stream if an {@link UnavailableException} is thrown, using the * last seen resume token if the server returns any. */ - long executeStreamingPartitionedUpdate(final Statement statement, final Duration timeout) { + long executeStreamingPartitionedUpdate( + final Statement statement, final Duration timeout, final UpdateOption... updateOptions) { checkState(isValid, "Partitioned DML has been invalidated by a new operation on the session"); LOGGER.log(Level.FINER, "Starting PartitionedUpdate statement"); @@ -74,9 +76,10 @@ long executeStreamingPartitionedUpdate(final Statement statement, final Duration boolean foundStats = false; long updateCount = 0L; Stopwatch stopwatch = Stopwatch.createStarted(ticker); + Options options = Options.fromUpdateOptions(updateOptions); try { - ExecuteSqlRequest request = newTransactionRequestFrom(statement); + ExecuteSqlRequest request = newTransactionRequestFrom(statement, options); while (true) { final Duration remainingTimeout = tryUpdateTimeout(timeout, stopwatch); @@ -98,7 +101,7 @@ long executeStreamingPartitionedUpdate(final Statement statement, final Duration } catch (UnavailableException e) { LOGGER.log( Level.FINER, "Retrying PartitionedDml transaction after UnavailableException", e); - request = resumeOrRestartRequest(resumeToken, statement, request); + request = resumeOrRestartRequest(resumeToken, statement, request, options); } catch (InternalException e) { if (!isRetryableInternalErrorPredicate.apply(e)) { throw e; @@ -106,13 +109,13 @@ long executeStreamingPartitionedUpdate(final Statement statement, final Duration LOGGER.log( Level.FINER, "Retrying PartitionedDml transaction after InternalException - EOS", e); - request = resumeOrRestartRequest(resumeToken, statement, request); + request = resumeOrRestartRequest(resumeToken, statement, request, options); } catch (AbortedException e) { LOGGER.log(Level.FINER, "Retrying PartitionedDml transaction after AbortedException", e); resumeToken = ByteString.EMPTY; foundStats = false; updateCount = 0L; - request = newTransactionRequestFrom(statement); + request = newTransactionRequestFrom(statement, options); } } if (!foundStats) { @@ -150,15 +153,17 @@ private Duration tryUpdateTimeout(final Duration timeout, final Stopwatch stopwa private ExecuteSqlRequest resumeOrRestartRequest( final ByteString resumeToken, final Statement statement, - final ExecuteSqlRequest originalRequest) { + final ExecuteSqlRequest originalRequest, + final Options options) { if (resumeToken.isEmpty()) { - return newTransactionRequestFrom(statement); + return newTransactionRequestFrom(statement, options); } else { return ExecuteSqlRequest.newBuilder(originalRequest).setResumeToken(resumeToken).build(); } } - private ExecuteSqlRequest newTransactionRequestFrom(final Statement statement) { + private ExecuteSqlRequest newTransactionRequestFrom( + final Statement statement, final Options options) { ByteString transactionId = initTransaction(); final TransactionSelector transactionSelector = diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 971dfc2ab1..8b37204d97 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -26,6 +26,7 @@ import com.google.cloud.spanner.AbstractReadContext.SingleReadContext; import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction; import com.google.cloud.spanner.Options.TransactionOption; +import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionClient.SessionId; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; @@ -113,17 +114,23 @@ void setCurrentSpan(Span span) { } @Override - public long executePartitionedUpdate(Statement stmt) { + public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { setActive(null); PartitionedDmlTransaction txn = new PartitionedDmlTransaction(this, spanner.getRpc(), Ticker.systemTicker()); return txn.executeStreamingPartitionedUpdate( - stmt, spanner.getOptions().getPartitionedDmlTimeout()); + stmt, spanner.getOptions().getPartitionedDmlTimeout(), options); } @Override public Timestamp write(Iterable mutations) throws SpannerException { - TransactionRunner runner = readWriteTransaction(); + return writeWithOptions(mutations).getCommitTimestamp(); + } + + @Override + public CommitResponse writeWithOptions(Iterable mutations, TransactionOption... options) + throws SpannerException { + TransactionRunner runner = readWriteTransaction(options); final Collection finalMutations = mutations instanceof java.util.Collection ? (Collection) mutations @@ -136,18 +143,18 @@ public Void run(TransactionContext ctx) { return null; } }); - return runner.getCommitTimestamp(); + return new CommitResponse(runner.getCommitTimestamp()); } @Override - public CommitResponse writeWithOptions(Iterable mutations, TransactionOption... options) - throws SpannerException { - final Timestamp commitTimestamp = write(mutations); - return new CommitResponse(commitTimestamp); + public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { + return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp(); } @Override - public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { + public CommitResponse writeAtLeastOnceWithOptions( + Iterable mutations, TransactionOption... transactionOptions) + throws SpannerException { setActive(null); List mutationsProto = new ArrayList<>(); Mutation.toProto(mutations, mutationsProto); @@ -161,9 +168,10 @@ public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerEx .build(); Span span = tracer.spanBuilder(SpannerImpl.COMMIT).startSpan(); try (Scope s = tracer.withSpan(span)) { - com.google.spanner.v1.CommitResponse response = spanner.getRpc().commit(request, options); + com.google.spanner.v1.CommitResponse response = + spanner.getRpc().commit(request, this.options); Timestamp t = Timestamp.fromProto(response.getCommitTimestamp()); - return t; + return new CommitResponse(t); } catch (IllegalArgumentException e) { TraceUtil.setWithFailure(span, e); throw newSpannerException(ErrorCode.INTERNAL, "Could not parse commit timestamp", e); @@ -175,13 +183,6 @@ public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerEx } } - @Override - public CommitResponse writeAtLeastOnceWithOptions( - Iterable mutations, TransactionOption... options) throws SpannerException { - final Timestamp commitTimestamp = writeAtLeastOnce(mutations); - return new CommitResponse(commitTimestamp); - } - @Override public ReadContext singleUse() { return singleUse(TimestampBound.strong()); @@ -240,26 +241,28 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { } @Override - public TransactionRunner readWriteTransaction() { + public TransactionRunner readWriteTransaction(TransactionOption... options) { return setActive( - new TransactionRunnerImpl(this, spanner.getRpc(), spanner.getDefaultPrefetchChunks())); + new TransactionRunnerImpl( + this, spanner.getRpc(), spanner.getDefaultPrefetchChunks(), options)); } @Override - public AsyncRunner runAsync() { + public AsyncRunner runAsync(TransactionOption... options) { return new AsyncRunnerImpl( setActive( - new TransactionRunnerImpl(this, spanner.getRpc(), spanner.getDefaultPrefetchChunks()))); + new TransactionRunnerImpl( + this, spanner.getRpc(), spanner.getDefaultPrefetchChunks(), options))); } @Override - public TransactionManager transactionManager() { - return new TransactionManagerImpl(this, currentSpan); + public TransactionManager transactionManager(TransactionOption... options) { + return new TransactionManagerImpl(this, currentSpan, options); } @Override - public AsyncTransactionManagerImpl transactionManagerAsync() { - return new AsyncTransactionManagerImpl(this, currentSpan); + public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... options) { + return new AsyncTransactionManagerImpl(this, currentSpan, options); } @Override @@ -340,10 +343,11 @@ public void run() { return res; } - TransactionContextImpl newTransaction() { + TransactionContextImpl newTransaction(Options options) { return TransactionContextImpl.newBuilder() .setSession(this) .setTransactionId(readyTransactionId) + .setOptions(options) .setRpc(spanner.getRpc()) .setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId)) .setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks()) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index f2f8601516..fcb87304f6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -49,6 +49,7 @@ import com.google.cloud.spanner.Options.QueryOption; import com.google.cloud.spanner.Options.ReadOption; import com.google.cloud.spanner.Options.TransactionOption; +import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionClient.SessionConsumer; import com.google.cloud.spanner.SpannerException.ResourceNotFoundException; import com.google.cloud.spanner.SpannerImpl.ClosedException; @@ -109,7 +110,7 @@ * Maintains a pool of sessions. This class itself is thread safe and is meant to be used * concurrently across multiple threads. */ -final class SessionPool { +class SessionPool { private static final Logger logger = Logger.getLogger(SessionPool.class.getName()); private static final Tracer tracer = Tracing.getTracer(); @@ -715,18 +716,18 @@ public void buffer(Iterable mutations) { } @Override - public long executeUpdate(Statement statement) { + public long executeUpdate(Statement statement, UpdateOption... options) { try { - return delegate.executeUpdate(statement); + return delegate.executeUpdate(statement, options); } catch (SessionNotFoundException e) { throw handler.handleSessionNotFound(e); } } @Override - public ApiFuture executeUpdateAsync(Statement statement) { + public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... options) { return ApiFutures.catching( - delegate.executeUpdateAsync(statement), + delegate.executeUpdateAsync(statement, options), SessionNotFoundException.class, new ApiFunction() { @Override @@ -738,18 +739,19 @@ public Long apply(SessionNotFoundException input) { } @Override - public long[] batchUpdate(Iterable statements) { + public long[] batchUpdate(Iterable statements, UpdateOption... options) { try { - return delegate.batchUpdate(statements); + return delegate.batchUpdate(statements, options); } catch (SessionNotFoundException e) { throw handler.handleSessionNotFound(e); } } @Override - public ApiFuture batchUpdateAsync(Iterable statements) { + public ApiFuture batchUpdateAsync( + Iterable statements, UpdateOption... options) { return ApiFutures.catching( - delegate.batchUpdateAsync(statements), + delegate.batchUpdateAsync(statements, options), SessionNotFoundException.class, new ApiFunction() { @Override @@ -786,17 +788,20 @@ private static class AutoClosingTransactionManager private TransactionManager delegate; private final SessionPool sessionPool; private PooledSessionFuture session; + private final TransactionOption[] options; private boolean closed; private boolean restartedAfterSessionNotFound; - AutoClosingTransactionManager(SessionPool sessionPool, PooledSessionFuture session) { + AutoClosingTransactionManager( + SessionPool sessionPool, PooledSessionFuture session, TransactionOption... options) { this.sessionPool = sessionPool; this.session = session; + this.options = options; } @Override public TransactionContext begin() { - this.delegate = session.get().transactionManager(); + this.delegate = session.get().transactionManager(options); // This cannot throw a SessionNotFoundException, as it does not call the BeginTransaction RPC. // Instead, the BeginTransaction will be included with the first statement of the transaction. return internalBegin(); @@ -812,7 +817,7 @@ private TransactionContext internalBegin() { public SpannerException handleSessionNotFound(SessionNotFoundException notFound) { session = sessionPool.replaceSession(notFound, session); PooledSession pooledSession = session.get(); - delegate = pooledSession.delegate.transactionManager(); + delegate = pooledSession.delegate.transactionManager(options); restartedAfterSessionNotFound = true; return SpannerExceptionFactory.newSpannerException( ErrorCode.ABORTED, notFound.getMessage(), notFound); @@ -897,16 +902,19 @@ public TransactionState getState() { private static final class SessionPoolTransactionRunner implements TransactionRunner { private final SessionPool sessionPool; private PooledSessionFuture session; + private final TransactionOption[] options; private TransactionRunner runner; - private SessionPoolTransactionRunner(SessionPool sessionPool, PooledSessionFuture session) { + private SessionPoolTransactionRunner( + SessionPool sessionPool, PooledSessionFuture session, TransactionOption... options) { this.sessionPool = sessionPool; this.session = session; + this.options = options; } private TransactionRunner getRunner() { if (this.runner == null) { - this.runner = session.get().readWriteTransaction(); + this.runner = session.get().readWriteTransaction(options); } return runner; } @@ -950,11 +958,14 @@ public TransactionRunner allowNestedTransaction() { private static class SessionPoolAsyncRunner implements AsyncRunner { private final SessionPool sessionPool; private volatile PooledSessionFuture session; + private final TransactionOption[] options; private final SettableApiFuture commitTimestamp = SettableApiFuture.create(); - private SessionPoolAsyncRunner(SessionPool sessionPool, PooledSessionFuture session) { + private SessionPoolAsyncRunner( + SessionPool sessionPool, PooledSessionFuture session, TransactionOption... options) { this.sessionPool = sessionPool; this.session = session; + this.options = options; } @Override @@ -970,7 +981,7 @@ public void run() { while (true) { SpannerException se = null; try { - runner = session.get().runAsync(); + runner = session.get().runAsync(options); r = runner.runAsync(work, MoreExecutors.directExecutor()).get(); break; } catch (ExecutionException e) { @@ -1044,14 +1055,15 @@ private PooledSessionFuture createPooledSessionFuture( return new PooledSessionFuture(future, span); } - final class PooledSessionFuture extends SimpleForwardingListenableFuture + class PooledSessionFuture extends SimpleForwardingListenableFuture implements Session { private volatile LeakedSessionException leakedException; private volatile AtomicBoolean inUse = new AtomicBoolean(); private volatile CountDownLatch initialized = new CountDownLatch(1); private final Span span; - private PooledSessionFuture(ListenableFuture delegate, Span span) { + @VisibleForTesting + PooledSessionFuture(ListenableFuture delegate, Span span) { super(delegate); this.span = span; } @@ -1067,34 +1079,32 @@ private void markCheckedOut() { @Override public Timestamp write(Iterable mutations) throws SpannerException { - try { - return get().write(mutations); - } finally { - close(); - } + return writeWithOptions(mutations).getCommitTimestamp(); } @Override public CommitResponse writeWithOptions( Iterable mutations, TransactionOption... options) throws SpannerException { - final Timestamp commitTimestamp = write(mutations); - return new CommitResponse(commitTimestamp); - } - - @Override - public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { try { - return get().writeAtLeastOnce(mutations); + return get().writeWithOptions(mutations, options); } finally { close(); } } + @Override + public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { + return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp(); + } + @Override public CommitResponse writeAtLeastOnceWithOptions( Iterable mutations, TransactionOption... options) throws SpannerException { - final Timestamp commitTimestamp = writeAtLeastOnce(mutations); - return new CommitResponse(commitTimestamp); + try { + return get().writeAtLeastOnceWithOptions(mutations, options); + } finally { + close(); + } } @Override @@ -1202,29 +1212,29 @@ private ReadOnlyTransaction internalReadOnlyTransaction( } @Override - public TransactionRunner readWriteTransaction() { - return new SessionPoolTransactionRunner(SessionPool.this, this); + public TransactionRunner readWriteTransaction(TransactionOption... options) { + return new SessionPoolTransactionRunner(SessionPool.this, this, options); } @Override - public TransactionManager transactionManager() { - return new AutoClosingTransactionManager(SessionPool.this, this); + public TransactionManager transactionManager(TransactionOption... options) { + return new AutoClosingTransactionManager(SessionPool.this, this, options); } @Override - public AsyncRunner runAsync() { - return new SessionPoolAsyncRunner(SessionPool.this, this); + public AsyncRunner runAsync(TransactionOption... options) { + return new SessionPoolAsyncRunner(SessionPool.this, this, options); } @Override - public AsyncTransactionManager transactionManagerAsync() { - return new SessionPoolAsyncTransactionManager(SessionPool.this, this); + public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) { + return new SessionPoolAsyncTransactionManager(SessionPool.this, this, options); } @Override - public long executePartitionedUpdate(Statement stmt) { + public long executePartitionedUpdate(Statement stmt, UpdateOption... options) { try { - return get().executePartitionedUpdate(stmt); + return get().executePartitionedUpdate(stmt, options); } finally { close(); } @@ -1330,43 +1340,42 @@ void setAllowReplacing(boolean allowReplacing) { @Override public Timestamp write(Iterable mutations) throws SpannerException { + return writeWithOptions(mutations).getCommitTimestamp(); + } + + @Override + public CommitResponse writeWithOptions( + Iterable mutations, TransactionOption... options) throws SpannerException { try { markUsed(); - return delegate.write(mutations); + return delegate.writeWithOptions(mutations, options); } catch (SpannerException e) { throw lastException = e; } } @Override - public CommitResponse writeWithOptions( - Iterable mutations, TransactionOption... options) throws SpannerException { - final Timestamp commitTimestamp = write(mutations); - return new CommitResponse(commitTimestamp); + public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { + return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp(); } @Override - public Timestamp writeAtLeastOnce(Iterable mutations) throws SpannerException { + public CommitResponse writeAtLeastOnceWithOptions( + Iterable mutations, TransactionOption... options) throws SpannerException { try { markUsed(); - return delegate.writeAtLeastOnce(mutations); + return delegate.writeAtLeastOnceWithOptions(mutations, options); } catch (SpannerException e) { throw lastException = e; } } @Override - public CommitResponse writeAtLeastOnceWithOptions( - Iterable mutations, TransactionOption... options) throws SpannerException { - final Timestamp commitTimestamp = writeAtLeastOnce(mutations); - return new CommitResponse(commitTimestamp); - } - - @Override - public long executePartitionedUpdate(Statement stmt) throws SpannerException { + public long executePartitionedUpdate(Statement stmt, UpdateOption... options) + throws SpannerException { try { markUsed(); - return delegate.executePartitionedUpdate(stmt); + return delegate.executePartitionedUpdate(stmt, options); } catch (SpannerException e) { throw lastException = e; } @@ -1403,18 +1412,18 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { } @Override - public TransactionRunner readWriteTransaction() { - return delegate.readWriteTransaction(); + public TransactionRunner readWriteTransaction(TransactionOption... options) { + return delegate.readWriteTransaction(options); } @Override - public AsyncRunner runAsync() { - return delegate.runAsync(); + public AsyncRunner runAsync(TransactionOption... options) { + return delegate.runAsync(options); } @Override - public AsyncTransactionManagerImpl transactionManagerAsync() { - return delegate.transactionManagerAsync(); + public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... options) { + return delegate.transactionManagerAsync(options); } @Override @@ -1485,8 +1494,8 @@ void markUsed() { } @Override - public TransactionManager transactionManager() { - return delegate.transactionManager(); + public TransactionManager transactionManager(TransactionOption... options) { + return delegate.transactionManager(options); } } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java index 9dc947332f..e618e2b44a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java @@ -23,6 +23,7 @@ import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SessionPool.SessionNotFoundHandler; import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager; @@ -42,12 +43,15 @@ class SessionPoolAsyncTransactionManager private AbortedException abortedException; private final SessionPool pool; + private final TransactionOption[] options; private volatile PooledSessionFuture session; private volatile SettableApiFuture delegate; private boolean restartedAfterSessionNotFound; - SessionPoolAsyncTransactionManager(SessionPool pool, PooledSessionFuture session) { + SessionPoolAsyncTransactionManager( + SessionPool pool, PooledSessionFuture session, TransactionOption... options) { this.pool = Preconditions.checkNotNull(pool); + this.options = options; createTransaction(session); } @@ -60,7 +64,10 @@ private void createTransaction(PooledSessionFuture session) { public void run() { try { delegate.set( - SessionPoolAsyncTransactionManager.this.session.get().transactionManagerAsync()); + SessionPoolAsyncTransactionManager.this + .session + .get() + .transactionManagerAsync(options)); } catch (Throwable t) { delegate.setException(t); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java index 0b4a92f989..64c45b12c0 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContext.java @@ -17,6 +17,7 @@ package com.google.cloud.spanner; import com.google.api.core.ApiFuture; +import com.google.cloud.spanner.Options.UpdateOption; /** * Context for a single attempt of a locking read-write transaction. This type of transaction is the @@ -102,7 +103,7 @@ public interface TransactionContext extends ReadContext { * it will result in an {@code IllegalArgumentException}. The effects of the DML statement will be * visible to subsequent operations in the transaction. */ - long executeUpdate(Statement statement); + long executeUpdate(Statement statement, UpdateOption... options); /** * Same as {@link #executeUpdate(Statement)}, but is guaranteed to be non-blocking. If multiple @@ -113,7 +114,7 @@ public interface TransactionContext extends ReadContext { * parallel. If you rely on the results of a previous statement, you should block until the result * of that statement is known and has been returned to the client. */ - ApiFuture executeUpdateAsync(Statement statement); + ApiFuture executeUpdateAsync(Statement statement, UpdateOption... options); /** * Executes a list of DML statements in a single request. The statements will be executed in order @@ -130,7 +131,7 @@ public interface TransactionContext extends ReadContext { * 2nd statement, and an array of length 1 that contains the number of rows modified by the 1st * statement. The 3rd statement will not run. */ - long[] batchUpdate(Iterable statements); + long[] batchUpdate(Iterable statements, UpdateOption... options); /** * Same as {@link #batchUpdate(Iterable)}, but is guaranteed to be non-blocking. If multiple @@ -141,5 +142,5 @@ public interface TransactionContext extends ReadContext { * parallel. If you rely on the results of a previous statement, you should block until the result * of that statement is known and has been returned to the client. */ - ApiFuture batchUpdateAsync(Iterable statements); + ApiFuture batchUpdateAsync(Iterable statements, UpdateOption... options); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java index b18e2f25d9..1cef304e48 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java @@ -17,6 +17,7 @@ package com.google.cloud.spanner; import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.SessionImpl.SessionTransaction; import com.google.common.base.Preconditions; import io.opencensus.common.Scope; @@ -30,13 +31,15 @@ final class TransactionManagerImpl implements TransactionManager, SessionTransac private final SessionImpl session; private Span span; + private final Options options; private TransactionRunnerImpl.TransactionContextImpl txn; private TransactionState txnState; - TransactionManagerImpl(SessionImpl session, Span span) { + TransactionManagerImpl(SessionImpl session, Span span, TransactionOption... options) { this.session = session; this.span = span; + this.options = Options.fromTransactionOptions(options); } Span getSpan() { @@ -52,7 +55,7 @@ public void setSpan(Span span) { public TransactionContext begin() { Preconditions.checkState(txn == null, "begin can only be called once"); try (Scope s = tracer.withSpan(span)) { - txn = session.newTransaction(); + txn = session.newTransaction(options); session.setActive(this); txnState = TransactionState.STARTED; return txn; @@ -101,7 +104,7 @@ public TransactionContext resetForRetry() { } try (Scope s = tracer.withSpan(span)) { boolean useInlinedBegin = txn.transactionId != null; - txn = session.newTransaction(); + txn = session.newTransaction(options); if (!useInlinedBegin) { txn.ensureTxn(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 02119b13a1..23405af8f4 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -28,9 +28,12 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.Options.QueryOption; import com.google.cloud.spanner.Options.ReadOption; +import com.google.cloud.spanner.Options.TransactionOption; +import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.SessionImpl.SessionTransaction; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; @@ -72,6 +75,7 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner { static class TransactionContextImpl extends AbstractReadContext implements TransactionContext { static class Builder extends AbstractReadContext.Builder { private ByteString transactionId; + private Options options; private Builder() {} @@ -80,8 +84,14 @@ Builder setTransactionId(ByteString transactionId) { return self(); } + Builder setOptions(Options options) { + this.options = Preconditions.checkNotNull(options); + return self(); + } + @Override TransactionContextImpl build() { + Preconditions.checkState(this.options != null, "Options must be set"); return new TransactionContextImpl(this); } } @@ -147,6 +157,8 @@ public void removeListener(Runnable listener) { @GuardedBy("lock") private boolean aborted; + private final Options options; + /** Default to -1 to indicate not available. */ @GuardedBy("lock") private long retryDelayInMillis = -1L; @@ -165,6 +177,7 @@ public void removeListener(Runnable listener) { private TransactionContextImpl(Builder builder) { super(builder); this.transactionId = builder.transactionId; + this.options = builder.options; this.finishedAsyncOperations.set(null); } @@ -512,10 +525,11 @@ public void buffer(Iterable mutations) { } @Override - public long executeUpdate(Statement statement) { + public long executeUpdate(Statement statement, UpdateOption... options) { beforeReadOrQuery(); final ExecuteSqlRequest.Builder builder = - getExecuteSqlRequestBuilder(statement, QueryMode.NORMAL); + getExecuteSqlRequestBuilder( + statement, QueryMode.NORMAL, Options.fromUpdateOptions(options)); try { com.google.spanner.v1.ResultSet resultSet = rpc.executeQuery(builder.build(), session.getOptions()); @@ -535,10 +549,11 @@ public long executeUpdate(Statement statement) { } @Override - public ApiFuture executeUpdateAsync(Statement statement) { + public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... options) { beforeReadOrQuery(); final ExecuteSqlRequest.Builder builder = - getExecuteSqlRequestBuilder(statement, QueryMode.NORMAL); + getExecuteSqlRequestBuilder( + statement, QueryMode.NORMAL, Options.fromUpdateOptions(options)); final ApiFuture resultSet; try { // Register the update as an async operation that must finish before the transaction may @@ -598,9 +613,10 @@ public void run() { } @Override - public long[] batchUpdate(Iterable statements) { + public long[] batchUpdate(Iterable statements, UpdateOption... options) { beforeReadOrQuery(); - final ExecuteBatchDmlRequest.Builder builder = getExecuteBatchDmlRequestBuilder(statements); + final ExecuteBatchDmlRequest.Builder builder = + getExecuteBatchDmlRequestBuilder(statements, Options.fromUpdateOptions(options)); try { com.google.spanner.v1.ExecuteBatchDmlResponse response = rpc.executeBatchDml(builder.build(), session.getOptions()); @@ -631,9 +647,11 @@ public long[] batchUpdate(Iterable statements) { } @Override - public ApiFuture batchUpdateAsync(Iterable statements) { + public ApiFuture batchUpdateAsync( + Iterable statements, UpdateOption... options) { beforeReadOrQuery(); - final ExecuteBatchDmlRequest.Builder builder = getExecuteBatchDmlRequestBuilder(statements); + final ExecuteBatchDmlRequest.Builder builder = + getExecuteBatchDmlRequestBuilder(statements, Options.fromUpdateOptions(options)); ApiFuture response; try { // Register the update as an async operation that must finish before the transaction may @@ -723,6 +741,7 @@ public ListenableAsyncResultSet executeQueryAsync( private boolean blockNestedTxn = true; private final SessionImpl session; + private final Options options; private Span span; private TransactionContextImpl txn; private volatile boolean isValid = true; @@ -733,9 +752,14 @@ public TransactionRunner allowNestedTransaction() { return this; } - TransactionRunnerImpl(SessionImpl session, SpannerRpc rpc, int defaultPrefetchChunks) { + TransactionRunnerImpl( + SessionImpl session, + SpannerRpc rpc, + int defaultPrefetchChunks, + TransactionOption... options) { this.session = session; - this.txn = session.newTransaction(); + this.options = Options.fromTransactionOptions(options); + this.txn = session.newTransaction(this.options); } @Override @@ -773,7 +797,7 @@ public T call() { // Do not inline the BeginTransaction during a retry if the initial attempt did not // actually start a transaction. useInlinedBegin = txn.transactionId != null; - txn = session.newTransaction(); + txn = session.newTransaction(options); } checkState( isValid, diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java index f9cee1c488..1ca3164e2f 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java @@ -89,7 +89,8 @@ public void setup() { public void executeSqlRequestBuilderWithoutQueryOptions() { ExecuteSqlRequest request = context - .getExecuteSqlRequestBuilder(Statement.of("SELECT FOO FROM BAR"), QueryMode.NORMAL) + .getExecuteSqlRequestBuilder( + Statement.of("SELECT FOO FROM BAR"), QueryMode.NORMAL, Options.fromQueryOptions()) .build(); assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR"); assertThat(request.getQueryOptions()).isEqualTo(defaultQueryOptions); @@ -103,7 +104,8 @@ public void executeSqlRequestBuilderWithQueryOptions() { Statement.newBuilder("SELECT FOO FROM BAR") .withQueryOptions(QueryOptions.newBuilder().setOptimizerVersion("2.0").build()) .build(), - QueryMode.NORMAL) + QueryMode.NORMAL, + Options.fromQueryOptions()) .build(); assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR"); assertThat(request.getQueryOptions().getOptimizerVersion()).isEqualTo("2.0"); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java index 1bcb303f72..6fd6a4383e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BaseSessionPoolTest.java @@ -24,6 +24,7 @@ import static org.mockito.Mockito.when; import com.google.api.core.ApiFutures; +import com.google.cloud.Timestamp; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.SessionPool.Clock; import com.google.protobuf.Empty; @@ -58,12 +59,17 @@ public void release(ScheduledExecutorService executor) { } } + @SuppressWarnings("unchecked") SessionImpl mockSession() { final SessionImpl session = mock(SessionImpl.class); when(session.getName()) .thenReturn( "projects/dummy/instances/dummy/database/dummy/sessions/session" + sessionIndex); when(session.asyncClose()).thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); + when(session.writeWithOptions(any(Iterable.class))) + .thenReturn(new CommitResponse(Timestamp.now())); + when(session.writeAtLeastOnceWithOptions(any(Iterable.class))) + .thenReturn(new CommitResponse(Timestamp.now())); sessionIndex++; return session; } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 2747dc314f..0a0a3bff69 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -19,6 +19,9 @@ import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; @@ -31,7 +34,9 @@ import com.google.cloud.spanner.AsyncRunner.AsyncWork; import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; +import com.google.cloud.spanner.Options.TransactionOption; import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode; +import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SpannerOptions.SpannerCallContextTimeoutConfigurator; import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.common.base.Stopwatch; @@ -1483,4 +1488,56 @@ public void testBatchCreateSessionsFailure_shouldNotPropagateToCloseMethod() { mockSpanner.setBatchCreateSessionsExecutionTime(SimulatedExecutionTime.none()); } } + + @Test + public void testReadWriteTransaction_usesOptions() { + SessionPool pool = mock(SessionPool.class); + PooledSessionFuture session = mock(PooledSessionFuture.class); + when(pool.getSession()).thenReturn(session); + TransactionOption option = mock(TransactionOption.class); + + DatabaseClientImpl client = new DatabaseClientImpl(pool); + client.readWriteTransaction(option); + + verify(session).readWriteTransaction(option); + } + + @Test + public void testTransactionManager_usesOptions() { + SessionPool pool = mock(SessionPool.class); + PooledSessionFuture session = mock(PooledSessionFuture.class); + when(pool.getSession()).thenReturn(session); + TransactionOption option = mock(TransactionOption.class); + + DatabaseClientImpl client = new DatabaseClientImpl(pool); + client.transactionManager(option); + + verify(session).transactionManager(option); + } + + @Test + public void testRunAsync_usesOptions() { + SessionPool pool = mock(SessionPool.class); + PooledSessionFuture session = mock(PooledSessionFuture.class); + when(pool.getSession()).thenReturn(session); + TransactionOption option = mock(TransactionOption.class); + + DatabaseClientImpl client = new DatabaseClientImpl(pool); + client.runAsync(option); + + verify(session).runAsync(option); + } + + @Test + public void testTransactionManagerAsync_usesOptions() { + SessionPool pool = mock(SessionPool.class); + PooledSessionFuture session = mock(PooledSessionFuture.class); + when(pool.getSession()).thenReturn(session); + TransactionOption option = mock(TransactionOption.class); + + DatabaseClientImpl client = new DatabaseClientImpl(pool); + client.transactionManagerAsync(option); + + verify(session).transactionManagerAsync(option); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java index 0620bfb0e9..a163b82914 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java @@ -890,10 +890,15 @@ public void testSessionNotFoundReadWriteTransaction() { when(closedSession.getName()) .thenReturn("projects/dummy/instances/dummy/database/dummy/sessions/session-closed"); final TransactionContextImpl closedTransactionContext = - TransactionContextImpl.newBuilder().setSession(closedSession).setRpc(rpc).build(); + TransactionContextImpl.newBuilder() + .setSession(closedSession) + .setOptions(Options.fromTransactionOptions()) + .setRpc(rpc) + .build(); when(closedSession.asyncClose()) .thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance())); - when(closedSession.newTransaction()).thenReturn(closedTransactionContext); + when(closedSession.newTransaction(Options.fromTransactionOptions())) + .thenReturn(closedTransactionContext); when(closedSession.beginTransactionAsync()).thenThrow(sessionNotFound); TransactionRunnerImpl closedTransactionRunner = new TransactionRunnerImpl(closedSession, rpc, 10); @@ -906,7 +911,8 @@ public void testSessionNotFoundReadWriteTransaction() { when(openSession.getName()) .thenReturn("projects/dummy/instances/dummy/database/dummy/sessions/session-open"); final TransactionContextImpl openTransactionContext = mock(TransactionContextImpl.class); - when(openSession.newTransaction()).thenReturn(openTransactionContext); + when(openSession.newTransaction(Options.fromTransactionOptions())) + .thenReturn(openTransactionContext); when(openSession.beginTransactionAsync()) .thenReturn(ApiFutures.immediateFuture(ByteString.copyFromUtf8("open-txn"))); TransactionRunnerImpl openTransactionRunner = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionContextImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionContextImplTest.java index 077b660576..21f2a59fb3 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionContextImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionContextImplTest.java @@ -63,6 +63,7 @@ private void batchDml(int status) { .setSession(session) .setRpc(rpc) .setTransactionId(ByteString.copyFromUtf8("test")) + .setOptions(Options.fromTransactionOptions()) .build()) { impl.batchUpdate(Arrays.asList(statement)); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java index 149002531a..39e65e21d9 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionManagerImplTest.java @@ -87,7 +87,7 @@ public void setUp() { @Test public void beginCalledTwiceFails() { - when(session.newTransaction()).thenReturn(txn); + when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); assertThat(manager.begin()).isEqualTo(txn); assertThat(manager.getState()).isEqualTo(TransactionState.STARTED); try { @@ -130,7 +130,7 @@ public void resetBeforeBeginFails() { @Test public void transactionRolledBackOnClose() { - when(session.newTransaction()).thenReturn(txn); + when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); when(txn.isAborted()).thenReturn(false); manager.begin(); manager.close(); @@ -139,7 +139,7 @@ public void transactionRolledBackOnClose() { @Test public void commitSucceeds() { - when(session.newTransaction()).thenReturn(txn); + when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); Timestamp commitTimestamp = Timestamp.ofTimeMicroseconds(1); when(txn.commitTimestamp()).thenReturn(commitTimestamp); manager.begin(); @@ -150,7 +150,7 @@ public void commitSucceeds() { @Test public void resetAfterSuccessfulCommitFails() { - when(session.newTransaction()).thenReturn(txn); + when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); manager.begin(); manager.commit(); try { @@ -163,7 +163,7 @@ public void resetAfterSuccessfulCommitFails() { @Test public void resetAfterAbortSucceeds() { - when(session.newTransaction()).thenReturn(txn); + when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); manager.begin(); doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, "")).when(txn).commit(); try { @@ -173,14 +173,14 @@ public void resetAfterAbortSucceeds() { assertThat(manager.getState()).isEqualTo(TransactionState.ABORTED); } txn = Mockito.mock(TransactionRunnerImpl.TransactionContextImpl.class); - when(session.newTransaction()).thenReturn(txn); + when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); assertThat(manager.resetForRetry()).isEqualTo(txn); assertThat(manager.getState()).isEqualTo(TransactionState.STARTED); } @Test public void resetAfterErrorFails() { - when(session.newTransaction()).thenReturn(txn); + when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); manager.begin(); doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.UNKNOWN, "")).when(txn).commit(); try { @@ -199,7 +199,7 @@ public void resetAfterErrorFails() { @Test public void rollbackAfterCommitFails() { - when(session.newTransaction()).thenReturn(txn); + when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); manager.begin(); manager.commit(); try { @@ -212,7 +212,7 @@ public void rollbackAfterCommitFails() { @Test public void commitAfterRollbackFails() { - when(session.newTransaction()).thenReturn(txn); + when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); manager.begin(); manager.rollback(); try { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index 71a34950bb..9b42bab344 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -101,7 +101,7 @@ public void release(ScheduledExecutorService exec) { public void setUp() { MockitoAnnotations.initMocks(this); firstRun = true; - when(session.newTransaction()).thenReturn(txn); + when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(txn); when(rpc.executeQuery(Mockito.any(ExecuteSqlRequest.class), Mockito.anyMap())) .thenAnswer( new Answer() { @@ -347,9 +347,10 @@ private long[] batchDmlException(int status) { TransactionContextImpl.newBuilder() .setSession(session) .setTransactionId(ByteString.copyFromUtf8(UUID.randomUUID().toString())) + .setOptions(Options.fromTransactionOptions()) .setRpc(rpc) .build(); - when(session.newTransaction()).thenReturn(transaction); + when(session.newTransaction(Options.fromTransactionOptions())).thenReturn(transaction); when(session.beginTransactionAsync()) .thenReturn( ApiFutures.immediateFuture(ByteString.copyFromUtf8(UUID.randomUUID().toString()))); From ae2cf93393f4f254c973b27149bcf5353214b7df Mon Sep 17 00:00:00 2001 From: Olav Loite Date: Fri, 11 Dec 2020 21:05:03 +0100 Subject: [PATCH 2/2] test: add options tests --- .../com/google/cloud/spanner/OptionsTest.java | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java index 0931eba174..5a2777570b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java @@ -219,4 +219,42 @@ public void queryEquality() { o3 = Options.fromReadOptions(Options.prefetchChunks(2)); assertThat(o2.equals(o3)).isFalse(); } + + @Test + public void testFromTransactionOptions() { + Options opts = Options.fromTransactionOptions(); + assertThat(opts.toString()).isEqualTo(""); + } + + @Test + public void testTransactionOptionsEquality() { + Options o1; + Options o2; + + o1 = Options.fromTransactionOptions(); + o2 = Options.fromTransactionOptions(); + assertThat(o1.equals(o2)).isTrue(); + + o2 = Options.fromReadOptions(Options.prefetchChunks(1)); + assertThat(o1.equals(o2)).isFalse(); + } + + @Test + public void testFromUpdateOptions() { + Options opts = Options.fromUpdateOptions(); + assertThat(opts.toString()).isEqualTo(""); + } + + @Test + public void testUpdateOptionsEquality() { + Options o1; + Options o2; + + o1 = Options.fromUpdateOptions(); + o2 = Options.fromUpdateOptions(); + assertThat(o1.equals(o2)).isTrue(); + + o2 = Options.fromReadOptions(Options.prefetchChunks(1)); + assertThat(o1.equals(o2)).isFalse(); + } }