diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunner.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunner.java index c9dec98d55..1703ef1ab2 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunner.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunner.java @@ -27,6 +27,7 @@ public interface AsyncRunner { * Functional interface for executing a read/write transaction asynchronously that returns a * result of type R. */ + @FunctionalInterface interface AsyncWork { /** * Performs a single transaction attempt. All reads/writes should be performed using {@code diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java index 215d86c825..9b20980f29 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java @@ -398,21 +398,18 @@ CommitResponse writeAtLeastOnceWithOptions( * AsyncRunner runner = client.runAsync(); * ApiFuture rowCount = * runner.runAsync( - * new AsyncWork() { - * @Override - * public ApiFuture doWorkAsync(TransactionContext txn) { - * String column = "FirstName"; - * Struct row = - * txn.readRow("Singers", Key.of(singerId), Collections.singleton("Name")); - * String name = row.getString("Name"); - * return txn.executeUpdateAsync( - * Statement.newBuilder("UPDATE Singers SET Name=@name WHERE SingerId=@id") - * .bind("id") - * .to(singerId) - * .bind("name") - * .to(name.toUpperCase()) - * .build()); - * } + * () -> { + * String column = "FirstName"; + * Struct row = + * txn.readRow("Singers", Key.of(singerId), Collections.singleton("Name")); + * String name = row.getString("Name"); + * return txn.executeUpdateAsync( + * Statement.newBuilder("UPDATE Singers SET Name=@name WHERE SingerId=@id") + * .bind("id") + * .to(singerId) + * .bind("name") + * .to(name.toUpperCase()) + * .build()); * }, * executor); * diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerImplTest.java index fab29d2900..dd5d4606a4 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerImplTest.java @@ -27,7 +27,6 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; -import com.google.cloud.spanner.AsyncRunner.AsyncWork; import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -57,14 +56,7 @@ public void testAsyncRunReturnsResultAndCommitResponse() { AsyncRunnerImpl runner = new AsyncRunnerImpl(delegate); ApiFuture result = - runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return ApiFutures.immediateFuture(expectedResult); - } - }, - executor); + runner.runAsync(txn -> ApiFutures.immediateFuture(expectedResult), executor); assertSame(expectedResult, get(result)); assertSame(expectedCommitResponse, get(runner.getCommitResponse())); @@ -105,14 +97,7 @@ public void testGetCommitResponseReturnsErrorIfRunFails() { when(delegate.getCommitResponse()).thenThrow(expectedException); AsyncRunnerImpl runner = new AsyncRunnerImpl(delegate); - runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return ApiFutures.immediateFailedFuture(expectedException); - } - }, - executor); + runner.runAsync(txn -> ApiFutures.immediateFailedFuture(expectedException), executor); try { get(runner.getCommitResponse()); @@ -130,24 +115,10 @@ public void testRunAyncFailsIfCalledMultipleTimes() { when(delegate.run(any(TransactionCallable.class))).thenReturn(result); AsyncRunnerImpl runner = new AsyncRunnerImpl(delegate); - runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return ApiFutures.immediateFuture(result); - } - }, - executor); + runner.runAsync(txn -> ApiFutures.immediateFuture(result), executor); try { - runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return ApiFutures.immediateFuture(null); - } - }, - executor); + runner.runAsync(txn -> ApiFutures.immediateFuture(null), executor); fail("missing expected exception"); } catch (IllegalStateException e) { assertTrue(e.getMessage().contains("runAsync() can only be called once")); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java index 0c67871e89..085128c9ab 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncRunnerTest.java @@ -28,7 +28,6 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.AsyncResultSet.CallbackResponse; import com.google.cloud.spanner.AsyncResultSet.ReadyCallback; -import com.google.cloud.spanner.AsyncRunner.AsyncWork; import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.common.base.Function; @@ -81,14 +80,7 @@ public void testAsyncRunner_doesNotReturnCommitResponseBeforeCommit() { public void asyncRunnerUpdate() throws Exception { AsyncRunner runner = client().runAsync(); ApiFuture updateCount = - runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return txn.executeUpdateAsync(UPDATE_STATEMENT); - } - }, - executor); + runner.runAsync(txn -> txn.executeUpdateAsync(UPDATE_STATEMENT), executor); assertThat(updateCount.get()).isEqualTo(UPDATE_COUNT); } @@ -98,12 +90,9 @@ public void asyncRunnerIsNonBlocking() throws Exception { AsyncRunner runner = clientWithEmptySessionPool().runAsync(); ApiFuture res = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - txn.executeUpdateAsync(UPDATE_STATEMENT); - return ApiFutures.immediateFuture(null); - } + txn -> { + txn.executeUpdateAsync(UPDATE_STATEMENT); + return ApiFutures.immediateFuture(null); }, executor); ApiFuture ts = runner.getCommitTimestamp(); @@ -116,14 +105,7 @@ public ApiFuture doWorkAsync(TransactionContext txn) { public void asyncRunnerInvalidUpdate() throws Exception { AsyncRunner runner = client().runAsync(); ApiFuture updateCount = - runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return txn.executeUpdateAsync(INVALID_UPDATE_STATEMENT); - } - }, - executor); + runner.runAsync(txn -> txn.executeUpdateAsync(INVALID_UPDATE_STATEMENT), executor); try { updateCount.get(); fail("missing expected exception"); @@ -140,12 +122,9 @@ public void asyncRunnerFireAndForgetInvalidUpdate() throws Exception { AsyncRunner runner = client().runAsync(); ApiFuture res = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - txn.executeUpdateAsync(INVALID_UPDATE_STATEMENT); - return txn.executeUpdateAsync(UPDATE_STATEMENT); - } + txn -> { + txn.executeUpdateAsync(INVALID_UPDATE_STATEMENT); + return txn.executeUpdateAsync(UPDATE_STATEMENT); }, executor); assertThat(res.get()).isEqualTo(UPDATE_COUNT); @@ -160,18 +139,15 @@ public void asyncRunnerUpdateAborted() throws Exception { AsyncRunner runner = client().runAsync(); ApiFuture updateCount = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - if (attempt.incrementAndGet() == 1) { - mockSpanner.abortNextStatement(); - } else { - // Set the result of the update statement back to 1 row. - mockSpanner.putStatementResult( - StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT)); - } - return txn.executeUpdateAsync(UPDATE_STATEMENT); + txn -> { + if (attempt.incrementAndGet() == 1) { + mockSpanner.abortNextStatement(); + } else { + // Set the result of the update statement back to 1 row. + mockSpanner.putStatementResult( + StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT)); } + return txn.executeUpdateAsync(UPDATE_STATEMENT); }, executor); assertThat(updateCount.get()).isEqualTo(UPDATE_COUNT); @@ -190,20 +166,17 @@ public void asyncRunnerCommitAborted() throws Exception { AsyncRunner runner = client().runAsync(); ApiFuture updateCount = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(final TransactionContext txn) { - if (attempt.get() > 0) { - // Set the result of the update statement back to 1 row. - mockSpanner.putStatementResult( - StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT)); - } - ApiFuture updateCount = txn.executeUpdateAsync(UPDATE_STATEMENT); - if (attempt.incrementAndGet() == 1) { - mockSpanner.abortTransaction(txn); - } - return updateCount; + txn -> { + if (attempt.get() > 0) { + // Set the result of the update statement back to 1 row. + mockSpanner.putStatementResult( + StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT)); + } + ApiFuture updateCount1 = txn.executeUpdateAsync(UPDATE_STATEMENT); + if (attempt.incrementAndGet() == 1) { + mockSpanner.abortTransaction(txn); } + return updateCount1; }, executor); assertThat(updateCount.get()).isEqualTo(UPDATE_COUNT); @@ -219,21 +192,18 @@ public void asyncRunnerUpdateAbortedWithoutGettingResult() throws Exception { AsyncRunner runner = clientWithEmptySessionPool().runAsync(); ApiFuture result = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - if (attempt.incrementAndGet() == 1) { - mockSpanner.abortNextStatement(); - } - // This update statement will be aborted, but the error will not propagated to the - // transaction runner and cause the transaction to retry. Instead, the commit call - // will do that. - txn.executeUpdateAsync(UPDATE_STATEMENT); - // Resolving this future will not resolve the result of the entire transaction. The - // transaction result will be resolved when the commit has actually finished - // successfully. - return ApiFutures.immediateFuture(null); + txn -> { + if (attempt.incrementAndGet() == 1) { + mockSpanner.abortNextStatement(); } + // This update statement will be aborted, but the error will not propagated to the + // transaction runner and cause the transaction to retry. Instead, the commit call + // will do that. + txn.executeUpdateAsync(UPDATE_STATEMENT); + // Resolving this future will not resolve the result of the entire transaction. The + // transaction result will be resolved when the commit has actually finished + // successfully. + return ApiFutures.immediateFuture(null); }, executor); assertThat(result.get()).isNull(); @@ -259,14 +229,11 @@ public void asyncRunnerCommitFails() throws Exception { AsyncRunner runner = client().runAsync(); ApiFuture updateCount = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - // This statement will succeed, but the commit will fail. The error from the commit - // will bubble up to the future that is returned by the transaction, and the update - // count returned here will never reach the user application. - return txn.executeUpdateAsync(UPDATE_STATEMENT); - } + txn -> { + // This statement will succeed, but the commit will fail. The error from the commit + // will bubble up to the future that is returned by the transaction, and the update + // count returned here will never reach the user application. + return txn.executeUpdateAsync(UPDATE_STATEMENT); }, executor); try { @@ -285,12 +252,9 @@ public void asyncRunnerWaitsUntilAsyncUpdateHasFinished() throws Exception { AsyncRunner runner = clientWithEmptySessionPool().runAsync(); ApiFuture res = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - txn.executeUpdateAsync(UPDATE_STATEMENT); - return ApiFutures.immediateFuture(null); - } + txn -> { + txn.executeUpdateAsync(UPDATE_STATEMENT); + return ApiFutures.immediateFuture(null); }, executor); res.get(); @@ -304,12 +268,7 @@ public void asyncRunnerBatchUpdate() throws Exception { AsyncRunner runner = client().runAsync(); ApiFuture updateCount = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT, UPDATE_STATEMENT)); - } - }, + txn -> txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT, UPDATE_STATEMENT)), executor); assertThat(updateCount.get()).asList().containsExactly(UPDATE_COUNT, UPDATE_COUNT); } @@ -320,12 +279,9 @@ public void asyncRunnerIsNonBlockingWithBatchUpdate() throws Exception { AsyncRunner runner = clientWithEmptySessionPool().runAsync(); ApiFuture res = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT)); - return ApiFutures.immediateFuture(null); - } + txn -> { + txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT)); + return ApiFutures.immediateFuture(null); }, executor); ApiFuture ts = runner.getCommitTimestamp(); @@ -339,13 +295,8 @@ public void asyncRunnerInvalidBatchUpdate() throws Exception { AsyncRunner runner = client().runAsync(); ApiFuture updateCount = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return txn.batchUpdateAsync( - ImmutableList.of(UPDATE_STATEMENT, INVALID_UPDATE_STATEMENT)); - } - }, + txn -> + txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT, INVALID_UPDATE_STATEMENT)), executor); try { updateCount.get(); @@ -363,12 +314,9 @@ public void asyncRunnerFireAndForgetInvalidBatchUpdate() throws Exception { AsyncRunner runner = client().runAsync(); ApiFuture res = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT, INVALID_UPDATE_STATEMENT)); - return txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT, UPDATE_STATEMENT)); - } + txn -> { + txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT, INVALID_UPDATE_STATEMENT)); + return txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT, UPDATE_STATEMENT)); }, executor); assertThat(res.get()).asList().containsExactly(UPDATE_COUNT, UPDATE_COUNT); @@ -380,15 +328,12 @@ public void asyncRunnerBatchUpdateAborted() throws Exception { AsyncRunner runner = client().runAsync(); ApiFuture updateCount = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - if (attempt.incrementAndGet() == 1) { - return txn.batchUpdateAsync( - ImmutableList.of(UPDATE_STATEMENT, UPDATE_ABORTED_STATEMENT)); - } else { - return txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT, UPDATE_STATEMENT)); - } + txn -> { + if (attempt.incrementAndGet() == 1) { + return txn.batchUpdateAsync( + ImmutableList.of(UPDATE_STATEMENT, UPDATE_ABORTED_STATEMENT)); + } else { + return txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT, UPDATE_STATEMENT)); } }, executor); @@ -405,21 +350,18 @@ public void asyncRunnerWithBatchUpdateCommitAborted() throws Exception { AsyncRunner runner = client().runAsync(); ApiFuture updateCount = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(final TransactionContext txn) { - if (attempt.get() > 0) { - // Set the result of the update statement back to 1 row. - mockSpanner.putStatementResult( - StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT)); - } - ApiFuture updateCount = - txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT, UPDATE_STATEMENT)); - if (attempt.incrementAndGet() == 1) { - mockSpanner.abortTransaction(txn); - } - return updateCount; + txn -> { + if (attempt.get() > 0) { + // Set the result of the update statement back to 1 row. + mockSpanner.putStatementResult( + StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT)); + } + ApiFuture updateCount1 = + txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT, UPDATE_STATEMENT)); + if (attempt.incrementAndGet() == 1) { + mockSpanner.abortTransaction(txn); } + return updateCount1; }, executor); assertThat(updateCount.get()).asList().containsExactly(UPDATE_COUNT, UPDATE_COUNT); @@ -435,26 +377,23 @@ public void asyncRunnerBatchUpdateAbortedWithoutGettingResult() throws Exception AsyncRunner runner = clientWithEmptySessionPool().runAsync(); ApiFuture result = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - if (attempt.incrementAndGet() == 1) { - mockSpanner.abortNextTransaction(); - } - // This statement will succeed and return a transaction id. The transaction will be - // marked as aborted on the mock server. - txn.executeUpdate(UPDATE_STATEMENT); - - // This batch update statement will be aborted, but the error will not propagated to - // the - // transaction runner and cause the transaction to retry. Instead, the commit call - // will do that. - txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT, UPDATE_STATEMENT)); - // Resolving this future will not resolve the result of the entire transaction. The - // transaction result will be resolved when the commit has actually finished - // successfully. - return ApiFutures.immediateFuture(null); + txn -> { + if (attempt.incrementAndGet() == 1) { + mockSpanner.abortNextTransaction(); } + // This statement will succeed and return a transaction id. The transaction will be + // marked as aborted on the mock server. + txn.executeUpdate(UPDATE_STATEMENT); + + // This batch update statement will be aborted, but the error will not propagated to + // the + // transaction runner and cause the transaction to retry. Instead, the commit call + // will do that. + txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT, UPDATE_STATEMENT)); + // Resolving this future will not resolve the result of the entire transaction. The + // transaction result will be resolved when the commit has actually finished + // successfully. + return ApiFutures.immediateFuture(null); }, executor); assertThat(result.get()).isNull(); @@ -480,14 +419,11 @@ public void asyncRunnerWithBatchUpdateCommitFails() throws Exception { AsyncRunner runner = client().runAsync(); ApiFuture updateCount = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - // This statement will succeed, but the commit will fail. The error from the commit - // will bubble up to the future that is returned by the transaction, and the update - // count returned here will never reach the user application. - return txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT, UPDATE_STATEMENT)); - } + txn -> { + // This statement will succeed, but the commit will fail. The error from the commit + // will bubble up to the future that is returned by the transaction, and the update + // count returned here will never reach the user application. + return txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT, UPDATE_STATEMENT)); }, executor); try { @@ -506,12 +442,9 @@ public void asyncRunnerWaitsUntilAsyncBatchUpdateHasFinished() throws Exception AsyncRunner runner = clientWithEmptySessionPool().runAsync(); ApiFuture res = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT)); - return ApiFutures.immediateFuture(null); - } + txn -> { + txn.batchUpdateAsync(ImmutableList.of(UPDATE_STATEMENT)); + return ApiFutures.immediateFuture(null); }, executor); res.get(); @@ -534,45 +467,42 @@ public void closeTransactionBeforeEndOfAsyncQuery() throws Exception { final CountDownLatch dataChecked = new CountDownLatch(1); ApiFuture res = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - try (AsyncResultSet rs = - txn.readAsync( - READ_TABLE_NAME, KeySet.all(), READ_COLUMN_NAMES, Options.bufferRows(1))) { - rs.setCallback( - Executors.newSingleThreadExecutor(), - new ReadyCallback() { - @Override - public CallbackResponse cursorReady(AsyncResultSet resultSet) { - dataReceived.countDown(); - try { - while (true) { - switch (resultSet.tryNext()) { - case DONE: - finished.set(true); - return CallbackResponse.DONE; - case NOT_READY: - return CallbackResponse.CONTINUE; - case OK: - dataChecked.await(); - results.put(resultSet.getString(0)); - } + txn -> { + try (AsyncResultSet rs = + txn.readAsync( + READ_TABLE_NAME, KeySet.all(), READ_COLUMN_NAMES, Options.bufferRows(1))) { + rs.setCallback( + Executors.newSingleThreadExecutor(), + new ReadyCallback() { + @Override + public CallbackResponse cursorReady(AsyncResultSet resultSet) { + dataReceived.countDown(); + try { + while (true) { + switch (resultSet.tryNext()) { + case DONE: + finished.set(true); + return CallbackResponse.DONE; + case NOT_READY: + return CallbackResponse.CONTINUE; + case OK: + dataChecked.await(); + results.put(resultSet.getString(0)); } - } catch (Throwable t) { - finished.setException(t); - return CallbackResponse.DONE; } + } catch (Throwable t) { + finished.setException(t); + return CallbackResponse.DONE; } - }); - } - try { - dataReceived.await(); - return ApiFutures.immediateFuture(null); - } catch (InterruptedException e) { - return ApiFutures.immediateFailedFuture( - SpannerExceptionFactory.propagateInterrupt(e)); - } + } + }); + } + try { + dataReceived.await(); + return ApiFutures.immediateFuture(null); + } catch (InterruptedException e) { + return ApiFutures.immediateFailedFuture( + SpannerExceptionFactory.propagateInterrupt(e)); } }, executor); @@ -598,10 +528,8 @@ public void asyncRunnerReadRow() throws Exception { AsyncRunner runner = client().runAsync(); ApiFuture val = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return ApiFutures.transform( + txn -> + ApiFutures.transform( txn.readRowAsync(READ_TABLE_NAME, Key.of(1L), READ_COLUMN_NAMES), new ApiFunction() { @Override @@ -609,9 +537,7 @@ public String apply(Struct input) { return input.getString("Value"); } }, - MoreExecutors.directExecutor()); - } - }, + MoreExecutors.directExecutor()), executor); assertThat(val.get()).isEqualTo("v1"); } @@ -621,10 +547,8 @@ public void asyncRunnerRead() throws Exception { AsyncRunner runner = client().runAsync(); ApiFuture> val = runner.runAsync( - new AsyncWork>() { - @Override - public ApiFuture> doWorkAsync(TransactionContext txn) { - return txn.readAsync(READ_TABLE_NAME, KeySet.all(), READ_COLUMN_NAMES) + txn -> + txn.readAsync(READ_TABLE_NAME, KeySet.all(), READ_COLUMN_NAMES) .toListAsync( new Function() { @Override @@ -632,9 +556,7 @@ public String apply(StructReader input) { return input.getString("Value"); } }, - MoreExecutors.directExecutor()); - } - }, + MoreExecutors.directExecutor()), executor); assertThat(val.get()).containsExactly("v1", "v2", "v3"); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index fc4f8444b1..8336bc1b98 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -41,7 +41,6 @@ import com.google.cloud.spanner.AbstractResultSet.GrpcStreamIterator; import com.google.cloud.spanner.AsyncResultSet.CallbackResponse; import com.google.cloud.spanner.AsyncResultSet.ReadyCallback; -import com.google.cloud.spanner.AsyncRunner.AsyncWork; import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionFunction; import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture; import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; @@ -491,12 +490,9 @@ public void testAsyncRunnerCommitWithTag() { AsyncRunner runner = client.runAsync(Options.tag("app=spanner,env=test,action=runner")); get( runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - txn.buffer(Mutation.delete("TEST", KeySet.all())); - return ApiFutures.immediateFuture(null); - } + txn -> { + txn.buffer(Mutation.delete("TEST", KeySet.all())); + return ApiFutures.immediateFuture(null); }, executor)); @@ -846,13 +842,7 @@ public void testRunAsync() throws Exception { AsyncRunner runner = client.runAsync(); ApiFuture result = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return ApiFutures.immediateFuture(txn.executeUpdate(UPDATE_STATEMENT)); - } - }, - executor); + txn -> ApiFutures.immediateFuture(txn.executeUpdate(UPDATE_STATEMENT)), executor); assertEquals(UPDATE_COUNT, result.get().longValue()); assertNotNull(runner.getCommitTimestamp().get()); executor.shutdown(); @@ -866,12 +856,9 @@ public void testRunAsync_returnsCommitStats() { AsyncRunner runner = client.runAsync(Options.commitStats()); ApiFuture result = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - txn.buffer(Mutation.delete("FOO", Key.of("foo"))); - return ApiFutures.immediateFuture(null); - } + txn -> { + txn.buffer(Mutation.delete("FOO", Key.of("foo"))); + return ApiFutures.immediateFuture(null); }, executor); assertNull(get(result)); @@ -891,13 +878,7 @@ public void runAsyncIsNonBlocking() throws Exception { AsyncRunner runner = client.runAsync(); ApiFuture fut = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return ApiFutures.immediateFuture(txn.executeUpdate(UPDATE_STATEMENT)); - } - }, - executor); + txn -> ApiFutures.immediateFuture(txn.executeUpdate(UPDATE_STATEMENT)), executor); mockSpanner.unfreeze(); assertThat(fut.get()).isEqualTo(UPDATE_COUNT); executor.shutdown(); @@ -911,12 +892,7 @@ public void runAsyncWithException() throws Exception { AsyncRunner runner = client.runAsync(); ApiFuture fut = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return ApiFutures.immediateFuture(txn.executeUpdate(INVALID_UPDATE_STATEMENT)); - } - }, + txn -> ApiFutures.immediateFuture(txn.executeUpdate(INVALID_UPDATE_STATEMENT)), executor); try { fut.get(); @@ -2091,12 +2067,9 @@ public void testAsyncRunnerCommitWithPriority() { AsyncRunner runner = client.runAsync(Options.priority(RpcPriority.HIGH)); get( runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - txn.buffer(Mutation.delete("TEST", KeySet.all())); - return ApiFutures.immediateFuture(null); - } + txn -> { + txn.buffer(Mutation.delete("TEST", KeySet.all())); + return ApiFutures.immediateFuture(null); }, executor)); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java index 6e595bf9b0..11eef95588 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java @@ -28,7 +28,6 @@ import com.google.cloud.NoCredentials; import com.google.cloud.spanner.AsyncResultSet.CallbackResponse; import com.google.cloud.spanner.AsyncResultSet.ReadyCallback; -import com.google.cloud.spanner.AsyncRunner.AsyncWork; import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionFunction; import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionStep; import com.google.cloud.spanner.AsyncTransactionManager.CommitTimestampFuture; @@ -218,16 +217,7 @@ public void testInlinedBeginAsyncTx() throws InterruptedException, ExecutionExce DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); ApiFuture updateCount = - client - .runAsync() - .runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return txn.executeUpdateAsync(UPDATE_STATEMENT); - } - }, - executor); + client.runAsync().runAsync(txn -> txn.executeUpdateAsync(UPDATE_STATEMENT), executor); assertThat(updateCount.get()).isEqualTo(UPDATE_COUNT); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); assertThat(countTransactionsStarted()).isEqualTo(1); @@ -242,15 +232,12 @@ public void testInlinedBeginAsyncTxAborted() throws InterruptedException, Execut client .runAsync() .runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - ApiFuture res = txn.executeUpdateAsync(UPDATE_STATEMENT); - if (firstAttempt.getAndSet(false)) { - mockSpanner.abortTransaction(txn); - } - return res; + txn -> { + ApiFuture res = txn.executeUpdateAsync(UPDATE_STATEMENT); + if (firstAttempt.getAndSet(false)) { + mockSpanner.abortTransaction(txn); } + return res; }, executor); assertThat(updateCount.get()).isEqualTo(UPDATE_COUNT); @@ -268,31 +255,28 @@ public void testInlinedBeginAsyncTxWithQuery() throws InterruptedException, Exec client .runAsync() .runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - final SettableApiFuture res = SettableApiFuture.create(); - try (AsyncResultSet rs = txn.executeQueryAsync(SELECT1)) { - rs.setCallback( - executor, - new ReadyCallback() { - @Override - public CallbackResponse cursorReady(AsyncResultSet resultSet) { - switch (resultSet.tryNext()) { - case DONE: - return CallbackResponse.DONE; - case NOT_READY: - return CallbackResponse.CONTINUE; - case OK: - res.set(resultSet.getLong(0)); - default: - throw new IllegalStateException(); - } + txn -> { + final SettableApiFuture res = SettableApiFuture.create(); + try (AsyncResultSet rs = txn.executeQueryAsync(SELECT1)) { + rs.setCallback( + executor, + new ReadyCallback() { + @Override + public CallbackResponse cursorReady(AsyncResultSet resultSet) { + switch (resultSet.tryNext()) { + case DONE: + return CallbackResponse.DONE; + case NOT_READY: + return CallbackResponse.CONTINUE; + case OK: + res.set(resultSet.getLong(0)); + default: + throw new IllegalStateException(); } - }); - } - return res; + } + }); } + return res; }, queryExecutor); assertThat(updateCount.get()).isEqualTo(1L); @@ -310,13 +294,9 @@ public void testInlinedBeginAsyncTxWithBatchDml() client .runAsync() .runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext transaction) { - return transaction.batchUpdateAsync( - Arrays.asList(UPDATE_STATEMENT, UPDATE_STATEMENT)); - } - }, + transaction -> + transaction.batchUpdateAsync( + Arrays.asList(UPDATE_STATEMENT, UPDATE_STATEMENT)), executor); assertThat(updateCounts.get()).asList().containsExactly(UPDATE_COUNT, UPDATE_COUNT); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); @@ -331,12 +311,9 @@ public void testInlinedBeginAsyncTxWithError() throws InterruptedException, Exec client .runAsync() .runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext transaction) { - transaction.executeUpdateAsync(INVALID_UPDATE_STATEMENT); - return transaction.executeUpdateAsync(UPDATE_STATEMENT); - } + transaction -> { + transaction.executeUpdateAsync(INVALID_UPDATE_STATEMENT); + return transaction.executeUpdateAsync(UPDATE_STATEMENT); }, executor); assertThat(updateCount.get()).isEqualTo(UPDATE_COUNT); @@ -357,12 +334,9 @@ public void testInlinedBeginAsyncTxWithOnlyMutations() client .runAsync() .runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext transaction) { - transaction.buffer(Mutation.newInsertBuilder("FOO").set("ID").to(1L).build()); - return ApiFutures.immediateFuture(null); - } + transaction -> { + transaction.buffer(Mutation.newInsertBuilder("FOO").set("ID").to(1L).build()); + return ApiFutures.immediateFuture(null); }, executor) .get(); @@ -1192,47 +1166,44 @@ public void testInlinedBeginAsyncTxWithParallelQueries() client .runAsync() .runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(final TransactionContext txn) { - List> futures = new ArrayList<>(numQueries); - for (int i = 0; i < numQueries; i++) { - final SettableApiFuture res = SettableApiFuture.create(); - try (AsyncResultSet rs = txn.executeQueryAsync(SELECT1)) { - rs.setCallback( - executor, - new ReadyCallback() { - @Override - public CallbackResponse cursorReady(AsyncResultSet resultSet) { - switch (resultSet.tryNext()) { - case DONE: - return CallbackResponse.DONE; - case NOT_READY: - return CallbackResponse.CONTINUE; - case OK: - res.set(resultSet.getLong(0)); - default: - throw new IllegalStateException(); - } + txn -> { + List> futures = new ArrayList<>(numQueries); + for (int i = 0; i < numQueries; i++) { + final SettableApiFuture res = SettableApiFuture.create(); + try (AsyncResultSet rs = txn.executeQueryAsync(SELECT1)) { + rs.setCallback( + executor, + new ReadyCallback() { + @Override + public CallbackResponse cursorReady(AsyncResultSet resultSet) { + switch (resultSet.tryNext()) { + case DONE: + return CallbackResponse.DONE; + case NOT_READY: + return CallbackResponse.CONTINUE; + case OK: + res.set(resultSet.getLong(0)); + default: + throw new IllegalStateException(); } - }); - } - futures.add(res); - } - return ApiFutures.transformAsync( - ApiFutures.allAsList(futures), - new ApiAsyncFunction, Long>() { - @Override - public ApiFuture apply(List input) throws Exception { - long sum = 0L; - for (Long l : input) { - sum += l; } - return ApiFutures.immediateFuture(sum); - } - }, - MoreExecutors.directExecutor()); + }); + } + futures.add(res); } + return ApiFutures.transformAsync( + ApiFutures.allAsList(futures), + new ApiAsyncFunction, Long>() { + @Override + public ApiFuture apply(List input) throws Exception { + long sum = 0L; + for (Long l : input) { + sum += l; + } + return ApiFutures.immediateFuture(sum); + } + }, + MoreExecutors.directExecutor()); }, executor); assertThat(updateCount.get()).isEqualTo(1L * numQueries); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java index 6102013780..4ce21ac90e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java @@ -33,7 +33,6 @@ import com.google.cloud.Timestamp; import com.google.cloud.spanner.AsyncResultSet.CallbackResponse; import com.google.cloud.spanner.AsyncResultSet.ReadyCallback; -import com.google.cloud.spanner.AsyncRunner.AsyncWork; import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionFunction; import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionStep; import com.google.cloud.spanner.AsyncTransactionManager.CommitTimestampFuture; @@ -1473,39 +1472,36 @@ private void asyncRunner_withReadFunction( final AtomicLong counter = new AtomicLong(); ApiFuture count = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - AsyncResultSet rs = readFunction.apply(txn); - ApiFuture fut = - rs.setCallback( - queryExecutor, - new ReadyCallback() { - @Override - public CallbackResponse cursorReady(AsyncResultSet resultSet) { - while (true) { - switch (resultSet.tryNext()) { - case OK: - counter.incrementAndGet(); - break; - case DONE: - return CallbackResponse.DONE; - case NOT_READY: - return CallbackResponse.CONTINUE; - } + txn -> { + AsyncResultSet rs = readFunction.apply(txn); + ApiFuture fut = + rs.setCallback( + queryExecutor, + new ReadyCallback() { + @Override + public CallbackResponse cursorReady(AsyncResultSet resultSet) { + while (true) { + switch (resultSet.tryNext()) { + case OK: + counter.incrementAndGet(); + break; + case DONE: + return CallbackResponse.DONE; + case NOT_READY: + return CallbackResponse.CONTINUE; } } - }); - return ApiFutures.transform( - fut, - new ApiFunction() { - @Override - public Long apply(Void input) { - return counter.get(); - } - }, - MoreExecutors.directExecutor()); - } + } + }); + return ApiFutures.transform( + fut, + new ApiFunction() { + @Override + public Long apply(Void input) { + return counter.get(); + } + }, + MoreExecutors.directExecutor()); }, executor); assertThat(get(count)).isEqualTo(2); @@ -1523,14 +1519,7 @@ public void asyncRunnerReadRow() throws InterruptedException { try { AsyncRunner runner = client.runAsync(); ApiFuture row = - runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return txn.readRowAsync("FOO", Key.of(), Arrays.asList("BAR")); - } - }, - executor); + runner.runAsync(txn -> txn.readRowAsync("FOO", Key.of(), Arrays.asList("BAR")), executor); assertThat(get(row).getLong(0)).isEqualTo(1L); assertThat(failOnInvalidatedSession).isFalse(); } catch (SessionNotFoundException e) { @@ -1545,12 +1534,7 @@ public void asyncRunnerReadRowUsingIndex() throws InterruptedException { AsyncRunner runner = client.runAsync(); ApiFuture row = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return txn.readRowUsingIndexAsync("FOO", "IDX", Key.of(), Arrays.asList("BAR")); - } - }, + txn -> txn.readRowUsingIndexAsync("FOO", "IDX", Key.of(), Arrays.asList("BAR")), executor); assertThat(get(row).getLong(0)).isEqualTo(1L); assertThat(failOnInvalidatedSession).isFalse(); @@ -1565,14 +1549,7 @@ public void asyncRunnerUpdate() throws InterruptedException { try { AsyncRunner runner = client.runAsync(); ApiFuture count = - runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return txn.executeUpdateAsync(UPDATE_STATEMENT); - } - }, - executor); + runner.runAsync(txn -> txn.executeUpdateAsync(UPDATE_STATEMENT), executor); assertThat(get(count)).isEqualTo(UPDATE_COUNT); assertThat(failOnInvalidatedSession).isFalse(); } catch (SessionNotFoundException e) { @@ -1587,12 +1564,7 @@ public void asyncRunnerBatchUpdate() throws InterruptedException { AsyncRunner runner = client.runAsync(); ApiFuture count = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return txn.batchUpdateAsync(Arrays.asList(UPDATE_STATEMENT, UPDATE_STATEMENT)); - } - }, + txn -> txn.batchUpdateAsync(Arrays.asList(UPDATE_STATEMENT, UPDATE_STATEMENT)), executor); assertThat(get(count)).hasLength(2); assertThat(get(count)).asList().containsExactly(UPDATE_COUNT, UPDATE_COUNT); @@ -1609,12 +1581,9 @@ public void asyncRunnerBuffer() throws InterruptedException { AsyncRunner runner = client.runAsync(); ApiFuture res = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - txn.buffer(Mutation.newInsertBuilder("FOO").set("BAR").to(1L).build()); - return ApiFutures.immediateFuture(null); - } + txn -> { + txn.buffer(Mutation.newInsertBuilder("FOO").set("BAR").to(1L).build()); + return ApiFutures.immediateFuture(null); }, executor); assertThat(get(res)).isNull(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java index 5bdb685c8b..c32846725c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncAPITest.java @@ -31,7 +31,6 @@ import com.google.cloud.spanner.AsyncResultSet.CallbackResponse; import com.google.cloud.spanner.AsyncResultSet.ReadyCallback; import com.google.cloud.spanner.AsyncRunner; -import com.google.cloud.spanner.AsyncRunner.AsyncWork; import com.google.cloud.spanner.AsyncTransactionManager; import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionFunction; import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture; @@ -299,16 +298,13 @@ public void asyncRunnerFireAndForgetInvalidUpdate() throws Exception { AsyncRunner runner = client.runAsync(); ApiFuture res = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - // The error returned by this update statement will not bubble up and fail the - // transaction. - txn.executeUpdateAsync(Statement.of("UPDATE BadTableName SET FOO=1 WHERE ID=2")); - return txn.executeUpdateAsync( - Statement.of( - "INSERT INTO TestTable (Key, StringValue) VALUES ('k999', 'v999')")); - } + txn -> { + // The error returned by this update statement will not bubble up and fail the + // transaction. + txn.executeUpdateAsync(Statement.of("UPDATE BadTableName SET FOO=1 WHERE ID=2")); + return txn.executeUpdateAsync( + Statement.of( + "INSERT INTO TestTable (Key, StringValue) VALUES ('k999', 'v999')")); }, executor); assertThat(res.get()).isEqualTo(1L); @@ -324,18 +320,15 @@ public void testAsyncRunnerReturnsCommitStats() { assumeFalse("Emulator does not return commit statistics", isUsingEmulator()); AsyncRunner runner = client.runAsync(Options.commitStats()); runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext transaction) { - transaction.buffer( - Mutation.newInsertOrUpdateBuilder(TABLE_NAME) - .set("Key") - .to("k_commit_stats") - .set("StringValue") - .to("Should return commit stats") - .build()); - return ApiFutures.immediateFuture(null); - } + transaction -> { + transaction.buffer( + Mutation.newInsertOrUpdateBuilder(TABLE_NAME) + .set("Key") + .to("k_commit_stats") + .set("StringValue") + .to("Should return commit stats") + .build()); + return ApiFutures.immediateFuture(null); }, executor); assertNotNull(get(runner.getCommitResponse()).getCommitStats()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java index 5849176bbb..e08d59ab29 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java @@ -27,7 +27,6 @@ import com.google.cloud.spanner.AsyncResultSet.CallbackResponse; import com.google.cloud.spanner.AsyncResultSet.ReadyCallback; import com.google.cloud.spanner.AsyncRunner; -import com.google.cloud.spanner.AsyncRunner.AsyncWork; import com.google.cloud.spanner.Database; import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.ErrorCode; @@ -41,7 +40,6 @@ import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.StructReader; -import com.google.cloud.spanner.TransactionContext; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -244,35 +242,28 @@ public void runAsync() throws Exception { AsyncRunner runner = client.runAsync(); ApiFuture insertCount = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - // Even though this is a shoot-and-forget asynchronous DML statement, it is - // guaranteed to be executed within the transaction before the commit is executed. - return txn.executeUpdateAsync( - Statement.newBuilder( - "INSERT INTO TestTable (Key, StringValue) VALUES (@key, @value)") - .bind("key") - .to("k999") - .bind("value") - .to("v999") - .build()); - } + txn -> { + // Even though this is a shoot-and-forget asynchronous DML statement, it is + // guaranteed to be executed within the transaction before the commit is executed. + return txn.executeUpdateAsync( + Statement.newBuilder( + "INSERT INTO TestTable (Key, StringValue) VALUES (@key, @value)") + .bind("key") + .to("k999") + .bind("value") + .to("v999") + .build()); }, executor); assertThat(insertCount.get()).isEqualTo(1L); ApiFuture deleteCount = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return txn.executeUpdateAsync( + txn -> + txn.executeUpdateAsync( Statement.newBuilder("DELETE FROM TestTable WHERE Key=@key") .bind("key") .to("k999") - .build()); - } - }, + .build()), executor); assertThat(deleteCount.get()).isEqualTo(1L); } @@ -282,44 +273,39 @@ public void runAsyncBatchUpdate() throws Exception { AsyncRunner runner = client.runAsync(); ApiFuture insertCount = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - // Even though this is a shoot-and-forget asynchronous DML statement, it is - // guaranteed to be executed within the transaction before the commit is executed. - return txn.batchUpdateAsync( - ImmutableList.of( - Statement.newBuilder( - "INSERT INTO TestTable (Key, StringValue) VALUES (@key, @value)") - .bind("key") - .to("k997") - .bind("value") - .to("v997") - .build(), - Statement.newBuilder( - "INSERT INTO TestTable (Key, StringValue) VALUES (@key, @value)") - .bind("key") - .to("k998") - .bind("value") - .to("v998") - .build(), - Statement.newBuilder( - "INSERT INTO TestTable (Key, StringValue) VALUES (@key, @value)") - .bind("key") - .to("k999") - .bind("value") - .to("v999") - .build())); - } + txn -> { + // Even though this is a shoot-and-forget asynchronous DML statement, it is + // guaranteed to be executed within the transaction before the commit is executed. + return txn.batchUpdateAsync( + ImmutableList.of( + Statement.newBuilder( + "INSERT INTO TestTable (Key, StringValue) VALUES (@key, @value)") + .bind("key") + .to("k997") + .bind("value") + .to("v997") + .build(), + Statement.newBuilder( + "INSERT INTO TestTable (Key, StringValue) VALUES (@key, @value)") + .bind("key") + .to("k998") + .bind("value") + .to("v998") + .build(), + Statement.newBuilder( + "INSERT INTO TestTable (Key, StringValue) VALUES (@key, @value)") + .bind("key") + .to("k999") + .bind("value") + .to("v999") + .build())); }, executor); assertThat(insertCount.get()).asList().containsExactly(1L, 1L, 1L); ApiFuture deleteCount = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - return txn.batchUpdateAsync( + txn -> + txn.batchUpdateAsync( ImmutableList.of( Statement.newBuilder("DELETE FROM TestTable WHERE Key=@key") .bind("key") @@ -332,9 +318,7 @@ public ApiFuture doWorkAsync(TransactionContext txn) { Statement.newBuilder("DELETE FROM TestTable WHERE Key=@key") .bind("key") .to("k999") - .build())); - } - }, + .build())), executor); assertThat(deleteCount.get()).asList().containsExactly(1L, 1L, 1L); } diff --git a/samples/snippets/src/main/java/com/example/spanner/AsyncDmlExample.java b/samples/snippets/src/main/java/com/example/spanner/AsyncDmlExample.java index f2e53d7a22..83e28589ed 100644 --- a/samples/snippets/src/main/java/com/example/spanner/AsyncDmlExample.java +++ b/samples/snippets/src/main/java/com/example/spanner/AsyncDmlExample.java @@ -54,17 +54,14 @@ static void asyncDml(DatabaseClient client) AsyncRunner runner = client.runAsync(); ApiFuture rowCount = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - String sql = - "INSERT INTO Singers (SingerId, FirstName, LastName) VALUES " - + "(12, 'Melissa', 'Garcia'), " - + "(13, 'Russell', 'Morales'), " - + "(14, 'Jacqueline', 'Long'), " - + "(15, 'Dylan', 'Shaw')"; - return txn.executeUpdateAsync(Statement.of(sql)); - } + txn -> { + String sql = + "INSERT INTO Singers (SingerId, FirstName, LastName) VALUES " + + "(12, 'Melissa', 'Garcia'), " + + "(13, 'Russell', 'Morales'), " + + "(14, 'Jacqueline', 'Long'), " + + "(15, 'Dylan', 'Shaw')"; + return txn.executeUpdateAsync(Statement.of(sql)); }, executor); System.out.printf("%d records inserted.%n", rowCount.get()); diff --git a/samples/snippets/src/main/java/com/example/spanner/AsyncRunnerExample.java b/samples/snippets/src/main/java/com/example/spanner/AsyncRunnerExample.java index 575164e610..afdf8f6572 100644 --- a/samples/snippets/src/main/java/com/example/spanner/AsyncRunnerExample.java +++ b/samples/snippets/src/main/java/com/example/spanner/AsyncRunnerExample.java @@ -68,53 +68,50 @@ static void asyncRunner(DatabaseClient client) // longs. ApiFuture rowCounts = runner.runAsync( - new AsyncWork() { - @Override - public ApiFuture doWorkAsync(TransactionContext txn) { - // Transfer marketing budget from one album to another. We do it in a - // transaction to ensure that the transfer is atomic. - ApiFuture album1BudgetFut = - txn.readRowAsync("Albums", Key.of(1, 1), ImmutableList.of("MarketingBudget")); - ApiFuture album2BudgetFut = - txn.readRowAsync("Albums", Key.of(2, 2), ImmutableList.of("MarketingBudget")); + txn -> { + // Transfer marketing budget from one album to another. We do it in a + // transaction to ensure that the transfer is atomic. + ApiFuture album1BudgetFut = + txn.readRowAsync("Albums", Key.of(1, 1), ImmutableList.of("MarketingBudget")); + ApiFuture album2BudgetFut = + txn.readRowAsync("Albums", Key.of(2, 2), ImmutableList.of("MarketingBudget")); - try { - // Transaction will only be committed if this condition still holds at the - // time of commit. Otherwise it will be aborted and the AsyncWork will be - // rerun by the client library. - long transfer = 200_000; - if (album2BudgetFut.get().getLong(0) >= transfer) { - long album1Budget = album1BudgetFut.get().getLong(0); - long album2Budget = album2BudgetFut.get().getLong(0); + try { + // Transaction will only be committed if this condition still holds at the + // time of commit. Otherwise it will be aborted and the AsyncWork will be + // rerun by the client library. + long transfer = 200_000; + if (album2BudgetFut.get().getLong(0) >= transfer) { + long album1Budget = album1BudgetFut.get().getLong(0); + long album2Budget = album2BudgetFut.get().getLong(0); - album1Budget += transfer; - album2Budget -= transfer; - Statement updateStatement1 = - Statement.newBuilder( - "UPDATE Albums " - + "SET MarketingBudget = @AlbumBudget " - + "WHERE SingerId = 1 and AlbumId = 1") - .bind("AlbumBudget") - .to(album1Budget) - .build(); - Statement updateStatement2 = - Statement.newBuilder( - "UPDATE Albums " - + "SET MarketingBudget = @AlbumBudget " - + "WHERE SingerId = 2 and AlbumId = 2") - .bind("AlbumBudget") - .to(album2Budget) - .build(); - return txn.batchUpdateAsync( - ImmutableList.of(updateStatement1, updateStatement2)); - } else { - return ApiFutures.immediateFuture(new long[] {0L, 0L}); - } - } catch (ExecutionException e) { - throw SpannerExceptionFactory.newSpannerException(e.getCause()); - } catch (InterruptedException e) { - throw SpannerExceptionFactory.propagateInterrupt(e); + album1Budget += transfer; + album2Budget -= transfer; + Statement updateStatement1 = + Statement.newBuilder( + "UPDATE Albums " + + "SET MarketingBudget = @AlbumBudget " + + "WHERE SingerId = 1 and AlbumId = 1") + .bind("AlbumBudget") + .to(album1Budget) + .build(); + Statement updateStatement2 = + Statement.newBuilder( + "UPDATE Albums " + + "SET MarketingBudget = @AlbumBudget " + + "WHERE SingerId = 2 and AlbumId = 2") + .bind("AlbumBudget") + .to(album2Budget) + .build(); + return txn.batchUpdateAsync( + ImmutableList.of(updateStatement1, updateStatement2)); + } else { + return ApiFutures.immediateFuture(new long[] {0L, 0L}); } + } catch (ExecutionException e) { + throw SpannerExceptionFactory.newSpannerException(e.getCause()); + } catch (InterruptedException e) { + throw SpannerExceptionFactory.propagateInterrupt(e); } }, executor);