Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat: async work as functional interface (#1068)
Marks the async work interface as a functional interface.
  • Loading branch information
thiagotnunes committed Apr 15, 2021
1 parent b036a77 commit 734fb60
Show file tree
Hide file tree
Showing 11 changed files with 376 additions and 601 deletions.
Expand Up @@ -27,6 +27,7 @@ public interface AsyncRunner {
* Functional interface for executing a read/write transaction asynchronously that returns a
* result of type R.
*/
@FunctionalInterface
interface AsyncWork<R> {
/**
* Performs a single transaction attempt. All reads/writes should be performed using {@code
Expand Down
Expand Up @@ -398,21 +398,18 @@ CommitResponse writeAtLeastOnceWithOptions(
* AsyncRunner runner = client.runAsync();
* ApiFuture<Long> rowCount =
* runner.runAsync(
* new AsyncWork<Long>() {
* @Override
* public ApiFuture<Long> doWorkAsync(TransactionContext txn) {
* String column = "FirstName";
* Struct row =
* txn.readRow("Singers", Key.of(singerId), Collections.singleton("Name"));
* String name = row.getString("Name");
* return txn.executeUpdateAsync(
* Statement.newBuilder("UPDATE Singers SET Name=@name WHERE SingerId=@id")
* .bind("id")
* .to(singerId)
* .bind("name")
* .to(name.toUpperCase())
* .build());
* }
* () -> {
* String column = "FirstName";
* Struct row =
* txn.readRow("Singers", Key.of(singerId), Collections.singleton("Name"));
* String name = row.getString("Name");
* return txn.executeUpdateAsync(
* Statement.newBuilder("UPDATE Singers SET Name=@name WHERE SingerId=@id")
* .bind("id")
* .to(singerId)
* .bind("name")
* .to(name.toUpperCase())
* .build());
* },
* executor);
* </code></pre>
Expand Down
Expand Up @@ -27,7 +27,6 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.spanner.AsyncRunner.AsyncWork;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -57,14 +56,7 @@ public void testAsyncRunReturnsResultAndCommitResponse() {

AsyncRunnerImpl runner = new AsyncRunnerImpl(delegate);
ApiFuture<Object> result =
runner.runAsync(
new AsyncWork<Object>() {
@Override
public ApiFuture<Object> doWorkAsync(TransactionContext txn) {
return ApiFutures.immediateFuture(expectedResult);
}
},
executor);
runner.runAsync(txn -> ApiFutures.immediateFuture(expectedResult), executor);

assertSame(expectedResult, get(result));
assertSame(expectedCommitResponse, get(runner.getCommitResponse()));
Expand Down Expand Up @@ -105,14 +97,7 @@ public void testGetCommitResponseReturnsErrorIfRunFails() {
when(delegate.getCommitResponse()).thenThrow(expectedException);

AsyncRunnerImpl runner = new AsyncRunnerImpl(delegate);
runner.runAsync(
new AsyncWork<Void>() {
@Override
public ApiFuture<Void> doWorkAsync(TransactionContext txn) {
return ApiFutures.immediateFailedFuture(expectedException);
}
},
executor);
runner.runAsync(txn -> ApiFutures.immediateFailedFuture(expectedException), executor);

try {
get(runner.getCommitResponse());
Expand All @@ -130,24 +115,10 @@ public void testRunAyncFailsIfCalledMultipleTimes() {
when(delegate.run(any(TransactionCallable.class))).thenReturn(result);

AsyncRunnerImpl runner = new AsyncRunnerImpl(delegate);
runner.runAsync(
new AsyncWork<Object>() {
@Override
public ApiFuture<Object> doWorkAsync(TransactionContext txn) {
return ApiFutures.immediateFuture(result);
}
},
executor);
runner.runAsync(txn -> ApiFutures.immediateFuture(result), executor);

try {
runner.runAsync(
new AsyncWork<Object>() {
@Override
public ApiFuture<Object> doWorkAsync(TransactionContext txn) {
return ApiFutures.immediateFuture(null);
}
},
executor);
runner.runAsync(txn -> ApiFutures.immediateFuture(null), executor);
fail("missing expected exception");
} catch (IllegalStateException e) {
assertTrue(e.getMessage().contains("runAsync() can only be called once"));
Expand Down

0 comments on commit 734fb60

Please sign in to comment.