Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add bufferAsync methods #1145

Merged
merged 7 commits into from May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -605,4 +605,17 @@
<className>com/google/cloud/spanner/StructReader</className>
<method>com.google.cloud.spanner.Value getValue(java.lang.String)</method>
</difference>

<!-- Adds bufferAsync to DatabaseClient -->
<!-- These are not breaking changes, since we provide default interface implementation -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>com.google.api.core.ApiFuture bufferAsync(com.google.cloud.spanner.Mutation)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>com.google.api.core.ApiFuture bufferAsync(java.lang.Iterable)</method>
</difference>
</differences>
Expand Up @@ -18,6 +18,7 @@

import com.google.api.core.ApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
import com.google.cloud.spanner.TransactionManager.TransactionState;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -98,31 +99,21 @@ Timestamp get(long timeout, TimeUnit unit)
* <p>Example usage:
*
* <pre>{@code
* TransactionContextFuture txnFuture = manager.beginAsync();
* final String column = "FirstName";
* txnFuture.then(
* new AsyncTransactionFunction<Void, Struct>() {
* @Override
* public ApiFuture<Struct> apply(TransactionContext txn, Void input)
* throws Exception {
* return txn.readRowAsync(
* "Singers", Key.of(singerId), Collections.singleton(column));
* }
* })
* .then(
* new AsyncTransactionFunction<Struct, Void>() {
* @Override
* public ApiFuture<Void> apply(TransactionContext txn, Struct input)
* throws Exception {
* String name = input.getString(column);
* txn.buffer(
* Mutation.newUpdateBuilder("Singers")
* .set(column)
* .to(name.toUpperCase())
* .build());
* return ApiFutures.immediateFuture(null);
* }
* })
* final long singerId = 1L;
* AsyncTransactionManager manager = client.transactionManagerAsync();
* TransactionContextFuture txnFuture = manager.beginAsync();
* txnFuture
* .then((transaction, ignored) ->
* transaction.readRowAsync("Singers", Key.of(singerId), Collections.singleton(column)),
* executor)
* .then((transaction, row) ->
* transaction.bufferAsync(
* Mutation.newUpdateBuilder("Singers")
* .set(column).to(row.getString(column).toUpperCase())
* .build()),
* executor)
* .commitAsync();
* }</pre>
*/
interface AsyncTransactionStep<I, O> extends ApiFuture<O> {
Expand Down
Expand Up @@ -431,8 +431,7 @@ CommitResponse writeAtLeastOnceWithOptions(
* lifecycle. This API is meant for advanced users. Most users should instead use the {@link
* #runAsync()} API instead.
*
* <p>Example of using {@link AsyncTransactionManager} with lambda expressions (Java 8 and
* higher).
* <p>Example of using {@link AsyncTransactionManager}.
*
* <pre>{@code
* long singerId = 1L;
Expand All @@ -449,56 +448,11 @@ CommitResponse writeAtLeastOnceWithOptions(
* .then(
* (transaction, row) -> {
* String name = row.getString(column);
* transaction.buffer(
* return transaction.bufferAsync(
* Mutation.newUpdateBuilder("Singers")
* .set(column)
* .to(name.toUpperCase())
* .build());
* return ApiFutures.immediateFuture(null);
* })
* .commitAsync();
* try {
* commitTimestamp.get();
* break;
* } catch (AbortedException e) {
* Thread.sleep(e.getRetryDelayInMillis());
* transactionFuture = manager.resetForRetryAsync();
* }
* }
* }
* }</pre>
*
* <p>Example of using {@link AsyncTransactionManager} (Java 7).
*
* <pre>{@code
* final long singerId = 1L;
* try (AsyncTransactionManager manager = client().transactionManagerAsync()) {
* TransactionContextFuture transactionFuture = manager.beginAsync();
* while (true) {
* final String column = "FirstName";
* CommitTimestampFuture commitTimestamp =
* transactionFuture.then(
* new AsyncTransactionFunction<Void, Struct>() {
* @Override
* public ApiFuture<Struct> apply(TransactionContext transaction, Void input)
* throws Exception {
* return transaction.readRowAsync(
* "Singers", Key.of(singerId), Collections.singleton(column));
* }
* })
* .then(
* new AsyncTransactionFunction<Struct, Void>() {
* @Override
* public ApiFuture<Void> apply(TransactionContext transaction, Struct input)
* throws Exception {
* String name = input.getString(column);
* transaction.buffer(
* Mutation.newUpdateBuilder("Singers")
* .set(column)
* .to(name.toUpperCase())
* .build());
* return ApiFutures.immediateFuture(null);
* }
* })
* .commitAsync();
* try {
Expand Down
Expand Up @@ -675,6 +675,11 @@ public void buffer(Mutation mutation) {
delegate.buffer(mutation);
}

@Override
public ApiFuture<Void> bufferAsync(Mutation mutation) {
return delegate.bufferAsync(mutation);
}

@Override
public Struct readRowUsingIndex(String table, String index, Key key, Iterable<String> columns) {
try {
Expand Down Expand Up @@ -703,6 +708,11 @@ public void buffer(Iterable<Mutation> mutations) {
delegate.buffer(mutations);
}

@Override
public ApiFuture<Void> bufferAsync(Iterable<Mutation> mutations) {
return delegate.bufferAsync(mutations);
}

@Override
public long executeUpdate(Statement statement, UpdateOption... options) {
try {
Expand Down
Expand Up @@ -91,13 +91,23 @@ public interface TransactionContext extends ReadContext {
*/
void buffer(Mutation mutation);

/** Same as {@link #buffer(Mutation)}, but is guaranteed to be non-blocking. */
default ApiFuture<Void> bufferAsync(Mutation mutation) {
throw new UnsupportedOperationException("method should be overwritten");
}

/**
* Buffers mutations to be applied if the transaction commits successfully. The effects of the
* mutations will not be visible to subsequent operations in the transaction. All buffered
* mutations will be applied atomically.
*/
void buffer(Iterable<Mutation> mutations);

/** Same as {@link #buffer(Iterable)}, but is guaranteed to be non-blocking. */
default ApiFuture<Void> bufferAsync(Iterable<Mutation> mutations) {
throw new UnsupportedOperationException("method should be overwritten");
}

/**
* Executes the DML statement(s) and returns the number of rows modified. For non-DML statements,
* it will result in an {@code IllegalArgumentException}. The effects of the DML statement will be
Expand Down
Expand Up @@ -54,7 +54,9 @@
import io.opencensus.trace.Tracing;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
Expand All @@ -75,6 +77,9 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {
*/
private static final String TRANSACTION_CANCELLED_MESSAGE = "invalidated by a later transaction";

private static final String TRANSACTION_ALREADY_COMMITTED_MESSAGE =
"Transaction has already committed";

@VisibleForTesting
static class TransactionContextImpl extends AbstractReadContext implements TransactionContext {
static class Builder extends AbstractReadContext.Builder<Builder, TransactionContextImpl> {
Expand Down Expand Up @@ -146,7 +151,9 @@ public void removeListener(Runnable listener) {
}
}

@GuardedBy("lock")
private final Object committingLock = new Object();

@GuardedBy("committingLock")
private volatile boolean committing;

@GuardedBy("lock")
Expand All @@ -155,8 +162,7 @@ public void removeListener(Runnable listener) {
@GuardedBy("lock")
private volatile int runningAsyncOperations;

@GuardedBy("lock")
private List<Mutation> mutations = new ArrayList<>();
private final Queue<Mutation> mutations = new ConcurrentLinkedQueue<>();

@GuardedBy("lock")
private boolean aborted;
Expand Down Expand Up @@ -280,6 +286,16 @@ void commit() {
volatile ApiFuture<CommitResponse> commitFuture;

ApiFuture<CommitResponse> commitAsync() {
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
synchronized (committingLock) {
if (committing) {
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
}
committing = true;
if (!mutations.isEmpty()) {
Mutation.toProto(mutations, mutationsProto);
}
}
final SettableApiFuture<CommitResponse> res = SettableApiFuture.create();
final SettableApiFuture<Void> finishOps;
CommitRequest.Builder builder =
Expand All @@ -303,14 +319,8 @@ ApiFuture<CommitResponse> commitAsync() {
} else {
finishOps = finishedAsyncOperations;
}
if (!mutations.isEmpty()) {
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutations, mutationsProto);
builder.addAllMutations(mutationsProto);
}
// Ensure that no call to buffer mutations that would be lost can succeed.
mutations = null;
}
builder.addAllMutations(mutationsProto);
finishOps.addListener(
new CommitRunnable(res, finishOps, builder), MoreExecutors.directExecutor());
return res;
Expand Down Expand Up @@ -603,22 +613,44 @@ public void onDone(boolean withBeginTransaction) {

@Override
public void buffer(Mutation mutation) {
synchronized (lock) {
checkNotNull(mutations, "Context is closed");
synchronized (committingLock) {
if (committing) {
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
}
mutations.add(checkNotNull(mutation));
}
}

@Override
public ApiFuture<Void> bufferAsync(Mutation mutation) {
// Normally, we would call the async method from the sync method, but this is also safe as
// both are non-blocking anyways, and this prevents the creation of an ApiFuture that is not
// really used when the sync method is called.
buffer(mutation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do have a worry here: it seems we acquire a lock in the buffer method, so that could potentially have some waiting which is not expected by the user.

How big of a problem do you think this is?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. We actually take this lock in a couple of the other async methods as well already, so in that sense this does not deviate from the existing methods. The lock is only held for a short period of time in all cases, as it is only held while executing local operations. So in that sense I don't think it is a big problem. Still, we should preferably not take any locks in a method that is marked as non-blocking, as there could in theory be some waiting, albeit short.

I'll have a look and see if I can fix this for all the async methods in this class.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've not been able to come up with a solution without any locking, but I've reduced it to an absolute minimum by introducing a separate lock that is only for ensuring that mutations that are buffered are either included in the commit, or will throw an exception if commit has already been called. It is not perfect, but the locking should be absolutely minimal as the lock is only held for a couple of simple statements in all cases.

return ApiFutures.immediateFuture(null);
}

@Override
public void buffer(Iterable<Mutation> mutations) {
synchronized (lock) {
checkNotNull(this.mutations, "Context is closed");
synchronized (committingLock) {
if (committing) {
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
}
for (Mutation mutation : mutations) {
this.mutations.add(checkNotNull(mutation));
}
}
}

@Override
public ApiFuture<Void> bufferAsync(Iterable<Mutation> mutations) {
// Normally, we would call the async method from the sync method, but this is also safe as
// both are non-blocking anyways, and this prevents the creation of an ApiFuture that is not
// really used when the sync method is called.
buffer(mutations);
return ApiFutures.immediateFuture(null);
}

@Override
public long executeUpdate(Statement statement, UpdateOption... options) {
beforeReadOrQuery();
Expand Down