Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: transaction callable as functional interface #1066

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -23,7 +23,6 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -60,16 +59,13 @@ public void run() {

private <R> R runTransaction(final AsyncWork<R> work) {
return delegate.run(
new TransactionCallable<R>() {
@Override
public R run(TransactionContext transaction) throws Exception {
try {
return work.doWorkAsync(transaction).get();
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause());
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
}
transaction -> {
try {
return work.doWorkAsync(transaction).get();
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause());
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
}
});
}
Expand Down
Expand Up @@ -141,12 +141,9 @@ public CommitResponse writeWithOptions(Iterable<Mutation> mutations, Transaction
? (Collection<Mutation>) mutations
: Lists.newArrayList(mutations);
runner.run(
new TransactionRunner.TransactionCallable<Void>() {
@Override
public Void run(TransactionContext ctx) {
ctx.buffer(finalMutations);
return null;
}
ctx -> {
ctx.buffer(finalMutations);
return null;
});
return runner.getCommitResponse();
}
Expand Down
Expand Up @@ -30,6 +30,7 @@
*/
public interface TransactionRunner {
/** A unit of work to be performed in the context of a transaction. */
@FunctionalInterface
interface TransactionCallable<T> {
/**
* Invoked by the library framework to perform a single attempt of a transaction. This method
Expand Down
Expand Up @@ -34,9 +34,7 @@
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.cloud.spanner.connection.StatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.StatementParser.StatementType;
import com.google.common.base.Function;
Expand Down Expand Up @@ -357,12 +355,7 @@ public Long call() throws Exception {
writeTransaction = createWriteTransaction();
Long res =
writeTransaction.run(
new TransactionCallable<Long>() {
@Override
public Long run(TransactionContext transaction) throws Exception {
return transaction.executeUpdate(update.getStatement());
}
});
transaction -> transaction.executeUpdate(update.getStatement()));
state = UnitOfWorkState.COMMITTED;
return res;
} catch (Throwable t) {
Expand Down Expand Up @@ -404,31 +397,28 @@ private ApiFuture<long[]> executeTransactionalBatchUpdateAsync(
public long[] call() throws Exception {
writeTransaction = createWriteTransaction();
return writeTransaction.run(
new TransactionCallable<long[]>() {
@Override
public long[] run(TransactionContext transaction) throws Exception {
try {
long[] res =
transaction.batchUpdate(
Iterables.transform(
updates,
new Function<ParsedStatement, Statement>() {
@Override
public Statement apply(ParsedStatement input) {
return input.getStatement();
}
}));
transaction -> {
try {
long[] res =
transaction.batchUpdate(
Iterables.transform(
updates,
new Function<ParsedStatement, Statement>() {
@Override
public Statement apply(ParsedStatement input) {
return input.getStatement();
}
}));
state = UnitOfWorkState.COMMITTED;
return res;
} catch (Throwable t) {
if (t instanceof SpannerBatchUpdateException) {
// Batch update exceptions does not cause a rollback.
state = UnitOfWorkState.COMMITTED;
return res;
} catch (Throwable t) {
if (t instanceof SpannerBatchUpdateException) {
// Batch update exceptions does not cause a rollback.
state = UnitOfWorkState.COMMITTED;
} else {
state = UnitOfWorkState.COMMIT_FAILED;
}
throw t;
} else {
state = UnitOfWorkState.COMMIT_FAILED;
}
throw t;
}
});
}
Expand All @@ -455,12 +445,9 @@ public Void call() throws Exception {
writeTransaction = createWriteTransaction();
Void res =
writeTransaction.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws Exception {
transaction.buffer(mutations);
return null;
}
transaction -> {
transaction.buffer(mutations);
return null;
});
state = UnitOfWorkState.COMMITTED;
return res;
Expand Down
Expand Up @@ -24,7 +24,6 @@
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.protobuf.ListValue;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.StructType;
Expand Down Expand Up @@ -205,13 +204,7 @@ private final class WriteRunnable implements Runnable {
@Override
public void run() {
TransactionRunner runner = client.readWriteTransaction();
runner.run(
new TransactionCallable<Long>() {
@Override
public Long run(TransactionContext transaction) {
return transaction.executeUpdate(UPDATE_STATEMENT);
}
});
runner.run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT));
}
}
}