Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add bufferAsync methods #1145

Merged
merged 7 commits into from May 18, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Expand Up @@ -605,4 +605,17 @@
<className>com/google/cloud/spanner/StructReader</className>
<method>com.google.cloud.spanner.Value getValue(java.lang.String)</method>
</difference>

<!-- Adds bufferAsync to DatabaseClient -->
<!-- These are not breaking changes, since we provide default interface implementation -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>com.google.api.core.ApiFuture bufferAsync(com.google.cloud.spanner.Mutation)</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/TransactionContext</className>
<method>com.google.api.core.ApiFuture bufferAsync(java.lang.Iterable)</method>
</difference>
</differences>
Expand Up @@ -18,6 +18,7 @@

import com.google.api.core.ApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
import com.google.cloud.spanner.TransactionManager.TransactionState;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -98,31 +99,21 @@ Timestamp get(long timeout, TimeUnit unit)
* <p>Example usage:
*
* <pre>{@code
* TransactionContextFuture txnFuture = manager.beginAsync();
* final String column = "FirstName";
* txnFuture.then(
* new AsyncTransactionFunction<Void, Struct>() {
* @Override
* public ApiFuture<Struct> apply(TransactionContext txn, Void input)
* throws Exception {
* return txn.readRowAsync(
* "Singers", Key.of(singerId), Collections.singleton(column));
* }
* })
* .then(
* new AsyncTransactionFunction<Struct, Void>() {
* @Override
* public ApiFuture<Void> apply(TransactionContext txn, Struct input)
* throws Exception {
* String name = input.getString(column);
* txn.buffer(
* Mutation.newUpdateBuilder("Singers")
* .set(column)
* .to(name.toUpperCase())
* .build());
* return ApiFutures.immediateFuture(null);
* }
* })
* final long singerId = 1L;
* AsyncTransactionManager manager = client.transactionManagerAsync();
* TransactionContextFuture txnFuture = manager.beginAsync();
* txnFuture
* .then((transaction, ignored) ->
* transaction.readRowAsync("Singers", Key.of(singerId), Collections.singleton(column)),
* executor)
* .then((transaction, row) ->
* transaction.bufferAsync(
* Mutation.newUpdateBuilder("Singers")
* .set(column).to(row.getString(column).toUpperCase())
* .build()),
* executor)
* .commitAsync();
* }</pre>
*/
interface AsyncTransactionStep<I, O> extends ApiFuture<O> {
Expand Down
Expand Up @@ -431,8 +431,7 @@ CommitResponse writeAtLeastOnceWithOptions(
* lifecycle. This API is meant for advanced users. Most users should instead use the {@link
* #runAsync()} API instead.
*
* <p>Example of using {@link AsyncTransactionManager} with lambda expressions (Java 8 and
* higher).
* <p>Example of using {@link AsyncTransactionManager}.
*
* <pre>{@code
* long singerId = 1L;
Expand All @@ -449,56 +448,11 @@ CommitResponse writeAtLeastOnceWithOptions(
* .then(
* (transaction, row) -> {
* String name = row.getString(column);
* transaction.buffer(
* return transaction.bufferAsync(
* Mutation.newUpdateBuilder("Singers")
* .set(column)
* .to(name.toUpperCase())
* .build());
* return ApiFutures.immediateFuture(null);
* })
* .commitAsync();
* try {
* commitTimestamp.get();
* break;
* } catch (AbortedException e) {
* Thread.sleep(e.getRetryDelayInMillis());
* transactionFuture = manager.resetForRetryAsync();
* }
* }
* }
* }</pre>
*
* <p>Example of using {@link AsyncTransactionManager} (Java 7).
*
* <pre>{@code
* final long singerId = 1L;
* try (AsyncTransactionManager manager = client().transactionManagerAsync()) {
* TransactionContextFuture transactionFuture = manager.beginAsync();
* while (true) {
* final String column = "FirstName";
* CommitTimestampFuture commitTimestamp =
* transactionFuture.then(
* new AsyncTransactionFunction<Void, Struct>() {
* @Override
* public ApiFuture<Struct> apply(TransactionContext transaction, Void input)
* throws Exception {
* return transaction.readRowAsync(
* "Singers", Key.of(singerId), Collections.singleton(column));
* }
* })
* .then(
* new AsyncTransactionFunction<Struct, Void>() {
* @Override
* public ApiFuture<Void> apply(TransactionContext transaction, Struct input)
* throws Exception {
* String name = input.getString(column);
* transaction.buffer(
* Mutation.newUpdateBuilder("Singers")
* .set(column)
* .to(name.toUpperCase())
* .build());
* return ApiFutures.immediateFuture(null);
* }
* })
* .commitAsync();
* try {
Expand Down
Expand Up @@ -675,6 +675,11 @@ public void buffer(Mutation mutation) {
delegate.buffer(mutation);
}

@Override
public ApiFuture<Void> bufferAsync(Mutation mutation) {
return delegate.bufferAsync(mutation);
}

@Override
public Struct readRowUsingIndex(String table, String index, Key key, Iterable<String> columns) {
try {
Expand Down Expand Up @@ -703,6 +708,11 @@ public void buffer(Iterable<Mutation> mutations) {
delegate.buffer(mutations);
}

@Override
public ApiFuture<Void> bufferAsync(Iterable<Mutation> mutations) {
return delegate.bufferAsync(mutations);
}

@Override
public long executeUpdate(Statement statement, UpdateOption... options) {
try {
Expand Down
Expand Up @@ -91,13 +91,23 @@ public interface TransactionContext extends ReadContext {
*/
void buffer(Mutation mutation);

/** Same as {@link #buffer(Mutation)}, but is guaranteed to be non-blocking. */
default ApiFuture<Void> bufferAsync(Mutation mutation) {
throw new UnsupportedOperationException("method should be overwritten");
}

/**
* Buffers mutations to be applied if the transaction commits successfully. The effects of the
* mutations will not be visible to subsequent operations in the transaction. All buffered
* mutations will be applied atomically.
*/
void buffer(Iterable<Mutation> mutations);

/** Same as {@link #buffer(Iterable)}, but is guaranteed to be non-blocking. */
default ApiFuture<Void> bufferAsync(Iterable<Mutation> mutations) {
throw new UnsupportedOperationException("method should be overwritten");
}

/**
* Executes the DML statement(s) and returns the number of rows modified. For non-DML statements,
* it will result in an {@code IllegalArgumentException}. The effects of the DML statement will be
Expand Down
Expand Up @@ -609,6 +609,15 @@ public void buffer(Mutation mutation) {
}
}

@Override
public ApiFuture<Void> bufferAsync(Mutation mutation) {
// Normally, we would call the async method from the sync method, but this is also safe as
// both are non-blocking anyways, and this prevents the creation of an ApiFuture that is not
// really used when the sync method is called.
buffer(mutation);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do have a worry here: it seems we acquire a lock in the buffer method, so that could potentially have some waiting which is not expected by the user.

How big of a problem do you think this is?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. We actually take this lock in a couple of the other async methods as well already, so in that sense this does not deviate from the existing methods. The lock is only held for a short period of time in all cases, as it is only held while executing local operations. So in that sense I don't think it is a big problem. Still, we should preferably not take any locks in a method that is marked as non-blocking, as there could in theory be some waiting, albeit short.

I'll have a look and see if I can fix this for all the async methods in this class.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've not been able to come up with a solution without any locking, but I've reduced it to an absolute minimum by introducing a separate lock that is only for ensuring that mutations that are buffered are either included in the commit, or will throw an exception if commit has already been called. It is not perfect, but the locking should be absolutely minimal as the lock is only held for a couple of simple statements in all cases.

return ApiFutures.immediateFuture(null);
}

@Override
public void buffer(Iterable<Mutation> mutations) {
synchronized (lock) {
Expand All @@ -619,6 +628,15 @@ public void buffer(Iterable<Mutation> mutations) {
}
}

@Override
public ApiFuture<Void> bufferAsync(Iterable<Mutation> mutations) {
// Normally, we would call the async method from the sync method, but this is also safe as
// both are non-blocking anyways, and this prevents the creation of an ApiFuture that is not
// really used when the sync method is called.
buffer(mutations);
return ApiFutures.immediateFuture(null);
}

@Override
public long executeUpdate(Statement statement, UpdateOption... options) {
beforeReadOrQuery();
Expand Down