Skip to content

Commit

Permalink
feat: transaction callable as functional interface (#1066)
Browse files Browse the repository at this point in the history
* feat: transaction callable as functional interface

Marks the transaction callable as a functional interface.

* samples: uses lambdas in samples
  • Loading branch information
thiagotnunes committed Apr 15, 2021
1 parent 1d4eed4 commit b036a77
Show file tree
Hide file tree
Showing 27 changed files with 952 additions and 1,591 deletions.
Expand Up @@ -23,7 +23,6 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -60,16 +59,13 @@ public void run() {

private <R> R runTransaction(final AsyncWork<R> work) {
return delegate.run(
new TransactionCallable<R>() {
@Override
public R run(TransactionContext transaction) throws Exception {
try {
return work.doWorkAsync(transaction).get();
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause());
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
}
transaction -> {
try {
return work.doWorkAsync(transaction).get();
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause());
} catch (InterruptedException e) {
throw SpannerExceptionFactory.propagateInterrupt(e);
}
});
}
Expand Down
Expand Up @@ -141,12 +141,9 @@ public CommitResponse writeWithOptions(Iterable<Mutation> mutations, Transaction
? (Collection<Mutation>) mutations
: Lists.newArrayList(mutations);
runner.run(
new TransactionRunner.TransactionCallable<Void>() {
@Override
public Void run(TransactionContext ctx) {
ctx.buffer(finalMutations);
return null;
}
ctx -> {
ctx.buffer(finalMutations);
return null;
});
return runner.getCommitResponse();
}
Expand Down
Expand Up @@ -30,6 +30,7 @@
*/
public interface TransactionRunner {
/** A unit of work to be performed in the context of a transaction. */
@FunctionalInterface
interface TransactionCallable<T> {
/**
* Invoked by the library framework to perform a single attempt of a transaction. This method
Expand Down
Expand Up @@ -34,9 +34,7 @@
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.spanner.TransactionContext;
import com.google.cloud.spanner.TransactionRunner;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.cloud.spanner.connection.StatementParser.ParsedStatement;
import com.google.cloud.spanner.connection.StatementParser.StatementType;
import com.google.common.base.Function;
Expand Down Expand Up @@ -357,12 +355,7 @@ public Long call() throws Exception {
writeTransaction = createWriteTransaction();
Long res =
writeTransaction.run(
new TransactionCallable<Long>() {
@Override
public Long run(TransactionContext transaction) throws Exception {
return transaction.executeUpdate(update.getStatement());
}
});
transaction -> transaction.executeUpdate(update.getStatement()));
state = UnitOfWorkState.COMMITTED;
return res;
} catch (Throwable t) {
Expand Down Expand Up @@ -404,31 +397,28 @@ private ApiFuture<long[]> executeTransactionalBatchUpdateAsync(
public long[] call() throws Exception {
writeTransaction = createWriteTransaction();
return writeTransaction.run(
new TransactionCallable<long[]>() {
@Override
public long[] run(TransactionContext transaction) throws Exception {
try {
long[] res =
transaction.batchUpdate(
Iterables.transform(
updates,
new Function<ParsedStatement, Statement>() {
@Override
public Statement apply(ParsedStatement input) {
return input.getStatement();
}
}));
transaction -> {
try {
long[] res =
transaction.batchUpdate(
Iterables.transform(
updates,
new Function<ParsedStatement, Statement>() {
@Override
public Statement apply(ParsedStatement input) {
return input.getStatement();
}
}));
state = UnitOfWorkState.COMMITTED;
return res;
} catch (Throwable t) {
if (t instanceof SpannerBatchUpdateException) {
// Batch update exceptions does not cause a rollback.
state = UnitOfWorkState.COMMITTED;
return res;
} catch (Throwable t) {
if (t instanceof SpannerBatchUpdateException) {
// Batch update exceptions does not cause a rollback.
state = UnitOfWorkState.COMMITTED;
} else {
state = UnitOfWorkState.COMMIT_FAILED;
}
throw t;
} else {
state = UnitOfWorkState.COMMIT_FAILED;
}
throw t;
}
});
}
Expand All @@ -455,12 +445,9 @@ public Void call() throws Exception {
writeTransaction = createWriteTransaction();
Void res =
writeTransaction.run(
new TransactionCallable<Void>() {
@Override
public Void run(TransactionContext transaction) throws Exception {
transaction.buffer(mutations);
return null;
}
transaction -> {
transaction.buffer(mutations);
return null;
});
state = UnitOfWorkState.COMMITTED;
return res;
Expand Down
Expand Up @@ -24,7 +24,6 @@
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.protobuf.ListValue;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.StructType;
Expand Down Expand Up @@ -205,13 +204,7 @@ private final class WriteRunnable implements Runnable {
@Override
public void run() {
TransactionRunner runner = client.readWriteTransaction();
runner.run(
new TransactionCallable<Long>() {
@Override
public Long run(TransactionContext transaction) {
return transaction.executeUpdate(UPDATE_STATEMENT);
}
});
runner.run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT));
}
}
}

0 comments on commit b036a77

Please sign in to comment.