Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce TransactionOptions and UpdateOptions #716

Merged
merged 2 commits into from Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 47 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -406,4 +406,51 @@
<className>com/google/cloud/spanner/AbstractLazyInitializer</className>
<method>java.lang.Object initialize()</method>
</difference>

<!-- TransactionOptions and UpdateOptions -->
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>long executePartitionedUpdate(com.google.cloud.spanner.Statement)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.TransactionRunner readWriteTransaction()</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.AsyncRunner runAsync()</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.TransactionManager transactionManager()</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/DatabaseClient</className>
<method>com.google.cloud.spanner.AsyncTransactionManager transactionManagerAsync()</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>long[] batchUpdate(java.lang.Iterable)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>com.google.api.core.ApiFuture batchUpdateAsync(java.lang.Iterable)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>long executeUpdate(com.google.cloud.spanner.Statement)</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>com.google.api.core.ApiFuture executeUpdateAsync(com.google.cloud.spanner.Statement)</method>
</difference>
</differences>
Expand Up @@ -554,7 +554,8 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) {
return builder.build();
}

ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, QueryMode queryMode) {
ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
Statement statement, QueryMode queryMode, Options options) {
ExecuteSqlRequest.Builder builder =
ExecuteSqlRequest.newBuilder()
.setSql(statement.getSql())
Expand All @@ -577,7 +578,8 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(Statement statement, Query
return builder;
}

ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(Iterable<Statement> statements) {
ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(
Iterable<Statement> statements, Options options) {
ExecuteBatchDmlRequest.Builder builder =
ExecuteBatchDmlRequest.newBuilder().setSession(session.getName());
int idx = 0;
Expand Down Expand Up @@ -609,7 +611,7 @@ ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(Iterable<Stateme
ResultSet executeQueryInternalWithOptions(
final Statement statement,
final com.google.spanner.v1.ExecuteSqlRequest.QueryMode queryMode,
Options options,
final Options options,
final ByteString partitionToken) {
beforeReadOrQuery();
final int prefetchChunks =
Expand All @@ -620,7 +622,7 @@ ResultSet executeQueryInternalWithOptions(
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks);
final ExecuteSqlRequest.Builder request =
getExecuteSqlRequestBuilder(statement, queryMode);
getExecuteSqlRequestBuilder(statement, queryMode, options);
if (partitionToken != null) {
request.setPartitionToken(partitionToken);
}
Expand Down Expand Up @@ -707,7 +709,7 @@ ResultSet readInternalWithOptions(
@Nullable String index,
KeySet keys,
Iterable<String> columns,
Options readOptions,
final Options readOptions,
ByteString partitionToken) {
beforeReadOrQuery();
final ReadRequest.Builder builder =
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager;
import com.google.cloud.spanner.TransactionManager.TransactionState;
Expand All @@ -40,14 +41,16 @@ final class AsyncTransactionManagerImpl

private final SessionImpl session;
private Span span;
private final Options options;

private TransactionRunnerImpl.TransactionContextImpl txn;
private TransactionState txnState;
private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create();

AsyncTransactionManagerImpl(SessionImpl session, Span span) {
AsyncTransactionManagerImpl(SessionImpl session, Span span, TransactionOption... options) {
this.session = session;
this.span = span;
this.options = Options.fromTransactionOptions(options);
}

@Override
Expand Down Expand Up @@ -82,7 +85,7 @@ public TransactionContextFutureImpl beginAsync() {

private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
txnState = TransactionState.STARTED;
txn = session.newTransaction();
txn = session.newTransaction(options);
if (firstAttempt) {
session.setActive(this);
}
Expand Down
Expand Up @@ -18,6 +18,7 @@

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;

/**
* Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An
Expand Down Expand Up @@ -308,7 +309,7 @@ CommitResponse writeAtLeastOnceWithOptions(
* });
* </code></pre>
*/
TransactionRunner readWriteTransaction();
TransactionRunner readWriteTransaction(TransactionOption... options);

/**
* Returns a transaction manager which allows manual management of transaction lifecycle. This API
Expand Down Expand Up @@ -338,7 +339,7 @@ CommitResponse writeAtLeastOnceWithOptions(
* }
* }</pre>
*/
TransactionManager transactionManager();
TransactionManager transactionManager(TransactionOption... options);

/**
* Returns an asynchronous transaction runner for executing a single logical transaction with
Expand Down Expand Up @@ -371,7 +372,7 @@ CommitResponse writeAtLeastOnceWithOptions(
* executor);
* </code></pre>
*/
AsyncRunner runAsync();
AsyncRunner runAsync(TransactionOption... options);

/**
* Returns an asynchronous transaction manager which allows manual management of transaction
Expand Down Expand Up @@ -459,7 +460,7 @@ CommitResponse writeAtLeastOnceWithOptions(
* }
* }</pre>
*/
AsyncTransactionManager transactionManagerAsync();
AsyncTransactionManager transactionManagerAsync(TransactionOption... options);

/**
* Returns the lower bound of rows modified by this DML statement.
Expand Down Expand Up @@ -508,5 +509,5 @@ CommitResponse writeAtLeastOnceWithOptions(
* <p>Given the above, Partitioned DML is good fit for large, database-wide, operations that are
* idempotent, such as deleting old rows from a very large table.
*/
long executePartitionedUpdate(Statement stmt);
long executePartitionedUpdate(Statement stmt, UpdateOption... options);
}
Expand Up @@ -18,6 +18,7 @@

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -54,13 +55,20 @@ PooledSessionFuture getSession() {

@Override
public Timestamp write(final Iterable<Mutation> mutations) throws SpannerException {
return writeWithOptions(mutations).getCommitTimestamp();
}

@Override
public CommitResponse writeWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
new Function<Session, Timestamp>() {
new Function<Session, CommitResponse>() {
@Override
public Timestamp apply(Session session) {
return session.write(mutations);
public CommitResponse apply(Session session) {
return session.writeWithOptions(mutations, options);
}
});
} catch (RuntimeException e) {
Expand All @@ -72,21 +80,21 @@ public Timestamp apply(Session session) {
}

@Override
public CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption... options)
throws SpannerException {
final Timestamp commitTimestamp = write(mutations);
return new CommitResponse(commitTimestamp);
public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws SpannerException {
return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp();
}

@Override
public Timestamp writeAtLeastOnce(final Iterable<Mutation> mutations) throws SpannerException {
public CommitResponse writeAtLeastOnceWithOptions(
final Iterable<Mutation> mutations, final TransactionOption... options)
throws SpannerException {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
new Function<Session, Timestamp>() {
new Function<Session, CommitResponse>() {
@Override
public Timestamp apply(Session session) {
return session.writeAtLeastOnce(mutations);
public CommitResponse apply(Session session) {
return session.writeAtLeastOnceWithOptions(mutations, options);
}
});
} catch (RuntimeException e) {
Expand All @@ -97,13 +105,6 @@ public Timestamp apply(Session session) {
}
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
final Timestamp commitTimestamp = writeAtLeastOnce(mutations);
return new CommitResponse(commitTimestamp);
}

@Override
public ReadContext singleUse() {
Span span = tracer.spanBuilder(READ_ONLY_TRANSACTION).startSpan();
Expand Down Expand Up @@ -171,10 +172,10 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
}

@Override
public TransactionRunner readWriteTransaction() {
public TransactionRunner readWriteTransaction(TransactionOption... options) {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getSession().readWriteTransaction();
return getSession().readWriteTransaction(options);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
Expand All @@ -184,47 +185,47 @@ public TransactionRunner readWriteTransaction() {
}

@Override
public TransactionManager transactionManager() {
public TransactionManager transactionManager(TransactionOption... options) {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getSession().transactionManager();
return getSession().transactionManager(options);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

@Override
public AsyncRunner runAsync() {
public AsyncRunner runAsync(TransactionOption... options) {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getSession().runAsync();
return getSession().runAsync(options);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

@Override
public AsyncTransactionManager transactionManagerAsync() {
public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getSession().transactionManagerAsync();
return getSession().transactionManagerAsync(options);
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

@Override
public long executePartitionedUpdate(final Statement stmt) {
public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) {
Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return runWithSessionRetry(
new Function<Session, Long>() {
@Override
public Long apply(Session session) {
return session.executePartitionedUpdate(stmt);
return session.executePartitionedUpdate(stmt, options);
}
});
} catch (RuntimeException e) {
Expand Down
Expand Up @@ -30,12 +30,19 @@ public interface ReadAndQueryOption extends ReadOption, QueryOption {}
/** Marker interface to mark options applicable to read operation */
public interface ReadOption {}

/** Marker interface to mark options applicable to Read, Query, Update and Write operations */
public interface ReadQueryUpdateTransactionOption
extends ReadOption, QueryOption, UpdateOption, TransactionOption {}

/** Marker interface to mark options applicable to query operation. */
public interface QueryOption {}

/** Marker interface to mark options applicable to write operations */
public interface TransactionOption {}

/** Marker interface to mark options applicable to update operation. */
public interface UpdateOption {}

/** Marker interface to mark options applicable to list operations in admin API. */
public interface ListOption {}

Expand Down Expand Up @@ -287,6 +294,26 @@ static Options fromQueryOptions(QueryOption... options) {
return readOptions;
}

static Options fromUpdateOptions(UpdateOption... options) {
olavloite marked this conversation as resolved.
Show resolved Hide resolved
Options updateOptions = new Options();
for (UpdateOption option : options) {
if (option instanceof InternalOption) {
((InternalOption) option).appendToOptions(updateOptions);
}
}
return updateOptions;
}

static Options fromTransactionOptions(TransactionOption... options) {
olavloite marked this conversation as resolved.
Show resolved Hide resolved
Options transactionOptions = new Options();
for (TransactionOption option : options) {
if (option instanceof InternalOption) {
((InternalOption) option).appendToOptions(transactionOptions);
}
}
return transactionOptions;
}

static Options fromListOptions(ListOption... options) {
Options listOptions = new Options();
for (ListOption option : options) {
Expand Down