From b036a77196886f16d2738e70f676ccc99a52874c Mon Sep 17 00:00:00 2001 From: Thiago Nunes Date: Thu, 15 Apr 2021 19:04:07 +1000 Subject: [PATCH] feat: transaction callable as functional interface (#1066) * feat: transaction callable as functional interface Marks the transaction callable as a functional interface. * samples: uses lambdas in samples --- .../google/cloud/spanner/AsyncRunnerImpl.java | 18 +- .../com/google/cloud/spanner/SessionImpl.java | 9 +- .../cloud/spanner/TransactionRunner.java | 1 + .../connection/SingleUseTransaction.java | 61 +-- .../cloud/spanner/BackendExhaustedTest.java | 9 +- .../cloud/spanner/DatabaseClientImplTest.java | 271 ++++------ .../cloud/spanner/InlineBeginBenchmark.java | 19 +- .../spanner/InlineBeginTransactionTest.java | 472 +++++++----------- .../cloud/spanner/MockSpannerServiceImpl.java | 11 +- ...adWriteTransactionWithInlineBeginTest.java | 296 +++++------ .../RetryOnInvalidatedSessionTest.java | 106 ++-- .../google/cloud/spanner/SessionImplTest.java | 55 +- .../cloud/spanner/SessionPoolBenchmark.java | 19 +- .../cloud/spanner/SessionPoolLeakTest.java | 11 +- .../SessionPoolMaintainerBenchmark.java | 19 +- .../com/google/cloud/spanner/SpanTest.java | 19 +- .../cloud/spanner/SpannerGaxRetryTest.java | 69 +-- .../spanner/TransactionRunnerImplTest.java | 69 +-- .../cloud/spanner/it/ITBatchDmlTest.java | 94 ++-- .../cloud/spanner/it/ITClosedSessionTest.java | 31 +- .../google/cloud/spanner/it/ITDMLTest.java | 103 ++-- .../cloud/spanner/it/ITQueryOptionsTest.java | 36 +- .../cloud/spanner/it/ITTransactionTest.java | 300 +++++------ .../spanner/spi/v1/GapicSpannerRpcTest.java | 18 +- .../CustomTimeoutAndRetrySettingsExample.java | 23 +- .../com/example/spanner/SpannerSample.java | 388 +++++++------- .../spanner/StatementTimeoutExample.java | 16 +- 27 files changed, 952 insertions(+), 1591 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunnerImpl.java index 7982f0d282..5e4face3bd 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncRunnerImpl.java @@ -23,7 +23,6 @@ import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.cloud.Timestamp; -import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; import java.util.concurrent.ExecutionException; @@ -60,16 +59,13 @@ public void run() { private R runTransaction(final AsyncWork work) { return delegate.run( - new TransactionCallable() { - @Override - public R run(TransactionContext transaction) throws Exception { - try { - return work.doWorkAsync(transaction).get(); - } catch (ExecutionException e) { - throw SpannerExceptionFactory.newSpannerException(e.getCause()); - } catch (InterruptedException e) { - throw SpannerExceptionFactory.propagateInterrupt(e); - } + transaction -> { + try { + return work.doWorkAsync(transaction).get(); + } catch (ExecutionException e) { + throw SpannerExceptionFactory.newSpannerException(e.getCause()); + } catch (InterruptedException e) { + throw SpannerExceptionFactory.propagateInterrupt(e); } }); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 874c11a5d8..d658a91182 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -141,12 +141,9 @@ public CommitResponse writeWithOptions(Iterable mutations, Transaction ? (Collection) mutations : Lists.newArrayList(mutations); runner.run( - new TransactionRunner.TransactionCallable() { - @Override - public Void run(TransactionContext ctx) { - ctx.buffer(finalMutations); - return null; - } + ctx -> { + ctx.buffer(finalMutations); + return null; }); return runner.getCommitResponse(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunner.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunner.java index 33cb57e90f..e7167451f0 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunner.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunner.java @@ -30,6 +30,7 @@ */ public interface TransactionRunner { /** A unit of work to be performed in the context of a transaction. */ + @FunctionalInterface interface TransactionCallable { /** * Invoked by the library framework to perform a single attempt of a transaction. This method diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SingleUseTransaction.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SingleUseTransaction.java index 922445fb97..fe75c1c7c3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SingleUseTransaction.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SingleUseTransaction.java @@ -34,9 +34,7 @@ import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.TimestampBound; -import com.google.cloud.spanner.TransactionContext; import com.google.cloud.spanner.TransactionRunner; -import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.cloud.spanner.connection.StatementParser.ParsedStatement; import com.google.cloud.spanner.connection.StatementParser.StatementType; import com.google.common.base.Function; @@ -357,12 +355,7 @@ public Long call() throws Exception { writeTransaction = createWriteTransaction(); Long res = writeTransaction.run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate(update.getStatement()); - } - }); + transaction -> transaction.executeUpdate(update.getStatement())); state = UnitOfWorkState.COMMITTED; return res; } catch (Throwable t) { @@ -404,31 +397,28 @@ private ApiFuture executeTransactionalBatchUpdateAsync( public long[] call() throws Exception { writeTransaction = createWriteTransaction(); return writeTransaction.run( - new TransactionCallable() { - @Override - public long[] run(TransactionContext transaction) throws Exception { - try { - long[] res = - transaction.batchUpdate( - Iterables.transform( - updates, - new Function() { - @Override - public Statement apply(ParsedStatement input) { - return input.getStatement(); - } - })); + transaction -> { + try { + long[] res = + transaction.batchUpdate( + Iterables.transform( + updates, + new Function() { + @Override + public Statement apply(ParsedStatement input) { + return input.getStatement(); + } + })); + state = UnitOfWorkState.COMMITTED; + return res; + } catch (Throwable t) { + if (t instanceof SpannerBatchUpdateException) { + // Batch update exceptions does not cause a rollback. state = UnitOfWorkState.COMMITTED; - return res; - } catch (Throwable t) { - if (t instanceof SpannerBatchUpdateException) { - // Batch update exceptions does not cause a rollback. - state = UnitOfWorkState.COMMITTED; - } else { - state = UnitOfWorkState.COMMIT_FAILED; - } - throw t; + } else { + state = UnitOfWorkState.COMMIT_FAILED; } + throw t; } }); } @@ -455,12 +445,9 @@ public Void call() throws Exception { writeTransaction = createWriteTransaction(); Void res = writeTransaction.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - transaction.buffer(mutations); - return null; - } + transaction -> { + transaction.buffer(mutations); + return null; }); state = UnitOfWorkState.COMMITTED; return res; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BackendExhaustedTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BackendExhaustedTest.java index 7e4e41facc..dba6d76e91 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BackendExhaustedTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/BackendExhaustedTest.java @@ -24,7 +24,6 @@ import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; -import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.protobuf.ListValue; import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.StructType; @@ -205,13 +204,7 @@ private final class WriteRunnable implements Runnable { @Override public void run() { TransactionRunner runner = client.readWriteTransaction(); - runner.run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) { - return transaction.executeUpdate(UPDATE_STATEMENT); - } - }); + runner.run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT)); } } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index 8c66b78ef3..fc4f8444b1 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -51,7 +51,6 @@ import com.google.cloud.spanner.ReadContext.QueryAnalyzeMode; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; import com.google.cloud.spanner.SpannerOptions.SpannerCallContextTimeoutConfigurator; -import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.SettableFuture; @@ -341,16 +340,12 @@ public void testReadWriteExecuteQueryWithTag() { TransactionRunner runner = client.readWriteTransaction(Options.tag("app=spanner,env=test,action=txn")); runner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - try (ResultSet resultSet = - transaction.executeQuery( - SELECT1, Options.tag("app=spanner,env=test,action=query"))) { - while (resultSet.next()) {} - } - return null; + transaction -> { + try (ResultSet resultSet = + transaction.executeQuery(SELECT1, Options.tag("app=spanner,env=test,action=query"))) { + while (resultSet.next()) {} } + return null; }); List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); @@ -370,19 +365,16 @@ public void testReadWriteExecuteReadWithTag() { TransactionRunner runner = client.readWriteTransaction(Options.tag("app=spanner,env=test,action=txn")); runner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - try (ResultSet resultSet = - transaction.read( - READ_TABLE_NAME, - KeySet.singleKey(Key.of(1L)), - READ_COLUMN_NAMES, - Options.tag("app=spanner,env=test,action=read"))) { - while (resultSet.next()) {} - } - return null; + transaction -> { + try (ResultSet resultSet = + transaction.read( + READ_TABLE_NAME, + KeySet.singleKey(Key.of(1L)), + READ_COLUMN_NAMES, + Options.tag("app=spanner,env=test,action=read"))) { + while (resultSet.next()) {} } + return null; }); List requests = mockSpanner.getRequestsOfType(ReadRequest.class); @@ -401,13 +393,9 @@ public void testExecuteUpdateWithTag() { spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); TransactionRunner runner = client.readWriteTransaction(); runner.run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate( - UPDATE_STATEMENT, Options.tag("app=spanner,env=test,action=update")); - } - }); + transaction -> + transaction.executeUpdate( + UPDATE_STATEMENT, Options.tag("app=spanner,env=test,action=update"))); List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); assertThat(requests).hasSize(1); @@ -425,13 +413,9 @@ public void testBatchUpdateWithTag() { TransactionRunner runner = client.readWriteTransaction(Options.tag("app=spanner,env=test,action=txn")); runner.run( - new TransactionCallable() { - @Override - public long[] run(TransactionContext transaction) throws Exception { - return transaction.batchUpdate( - Arrays.asList(UPDATE_STATEMENT), Options.tag("app=spanner,env=test,action=batch")); - } - }); + transaction -> + transaction.batchUpdate( + Arrays.asList(UPDATE_STATEMENT), Options.tag("app=spanner,env=test,action=batch"))); List requests = mockSpanner.getRequestsOfType(ExecuteBatchDmlRequest.class); @@ -467,12 +451,9 @@ public void testCommitWithTag() { TransactionRunner runner = client.readWriteTransaction(Options.tag("app=spanner,env=test,action=commit")); runner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - transaction.buffer(Mutation.delete("TEST", KeySet.all())); - return null; - } + transaction -> { + transaction.buffer(Mutation.delete("TEST", KeySet.all())); + return null; }); List requests = mockSpanner.getRequestsOfType(CommitRequest.class); @@ -818,12 +799,9 @@ public void testReadWriteTransaction() { spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); TransactionRunner runner = client.readWriteTransaction(); runner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - transaction.executeUpdate(UPDATE_STATEMENT); - return null; - } + transaction -> { + transaction.executeUpdate(UPDATE_STATEMENT); + return null; }); assertNotNull(runner.getCommitTimestamp()); } @@ -834,12 +812,9 @@ public void testReadWriteTransaction_returnsCommitStats() { spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); TransactionRunner runner = client.readWriteTransaction(Options.commitStats()); runner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - transaction.buffer(Mutation.delete("FOO", Key.of("foo"))); - return null; - } + transaction -> { + transaction.buffer(Mutation.delete("FOO", Key.of("foo"))); + return null; }); assertNotNull(runner.getCommitResponse()); assertNotNull(runner.getCommitResponse().getCommitStats()); @@ -857,12 +832,9 @@ public void readWriteTransactionIsNonBlocking() { // transaction. mockSpanner.unfreeze(); runner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - transaction.executeUpdate(UPDATE_STATEMENT); - return null; - } + transaction -> { + transaction.executeUpdate(UPDATE_STATEMENT); + return null; }); } @@ -1136,12 +1108,9 @@ public void testPartitionedDmlDoesNotTimeout() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - transaction.executeUpdate(UPDATE_STATEMENT); - return null; - } + transaction -> { + transaction.executeUpdate(UPDATE_STATEMENT); + return null; }); fail("expected DEADLINE_EXCEEDED"); } catch (SpannerException e) { @@ -1184,13 +1153,7 @@ public void testPartitionedDmlWithLowerTimeout() { long updateCount = client .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) { - return transaction.executeUpdate(UPDATE_STATEMENT); - } - }); + .run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT)); assertThat(updateCount).isEqualTo(UPDATE_COUNT); } } @@ -1234,13 +1197,7 @@ public void testPartitionedDmlWithHigherTimeout() { try { client .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) { - return transaction.executeUpdate(UPDATE_STATEMENT); - } - }); + .run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT)); fail("missing expected DEADLINE_EXCEEDED exception"); } catch (SpannerException e) { assertThat(e.getErrorCode()).isEqualTo(ErrorCode.DEADLINE_EXCEEDED); @@ -1442,15 +1399,7 @@ public void testDatabaseOrInstanceIsDeletedAndThenRecreated() throws Exception { } catch (DatabaseNotFoundException | InstanceNotFoundException e) { } try { - dbClient - .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - return null; - } - }); + dbClient.readWriteTransaction().run(transaction -> null); fail("missing expected exception"); } catch (DatabaseNotFoundException | InstanceNotFoundException e) { } @@ -1467,15 +1416,7 @@ public Void run(TransactionContext transaction) { } catch (DatabaseNotFoundException | InstanceNotFoundException e) { } try { - dbClient - .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - return null; - } - }); + dbClient.readWriteTransaction().run(transaction -> null); fail("missing expected exception"); } catch (DatabaseNotFoundException | InstanceNotFoundException e) { } @@ -1517,12 +1458,9 @@ public void testAllowNestedTransactions() throws InterruptedException { .readWriteTransaction() .allowNestedTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) { - assertThat(client.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions - 1); - return transaction.executeUpdate(UPDATE_STATEMENT); - } + transaction -> { + assertThat(client.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions - 1); + return transaction.executeUpdate(UPDATE_STATEMENT); }); assertThat(res).isEqualTo(UPDATE_COUNT); assertThat(client.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions); @@ -1552,39 +1490,33 @@ public void testNestedTransactionsUsingTwoDatabases() throws InterruptedExceptio .readWriteTransaction() .allowNestedTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) { - // Client1 should have 1 session checked out. - // Client2 should have 0 sessions checked out. - assertThat(client1.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions - 1); - assertThat(client2.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions); - Long add = - client2 - .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) { - // Both clients should now have 1 session checked out. - assertThat(client1.pool.getNumberOfSessionsInPool()) - .isEqualTo(minSessions - 1); - assertThat(client2.pool.getNumberOfSessionsInPool()) - .isEqualTo(minSessions - 1); - try (ResultSet rs = transaction.executeQuery(SELECT1)) { - if (rs.next()) { - return rs.getLong(0); - } - return 0L; - } + transaction -> { + // Client1 should have 1 session checked out. + // Client2 should have 0 sessions checked out. + assertThat(client1.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions - 1); + assertThat(client2.pool.getNumberOfSessionsInPool()).isEqualTo(minSessions); + Long add = + client2 + .readWriteTransaction() + .run( + transaction1 -> { + // Both clients should now have 1 session checked out. + assertThat(client1.pool.getNumberOfSessionsInPool()) + .isEqualTo(minSessions - 1); + assertThat(client2.pool.getNumberOfSessionsInPool()) + .isEqualTo(minSessions - 1); + try (ResultSet rs = transaction1.executeQuery(SELECT1)) { + if (rs.next()) { + return rs.getLong(0); } - }); - try (ResultSet rs = transaction.executeQuery(SELECT1)) { - if (rs.next()) { - return add + rs.getLong(0); - } - return add + 0L; + return 0L; + } + }); + try (ResultSet rs = transaction.executeQuery(SELECT1)) { + if (rs.next()) { + return add + rs.getLong(0); } + return add + 0L; } }); assertThat(res).isEqualTo(2L); @@ -1903,13 +1835,7 @@ public void run() { // Update should succeed. client .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate(UPDATE_STATEMENT); - } - }); + .run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT)); } }); } @@ -2037,15 +1963,12 @@ public void testReadWriteExecuteQueryWithPriority() { spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); TransactionRunner runner = client.readWriteTransaction(); runner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - try (ResultSet resultSet = - transaction.executeQuery(SELECT1, Options.priority(RpcPriority.HIGH))) { - while (resultSet.next()) {} - } - return null; + transaction -> { + try (ResultSet resultSet = + transaction.executeQuery(SELECT1, Options.priority(RpcPriority.HIGH))) { + while (resultSet.next()) {} } + return null; }); List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); @@ -2061,19 +1984,16 @@ public void testReadWriteExecuteReadWithPriority() { spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); TransactionRunner runner = client.readWriteTransaction(); runner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - try (ResultSet resultSet = - transaction.read( - READ_TABLE_NAME, - KeySet.singleKey(Key.of(1L)), - READ_COLUMN_NAMES, - Options.priority(RpcPriority.HIGH))) { - while (resultSet.next()) {} - } - return null; + transaction -> { + try (ResultSet resultSet = + transaction.read( + READ_TABLE_NAME, + KeySet.singleKey(Key.of(1L)), + READ_COLUMN_NAMES, + Options.priority(RpcPriority.HIGH))) { + while (resultSet.next()) {} } + return null; }); List requests = mockSpanner.getRequestsOfType(ReadRequest.class); @@ -2089,12 +2009,8 @@ public void testExecuteUpdateWithPriority() { spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); TransactionRunner runner = client.readWriteTransaction(); runner.run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate(UPDATE_STATEMENT, Options.priority(RpcPriority.HIGH)); - } - }); + transaction -> + transaction.executeUpdate(UPDATE_STATEMENT, Options.priority(RpcPriority.HIGH))); List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); assertThat(requests).hasSize(1); @@ -2109,13 +2025,9 @@ public void testBatchUpdateWithPriority() { spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); TransactionRunner runner = client.readWriteTransaction(); runner.run( - new TransactionCallable() { - @Override - public long[] run(TransactionContext transaction) throws Exception { - return transaction.batchUpdate( - Arrays.asList(UPDATE_STATEMENT), Options.priority(RpcPriority.HIGH)); - } - }); + transaction -> + transaction.batchUpdate( + Arrays.asList(UPDATE_STATEMENT), Options.priority(RpcPriority.HIGH))); List requests = mockSpanner.getRequestsOfType(ExecuteBatchDmlRequest.class); @@ -2144,12 +2056,9 @@ public void testCommitWithPriority() { spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); TransactionRunner runner = client.readWriteTransaction(Options.priority(RpcPriority.HIGH)); runner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - transaction.buffer(Mutation.delete("TEST", KeySet.all())); - return null; - } + transaction -> { + transaction.buffer(Mutation.delete("TEST", KeySet.all())); + return null; }); List requests = mockSpanner.getRequestsOfType(CommitRequest.class); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginBenchmark.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginBenchmark.java index ecd8f4410d..e38b6a104c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginBenchmark.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginBenchmark.java @@ -20,7 +20,6 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.NoCredentials; -import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -195,13 +194,8 @@ public Long call() throws Exception { Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); TransactionRunner runner = server.client.readWriteTransaction(); return runner.run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate( - StandardBenchmarkMockServer.UPDATE_STATEMENT); - } - }); + transaction -> + transaction.executeUpdate(StandardBenchmarkMockServer.UPDATE_STATEMENT)); } })); } @@ -231,13 +225,8 @@ public Long call() throws Exception { Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); TransactionRunner runner = server.client.readWriteTransaction(); return runner.run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate( - StandardBenchmarkMockServer.UPDATE_STATEMENT); - } - }); + transaction -> + transaction.executeUpdate(StandardBenchmarkMockServer.UPDATE_STATEMENT)); } })); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java index d8a780ef7d..6e595bf9b0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java @@ -539,13 +539,7 @@ public void testInlinedBeginTx() { long updateCount = client .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate(UPDATE_STATEMENT); - } - }); + .run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT)); assertThat(updateCount).isEqualTo(UPDATE_COUNT); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(1); @@ -562,15 +556,12 @@ public void testInlinedBeginTxAborted() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - long res = transaction.executeUpdate(UPDATE_STATEMENT); - if (firstAttempt.getAndSet(false)) { - mockSpanner.abortTransaction(transaction); - } - return res; + transaction -> { + long res = transaction.executeUpdate(UPDATE_STATEMENT); + if (firstAttempt.getAndSet(false)) { + mockSpanner.abortTransaction(transaction); } + return res; }); assertThat(updateCount).isEqualTo(UPDATE_COUNT); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); @@ -658,17 +649,14 @@ public void testInlinedBeginFirstQueryReturnsUnavailable() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - // The first attempt will return UNAVAILABLE and retry internally. - try (ResultSet rs = transaction.executeQuery(SELECT1)) { - while (rs.next()) { - return rs.getLong(0); - } + transaction -> { + // The first attempt will return UNAVAILABLE and retry internally. + try (ResultSet rs = transaction.executeQuery(SELECT1)) { + while (rs.next()) { + return rs.getLong(0); } - return 0L; } + return 0L; }); assertThat(value).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); @@ -685,18 +673,15 @@ public void testInlinedBeginFirstReadReturnsUnavailable() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - // The first attempt will return UNAVAILABLE and retry internally. - try (ResultSet rs = - transaction.read("FOO", KeySet.all(), Arrays.asList("ID"))) { - while (rs.next()) { - return rs.getLong(0); - } + transaction -> { + // The first attempt will return UNAVAILABLE and retry internally. + try (ResultSet rs = + transaction.read("FOO", KeySet.all(), Arrays.asList("ID"))) { + while (rs.next()) { + return rs.getLong(0); } - return 0L; } + return 0L; }); assertThat(value).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); @@ -712,16 +697,13 @@ public void testInlinedBeginTxWithQuery() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - try (ResultSet rs = transaction.executeQuery(SELECT1)) { - while (rs.next()) { - return rs.getLong(0); - } + transaction -> { + try (ResultSet rs = transaction.executeQuery(SELECT1)) { + while (rs.next()) { + return rs.getLong(0); } - return 0L; } + return 0L; }); assertThat(updateCount).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); @@ -737,17 +719,14 @@ public void testInlinedBeginTxWithRead() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - try (ResultSet rs = - transaction.read("FOO", KeySet.all(), Arrays.asList("ID"))) { - while (rs.next()) { - return rs.getLong(0); - } + transaction -> { + try (ResultSet rs = + transaction.read("FOO", KeySet.all(), Arrays.asList("ID"))) { + while (rs.next()) { + return rs.getLong(0); } - return 0L; } + return 0L; }); assertThat(updateCount).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); @@ -764,13 +743,8 @@ public void testInlinedBeginTxWithBatchDml() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public long[] run(TransactionContext transaction) throws Exception { - return transaction.batchUpdate( - Arrays.asList(UPDATE_STATEMENT, UPDATE_STATEMENT)); - } - }); + transaction -> + transaction.batchUpdate(Arrays.asList(UPDATE_STATEMENT, UPDATE_STATEMENT))); assertThat(updateCounts).asList().containsExactly(UPDATE_COUNT, UPDATE_COUNT); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); assertThat(countRequests(ExecuteBatchDmlRequest.class)).isEqualTo(1); @@ -786,17 +760,14 @@ public void testInlinedBeginTxWithError() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - try { - transaction.executeUpdate(INVALID_UPDATE_STATEMENT); - fail("missing expected exception"); - } catch (SpannerException e) { - assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); - } - return transaction.executeUpdate(UPDATE_STATEMENT); + transaction -> { + try { + transaction.executeUpdate(INVALID_UPDATE_STATEMENT); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); } + return transaction.executeUpdate(UPDATE_STATEMENT); }); assertThat(updateCount).isEqualTo(UPDATE_COUNT); // The transaction will be retried because the first statement that also tried to include the @@ -831,17 +802,14 @@ public void testInlinedBeginTxWithErrorOnFirstStatement_andThenErrorOnBeginTrans client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - try { - transaction.executeUpdate(INVALID_UPDATE_STATEMENT); - fail("missing expected exception"); - } catch (SpannerException e) { - assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); - } - return null; + transaction -> { + try { + transaction.executeUpdate(INVALID_UPDATE_STATEMENT); + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); } + return null; }); fail("Missing expected exception"); } catch (SpannerException e) { @@ -865,13 +833,7 @@ public void testInlinedBeginTxWithUncaughtError() { try { client .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate(INVALID_UPDATE_STATEMENT); - } - }); + .run(transaction -> transaction.executeUpdate(INVALID_UPDATE_STATEMENT)); fail("missing expected exception"); } catch (SpannerException e) { assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); @@ -896,14 +858,11 @@ public void testInlinedBeginTxWithUncaughtErrorAfterSuccessfulBegin() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - // This statement will start a transaction. - transaction.executeUpdate(UPDATE_STATEMENT); - // This statement will fail and cause a rollback as the exception is not caught. - return transaction.executeUpdate(INVALID_UPDATE_STATEMENT); - } + transaction -> { + // This statement will start a transaction. + transaction.executeUpdate(UPDATE_STATEMENT); + // This statement will fail and cause a rollback as the exception is not caught. + return transaction.executeUpdate(INVALID_UPDATE_STATEMENT); }); fail("missing expected exception"); } catch (SpannerException e) { @@ -924,19 +883,16 @@ public void testInlinedBeginTxBatchDmlWithErrorOnFirstStatement() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - try { - transaction.batchUpdate( - ImmutableList.of(INVALID_UPDATE_STATEMENT, UPDATE_STATEMENT)); - fail("missing expected exception"); - } catch (SpannerBatchUpdateException e) { - assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); - assertThat(e.getUpdateCounts()).hasLength(0); - } - return null; + transaction -> { + try { + transaction.batchUpdate( + ImmutableList.of(INVALID_UPDATE_STATEMENT, UPDATE_STATEMENT)); + fail("missing expected exception"); + } catch (SpannerBatchUpdateException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + assertThat(e.getUpdateCounts()).hasLength(0); } + return null; }); assertThat(res).isNull(); // The first statement failed and could not return a transaction. The entire transaction is @@ -955,21 +911,18 @@ public void testInlinedBeginTxBatchDmlWithErrorOnSecondStatement() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - try { - transaction.batchUpdate( - ImmutableList.of(UPDATE_STATEMENT, INVALID_UPDATE_STATEMENT)); - fail("missing expected exception"); - // The following line is needed as the compiler does not know that this is - // unreachable. - return -1L; - } catch (SpannerBatchUpdateException e) { - assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); - assertThat(e.getUpdateCounts()).hasLength(1); - return e.getUpdateCounts()[0]; - } + transaction -> { + try { + transaction.batchUpdate( + ImmutableList.of(UPDATE_STATEMENT, INVALID_UPDATE_STATEMENT)); + fail("missing expected exception"); + // The following line is needed as the compiler does not know that this is + // unreachable. + return -1L; + } catch (SpannerBatchUpdateException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + assertThat(e.getUpdateCounts()).hasLength(1); + return e.getUpdateCounts()[0]; } }); assertThat(updateCount).isEqualTo(UPDATE_COUNT); @@ -990,17 +943,14 @@ public void testInlinedBeginTxWithErrorOnStreamingSql() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - try (ResultSet rs = transaction.executeQuery(INVALID_SELECT)) { - while (rs.next()) {} - fail("missing expected exception"); - } catch (SpannerException e) { - assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); - } - return null; + transaction -> { + try (ResultSet rs = transaction.executeQuery(INVALID_SELECT)) { + while (rs.next()) {} + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); } + return null; }); assertThat(res).isNull(); // The transaction will be retried because the first statement that also tried to include the @@ -1031,17 +981,14 @@ public void testInlinedBeginTxWithErrorOnSecondPartialResultSet() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - try (ResultSet rs = transaction.executeQuery(statement)) { - while (rs.next()) {} - fail("missing expected exception"); - } catch (SpannerException e) { - assertThat(e.getErrorCode()).isEqualTo(ErrorCode.DATA_LOSS); - } - return null; + transaction -> { + try (ResultSet rs = transaction.executeQuery(statement)) { + while (rs.next()) {} + fail("missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.DATA_LOSS); } + return null; }); assertThat(res).isNull(); // The transaction will not be retried, as the first PartialResultSet returns the transaction @@ -1062,31 +1009,28 @@ public void testInlinedBeginTxWithParallelQueries() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(final TransactionContext transaction) throws Exception { - List> futures = new ArrayList<>(numQueries); - for (int i = 0; i < numQueries; i++) { - futures.add( - executor.submit( - new Callable() { - @Override - public Long call() throws Exception { - try (ResultSet rs = transaction.executeQuery(SELECT1)) { - while (rs.next()) { - return rs.getLong(0); - } + transaction -> { + List> futures = new ArrayList<>(numQueries); + for (int i = 0; i < numQueries; i++) { + futures.add( + executor.submit( + new Callable() { + @Override + public Long call() throws Exception { + try (ResultSet rs = transaction.executeQuery(SELECT1)) { + while (rs.next()) { + return rs.getLong(0); } - return 0L; } - })); - } - Long res = 0L; - for (Future f : futures) { - res += f.get(); - } - return res; + return 0L; + } + })); + } + Long res = 0L; + for (Future f : futures) { + res += f.get(); } + return res; }); assertThat(updateCount).isEqualTo(1L * numQueries); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); @@ -1100,15 +1044,12 @@ public void testInlinedBeginTxWithOnlyMutations() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - transaction.buffer( - Arrays.asList( - Mutation.newInsertBuilder("FOO").set("ID").to(1L).build(), - Mutation.delete("FOO", Key.of(1L)))); - return null; - } + transaction -> { + transaction.buffer( + Arrays.asList( + Mutation.newInsertBuilder("FOO").set("ID").to(1L).build(), + Mutation.delete("FOO", Key.of(1L)))); + return null; }); // There should be 1 call to BeginTransaction because there is no statement that we can use to // inline the BeginTransaction call with. @@ -1305,15 +1246,12 @@ public void queryWithoutNext() { assertThat( client .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - // This will not actually send an RPC, so it will also not request a - // transaction. - transaction.executeQuery(SELECT1); - return transaction.executeUpdate(UPDATE_STATEMENT); - } + .run( + transaction -> { + // This will not actually send an RPC, so it will also not request a + // transaction. + transaction.executeQuery(SELECT1); + return transaction.executeUpdate(UPDATE_STATEMENT); })) .isEqualTo(UPDATE_COUNT); assertThat(mockSpanner.countRequestsOfType(BeginTransactionRequest.class)).isEqualTo(0L); @@ -1327,13 +1265,10 @@ public void queryAsyncWithoutCallback() { assertThat( client .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - transaction.executeQueryAsync(SELECT1); - return transaction.executeUpdate(UPDATE_STATEMENT); - } + .run( + transaction -> { + transaction.executeQueryAsync(SELECT1); + return transaction.executeUpdate(UPDATE_STATEMENT); })) .isEqualTo(UPDATE_COUNT); assertThat(mockSpanner.countRequestsOfType(BeginTransactionRequest.class)).isEqualTo(0L); @@ -1347,13 +1282,10 @@ public void readWithoutNext() { assertThat( client .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - transaction.read("FOO", KeySet.all(), Arrays.asList("ID")); - return transaction.executeUpdate(UPDATE_STATEMENT); - } + .run( + transaction -> { + transaction.read("FOO", KeySet.all(), Arrays.asList("ID")); + return transaction.executeUpdate(UPDATE_STATEMENT); })) .isEqualTo(UPDATE_COUNT); assertThat(mockSpanner.countRequestsOfType(BeginTransactionRequest.class)).isEqualTo(0L); @@ -1368,13 +1300,10 @@ public void readAsyncWithoutCallback() { assertThat( client .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - transaction.readAsync("FOO", KeySet.all(), Arrays.asList("ID")); - return transaction.executeUpdate(UPDATE_STATEMENT); - } + .run( + transaction -> { + transaction.readAsync("FOO", KeySet.all(), Arrays.asList("ID")); + return transaction.executeUpdate(UPDATE_STATEMENT); })) .isEqualTo(UPDATE_COUNT); assertThat(mockSpanner.countRequestsOfType(BeginTransactionRequest.class)).isEqualTo(0L); @@ -1390,16 +1319,13 @@ public void query_ThenUpdate_ThenConsumeResultSet() assertThat( client .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - ResultSet rs = transaction.executeQuery(SELECT1); - long updateCount = transaction.executeUpdate(UPDATE_STATEMENT); - // Consume the result set. - while (rs.next()) {} - return updateCount; - } + .run( + transaction -> { + ResultSet rs = transaction.executeQuery(SELECT1); + long updateCount = transaction.executeUpdate(UPDATE_STATEMENT); + // Consume the result set. + while (rs.next()) {} + return updateCount; })) .isEqualTo(UPDATE_COUNT); // The update statement should start the transaction, and the query should use the transaction @@ -1425,14 +1351,11 @@ public void testInlinedBeginTxWithStreamRetry() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - try (ResultSet rs = transaction.executeQuery(SELECT1_UNION_ALL_SELECT2)) { - while (rs.next()) {} - } - return null; + transaction -> { + try (ResultSet rs = transaction.executeQuery(SELECT1_UNION_ALL_SELECT2)) { + while (rs.next()) {} } + return null; }); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2); @@ -1509,14 +1432,11 @@ public void testQueryWithInlineBeginDidNotReturnTransaction() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - try (ResultSet rs = transaction.executeQuery(SELECT1_UNION_ALL_SELECT2)) { - while (rs.next()) {} - } - return null; + transaction -> { + try (ResultSet rs = transaction.executeQuery(SELECT1_UNION_ALL_SELECT2)) { + while (rs.next()) {} } + return null; }); fail("missing expected exception"); } catch (SpannerException e) { @@ -1538,12 +1458,9 @@ public void testReadWithInlineBeginDidNotReturnTransaction() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - transaction.readRow("FOO", Key.of(1L), Arrays.asList("BAR")); - return null; - } + transaction -> { + transaction.readRow("FOO", Key.of(1L), Arrays.asList("BAR")); + return null; }); fail("missing expected exception"); } catch (SpannerException e) { @@ -1565,12 +1482,9 @@ public void testUpdateWithInlineBeginDidNotReturnTransaction() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - transaction.executeUpdate(UPDATE_STATEMENT); - return null; - } + transaction -> { + transaction.executeUpdate(UPDATE_STATEMENT); + return null; }); fail("missing expected exception"); } catch (SpannerException e) { @@ -1592,12 +1506,9 @@ public void testBatchUpdateWithInlineBeginDidNotReturnTransaction() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - transaction.batchUpdate(Arrays.asList(UPDATE_STATEMENT)); - return null; - } + transaction -> { + transaction.batchUpdate(Arrays.asList(UPDATE_STATEMENT)); + return null; }); fail("missing expected exception"); } catch (SpannerException e) { @@ -1620,34 +1531,31 @@ public void testQueryAsyncWithInlineBeginDidNotReturnTransaction() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - try (AsyncResultSet rs = - transaction.executeQueryAsync(SELECT1_UNION_ALL_SELECT2)) { - return SpannerApiFutures.get( - rs.setCallback( - executor, - new ReadyCallback() { - @Override - public CallbackResponse cursorReady(AsyncResultSet resultSet) { - try { - while (true) { - switch (resultSet.tryNext()) { - case OK: - break; - case DONE: - return CallbackResponse.DONE; - case NOT_READY: - return CallbackResponse.CONTINUE; - } + transaction -> { + try (AsyncResultSet rs = + transaction.executeQueryAsync(SELECT1_UNION_ALL_SELECT2)) { + return SpannerApiFutures.get( + rs.setCallback( + executor, + new ReadyCallback() { + @Override + public CallbackResponse cursorReady(AsyncResultSet resultSet) { + try { + while (true) { + switch (resultSet.tryNext()) { + case OK: + break; + case DONE: + return CallbackResponse.DONE; + case NOT_READY: + return CallbackResponse.CONTINUE; } - } catch (SpannerException e) { - return CallbackResponse.DONE; } + } catch (SpannerException e) { + return CallbackResponse.DONE; } - })); - } + } + })); } }); fail("missing expected exception"); @@ -1670,12 +1578,8 @@ public void testUpdateAsyncWithInlineBeginDidNotReturnTransaction() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return SpannerApiFutures.get(transaction.executeUpdateAsync(UPDATE_STATEMENT)); - } - }); + transaction -> + SpannerApiFutures.get(transaction.executeUpdateAsync(UPDATE_STATEMENT))); fail("missing expected exception"); } catch (SpannerException e) { assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION); @@ -1696,13 +1600,9 @@ public void testBatchUpdateAsyncWithInlineBeginDidNotReturnTransaction() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public long[] run(TransactionContext transaction) throws Exception { - return SpannerApiFutures.get( - transaction.batchUpdateAsync(Arrays.asList(UPDATE_STATEMENT))); - } - }); + transaction -> + SpannerApiFutures.get( + transaction.batchUpdateAsync(Arrays.asList(UPDATE_STATEMENT)))); fail("missing expected exception"); } catch (SpannerException e) { assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION); @@ -1774,15 +1674,7 @@ public void testInlinedBeginTx_withStickyCancelledOnFirstStatement() { // The CANCELLED error is thrown both on the first and second attempt. The second attempt will // not be retried, as it did not include a BeginTransaction option. try { - client - .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate(statement); - } - }); + client.readWriteTransaction().run(transaction -> transaction.executeUpdate(statement)); fail("missing expected exception"); } catch (SpannerException e) { assertEquals(ErrorCode.CANCELLED, e.getErrorCode()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index f09d86a429..4850bce5ad 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -174,14 +174,9 @@ * long updateCount = * dbClient * .readWriteTransaction() - * .run( - * new TransactionCallable() { - * @Override - * public Long run(TransactionContext transaction) throws Exception { - * return transaction.executeUpdate( - * Statement.of("UPDATE FOO SET BAR=1 WHERE BAZ=2")); - * } - * }); + * .run(transaction -> + * transaction.executeUpdate(Statement.of("UPDATE FOO SET BAR=1 WHERE BAZ=2")) + * ); * System.out.println("Update count: " + updateCount); * spannerClient.close(); * } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadWriteTransactionWithInlineBeginTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadWriteTransactionWithInlineBeginTest.java index 4690a30aa7..79663b9a3f 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadWriteTransactionWithInlineBeginTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadWriteTransactionWithInlineBeginTest.java @@ -22,7 +22,6 @@ import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.cloud.NoCredentials; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; -import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.protobuf.AbstractMessage; import com.google.protobuf.ListValue; import com.google.spanner.v1.BeginTransactionRequest; @@ -145,13 +144,7 @@ public void singleUpdate() { Long updateCount = client .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate(UPDATE_STATEMENT); - } - }); + .run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT)); assertThat(updateCount).isEqualTo(UPDATE_COUNT); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); assertThat(countTransactionsStarted()).isEqualTo(1); @@ -163,13 +156,8 @@ public void singleBatchUpdate() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public long[] run(TransactionContext transaction) throws Exception { - return transaction.batchUpdate( - Arrays.asList(UPDATE_STATEMENT, UPDATE_STATEMENT)); - } - }); + transaction -> + transaction.batchUpdate(Arrays.asList(UPDATE_STATEMENT, UPDATE_STATEMENT))); assertThat(updateCounts).isEqualTo(new long[] {UPDATE_COUNT, UPDATE_COUNT}); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); assertThat(countTransactionsStarted()).isEqualTo(1); @@ -181,16 +169,13 @@ public void singleQuery() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - try (ResultSet rs = transaction.executeQuery(SELECT1)) { - while (rs.next()) { - return rs.getLong(0); - } + transaction -> { + try (ResultSet rs = transaction.executeQuery(SELECT1)) { + while (rs.next()) { + return rs.getLong(0); } - return 0L; } + return 0L; }); assertThat(value).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); @@ -203,18 +188,15 @@ public void updateAndQuery() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public long[] run(TransactionContext transaction) throws Exception { - long updateCount = transaction.executeUpdate(UPDATE_STATEMENT); - long val = 0L; - try (ResultSet rs = transaction.executeQuery(SELECT1)) { - while (rs.next()) { - val = rs.getLong(0); - } + transaction -> { + long updateCount = transaction.executeUpdate(UPDATE_STATEMENT); + long val = 0L; + try (ResultSet rs = transaction.executeQuery(SELECT1)) { + while (rs.next()) { + val = rs.getLong(0); } - return new long[] {updateCount, val}; } + return new long[] {updateCount, val}; }); assertThat(res).isEqualTo(new long[] {UPDATE_COUNT, 1L}); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); @@ -229,26 +211,23 @@ public void concurrentUpdates() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(final TransactionContext transaction) throws Exception { - List> list = new ArrayList<>(updates); - for (int i = 0; i < updates; i++) { - list.add( - service.submit( - new Callable() { - @Override - public Long call() throws Exception { - return transaction.executeUpdate(UPDATE_STATEMENT); - } - })); - } - long totalUpdateCount = 0L; - for (Future fut : list) { - totalUpdateCount += fut.get(); - } - return totalUpdateCount; + transaction -> { + List> list = new ArrayList<>(updates); + for (int i = 0; i < updates; i++) { + list.add( + service.submit( + new Callable() { + @Override + public Long call() throws Exception { + return transaction.executeUpdate(UPDATE_STATEMENT); + } + })); } + long totalUpdateCount = 0L; + for (Future fut : list) { + totalUpdateCount += fut.get(); + } + return totalUpdateCount; }); assertThat(updateCount).isEqualTo(UPDATE_COUNT * updates); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); @@ -263,29 +242,26 @@ public void concurrentBatchUpdates() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(final TransactionContext transaction) throws Exception { - List> list = new ArrayList<>(updates); - for (int i = 0; i < updates; i++) { - list.add( - service.submit( - new Callable() { - @Override - public long[] call() throws Exception { - return transaction.batchUpdate( - Arrays.asList(UPDATE_STATEMENT, UPDATE_STATEMENT)); - } - })); - } - long totalUpdateCount = 0L; - for (Future fut : list) { - for (long l : fut.get()) { - totalUpdateCount += l; - } + transaction -> { + List> list = new ArrayList<>(updates); + for (int i = 0; i < updates; i++) { + list.add( + service.submit( + new Callable() { + @Override + public long[] call() throws Exception { + return transaction.batchUpdate( + Arrays.asList(UPDATE_STATEMENT, UPDATE_STATEMENT)); + } + })); + } + long totalUpdateCount = 0L; + for (Future fut : list) { + for (long l : fut.get()) { + totalUpdateCount += l; } - return totalUpdateCount; } + return totalUpdateCount; }); assertThat(updateCount).isEqualTo(UPDATE_COUNT * updates * 2); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); @@ -300,31 +276,28 @@ public void concurrentQueries() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(final TransactionContext transaction) throws Exception { - List> list = new ArrayList<>(queries); - for (int i = 0; i < queries; i++) { - list.add( - service.submit( - new Callable() { - @Override - public Long call() throws Exception { - try (ResultSet rs = transaction.executeQuery(SELECT1)) { - while (rs.next()) { - return rs.getLong(0); - } + transaction -> { + List> list = new ArrayList<>(queries); + for (int i = 0; i < queries; i++) { + list.add( + service.submit( + new Callable() { + @Override + public Long call() throws Exception { + try (ResultSet rs = transaction.executeQuery(SELECT1)) { + while (rs.next()) { + return rs.getLong(0); } - return 0L; } - })); - } - long selectedTotal = 0L; - for (Future fut : list) { - selectedTotal += fut.get(); - } - return selectedTotal; + return 0L; + } + })); + } + long selectedTotal1 = 0L; + for (Future fut : list) { + selectedTotal1 += fut.get(); } + return selectedTotal1; }); assertThat(selectedTotal).isEqualTo(queries); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); @@ -336,13 +309,7 @@ public void failedUpdate() { try { client .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate(INVALID_UPDATE_STATEMENT); - } - }); + .run(transaction -> transaction.executeUpdate(INVALID_UPDATE_STATEMENT)); fail("missing expected exception"); } catch (SpannerException e) { assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); @@ -357,13 +324,9 @@ public void failedBatchUpdate() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public long[] run(TransactionContext transaction) throws Exception { - return transaction.batchUpdate( - Arrays.asList(INVALID_UPDATE_STATEMENT, UPDATE_STATEMENT)); - } - }); + transaction -> + transaction.batchUpdate( + Arrays.asList(INVALID_UPDATE_STATEMENT, UPDATE_STATEMENT))); fail("missing expected exception"); } catch (SpannerException e) { assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); @@ -378,14 +341,11 @@ public void failedQuery() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - try (ResultSet rs = transaction.executeQuery(INVALID_SELECT_STATEMENT)) { - rs.next(); - } - return null; + transaction -> { + try (ResultSet rs = transaction.executeQuery(INVALID_SELECT_STATEMENT)) { + rs.next(); } + return null; }); fail("missing expected exception"); } catch (SpannerException e) { @@ -401,21 +361,18 @@ public void failedUpdateAndThenUpdate() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - try { - // This update statement carries the BeginTransaction, but fails. This will - // cause the entire transaction to be retried with an explicit - // BeginTransaction RPC to ensure all statements in the transaction are - // actually executed against the same transaction. - transaction.executeUpdate(INVALID_UPDATE_STATEMENT); - fail("Missing expected exception"); - } catch (SpannerException e) { - assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); - } - return transaction.executeUpdate(UPDATE_STATEMENT); + transaction -> { + try { + // This update statement carries the BeginTransaction, but fails. This will + // cause the entire transaction to be retried with an explicit + // BeginTransaction RPC to ensure all statements in the transaction are + // actually executed against the same transaction. + transaction.executeUpdate(INVALID_UPDATE_STATEMENT); + fail("Missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); } + return transaction.executeUpdate(UPDATE_STATEMENT); }); assertThat(updateCount).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); @@ -428,22 +385,19 @@ public void failedBatchUpdateAndThenUpdate() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - try { - // This update statement carries the BeginTransaction, but fails. This will - // cause the entire transaction to be retried with an explicit - // BeginTransaction RPC to ensure all statements in the transaction are - // actually executed against the same transaction. - transaction.batchUpdate( - Arrays.asList(INVALID_UPDATE_STATEMENT, UPDATE_STATEMENT)); - fail("Missing expected exception"); - } catch (SpannerException e) { - assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); - } - return transaction.executeUpdate(UPDATE_STATEMENT); + transaction -> { + try { + // This update statement carries the BeginTransaction, but fails. This will + // cause the entire transaction to be retried with an explicit + // BeginTransaction RPC to ensure all statements in the transaction are + // actually executed against the same transaction. + transaction.batchUpdate( + Arrays.asList(INVALID_UPDATE_STATEMENT, UPDATE_STATEMENT)); + fail("Missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); } + return transaction.executeUpdate(UPDATE_STATEMENT); }); assertThat(updateCount).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); @@ -456,19 +410,16 @@ public void failedQueryAndThenUpdate() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - // This query carries the BeginTransaction, but fails. The BeginTransaction will - // then be carried by the subsequent statement. - try (ResultSet rs = transaction.executeQuery(INVALID_SELECT_STATEMENT)) { - rs.next(); - fail("Missing expected exception"); - } catch (SpannerException e) { - assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); - } - return transaction.executeUpdate(UPDATE_STATEMENT); + transaction -> { + // This query carries the BeginTransaction, but fails. The BeginTransaction will + // then be carried by the subsequent statement. + try (ResultSet rs = transaction.executeQuery(INVALID_SELECT_STATEMENT)) { + rs.next(); + fail("Missing expected exception"); + } catch (SpannerException e) { + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); } + return transaction.executeUpdate(UPDATE_STATEMENT); }); assertThat(updateCount).isEqualTo(1L); assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); @@ -482,16 +433,13 @@ public void abortedUpdate() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - if (attempt.incrementAndGet() == 1) { - // We use abortNextTransaction here, as the transaction context does not yet - // have a transaction (it will be requested by the first update statement). - mockSpanner.abortNextTransaction(); - } - return transaction.executeUpdate(UPDATE_STATEMENT); + transaction -> { + if (attempt.incrementAndGet() == 1) { + // We use abortNextTransaction here, as the transaction context does not yet + // have a transaction (it will be requested by the first update statement). + mockSpanner.abortNextTransaction(); } + return transaction.executeUpdate(UPDATE_STATEMENT); }); assertThat(updateCount).isEqualTo(UPDATE_COUNT); assertThat(attempt.get()).isEqualTo(2); @@ -506,17 +454,13 @@ public void abortedBatchUpdate() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public long[] run(TransactionContext transaction) throws Exception { - if (attempt.incrementAndGet() == 1) { - // We use abortNextTransaction here, as the transaction context does not yet - // have a transaction (it will be requested by the first update statement). - mockSpanner.abortNextTransaction(); - } - return transaction.batchUpdate( - Arrays.asList(UPDATE_STATEMENT, UPDATE_STATEMENT)); + transaction -> { + if (attempt.incrementAndGet() == 1) { + // We use abortNextTransaction here, as the transaction context does not yet + // have a transaction (it will be requested by the first update statement). + mockSpanner.abortNextTransaction(); } + return transaction.batchUpdate(Arrays.asList(UPDATE_STATEMENT, UPDATE_STATEMENT)); }); assertThat(updateCounts).isEqualTo(new long[] {UPDATE_COUNT, UPDATE_COUNT}); assertThat(attempt.get()).isEqualTo(2); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java index 9ea77366b1..6102013780 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java @@ -598,17 +598,14 @@ public void readWriteTransactionReadOnlySessionInPool() throws InterruptedExcept TransactionRunner runner = client.readWriteTransaction(); int count = runner.run( - new TransactionCallable() { - @Override - public Integer run(TransactionContext transaction) { - int count = 0; - try (ResultSet rs = transaction.executeQuery(SELECT1AND2)) { - while (rs.next()) { - count++; - } + transaction -> { + int count1 = 0; + try (ResultSet rs = transaction.executeQuery(SELECT1AND2)) { + while (rs.next()) { + count1++; } - return count; } + return count1; }); assertThat(count).isEqualTo(2); assertThat(failOnInvalidatedSession).isFalse(); @@ -624,17 +621,14 @@ public void readWriteTransactionSelect() throws InterruptedException { TransactionRunner runner = client.readWriteTransaction(); int count = runner.run( - new TransactionCallable() { - @Override - public Integer run(TransactionContext transaction) { - int count = 0; - try (ResultSet rs = transaction.executeQuery(SELECT1AND2)) { - while (rs.next()) { - count++; - } + transaction -> { + int count1 = 0; + try (ResultSet rs = transaction.executeQuery(SELECT1AND2)) { + while (rs.next()) { + count1++; } - return count; } + return count1; }); assertThat(count).isEqualTo(2); assertThat(failOnInvalidatedSession).isFalse(); @@ -650,17 +644,14 @@ public void readWriteTransactionRead() throws InterruptedException { TransactionRunner runner = client.readWriteTransaction(); int count = runner.run( - new TransactionCallable() { - @Override - public Integer run(TransactionContext transaction) { - int count = 0; - try (ResultSet rs = transaction.read("FOO", KeySet.all(), Arrays.asList("BAR"))) { - while (rs.next()) { - count++; - } + transaction -> { + int count1 = 0; + try (ResultSet rs = transaction.read("FOO", KeySet.all(), Arrays.asList("BAR"))) { + while (rs.next()) { + count1++; } - return count; } + return count1; }); assertThat(count).isEqualTo(2); assertThat(failOnInvalidatedSession).isFalse(); @@ -676,19 +667,15 @@ public void readWriteTransactionReadUsingIndex() throws InterruptedException { TransactionRunner runner = client.readWriteTransaction(); int count = runner.run( - new TransactionCallable() { - @Override - public Integer run(TransactionContext transaction) { - int count = 0; - try (ResultSet rs = - transaction.readUsingIndex( - "FOO", "IDX", KeySet.all(), Arrays.asList("BAR"))) { - while (rs.next()) { - count++; - } + transaction -> { + int count1 = 0; + try (ResultSet rs = + transaction.readUsingIndex("FOO", "IDX", KeySet.all(), Arrays.asList("BAR"))) { + while (rs.next()) { + count1++; } - return count; } + return count1; }); assertThat(count).isEqualTo(2); assertThat(failOnInvalidatedSession).isFalse(); @@ -703,13 +690,7 @@ public void readWriteTransactionReadRow() throws InterruptedException { try { TransactionRunner runner = client.readWriteTransaction(); Struct row = - runner.run( - new TransactionCallable() { - @Override - public Struct run(TransactionContext transaction) { - return transaction.readRow("FOO", Key.of(), Arrays.asList("BAR")); - } - }); + runner.run(transaction -> transaction.readRow("FOO", Key.of(), Arrays.asList("BAR"))); assertThat(row.getLong(0)).isEqualTo(1L); assertThat(failOnInvalidatedSession).isFalse(); } catch (SessionNotFoundException e) { @@ -724,13 +705,8 @@ public void readWriteTransactionReadRowUsingIndex() throws InterruptedException TransactionRunner runner = client.readWriteTransaction(); Struct row = runner.run( - new TransactionCallable() { - @Override - public Struct run(TransactionContext transaction) { - return transaction.readRowUsingIndex( - "FOO", "IDX", Key.of(), Arrays.asList("BAR")); - } - }); + transaction -> + transaction.readRowUsingIndex("FOO", "IDX", Key.of(), Arrays.asList("BAR"))); assertThat(row.getLong(0)).isEqualTo(1L); assertThat(failOnInvalidatedSession).isFalse(); } catch (SessionNotFoundException e) { @@ -743,14 +719,7 @@ public void readWriteTransactionUpdate() throws InterruptedException { invalidateSessionPool(); try { TransactionRunner runner = client.readWriteTransaction(); - long count = - runner.run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) { - return transaction.executeUpdate(UPDATE_STATEMENT); - } - }); + long count = runner.run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT)); assertThat(count).isEqualTo(UPDATE_COUNT); assertThat(failOnInvalidatedSession).isFalse(); } catch (SessionNotFoundException e) { @@ -764,13 +733,7 @@ public void readWriteTransactionBatchUpdate() throws InterruptedException { try { TransactionRunner runner = client.readWriteTransaction(); long[] count = - runner.run( - new TransactionCallable() { - @Override - public long[] run(TransactionContext transaction) { - return transaction.batchUpdate(Arrays.asList(UPDATE_STATEMENT)); - } - }); + runner.run(transaction -> transaction.batchUpdate(Arrays.asList(UPDATE_STATEMENT))); assertThat(count.length).isEqualTo(1); assertThat(count[0]).isEqualTo(UPDATE_COUNT); assertThat(failOnInvalidatedSession).isFalse(); @@ -785,12 +748,9 @@ public void readWriteTransactionBuffer() throws InterruptedException { try { TransactionRunner runner = client.readWriteTransaction(); runner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - transaction.buffer(Mutation.newInsertBuilder("FOO").set("BAR").to(1L).build()); - return null; - } + transaction -> { + transaction.buffer(Mutation.newInsertBuilder("FOO").set("BAR").to(1L).build()); + return null; }); assertThat(runner.getCommitTimestamp()).isNotNull(); assertThat(failOnInvalidatedSession).isFalse(); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index 5388af76c4..3a3f8b7ce8 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -28,7 +28,6 @@ import com.google.cloud.Timestamp; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; -import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; @@ -124,21 +123,10 @@ private void doNestedRwTransaction() { session .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws SpannerException { - session - .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - return null; - } - }); + transaction -> { + session.readWriteTransaction().run(transaction1 -> null); - return null; - } + return null; }); } @@ -159,13 +147,10 @@ public void nestedReadOnlyTxnThrows() { session .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws SpannerException { - session.readOnlyTransaction().getReadTimestamp(); + transaction -> { + session.readOnlyTransaction().getReadTimestamp(); - return null; - } + return null; }); fail("Expected exception"); } catch (SpannerException e) { @@ -180,12 +165,9 @@ public void nestedSingleUseReadTxnThrows() { session .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws SpannerException { - session.singleUseReadOnlyTransaction(); - return null; - } + transaction -> { + session.singleUseReadOnlyTransaction(); + return null; }); fail("Expected exception"); } catch (SpannerException e) { @@ -200,12 +182,9 @@ public void nestedTxnSucceedsWhenAllowed() { .readWriteTransaction() .allowNestedTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws SpannerException { - session.singleUseReadOnlyTransaction(); - return null; - } + transaction -> { + session.singleUseReadOnlyTransaction(); + return null; }); } @@ -371,13 +350,9 @@ public void singleUseContextClosesTransaction() { session.singleUse(TimestampBound.strong()); try { runner.run( - new TransactionRunner.TransactionCallable() { - @Nullable - @Override - public Void run(TransactionContext transaction) throws SpannerException { - fail("Unexpected call to transaction body"); - return null; - } + transaction -> { + fail("Unexpected call to transaction body"); + return null; }); fail("Expected exception"); } catch (IllegalStateException ex) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java index fe5599b32c..6eb3992471 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolBenchmark.java @@ -20,7 +20,6 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.NoCredentials; -import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; @@ -199,13 +198,8 @@ public Long call() throws Exception { Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); TransactionRunner runner = client.readWriteTransaction(); return runner.run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate( - StandardBenchmarkMockServer.UPDATE_STATEMENT); - } - }); + transaction -> + transaction.executeUpdate(StandardBenchmarkMockServer.UPDATE_STATEMENT)); } })); } @@ -236,13 +230,8 @@ public Long call() throws Exception { Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); TransactionRunner runner = client.readWriteTransaction(); return runner.run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate( - StandardBenchmarkMockServer.UPDATE_STATEMENT); - } - }); + transaction -> + transaction.executeUpdate(StandardBenchmarkMockServer.UPDATE_STATEMENT)); } })); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java index f559a04b94..77b0a461d5 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolLeakTest.java @@ -24,7 +24,6 @@ import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.cloud.NoCredentials; import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; -import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import io.grpc.Server; import io.grpc.StatusRuntimeException; import io.grpc.inprocess.InProcessServerBuilder; @@ -125,15 +124,7 @@ private void readWriteTransactionTest( assertThat(pool.getNumberOfSessionsInPool(), is(equalTo(0))); setup.run(); try { - client - .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - return null; - } - }); + client.readWriteTransaction().run(transaction -> null); fail("missing FAILED_PRECONDITION exception"); } catch (SpannerException e) { assertThat(e.getErrorCode(), is(equalTo(ErrorCode.FAILED_PRECONDITION))); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerBenchmark.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerBenchmark.java index baba73f9e9..1b919f9276 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerBenchmark.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolMaintainerBenchmark.java @@ -20,7 +20,6 @@ import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.NoCredentials; -import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; @@ -196,13 +195,8 @@ public Long call() throws Exception { Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); TransactionRunner runner = client.readWriteTransaction(); return runner.run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate( - StandardBenchmarkMockServer.UPDATE_STATEMENT); - } - }); + transaction -> + transaction.executeUpdate(StandardBenchmarkMockServer.UPDATE_STATEMENT)); } })); } @@ -235,13 +229,8 @@ public Long call() throws Exception { Thread.sleep(RND.nextInt(RND_WAIT_TIME_BETWEEN_REQUESTS)); TransactionRunner runner = client.readWriteTransaction(); return runner.run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate( - StandardBenchmarkMockServer.UPDATE_STATEMENT); - } - }); + transaction -> + transaction.executeUpdate(StandardBenchmarkMockServer.UPDATE_STATEMENT)); } })); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java index 75552c52e1..11276c68dc 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpanTest.java @@ -26,7 +26,6 @@ import com.google.cloud.NoCredentials; import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; -import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.protobuf.ListValue; import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.StructType; @@ -270,12 +269,9 @@ public void multiUse() { public void transactionRunner() { TransactionRunner runner = client.readWriteTransaction(); runner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - transaction.executeUpdate(UPDATE_STATEMENT); - return null; - } + transaction -> { + transaction.executeUpdate(UPDATE_STATEMENT); + return null; }); Map spans = failOnOverkillTraceComponent.getSpans(); assertThat(spans).containsEntry("CloudSpanner.ReadWriteTransaction", true); @@ -290,12 +286,9 @@ public void transactionRunnerWithError() { TransactionRunner runner = client.readWriteTransaction(); try { runner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - transaction.executeUpdate(INVALID_UPDATE_STATEMENT); - return null; - } + transaction -> { + transaction.executeUpdate(INVALID_UPDATE_STATEMENT); + return null; }); fail("missing expected exception"); } catch (SpannerException e) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerGaxRetryTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerGaxRetryTest.java index cda4cf5f8f..6773f8a5b5 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerGaxRetryTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerGaxRetryTest.java @@ -30,7 +30,6 @@ import com.google.cloud.NoCredentials; import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; -import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.protobuf.ListValue; import com.google.spanner.v1.ResultSetMetadata; import com.google.spanner.v1.StructType; @@ -197,14 +196,7 @@ private void warmUpSessionPool(DatabaseClient client) { while (true) { try { TransactionRunner runner = client.readWriteTransaction(); - long updateCount = - runner.run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) { - return transaction.executeUpdate(UPDATE_STATEMENT); - } - }); + long updateCount = runner.run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT)); assertThat(updateCount, is(equalTo(UPDATE_COUNT))); break; } catch (SpannerException e) { @@ -321,13 +313,7 @@ public void readWriteTransactionTimeout() { mockSpanner.setBeginTransactionExecutionTime(ONE_SECOND); try { TransactionRunner runner = clientWithTimeout.readWriteTransaction(); - runner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - return null; - } - }); + runner.run(transaction -> null); fail("Expected exception"); } catch (SpannerException ex) { assertEquals(ErrorCode.DEADLINE_EXCEEDED, ex.getErrorCode()); @@ -339,14 +325,7 @@ public void readWriteTransactionUnavailable() { warmUpSessionPool(client); mockSpanner.addException(UNAVAILABLE); TransactionRunner runner = client.readWriteTransaction(); - long updateCount = - runner.run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) { - return transaction.executeUpdate(UPDATE_STATEMENT); - } - }); + long updateCount = runner.run(transaction -> transaction.executeUpdate(UPDATE_STATEMENT)); assertThat(updateCount, is(equalTo(UPDATE_COUNT))); } @@ -356,14 +335,11 @@ public void readWriteTransactionStatementAborted() { final AtomicInteger attempts = new AtomicInteger(); long updateCount = runner.run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) { - if (attempts.getAndIncrement() == 0) { - mockSpanner.abortNextStatement(); - } - return transaction.executeUpdate(UPDATE_STATEMENT); + transaction -> { + if (attempts.getAndIncrement() == 0) { + mockSpanner.abortNextStatement(); } + return transaction.executeUpdate(UPDATE_STATEMENT); }); assertThat(updateCount, is(equalTo(UPDATE_COUNT))); assertThat(attempts.get(), is(equalTo(2))); @@ -375,15 +351,12 @@ public void readWriteTransactionCommitAborted() { final AtomicInteger attempts = new AtomicInteger(); long updateCount = runner.run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) { - long res = transaction.executeUpdate(UPDATE_STATEMENT); - if (attempts.getAndIncrement() == 0) { - mockSpanner.abortTransaction(transaction); - } - return res; + transaction -> { + long res = transaction.executeUpdate(UPDATE_STATEMENT); + if (attempts.getAndIncrement() == 0) { + mockSpanner.abortTransaction(transaction); } + return res; }); assertThat(updateCount, is(equalTo(UPDATE_COUNT))); assertThat(attempts.get(), is(equalTo(2))); @@ -393,12 +366,9 @@ public Long run(TransactionContext transaction) { public void readWriteTransactionCheckedException() { TransactionRunner runner = client.readWriteTransaction(); runner.run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - transaction.executeUpdate(UPDATE_STATEMENT); - throw new Exception("test"); - } + transaction -> { + transaction.executeUpdate(UPDATE_STATEMENT); + throw new Exception("test"); }); } @@ -406,12 +376,9 @@ public Long run(TransactionContext transaction) throws Exception { public void readWriteTransactionUncheckedException() { TransactionRunner runner = client.readWriteTransaction(); runner.run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) { - transaction.executeUpdate(UPDATE_STATEMENT); - throw SpannerExceptionFactory.newSpannerException(ErrorCode.INVALID_ARGUMENT, "test"); - } + transaction -> { + transaction.executeUpdate(UPDATE_STATEMENT); + throw SpannerExceptionFactory.newSpannerException(ErrorCode.INVALID_ARGUMENT, "test"); }); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java index b65f70b384..6ca8f88ab9 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/TransactionRunnerImplTest.java @@ -30,7 +30,6 @@ import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.SessionClient.SessionId; -import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.base.Preconditions; @@ -191,15 +190,7 @@ public ApiFuture answer(InvocationOnMock invocation) DatabaseId db = DatabaseId.of("test", "test", "test"); try (SpannerImpl spanner = new SpannerImpl(rpc, options)) { DatabaseClient client = spanner.getDatabaseClient(db); - client - .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - return null; - } - }); + client.readWriteTransaction().run(transaction -> null); verify(rpc, times(1)) .beginTransactionAsync(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap()); } @@ -209,12 +200,9 @@ public Void run(TransactionContext transaction) { public void commitSucceeds() { final AtomicInteger numCalls = new AtomicInteger(0); transactionRunner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - numCalls.incrementAndGet(); - return null; - } + transaction -> { + numCalls.incrementAndGet(); + return null; }); assertThat(numCalls.get()).isEqualTo(1); verify(txn, never()).ensureTxn(); @@ -235,12 +223,9 @@ public void commitAbort() { doThrow(error).doNothing().when(txn).commit(); final AtomicInteger numCalls = new AtomicInteger(0); transactionRunner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - numCalls.incrementAndGet(); - return null; - } + transaction -> { + numCalls.incrementAndGet(); + return null; }); assertThat(numCalls.get()).isEqualTo(2); // ensureTxn() is only called during retry. @@ -256,12 +241,9 @@ public void commitFailsWithNonAbort() { final AtomicInteger numCalls = new AtomicInteger(0); try { transactionRunner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - numCalls.incrementAndGet(); - return null; - } + transaction -> { + numCalls.incrementAndGet(); + return null; }); fail("Expected exception"); } catch (SpannerException e) { @@ -327,12 +309,9 @@ public void prepareReadWriteTransaction() { runner.setSpan(mock(Span.class)); assertThat(usedInlinedBegin).isFalse(); runner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - transaction.executeUpdate(Statement.of("UPDATE FOO SET BAR=1 WHERE BAZ=2")); - return null; - } + transaction -> { + transaction.executeUpdate(Statement.of("UPDATE FOO SET BAR=1 WHERE BAZ=2")); + return null; }); verify(rpc, Mockito.never()) .beginTransaction(Mockito.any(BeginTransactionRequest.class), Mockito.anyMap()); @@ -388,12 +367,9 @@ private long[] batchDmlException(int status) { final AtomicInteger numCalls = new AtomicInteger(0); long updateCount[] = runner.run( - new TransactionCallable() { - @Override - public long[] run(TransactionContext transaction) { - numCalls.incrementAndGet(); - return transaction.batchUpdate(Arrays.asList(statement, statement)); - } + transaction1 -> { + numCalls.incrementAndGet(); + return transaction1.batchUpdate(Arrays.asList(statement, statement)); }); if (status == Code.ABORTED_VALUE) { // Assert that the method ran twice because the first response aborted. @@ -404,15 +380,12 @@ public long[] run(TransactionContext transaction) { private void runTransaction(final Exception exception) { transactionRunner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - if (firstRun) { - firstRun = false; - throw SpannerExceptionFactory.newSpannerException(exception); - } - return null; + transaction -> { + if (firstRun) { + firstRun = false; + throw SpannerExceptionFactory.newSpannerException(exception); } + return null; }); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBatchDmlTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBatchDmlTest.java index b2cf021445..dada7a0aff 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBatchDmlTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBatchDmlTest.java @@ -27,7 +27,6 @@ import com.google.cloud.spanner.SpannerBatchUpdateException; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Statement; -import com.google.cloud.spanner.TransactionContext; import com.google.cloud.spanner.TransactionRunner; import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; @@ -82,21 +81,18 @@ public void dropTable() throws Exception { @Test public void noStatementsInRequest() { final TransactionCallable callable = - new TransactionCallable() { - @Override - public long[] run(TransactionContext transaction) { - List stmts = new ArrayList<>(); - long[] rowCounts; - try { - rowCounts = transaction.batchUpdate(stmts); - Assert.fail("Expecting an exception."); - } catch (SpannerException e) { - assertThat(e instanceof SpannerBatchUpdateException).isFalse(); - assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); - rowCounts = new long[0]; - } - return rowCounts; + transaction -> { + List stmts = new ArrayList<>(); + long[] rowCounts; + try { + rowCounts = transaction.batchUpdate(stmts); + Assert.fail("Expecting an exception."); + } catch (SpannerException e) { + assertThat(e instanceof SpannerBatchUpdateException).isFalse(); + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + rowCounts = new long[0]; } + return rowCounts; }; TransactionRunner runner = client.readWriteTransaction(); long[] rowCounts = runner.run(callable); @@ -106,15 +102,12 @@ public long[] run(TransactionContext transaction) { @Test public void batchDml() { final TransactionCallable callable = - new TransactionCallable() { - @Override - public long[] run(TransactionContext transaction) { - List stmts = new ArrayList<>(); - stmts.add(Statement.of(INSERT_DML)); - stmts.add(Statement.of(UPDATE_DML)); - stmts.add(Statement.of(DELETE_DML)); - return transaction.batchUpdate(stmts); - } + transaction -> { + List stmts = new ArrayList<>(); + stmts.add(Statement.of(INSERT_DML)); + stmts.add(Statement.of(UPDATE_DML)); + stmts.add(Statement.of(DELETE_DML)); + return transaction.batchUpdate(stmts); }; TransactionRunner runner = client.readWriteTransaction(); long[] rowCounts = runner.run(callable); @@ -127,19 +120,16 @@ public long[] run(TransactionContext transaction) { @Test public void mixedBatchDmlAndDml() { final TransactionCallable callable = - new TransactionCallable() { - @Override - public long[] run(TransactionContext transaction) { - long rowCount = transaction.executeUpdate(Statement.of(INSERT_DML)); - List stmts = new ArrayList<>(); - stmts.add(Statement.of(UPDATE_DML)); - stmts.add(Statement.of(DELETE_DML)); - long[] batchRowCounts = transaction.batchUpdate(stmts); - long[] rowCounts = new long[batchRowCounts.length + 1]; - System.arraycopy(batchRowCounts, 0, rowCounts, 0, batchRowCounts.length); - rowCounts[batchRowCounts.length] = rowCount; - return rowCounts; - } + transaction -> { + long rowCount = transaction.executeUpdate(Statement.of(INSERT_DML)); + List stmts = new ArrayList<>(); + stmts.add(Statement.of(UPDATE_DML)); + stmts.add(Statement.of(DELETE_DML)); + long[] batchRowCounts = transaction.batchUpdate(stmts); + long[] rowCounts = new long[batchRowCounts.length + 1]; + System.arraycopy(batchRowCounts, 0, rowCounts, 0, batchRowCounts.length); + rowCounts[batchRowCounts.length] = rowCount; + return rowCounts; }; TransactionRunner runner = client.readWriteTransaction(); long[] rowCounts = runner.run(callable); @@ -152,15 +142,12 @@ public long[] run(TransactionContext transaction) { @Test public void errorBatchDmlIllegalStatement() { final TransactionCallable callable = - new TransactionCallable() { - @Override - public long[] run(TransactionContext transaction) { - List stmts = new ArrayList<>(); - stmts.add(Statement.of(INSERT_DML)); - stmts.add(Statement.of("some illegal statement")); - stmts.add(Statement.of(UPDATE_DML)); - return transaction.batchUpdate(stmts); - } + transaction -> { + List stmts = new ArrayList<>(); + stmts.add(Statement.of(INSERT_DML)); + stmts.add(Statement.of("some illegal statement")); + stmts.add(Statement.of(UPDATE_DML)); + return transaction.batchUpdate(stmts); }; TransactionRunner runner = client.readWriteTransaction(); try { @@ -180,15 +167,12 @@ public long[] run(TransactionContext transaction) { @Test public void errorBatchDmlAlreadyExist() { final TransactionCallable callable = - new TransactionCallable() { - @Override - public long[] run(TransactionContext transaction) { - List stmts = new ArrayList<>(); - stmts.add(Statement.of(INSERT_DML)); - stmts.add(Statement.of(INSERT_DML)); // should fail - stmts.add(Statement.of(UPDATE_DML)); - return transaction.batchUpdate(stmts); - } + transaction -> { + List stmts = new ArrayList<>(); + stmts.add(Statement.of(INSERT_DML)); + stmts.add(Statement.of(INSERT_DML)); // should fail + stmts.add(Statement.of(UPDATE_DML)); + return transaction.batchUpdate(stmts); }; TransactionRunner runner = client.readWriteTransaction(); try { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITClosedSessionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITClosedSessionTest.java index f846dd3deb..aeb0256285 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITClosedSessionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITClosedSessionTest.java @@ -33,7 +33,6 @@ import com.google.cloud.spanner.TransactionContext; import com.google.cloud.spanner.TransactionManager; import com.google.cloud.spanner.TransactionRunner; -import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.BeforeClass; @@ -200,18 +199,15 @@ public void testReadWriteTransaction() { for (int run = 0; run < RUNS_PER_TEST_CASE; run++) { TransactionRunner txn = client.readWriteTransaction(); txn.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - for (int i = 0; i < 2; i++) { - try (ResultSet rs = transaction.executeQuery(Statement.of("SELECT 1"))) { - assertThat(rs.next()).isTrue(); - assertThat(rs.getLong(0)).isEqualTo(1L); - assertThat(rs.next()).isFalse(); - } + transaction -> { + for (int i = 0; i < 2; i++) { + try (ResultSet rs = transaction.executeQuery(Statement.of("SELECT 1"))) { + assertThat(rs.next()).isTrue(); + assertThat(rs.getLong(0)).isEqualTo(1L); + assertThat(rs.next()).isFalse(); } - return null; } + return null; }); } } @@ -223,15 +219,12 @@ public void testReadWriteTransactionNoRecreation() { try { TransactionRunner txn = client.readWriteTransaction(); txn.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - try (ResultSet rs = transaction.executeQuery(Statement.of("SELECT 1"))) { - rs.next(); - fail("Expected exception"); - } - return null; + transaction -> { + try (ResultSet rs = transaction.executeQuery(Statement.of("SELECT 1"))) { + rs.next(); + fail("Expected exception"); } + return null; }); fail("Expected exception"); } catch (SessionNotFoundException ex) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDMLTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDMLTest.java index 915efa604e..81452e07d0 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDMLTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDMLTest.java @@ -34,7 +34,6 @@ import com.google.cloud.spanner.SpannerExceptionFactory; import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.TimestampBound; -import com.google.cloud.spanner.TransactionContext; import com.google.cloud.spanner.TransactionRunner; import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import java.util.Arrays; @@ -103,21 +102,17 @@ private String deleteDml() { private void executeUpdate(long expectedCount, final String... stmts) { final TransactionCallable callable = - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) { - long rowCount = 0; - for (String stmt : stmts) { - if (throwAbortOnce) { - throwAbortOnce = false; - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.ABORTED, "Abort in test"); - } - - rowCount += transaction.executeUpdate(Statement.of(stmt)); + transaction -> { + long rowCount = 0; + for (String stmt : stmts) { + if (throwAbortOnce) { + throwAbortOnce = false; + throw SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, "Abort in test"); } - return rowCount; + + rowCount += transaction.executeUpdate(Statement.of(stmt)); } + return rowCount; }; TransactionRunner runner = client.readWriteTransaction(); Long rowCount = runner.run(callable); @@ -227,20 +222,17 @@ public void standardDMLReadYourWrites() { executeUpdate(DML_COUNT, insertDml()); final TransactionCallable callable = - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - long rowCount = - transaction.executeUpdate( - Statement.of(String.format("UPDATE T SET v = v * 2 WHERE k = '%d-boo2';", id))); - assertThat(rowCount).isEqualTo(1); - assertThat( - transaction - .readRow("T", Key.of(String.format("%d-boo2", id)), Arrays.asList("v")) - .getLong(0)) - .isEqualTo(2 * 2); - return null; - } + transaction -> { + long rowCount = + transaction.executeUpdate( + Statement.of(String.format("UPDATE T SET v = v * 2 WHERE k = '%d-boo2';", id))); + assertThat(rowCount).isEqualTo(1); + assertThat( + transaction + .readRow("T", Key.of(String.format("%d-boo2", id)), Arrays.asList("v")) + .getLong(0)) + .isEqualTo(2 * 2); + return null; }; TransactionRunner runner = client.readWriteTransaction(); runner.run(callable); @@ -256,13 +248,10 @@ class UserException extends Exception { } } final TransactionCallable callable = - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws UserException { - long rowCount = transaction.executeUpdate(Statement.of(insertDml())); - assertThat(rowCount).isEqualTo(DML_COUNT); - throw new UserException("failing to commit"); - } + transaction -> { + long rowCount = transaction.executeUpdate(Statement.of(insertDml())); + assertThat(rowCount).isEqualTo(DML_COUNT); + throw new UserException("failing to commit"); }; try { @@ -290,20 +279,17 @@ public void standardDMLAndMutations() { final String key1 = uniqueKey(); final String key2 = uniqueKey(); final TransactionCallable callable = - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - // DML - long rowCount = - transaction.executeUpdate( - Statement.of("INSERT INTO T (k, v) VALUES ('" + key1 + "', 1)")); - assertThat(rowCount).isEqualTo(1); - - // Mutations - transaction.buffer( - Mutation.newInsertOrUpdateBuilder("T").set("K").to(key2).set("V").to(2).build()); - return null; - } + transaction -> { + // DML + long rowCount = + transaction.executeUpdate( + Statement.of("INSERT INTO T (k, v) VALUES ('" + key1 + "', 1)")); + assertThat(rowCount).isEqualTo(1); + + // Mutations + transaction.buffer( + Mutation.newInsertOrUpdateBuilder("T").set("K").to(key2).set("V").to(2).build()); + return null; }; TransactionRunner runner = client.readWriteTransaction(); runner.run(callable); @@ -321,18 +307,15 @@ public Void run(TransactionContext transaction) { private void executeQuery(long expectedCount, final String... stmts) { final TransactionCallable callable = - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) { - long rowCount = 0; - for (final String stmt : stmts) { - ResultSet resultSet = transaction.executeQuery(Statement.of(stmt)); - assertThat(resultSet.next()).isFalse(); - assertThat(resultSet.getStats()).isNotNull(); - rowCount += resultSet.getStats().getRowCountExact(); - } - return rowCount; + transaction -> { + long rowCount = 0; + for (final String stmt : stmts) { + ResultSet resultSet = transaction.executeQuery(Statement.of(stmt)); + assertThat(resultSet.next()).isFalse(); + assertThat(resultSet.getStats()).isNotNull(); + rowCount += resultSet.getStats().getRowCountExact(); } + return rowCount; }; TransactionRunner runner = client.readWriteTransaction(); Long rowCount = runner.run(callable); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITQueryOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITQueryOptionsTest.java index 910effb0cd..1ff035fca8 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITQueryOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITQueryOptionsTest.java @@ -30,8 +30,6 @@ import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Statement; -import com.google.cloud.spanner.TransactionContext; -import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -109,11 +107,9 @@ public void executeUpdate() { assertThat( client .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) { - return transaction.executeUpdate( + .run( + transaction -> + transaction.executeUpdate( Statement.newBuilder("INSERT INTO TEST (ID, NAME) VALUES (@id, @name)") .bind("id") .to(1L) @@ -121,20 +117,16 @@ public Long run(TransactionContext transaction) { .to("One") .withQueryOptions( QueryOptions.newBuilder().setOptimizerVersion("1").build()) - .build()); - } - })) + .build()))) .isEqualTo(1L); // Version 'latest' should also work. assertThat( client .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) { - return transaction.executeUpdate( + .run( + transaction -> + transaction.executeUpdate( Statement.newBuilder("INSERT INTO TEST (ID, NAME) VALUES (@id, @name)") .bind("id") .to(2L) @@ -142,9 +134,7 @@ public Long run(TransactionContext transaction) { .to("Two") .withQueryOptions( QueryOptions.newBuilder().setOptimizerVersion("latest").build()) - .build()); - } - })) + .build()))) .isEqualTo(1L); // Version '100000' is an invalid value and should cause an error. @@ -152,10 +142,8 @@ public Long run(TransactionContext transaction) { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) { - return transaction.executeUpdate( + transaction -> + transaction.executeUpdate( Statement.newBuilder("INSERT INTO TEST (ID, NAME) VALUES (@id, @name)") .bind("id") .to(3L) @@ -163,9 +151,7 @@ public Long run(TransactionContext transaction) { .to("Three") .withQueryOptions( QueryOptions.newBuilder().setOptimizerVersion("100000").build()) - .build()); - } - }); + .build())); fail("missing expected exception"); } catch (SpannerException e) { assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java index ed2bc564b0..dd5746fd42 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java @@ -45,7 +45,6 @@ import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.TimestampBound; -import com.google.cloud.spanner.TransactionContext; import com.google.cloud.spanner.TransactionRunner; import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.cloud.spanner.testing.EmulatorSpannerHelper; @@ -115,18 +114,15 @@ private void doBasicsTest(final ReadStrategy strategy) throws InterruptedExcepti final CountDownLatch complete = new CountDownLatch(numThreads); final TransactionCallable callable = - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws SpannerException { - Struct row = strategy.read(transaction, key); - long newValue = row.getLong(0) + 1; - transaction.buffer( - Mutation.newUpdateBuilder("T").set("K").to(key).set("V").to(newValue).build()); - commitBarrier.countDown(); - // Synchronize so that all threads attempt to commit at the same time. - Uninterruptibles.awaitUninterruptibly(commitBarrier); - return newValue; - } + transaction -> { + Struct row = strategy.read(transaction, key); + long newValue = row.getLong(0) + 1; + transaction.buffer( + Mutation.newUpdateBuilder("T").set("K").to(key).set("V").to(newValue).build()); + commitBarrier.countDown(); + // Synchronize so that all threads attempt to commit at the same time. + Uninterruptibles.awaitUninterruptibly(commitBarrier); + return newValue; }; // We start multiple threads all attempting to update the same value concurrently. We expect @@ -210,12 +206,9 @@ class UserException extends Exception { final String key = uniqueKey(); TransactionCallable callable = - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws UserException { - transaction.buffer(Mutation.newInsertOrUpdateBuilder("T").set("K").to(key).build()); - throw new UserException("User failure"); - } + transaction -> { + transaction.buffer(Mutation.newInsertOrUpdateBuilder("T").set("K").to(key).build()); + throw new UserException("User failure"); }; try { @@ -237,12 +230,9 @@ public void userExceptionIsSpannerException() { final String key = uniqueKey(); TransactionCallable callable = - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - transaction.buffer(Mutation.newInsertOrUpdateBuilder("T").set("K").to(key).build()); - throw newSpannerException(ErrorCode.OUT_OF_RANGE, "User failure"); - } + transaction -> { + transaction.buffer(Mutation.newInsertOrUpdateBuilder("T").set("K").to(key).build()); + throw newSpannerException(ErrorCode.OUT_OF_RANGE, "User failure"); }; try { @@ -293,29 +283,26 @@ public void run() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws SpannerException { - try { - Struct row = transaction.readRow("T", Key.of(key1), Arrays.asList("V")); - t1Started.countDown(); - Uninterruptibles.awaitUninterruptibly(t2Running); - transaction.buffer( - Mutation.newUpdateBuilder("T") - .set("K") - .to(key1) - .set("V") - .to(row.getLong(0) + 1) - .build()); - return null; - } catch (SpannerException e) { - if (e.getErrorCode() == ErrorCode.ABORTED) { - assertThat(e).isInstanceOf(AbortedException.class); - assertThat(((AbortedException) e).getRetryDelayInMillis()) - .isNotEqualTo(-1L); - } - throw new RuntimeException("Swallowed exception: " + e.getMessage()); + transaction -> { + try { + Struct row = transaction.readRow("T", Key.of(key1), Arrays.asList("V")); + t1Started.countDown(); + Uninterruptibles.awaitUninterruptibly(t2Running); + transaction.buffer( + Mutation.newUpdateBuilder("T") + .set("K") + .to(key1) + .set("V") + .to(row.getLong(0) + 1) + .build()); + return null; + } catch (SpannerException e) { + if (e.getErrorCode() == ErrorCode.ABORTED) { + assertThat(e).isInstanceOf(AbortedException.class); + assertThat(((AbortedException) e).getRetryDelayInMillis()) + .isNotEqualTo(-1L); } + throw new RuntimeException("Swallowed exception: " + e.getMessage()); } }); t1Result.set(null); @@ -334,30 +321,27 @@ public void run() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws SpannerException { - try { - Struct r1 = transaction.readRow("T", Key.of(key1), Arrays.asList("V")); - t2Running.countDown(); - Uninterruptibles.awaitUninterruptibly(t1Done); - Struct r2 = transaction.readRow("T", Key.of(key2), Arrays.asList("V")); - transaction.buffer( - Mutation.newUpdateBuilder("T") - .set("K") - .to(key2) - .set("V") - .to(r1.getLong(0) + r2.getLong(0)) - .build()); - return null; - } catch (SpannerException e) { - if (e.getErrorCode() == ErrorCode.ABORTED) { - assertThat(e).isInstanceOf(AbortedException.class); - assertThat(((AbortedException) e).getRetryDelayInMillis()) - .isNotEqualTo(-1L); - } - throw new RuntimeException("Swallowed exception: " + e.getMessage()); + transaction -> { + try { + Struct r1 = transaction.readRow("T", Key.of(key1), Arrays.asList("V")); + t2Running.countDown(); + Uninterruptibles.awaitUninterruptibly(t1Done); + Struct r2 = transaction.readRow("T", Key.of(key2), Arrays.asList("V")); + transaction.buffer( + Mutation.newUpdateBuilder("T") + .set("K") + .to(key2) + .set("V") + .to(r1.getLong(0) + r2.getLong(0)) + .build()); + return null; + } catch (SpannerException e) { + if (e.getErrorCode() == ErrorCode.ABORTED) { + assertThat(e).isInstanceOf(AbortedException.class); + assertThat(((AbortedException) e).getRetryDelayInMillis()) + .isNotEqualTo(-1L); } + throw new RuntimeException("Swallowed exception: " + e.getMessage()); } }); t2Result.set(null); @@ -396,21 +380,10 @@ private void doNestedRwTransaction() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws SpannerException { - client - .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) { - return null; - } - }); + transaction -> { + client.readWriteTransaction().run(transaction1 -> null); - return null; - } + return null; }); } @@ -431,15 +404,12 @@ public void nestedReadOnlyTxnThrows() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws SpannerException { - try (ReadOnlyTransaction tx = client.readOnlyTransaction()) { - tx.getReadTimestamp(); - } - - return null; + transaction -> { + try (ReadOnlyTransaction tx = client.readOnlyTransaction()) { + tx.getReadTimestamp(); } + + return null; }); fail("Expected exception"); } catch (SpannerException e) { @@ -454,21 +424,18 @@ public void nestedBatchTxnThrows() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws SpannerException { - BatchClient batchClient = env.getTestHelper().getBatchClient(db); - BatchReadOnlyTransaction batchTxn = - batchClient.batchReadOnlyTransaction(TimestampBound.strong()); - batchTxn.partitionReadUsingIndex( - PartitionOptions.getDefaultInstance(), - "Test", - "Index", - KeySet.all(), - Arrays.asList("Fingerprint")); - - return null; - } + transaction -> { + BatchClient batchClient = env.getTestHelper().getBatchClient(db); + BatchReadOnlyTransaction batchTxn = + batchClient.batchReadOnlyTransaction(TimestampBound.strong()); + batchTxn.partitionReadUsingIndex( + PartitionOptions.getDefaultInstance(), + "Test", + "Index", + KeySet.all(), + Arrays.asList("Fingerprint")); + + return null; }); fail("Expected exception"); } catch (SpannerException e) { @@ -483,18 +450,13 @@ public void nestedSingleUseReadTxnThrows() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws SpannerException { - try (ResultSet rs = - client - .singleUseReadOnlyTransaction() - .executeQuery(Statement.of("SELECT 1"))) { - rs.next(); - } - - return null; + transaction -> { + try (ResultSet rs = + client.singleUseReadOnlyTransaction().executeQuery(Statement.of("SELECT 1"))) { + rs.next(); } + + return null; }); fail("Expected exception"); } catch (SpannerException e) { @@ -511,13 +473,10 @@ public void nestedTxnSucceedsWhenAllowed() { .readWriteTransaction() .allowNestedTransaction() .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws SpannerException { - client.singleUseReadOnlyTransaction(); + transaction -> { + client.singleUseReadOnlyTransaction(); - return null; - } + return null; }); } @@ -531,22 +490,19 @@ public void testTxWithCaughtError() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - try { - transaction.executeUpdate(Statement.of("UPDATE T SET V=2 WHERE")); - fail("missing expected exception"); - } catch (SpannerException e) { - if (e.getErrorCode() == ErrorCode.ABORTED) { - // Aborted -> Let the transaction be retried - throw e; - } - assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + transaction -> { + try { + transaction.executeUpdate(Statement.of("UPDATE T SET V=2 WHERE")); + fail("missing expected exception"); + } catch (SpannerException e) { + if (e.getErrorCode() == ErrorCode.ABORTED) { + // Aborted -> Let the transaction be retried + throw e; } - return transaction.executeUpdate( - Statement.of("INSERT INTO T (K, V) VALUES ('One', 1)")); + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); } + return transaction.executeUpdate( + Statement.of("INSERT INTO T (K, V) VALUES ('One', 1)")); }); assertThat(updateCount).isEqualTo(1L); } @@ -566,30 +522,26 @@ public void testTxWithConstraintError() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - try { - // Try to insert a duplicate row. This statement will fail. When the statement - // is executed against an already existing transaction (i.e. - // inlineBegin=false), the entire transaction will remain invalid and cannot - // be committed. When it is executed as the first statement of a transaction - // that also tries to start a transaction, then no transaction will be started - // and the next statement will start the transaction. This will cause the - // transaction to succeed. - transaction.executeUpdate( - Statement.of("INSERT INTO T (K, V) VALUES ('One', 1)")); - fail("missing expected exception"); - } catch (SpannerException e) { - if (e.getErrorCode() == ErrorCode.ABORTED) { - // Aborted -> Let the transaction be retried - throw e; - } - assertThat(e.getErrorCode()).isEqualTo(ErrorCode.ALREADY_EXISTS); + transaction -> { + try { + // Try to insert a duplicate row. This statement will fail. When the statement + // is executed against an already existing transaction (i.e. + // inlineBegin=false), the entire transaction will remain invalid and cannot + // be committed. When it is executed as the first statement of a transaction + // that also tries to start a transaction, then no transaction will be started + // and the next statement will start the transaction. This will cause the + // transaction to succeed. + transaction.executeUpdate(Statement.of("INSERT INTO T (K, V) VALUES ('One', 1)")); + fail("missing expected exception"); + } catch (SpannerException e) { + if (e.getErrorCode() == ErrorCode.ABORTED) { + // Aborted -> Let the transaction be retried + throw e; } - return transaction.executeUpdate( - Statement.of("INSERT INTO T (K, V) VALUES ('Two', 2)")); + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.ALREADY_EXISTS); } + return transaction.executeUpdate( + Statement.of("INSERT INTO T (K, V) VALUES ('Two', 2)")); }); fail("missing expected ALREADY_EXISTS error"); } catch (SpannerException e) { @@ -602,13 +554,7 @@ public void testTxWithUncaughtError() { try { client .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate(Statement.of("UPDATE T SET V=2 WHERE")); - } - }); + .run(transaction -> transaction.executeUpdate(Statement.of("UPDATE T SET V=2 WHERE"))); fail("missing expected exception"); } catch (SpannerException e) { assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); @@ -621,12 +567,9 @@ public void testTxWithUncaughtErrorAfterSuccessfulBegin() { client .readWriteTransaction() .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - transaction.executeUpdate(Statement.of("INSERT INTO T (K, V) VALUES ('One', 1)")); - return transaction.executeUpdate(Statement.of("UPDATE T SET V=2 WHERE")); - } + transaction -> { + transaction.executeUpdate(Statement.of("INSERT INTO T (K, V) VALUES ('One', 1)")); + return transaction.executeUpdate(Statement.of("UPDATE T SET V=2 WHERE")); }); fail("missing expected exception"); } catch (SpannerException e) { @@ -640,13 +583,10 @@ public void testTransactionRunnerReturnsCommitStats() { final String key = uniqueKey(); TransactionRunner runner = client.readWriteTransaction(Options.commitStats()); runner.run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - transaction.buffer( - Mutation.newInsertBuilder("T").set("K").to(key).set("V").to(0).build()); - return null; - } + transaction -> { + transaction.buffer( + Mutation.newInsertBuilder("T").set("K").to(key).set("V").to(0).build()); + return null; }); assertNotNull(runner.getCommitResponse().getCommitStats()); // MutationCount = 2 (2 columns). diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java index 6aa754d5f1..d5d4bf6d18 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java @@ -45,8 +45,6 @@ import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator; import com.google.cloud.spanner.SpannerOptions.CallCredentialsProvider; import com.google.cloud.spanner.Statement; -import com.google.cloud.spanner.TransactionContext; -import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.cloud.spanner.admin.database.v1.MockDatabaseAdminImpl; import com.google.cloud.spanner.admin.instance.v1.MockInstanceAdminImpl; import com.google.cloud.spanner.spi.v1.GapicSpannerRpc.AdminRequestsLimitExceededRetryAlgorithm; @@ -437,13 +435,7 @@ public void run() { timeoutHolder.timeout = Duration.ofNanos(1L); client .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate(UPDATE_FOO_STATEMENT); - } - }); + .run(transaction -> transaction.executeUpdate(UPDATE_FOO_STATEMENT)); fail("missing expected timeout exception"); } catch (SpannerException e) { assertThat(e.getErrorCode()).isEqualTo(ErrorCode.DEADLINE_EXCEEDED); @@ -454,13 +446,7 @@ public Long run(TransactionContext transaction) throws Exception { Long updateCount = client .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Long run(TransactionContext transaction) throws Exception { - return transaction.executeUpdate(UPDATE_FOO_STATEMENT); - } - }); + .run(transaction -> transaction.executeUpdate(UPDATE_FOO_STATEMENT)); assertThat(updateCount).isEqualTo(1L); } }); diff --git a/samples/snippets/src/main/java/com/example/spanner/CustomTimeoutAndRetrySettingsExample.java b/samples/snippets/src/main/java/com/example/spanner/CustomTimeoutAndRetrySettingsExample.java index dac0da5306..ad3fb8067c 100644 --- a/samples/snippets/src/main/java/com/example/spanner/CustomTimeoutAndRetrySettingsExample.java +++ b/samples/snippets/src/main/java/com/example/spanner/CustomTimeoutAndRetrySettingsExample.java @@ -17,6 +17,7 @@ package com.example.spanner; //[START spanner_set_custom_timeout_and_retry] + import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.spanner.DatabaseClient; @@ -24,8 +25,6 @@ import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.Statement; -import com.google.cloud.spanner.TransactionContext; -import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import org.threeten.bp.Duration; class CustomTimeoutAndRetrySettingsExample { @@ -70,18 +69,14 @@ static void executeSqlWithCustomTimeoutAndRetrySettings( spanner.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId)); client .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - String sql = - "INSERT Singers (SingerId, FirstName, LastName)\n" - + "VALUES (20, 'George', 'Washington')"; - long rowCount = transaction.executeUpdate(Statement.of(sql)); - System.out.printf("%d record inserted.%n", rowCount); - return null; - } - }); + .run(transaction -> { + String sql = + "INSERT Singers (SingerId, FirstName, LastName)\n" + + "VALUES (20, 'George', 'Washington')"; + long rowCount = transaction.executeUpdate(Statement.of(sql)); + System.out.printf("%d record inserted.%n", rowCount); + return null; + }); } } } diff --git a/samples/snippets/src/main/java/com/example/spanner/SpannerSample.java b/samples/snippets/src/main/java/com/example/spanner/SpannerSample.java index f7ac1113c3..0d19078102 100644 --- a/samples/snippets/src/main/java/com/example/spanner/SpannerSample.java +++ b/samples/snippets/src/main/java/com/example/spanner/SpannerSample.java @@ -16,7 +16,6 @@ package com.example.spanner; -import static com.google.cloud.spanner.TransactionRunner.TransactionCallable; import static com.google.cloud.spanner.Type.StructField; import com.google.api.gax.longrunning.OperationFuture; @@ -52,7 +51,6 @@ import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.TimestampBound; -import com.google.cloud.spanner.TransactionContext; import com.google.cloud.spanner.Type; import com.google.cloud.spanner.Value; import com.google.common.io.BaseEncoding; @@ -490,48 +488,44 @@ static void update(DatabaseClient dbClient) { static void writeWithTransaction(DatabaseClient dbClient) { dbClient .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - // Transfer marketing budget from one album to another. We do it in a transaction to - // ensure that the transfer is atomic. - Struct row = - transaction.readRow("Albums", Key.of(2, 2), Arrays.asList("MarketingBudget")); - long album2Budget = row.getLong(0); - // Transaction will only be committed if this condition still holds at the time of - // commit. Otherwise it will be aborted and the callable will be rerun by the - // client library. - long transfer = 200000; - if (album2Budget >= transfer) { - long album1Budget = - transaction - .readRow("Albums", Key.of(1, 1), Arrays.asList("MarketingBudget")) - .getLong(0); - album1Budget += transfer; - album2Budget -= transfer; - transaction.buffer( - Mutation.newUpdateBuilder("Albums") - .set("SingerId") - .to(1) - .set("AlbumId") - .to(1) - .set("MarketingBudget") - .to(album1Budget) - .build()); - transaction.buffer( - Mutation.newUpdateBuilder("Albums") - .set("SingerId") - .to(2) - .set("AlbumId") - .to(2) - .set("MarketingBudget") - .to(album2Budget) - .build()); - } - return null; - } - }); + .run(transaction -> { + // Transfer marketing budget from one album to another. We do it in a transaction to + // ensure that the transfer is atomic. + Struct row = + transaction.readRow("Albums", Key.of(2, 2), Arrays.asList("MarketingBudget")); + long album2Budget = row.getLong(0); + // Transaction will only be committed if this condition still holds at the time of + // commit. Otherwise it will be aborted and the callable will be rerun by the + // client library. + long transfer = 200000; + if (album2Budget >= transfer) { + long album1Budget = + transaction + .readRow("Albums", Key.of(1, 1), Arrays.asList("MarketingBudget")) + .getLong(0); + album1Budget += transfer; + album2Budget -= transfer; + transaction.buffer( + Mutation.newUpdateBuilder("Albums") + .set("SingerId") + .to(1) + .set("AlbumId") + .to(1) + .set("MarketingBudget") + .to(album1Budget) + .build()); + transaction.buffer( + Mutation.newUpdateBuilder("Albums") + .set("SingerId") + .to(2) + .set("AlbumId") + .to(2) + .set("MarketingBudget") + .to(album2Budget) + .build()); + } + return null; + }); } // [END spanner_read_write_transaction] @@ -1004,18 +998,14 @@ static void queryNestedStructField(DatabaseClient dbClient) { static void insertUsingDml(DatabaseClient dbClient) { dbClient .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - String sql = - "INSERT INTO Singers (SingerId, FirstName, LastName) " - + " VALUES (10, 'Virginia', 'Watson')"; - long rowCount = transaction.executeUpdate(Statement.of(sql)); - System.out.printf("%d record inserted.\n", rowCount); - return null; - } - }); + .run(transaction -> { + String sql = + "INSERT INTO Singers (SingerId, FirstName, LastName) " + + " VALUES (10, 'Virginia', 'Watson')"; + long rowCount = transaction.executeUpdate(Statement.of(sql)); + System.out.printf("%d record inserted.\n", rowCount); + return null; + }); } // [END spanner_dml_standard_insert] @@ -1023,19 +1013,15 @@ public Void run(TransactionContext transaction) throws Exception { static void updateUsingDml(DatabaseClient dbClient) { dbClient .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - String sql = - "UPDATE Albums " - + "SET MarketingBudget = MarketingBudget * 2 " - + "WHERE SingerId = 1 and AlbumId = 1"; - long rowCount = transaction.executeUpdate(Statement.of(sql)); - System.out.printf("%d record updated.\n", rowCount); - return null; - } - }); + .run(transaction -> { + String sql = + "UPDATE Albums " + + "SET MarketingBudget = MarketingBudget * 2 " + + "WHERE SingerId = 1 and AlbumId = 1"; + long rowCount = transaction.executeUpdate(Statement.of(sql)); + System.out.printf("%d record updated.\n", rowCount); + return null; + }); } // [END spanner_dml_standard_update] @@ -1043,16 +1029,12 @@ public Void run(TransactionContext transaction) throws Exception { static void deleteUsingDml(DatabaseClient dbClient) { dbClient .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - String sql = "DELETE FROM Singers WHERE FirstName = 'Alice'"; - long rowCount = transaction.executeUpdate(Statement.of(sql)); - System.out.printf("%d record deleted.\n", rowCount); - return null; - } - }); + .run(transaction -> { + String sql = "DELETE FROM Singers WHERE FirstName = 'Alice'"; + long rowCount = transaction.executeUpdate(Statement.of(sql)); + System.out.printf("%d record deleted.\n", rowCount); + return null; + }); } // [END spanner_dml_standard_delete] @@ -1060,18 +1042,14 @@ public Void run(TransactionContext transaction) throws Exception { static void updateUsingDmlWithTimestamp(DatabaseClient dbClient) { dbClient .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - String sql = - "UPDATE Albums " - + "SET LastUpdateTime = PENDING_COMMIT_TIMESTAMP() WHERE SingerId = 1"; - long rowCount = transaction.executeUpdate(Statement.of(sql)); - System.out.printf("%d records updated.\n", rowCount); - return null; - } - }); + .run(transaction -> { + String sql = + "UPDATE Albums " + + "SET LastUpdateTime = PENDING_COMMIT_TIMESTAMP() WHERE SingerId = 1"; + long rowCount = transaction.executeUpdate(Statement.of(sql)); + System.out.printf("%d records updated.\n", rowCount); + return null; + }); } // [END spanner_dml_standard_update_with_timestamp] @@ -1079,30 +1057,26 @@ public Void run(TransactionContext transaction) throws Exception { static void writeAndReadUsingDml(DatabaseClient dbClient) { dbClient .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - // Insert record. - String sql = - "INSERT INTO Singers (SingerId, FirstName, LastName) " - + " VALUES (11, 'Timothy', 'Campbell')"; - long rowCount = transaction.executeUpdate(Statement.of(sql)); - System.out.printf("%d record inserted.\n", rowCount); - // Read newly inserted record. - sql = "SELECT FirstName, LastName FROM Singers WHERE SingerId = 11"; - // We use a try-with-resource block to automatically release resources held by - // ResultSet. - try (ResultSet resultSet = transaction.executeQuery(Statement.of(sql))) { - while (resultSet.next()) { - System.out.printf( - "%s %s\n", - resultSet.getString("FirstName"), resultSet.getString("LastName")); - } - } - return null; - } - }); + .run(transaction -> { + // Insert record. + String sql = + "INSERT INTO Singers (SingerId, FirstName, LastName) " + + " VALUES (11, 'Timothy', 'Campbell')"; + long rowCount = transaction.executeUpdate(Statement.of(sql)); + System.out.printf("%d record inserted.\n", rowCount); + // Read newly inserted record. + sql = "SELECT FirstName, LastName FROM Singers WHERE SingerId = 11"; + // We use a try-with-resource block to automatically release resources held by + // ResultSet. + try (ResultSet resultSet = transaction.executeQuery(Statement.of(sql))) { + while (resultSet.next()) { + System.out.printf( + "%s %s\n", + resultSet.getString("FirstName"), resultSet.getString("LastName")); + } + } + return null; + }); } // [END spanner_dml_write_then_read] @@ -1120,15 +1094,11 @@ static void updateUsingDmlWithStruct(DatabaseClient dbClient) { .build(); dbClient .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - long rowCount = transaction.executeUpdate(s); - System.out.printf("%d record updated.\n", rowCount); - return null; - } - }); + .run(transaction -> { + long rowCount = transaction.executeUpdate(s); + System.out.printf("%d record updated.\n", rowCount); + return null; + }); } // [END spanner_dml_structs] @@ -1137,21 +1107,17 @@ static void writeUsingDml(DatabaseClient dbClient) { // Insert 4 singer records dbClient .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - String sql = - "INSERT INTO Singers (SingerId, FirstName, LastName) VALUES " - + "(12, 'Melissa', 'Garcia'), " - + "(13, 'Russell', 'Morales'), " - + "(14, 'Jacqueline', 'Long'), " - + "(15, 'Dylan', 'Shaw')"; - long rowCount = transaction.executeUpdate(Statement.of(sql)); - System.out.printf("%d records inserted.\n", rowCount); - return null; - } - }); + .run(transaction -> { + String sql = + "INSERT INTO Singers (SingerId, FirstName, LastName) VALUES " + + "(12, 'Melissa', 'Garcia'), " + + "(13, 'Russell', 'Morales'), " + + "(14, 'Jacqueline', 'Long'), " + + "(15, 'Dylan', 'Shaw')"; + long rowCount = transaction.executeUpdate(Statement.of(sql)); + System.out.printf("%d records inserted.\n", rowCount); + return null; + }); } // [END spanner_dml_getting_started_insert] @@ -1181,55 +1147,51 @@ static void queryWithParameter(DatabaseClient dbClient) { static void writeWithTransactionUsingDml(DatabaseClient dbClient) { dbClient .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - // Transfer marketing budget from one album to another. We do it in a transaction to - // ensure that the transfer is atomic. - String sql1 = - "SELECT MarketingBudget from Albums WHERE SingerId = 2 and AlbumId = 2"; - ResultSet resultSet = transaction.executeQuery(Statement.of(sql1)); - long album2Budget = 0; - while (resultSet.next()) { - album2Budget = resultSet.getLong("MarketingBudget"); - } - // Transaction will only be committed if this condition still holds at the time of - // commit. Otherwise it will be aborted and the callable will be rerun by the - // client library. - long transfer = 200000; - if (album2Budget >= transfer) { - String sql2 = - "SELECT MarketingBudget from Albums WHERE SingerId = 1 and AlbumId = 1"; - ResultSet resultSet2 = transaction.executeQuery(Statement.of(sql2)); - long album1Budget = 0; - while (resultSet2.next()) { - album1Budget = resultSet2.getLong("MarketingBudget"); - } - album1Budget += transfer; - album2Budget -= transfer; - Statement updateStatement = - Statement.newBuilder( - "UPDATE Albums " - + "SET MarketingBudget = @AlbumBudget " - + "WHERE SingerId = 1 and AlbumId = 1") - .bind("AlbumBudget") - .to(album1Budget) - .build(); - transaction.executeUpdate(updateStatement); - Statement updateStatement2 = - Statement.newBuilder( - "UPDATE Albums " - + "SET MarketingBudget = @AlbumBudget " - + "WHERE SingerId = 2 and AlbumId = 2") - .bind("AlbumBudget") - .to(album2Budget) - .build(); - transaction.executeUpdate(updateStatement2); - } - return null; - } - }); + .run(transaction -> { + // Transfer marketing budget from one album to another. We do it in a transaction to + // ensure that the transfer is atomic. + String sql1 = + "SELECT MarketingBudget from Albums WHERE SingerId = 2 and AlbumId = 2"; + ResultSet resultSet = transaction.executeQuery(Statement.of(sql1)); + long album2Budget = 0; + while (resultSet.next()) { + album2Budget = resultSet.getLong("MarketingBudget"); + } + // Transaction will only be committed if this condition still holds at the time of + // commit. Otherwise it will be aborted and the callable will be rerun by the + // client library. + long transfer = 200000; + if (album2Budget >= transfer) { + String sql2 = + "SELECT MarketingBudget from Albums WHERE SingerId = 1 and AlbumId = 1"; + ResultSet resultSet2 = transaction.executeQuery(Statement.of(sql2)); + long album1Budget = 0; + while (resultSet2.next()) { + album1Budget = resultSet2.getLong("MarketingBudget"); + } + album1Budget += transfer; + album2Budget -= transfer; + Statement updateStatement = + Statement.newBuilder( + "UPDATE Albums " + + "SET MarketingBudget = @AlbumBudget " + + "WHERE SingerId = 1 and AlbumId = 1") + .bind("AlbumBudget") + .to(album1Budget) + .build(); + transaction.executeUpdate(updateStatement); + Statement updateStatement2 = + Statement.newBuilder( + "UPDATE Albums " + + "SET MarketingBudget = @AlbumBudget " + + "WHERE SingerId = 2 and AlbumId = 2") + .bind("AlbumBudget") + .to(album2Budget) + .build(); + transaction.executeUpdate(updateStatement2); + } + return null; + }); } // [END spanner_dml_getting_started_update] @@ -1253,33 +1215,29 @@ static void deleteUsingPartitionedDml(DatabaseClient dbClient) { static void updateUsingBatchDml(DatabaseClient dbClient) { dbClient .readWriteTransaction() - .run( - new TransactionCallable() { - @Override - public Void run(TransactionContext transaction) throws Exception { - List stmts = new ArrayList(); - String sql = - "INSERT INTO Albums " - + "(SingerId, AlbumId, AlbumTitle, MarketingBudget) " - + "VALUES (1, 3, 'Test Album Title', 10000) "; - stmts.add(Statement.of(sql)); - sql = - "UPDATE Albums " - + "SET MarketingBudget = MarketingBudget * 2 " - + "WHERE SingerId = 1 and AlbumId = 3"; - stmts.add(Statement.of(sql)); - long[] rowCounts; - try { - rowCounts = transaction.batchUpdate(stmts); - } catch (SpannerBatchUpdateException e) { - rowCounts = e.getUpdateCounts(); - } - for (int i = 0; i < rowCounts.length; i++) { - System.out.printf("%d record updated by stmt %d.\n", rowCounts[i], i); - } - return null; - } - }); + .run(transaction -> { + List stmts = new ArrayList(); + String sql = + "INSERT INTO Albums " + + "(SingerId, AlbumId, AlbumTitle, MarketingBudget) " + + "VALUES (1, 3, 'Test Album Title', 10000) "; + stmts.add(Statement.of(sql)); + sql = + "UPDATE Albums " + + "SET MarketingBudget = MarketingBudget * 2 " + + "WHERE SingerId = 1 and AlbumId = 3"; + stmts.add(Statement.of(sql)); + long[] rowCounts; + try { + rowCounts = transaction.batchUpdate(stmts); + } catch (SpannerBatchUpdateException e) { + rowCounts = e.getUpdateCounts(); + } + for (int i = 0; i < rowCounts.length; i++) { + System.out.printf("%d record updated by stmt %d.\n", rowCounts[i], i); + } + return null; + }); } // [END spanner_dml_batch_update] diff --git a/samples/snippets/src/main/java/com/example/spanner/StatementTimeoutExample.java b/samples/snippets/src/main/java/com/example/spanner/StatementTimeoutExample.java index 9c3a8e5021..c62d5adb23 100644 --- a/samples/snippets/src/main/java/com/example/spanner/StatementTimeoutExample.java +++ b/samples/snippets/src/main/java/com/example/spanner/StatementTimeoutExample.java @@ -24,8 +24,6 @@ import com.google.cloud.spanner.SpannerOptions; import com.google.cloud.spanner.SpannerOptions.CallContextConfigurator; import com.google.cloud.spanner.Statement; -import com.google.cloud.spanner.TransactionContext; -import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.spanner.v1.SpannerGrpc; import io.grpc.CallOptions; import io.grpc.Context; @@ -67,14 +65,12 @@ public ApiCallContext configure(ApiCallContext context, ReqT reque // Run the transaction in the custom context. context.run(new Runnable() { public void run() { - client.readWriteTransaction().run(new TransactionCallable() { - public long[] run(TransactionContext transaction) throws Exception { - String sql = "INSERT Singers (SingerId, FirstName, LastName)\n" - + "VALUES (20, 'George', 'Washington')"; - long rowCount = transaction.executeUpdate(Statement.of(sql)); - System.out.printf("%d record inserted.%n", rowCount); - return null; - } + client.readWriteTransaction().run(transaction -> { + String sql = "INSERT Singers (SingerId, FirstName, LastName)\n" + + "VALUES (20, 'George', 'Washington')"; + long rowCount = transaction.executeUpdate(Statement.of(sql)); + System.out.printf("%d record inserted.%n", rowCount); + return null; }); } });