Skip to content

Commit

Permalink
feat: introduce TransactionOptions and UpdateOptions (#716)
Browse files Browse the repository at this point in the history
* feat: introduce TransactionOptions and UpdateOptions

Adds TransactionOptions and UpdateOptions for read/write transactions and statements.
These can be used in the future to specify options to affect how transactions and
statements will be executed.

* test: add options tests
  • Loading branch information
olavloite authored and thiagotnunes committed May 6, 2021
1 parent 35f0c11 commit eed83c7
Show file tree
Hide file tree
Showing 21 changed files with 423 additions and 178 deletions.
49 changes: 48 additions & 1 deletion google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -406,7 +406,54 @@
<className>com/google/cloud/spanner/AbstractLazyInitializer</className>
<method>java.lang.Object initialize()</method>
</difference>


<!-- TransactionOptions and UpdateOptions -->
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>long executePartitionedUpdate(com.google.cloud.spanner.Statement)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.TransactionRunner readWriteTransaction()</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.AsyncRunner runAsync()</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.TransactionManager transactionManager()</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.AsyncTransactionManager transactionManagerAsync()</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>long[] batchUpdate(java.lang.Iterable)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>com.google.api.core.ApiFuture batchUpdateAsync(java.lang.Iterable)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>long executeUpdate(com.google.cloud.spanner.Statement)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>com.google.api.core.ApiFuture executeUpdateAsync(com.google.cloud.spanner.Statement)</method>
</difference>

<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/spanner/InstanceInfo$Builder</className>
Expand Down
Expand Up @@ -554,7 +554,8 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) {
return builder.build();
}

ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, QueryMode queryMode) {
ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
Statement statement, QueryMode queryMode, Options options) {
ExecuteSqlRequest.Builder builder =
ExecuteSqlRequest.newBuilder()
.setSql(statement.getSql())
Expand All @@ -577,7 +578,8 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, Query
return builder;
}

ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(Iterable<Statement> statements) {
ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(
Iterable<Statement> statements, Options options) {
ExecuteBatchDmlRequest.Builder builder =
ExecuteBatchDmlRequest.newBuilder().setSession(session.getName());
int idx = 0;
Expand Down Expand Up @@ -609,7 +611,7 @@ ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(Iterable<Stateme
ResultSet executeQueryInternalWithOptions(
final Statement statement,
final com.google.spanner.v1.ExecuteSqlRequest.QueryMode queryMode,
Options options,
final Options options,
final ByteString partitionToken) {
beforeReadOrQuery();
final int prefetchChunks =
Expand All @@ -620,7 +622,7 @@ ResultSet executeQueryInternalWithOptions(
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks);
final ExecuteSqlRequest.Builder request =
getExecuteSqlRequestBuilder(statement, queryMode);
getExecuteSqlRequestBuilder(statement, queryMode, options);
if (partitionToken != null) {
request.setPartitionToken(partitionToken);
}
Expand Down Expand Up @@ -707,7 +709,7 @@ ResultSet readInternalWithOptions(
@Nullable String index,
KeySet keys,
Iterable<String> columns,
Options readOptions,
final Options readOptions,
ByteString partitionToken) {
beforeReadOrQuery();
final ReadRequest.Builder builder =
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager;
import com.google.cloud.spanner.TransactionManager.TransactionState;
Expand All @@ -40,14 +41,16 @@ final class AsyncTransactionManagerImpl

private final SessionImpl session;
private Span span;
private final Options options;

private TransactionRunnerImpl.TransactionContextImpl txn;
private TransactionState txnState;
private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create();

AsyncTransactionManagerImpl(SessionImpl session, Span span) {
AsyncTransactionManagerImpl(SessionImpl session, Span span, TransactionOption... options) {
this.session = session;
this.span = span;
this.options = Options.fromTransactionOptions(options);
}

@Override
Expand Down Expand Up @@ -82,7 +85,7 @@ public TransactionContextFutureImpl beginAsync() {

private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
txnState = TransactionState.STARTED;
txn = session.newTransaction();
txn = session.newTransaction(options);
if (firstAttempt) {
session.setActive(this);
}
Expand Down
Expand Up @@ -18,6 +18,7 @@

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;

/**
* Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An
Expand Down Expand Up @@ -308,7 +309,7 @@ CommitResponse writeAtLeastOnceWithOptions(
* });
* </code></pre>
*/
TransactionRunner readWriteTransaction();
TransactionRunner readWriteTransaction(TransactionOption... options);

/**
* Returns a transaction manager which allows manual management of transaction lifecycle. This API
Expand Down Expand Up @@ -338,7 +339,7 @@ CommitResponse writeAtLeastOnceWithOptions(
* }
* }</pre>
*/
TransactionManager transactionManager();
TransactionManager transactionManager(TransactionOption... options);

/**
* Returns an asynchronous transaction runner for executing a single logical transaction with
Expand Down Expand Up @@ -371,7 +372,7 @@ CommitResponse writeAtLeastOnceWithOptions(
* executor);
* </code></pre>
*/
AsyncRunner runAsync();
AsyncRunner runAsync(TransactionOption... options);

/**
* Returns an asynchronous transaction manager which allows manual management of transaction
Expand Down Expand Up @@ -459,7 +460,7 @@ CommitResponse writeAtLeastOnceWithOptions(
* }
* }</pre>
*/
AsyncTransactionManager transactionManagerAsync();
AsyncTransactionManager transactionManagerAsync(TransactionOption... options);

/**
* Returns the lower bound of rows modified by this DML statement.
Expand Down Expand Up @@ -508,5 +509,5 @@ CommitResponse writeAtLeastOnceWithOptions(
* <p>Given the above, Partitioned DML is good fit for large, database-wide, operations that are
* idempotent, such as deleting old rows from a very large table.
*/
long executePartitionedUpdate(Statement stmt);
long executePartitionedUpdate(Statement stmt, UpdateOption... options);
}
Expand Up @@ -18,6 +18,7 @@

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -54,13 +55,20 @@ PooledSessionFuture getSession() {

@Override
public Timestamp write(final Iterable<Mutation> mutations) throws SpannerException {
return writeWithOptions(mutations).getCommitTimestamp();
}

@Override
public CommitResponse writeWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
new Function<Session, Timestamp>() {
new Function<Session, CommitResponse>() {
@Override
public Timestamp apply(Session session) {
return session.write(mutations);
public CommitResponse apply(Session session) {
return session.writeWithOptions(mutations, options);
}
});
} catch (RuntimeException e) {
Expand All @@ -72,21 +80,21 @@ public Timestamp apply(Session session) {
}

@Override
public CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption... options)
throws SpannerException {
final Timestamp commitTimestamp = write(mutations);
return new CommitResponse(commitTimestamp);
public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws SpannerException {
return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp();
}

@Override
public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws SpannerException {
public CommitResponse writeAtLeastOnceWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
new Function<Session, Timestamp>() {
new Function<Session, CommitResponse>() {
@Override
public Timestamp apply(Session session) {
return session.writeAtLeastOnce(mutations);
public CommitResponse apply(Session session) {
return session.writeAtLeastOnceWithOptions(mutations, options);
}
});
} catch (RuntimeException e) {
Expand All @@ -97,13 +105,6 @@ public Timestamp apply(Session session) {
}
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
final Timestamp commitTimestamp = writeAtLeastOnce(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public ReadContext singleUse() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
Expand Down Expand Up @@ -171,10 +172,10 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
}

@Override
public TransactionRunner readWriteTransaction() {
public TransactionRunner readWriteTransaction(TransactionOption... options) {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getSession().readWriteTransaction();
return getSession().readWriteTransaction(options);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -184,47 +185,47 @@ public TransactionRunner readWriteTransaction() {
}

@Override
public TransactionManager transactionManager() {
public TransactionManager transactionManager(TransactionOption... options) {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getSession().transactionManager();
return getSession().transactionManager(options);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

@Override
public AsyncRunner runAsync() {
public AsyncRunner runAsync(TransactionOption... options) {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getSession().runAsync();
return getSession().runAsync(options);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

@Override
public AsyncTransactionManager transactionManagerAsync() {
public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getSession().transactionManagerAsync();
return getSession().transactionManagerAsync(options);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

@Override
public long executePartitionedUpdate(final Statement stmt) {
public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) {
Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
new Function<Session, Long>() {
@Override
public Long apply(Session session) {
return session.executePartitionedUpdate(stmt);
return session.executePartitionedUpdate(stmt, options);
}
});
} catch (RuntimeException e) {
Expand Down
Expand Up @@ -30,12 +30,19 @@ public interface ReadAndQueryOption extends ReadOption, QueryOption {}
/** Marker interface to mark options applicable to read operation */
public interface ReadOption {}

/** Marker interface to mark options applicable to Read, Query, Update and Write operations */
public interface ReadQueryUpdateTransactionOption
extends ReadOption, QueryOption, UpdateOption, TransactionOption {}

/** Marker interface to mark options applicable to query operation. */
public interface QueryOption {}

/** Marker interface to mark options applicable to write operations */
public interface TransactionOption {}

/** Marker interface to mark options applicable to update operation. */
public interface UpdateOption {}

/** Marker interface to mark options applicable to list operations in admin API. */
public interface ListOption {}

Expand Down Expand Up @@ -287,6 +294,26 @@ static Options fromQueryOptions(QueryOption... options) {
return readOptions;
}

static Options fromUpdateOptions(UpdateOption... options) {
Options updateOptions = new Options();
for (UpdateOption option : options) {
if (option instanceof InternalOption) {
((InternalOption) option).appendToOptions(updateOptions);
}
}
return updateOptions;
}

static Options fromTransactionOptions(TransactionOption... options) {
Options transactionOptions = new Options();
for (TransactionOption option : options) {
if (option instanceof InternalOption) {
((InternalOption) option).appendToOptions(transactionOptions);
}
}
return transactionOptions;
}

static Options fromListOptions(ListOption... options) {
Options listOptions = new Options();
for (ListOption option : options) {
Expand Down

0 comments on commit eed83c7

Please sign in to comment.