Skip to content

Commit

Permalink
feat: add bufferAsync methods (#1145)
Browse files Browse the repository at this point in the history
* feat: add bufferAsync methods

Adds bufferAsync methods to TransactionContext. The existing buffer methods
were already non-blocking, but the async versions also return an ApiFuture,
which make them easier to use when chaining multiple async calls together.

Also changes some calls in the AsyncTransactionManagerTest to use lambdas
instead of the test helper methods.

Fixes #1126

* fix: do not take lock on async method

* build: remove custom skip tests variable

* test: add test for committing twice

* fix: synchronize buffering and committing
  • Loading branch information
olavloite committed May 18, 2021
1 parent e70b009 commit 7d6816f
Show file tree
Hide file tree
Showing 9 changed files with 574 additions and 326 deletions.
13 changes: 13 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -605,4 +605,17 @@
<className>com/google/cloud/spanner/StructReader</className>
<method>com.google.cloud.spanner.Value getValue(java.lang.String)</method>
</difference>

<!-- Adds bufferAsync to DatabaseClient -->
<!-- These are not breaking changes, since we provide default interface implementation -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>com.google.api.core.ApiFuture bufferAsync(com.google.cloud.spanner.Mutation)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>com.google.api.core.ApiFuture bufferAsync(java.lang.Iterable)</method>
</difference>
</differences>
Expand Up @@ -18,6 +18,7 @@

import com.google.api.core.ApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
import com.google.cloud.spanner.TransactionManager.TransactionState;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -98,31 +99,21 @@ Timestamp get(long timeout, TimeUnit unit)
* <p>Example usage:
*
* <pre>{@code
* TransactionContextFuture txnFuture = manager.beginAsync();
* final String column = "FirstName";
* txnFuture.then(
* new AsyncTransactionFunction<Void, Struct>() {
* @Override
* public ApiFuture<Struct> apply(TransactionContext txn, Void input)
* throws Exception {
* return txn.readRowAsync(
* "Singers", Key.of(singerId), Collections.singleton(column));
* }
* })
* .then(
* new AsyncTransactionFunction<Struct, Void>() {
* @Override
* public ApiFuture<Void> apply(TransactionContext txn, Struct input)
* throws Exception {
* String name = input.getString(column);
* txn.buffer(
* Mutation.newUpdateBuilder("Singers")
* .set(column)
* .to(name.toUpperCase())
* .build());
* return ApiFutures.immediateFuture(null);
* }
* })
* final long singerId = 1L;
* AsyncTransactionManager manager = client.transactionManagerAsync();
* TransactionContextFuture txnFuture = manager.beginAsync();
* txnFuture
* .then((transaction, ignored) ->
* transaction.readRowAsync("Singers", Key.of(singerId), Collections.singleton(column)),
* executor)
* .then((transaction, row) ->
* transaction.bufferAsync(
* Mutation.newUpdateBuilder("Singers")
* .set(column).to(row.getString(column).toUpperCase())
* .build()),
* executor)
* .commitAsync();
* }</pre>
*/
interface AsyncTransactionStep<I, O> extends ApiFuture<O> {
Expand Down
Expand Up @@ -431,8 +431,7 @@ CommitResponse writeAtLeastOnceWithOptions(
* lifecycle. This API is meant for advanced users. Most users should instead use the {@link
* #runAsync()} API instead.
*
* <p>Example of using {@link AsyncTransactionManager} with lambda expressions (Java 8 and
* higher).
* <p>Example of using {@link AsyncTransactionManager}.
*
* <pre>{@code
* long singerId = 1L;
Expand All @@ -449,56 +448,11 @@ CommitResponse writeAtLeastOnceWithOptions(
* .then(
* (transaction, row) -> {
* String name = row.getString(column);
* transaction.buffer(
* return transaction.bufferAsync(
* Mutation.newUpdateBuilder("Singers")
* .set(column)
* .to(name.toUpperCase())
* .build());
* return ApiFutures.immediateFuture(null);
* })
* .commitAsync();
* try {
* commitTimestamp.get();
* break;
* } catch (AbortedException e) {
* Thread.sleep(e.getRetryDelayInMillis());
* transactionFuture = manager.resetForRetryAsync();
* }
* }
* }
* }</pre>
*
* <p>Example of using {@link AsyncTransactionManager} (Java 7).
*
* <pre>{@code
* final long singerId = 1L;
* try (AsyncTransactionManager manager = client().transactionManagerAsync()) {
* TransactionContextFuture transactionFuture = manager.beginAsync();
* while (true) {
* final String column = "FirstName";
* CommitTimestampFuture commitTimestamp =
* transactionFuture.then(
* new AsyncTransactionFunction<Void, Struct>() {
* @Override
* public ApiFuture<Struct> apply(TransactionContext transaction, Void input)
* throws Exception {
* return transaction.readRowAsync(
* "Singers", Key.of(singerId), Collections.singleton(column));
* }
* })
* .then(
* new AsyncTransactionFunction<Struct, Void>() {
* @Override
* public ApiFuture<Void> apply(TransactionContext transaction, Struct input)
* throws Exception {
* String name = input.getString(column);
* transaction.buffer(
* Mutation.newUpdateBuilder("Singers")
* .set(column)
* .to(name.toUpperCase())
* .build());
* return ApiFutures.immediateFuture(null);
* }
* })
* .commitAsync();
* try {
Expand Down
Expand Up @@ -675,6 +675,11 @@ public void buffer(Mutation mutation) {
delegate.buffer(mutation);
}

@Override
public ApiFuture<Void> bufferAsync(Mutation mutation) {
return delegate.bufferAsync(mutation);
}

@Override
public Struct readRowUsingIndex(String table, String index, Key key, Iterable<String> columns) {
try {
Expand Down Expand Up @@ -703,6 +708,11 @@ public void buffer(Iterable<Mutation> mutations) {
delegate.buffer(mutations);
}

@Override
public ApiFuture<Void> bufferAsync(Iterable<Mutation> mutations) {
return delegate.bufferAsync(mutations);
}

@Override
public long executeUpdate(Statement statement, UpdateOption... options) {
try {
Expand Down
Expand Up @@ -91,13 +91,23 @@ public interface TransactionContext extends ReadContext {
*/
void buffer(Mutation mutation);

/** Same as {@link #buffer(Mutation)}, but is guaranteed to be non-blocking. */
default ApiFuture<Void> bufferAsync(Mutation mutation) {
throw new UnsupportedOperationException("method should be overwritten");
}

/**
* Buffers mutations to be applied if the transaction commits successfully. The effects of the
* mutations will not be visible to subsequent operations in the transaction. All buffered
* mutations will be applied atomically.
*/
void buffer(Iterable<Mutation> mutations);

/** Same as {@link #buffer(Iterable)}, but is guaranteed to be non-blocking. */
default ApiFuture<Void> bufferAsync(Iterable<Mutation> mutations) {
throw new UnsupportedOperationException("method should be overwritten");
}

/**
* Executes the DML statement(s) and returns the number of rows modified. For non-DML statements,
* it will result in an {@code IllegalArgumentException}. The effects of the DML statement will be
Expand Down
Expand Up @@ -54,7 +54,9 @@
import io.opencensus.trace.Tracing;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
Expand All @@ -75,6 +77,9 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {
*/
private static final String TRANSACTION_CANCELLED_MESSAGE = "invalidated by a later transaction";

private static final String TRANSACTION_ALREADY_COMMITTED_MESSAGE =
"Transaction has already committed";

@VisibleForTesting
static class TransactionContextImpl extends AbstractReadContext implements TransactionContext {
static class Builder extends AbstractReadContext.Builder<Builder, TransactionContextImpl> {
Expand Down Expand Up @@ -146,7 +151,9 @@ public void removeListener(Runnable listener) {
}
}

@GuardedBy("lock")
private final Object committingLock = new Object();

@GuardedBy("committingLock")
private volatile boolean committing;

@GuardedBy("lock")
Expand All @@ -155,8 +162,7 @@ public void removeListener(Runnable listener) {
@GuardedBy("lock")
private volatile int runningAsyncOperations;

@GuardedBy("lock")
private List<Mutation> mutations = new ArrayList<>();
private final Queue<Mutation> mutations = new ConcurrentLinkedQueue<>();

@GuardedBy("lock")
private boolean aborted;
Expand Down Expand Up @@ -280,6 +286,16 @@ void commit() {
volatile ApiFuture<CommitResponse> commitFuture;

ApiFuture<CommitResponse> commitAsync() {
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
synchronized (committingLock) {
if (committing) {
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
}
committing = true;
if (!mutations.isEmpty()) {
Mutation.toProto(mutations, mutationsProto);
}
}
final SettableApiFuture<CommitResponse> res = SettableApiFuture.create();
final SettableApiFuture<Void> finishOps;
CommitRequest.Builder builder =
Expand All @@ -303,14 +319,8 @@ ApiFuture<CommitResponse> commitAsync() {
} else {
finishOps = finishedAsyncOperations;
}
if (!mutations.isEmpty()) {
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutations, mutationsProto);
builder.addAllMutations(mutationsProto);
}
// Ensure that no call to buffer mutations that would be lost can succeed.
mutations = null;
}
builder.addAllMutations(mutationsProto);
finishOps.addListener(
new CommitRunnable(res, finishOps, builder), MoreExecutors.directExecutor());
return res;
Expand Down Expand Up @@ -603,22 +613,44 @@ public void onDone(boolean withBeginTransaction) {

@Override
public void buffer(Mutation mutation) {
synchronized (lock) {
checkNotNull(mutations, "Context is closed");
synchronized (committingLock) {
if (committing) {
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
}
mutations.add(checkNotNull(mutation));
}
}

@Override
public ApiFuture<Void> bufferAsync(Mutation mutation) {
// Normally, we would call the async method from the sync method, but this is also safe as
// both are non-blocking anyways, and this prevents the creation of an ApiFuture that is not
// really used when the sync method is called.
buffer(mutation);
return ApiFutures.immediateFuture(null);
}

@Override
public void buffer(Iterable<Mutation> mutations) {
synchronized (lock) {
checkNotNull(this.mutations, "Context is closed");
synchronized (committingLock) {
if (committing) {
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
}
for (Mutation mutation : mutations) {
this.mutations.add(checkNotNull(mutation));
}
}
}

@Override
public ApiFuture<Void> bufferAsync(Iterable<Mutation> mutations) {
// Normally, we would call the async method from the sync method, but this is also safe as
// both are non-blocking anyways, and this prevents the creation of an ApiFuture that is not
// really used when the sync method is called.
buffer(mutations);
return ApiFutures.immediateFuture(null);
}

@Override
public long executeUpdate(Statement statement, UpdateOption... options) {
beforeReadOrQuery();
Expand Down

0 comments on commit 7d6816f

Please sign in to comment.