diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java index a306e7caa2..f6085adf38 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java @@ -39,14 +39,16 @@ final class AsyncTransactionManagerImpl private final SessionImpl session; private Span span; + private final boolean inlineBegin; private TransactionRunnerImpl.TransactionContextImpl txn; private TransactionState txnState; private final SettableApiFuture commitTimestamp = SettableApiFuture.create(); - AsyncTransactionManagerImpl(SessionImpl session, Span span) { + AsyncTransactionManagerImpl(SessionImpl session, Span span, boolean inlineBegin) { this.session = session; this.span = span; + this.inlineBegin = inlineBegin; } @Override @@ -74,7 +76,12 @@ private ApiFuture internalBeginAsync(boolean setActive) { session.setActive(this); } final SettableApiFuture res = SettableApiFuture.create(); - final ApiFuture fut = txn.ensureTxnAsync(); + final ApiFuture fut; + if (inlineBegin) { + fut = ApiFutures.immediateFuture(null); + } else { + fut = txn.ensureTxnAsync(); + } ApiFutures.addCallback( fut, new ApiFutureCallback() { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java index dd16e0a616..3883134582 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java @@ -256,6 +256,12 @@ private AsyncRunner inlinedRunAsync() { @Override public AsyncTransactionManager transactionManagerAsync() { + return inlineBeginReadWriteTransactions + ? inlinedTransactionManagerAsync() + : preparedTransactionManagerAsync(); + } + + private AsyncTransactionManager preparedTransactionManagerAsync() { Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan(); try (Scope s = tracer.withSpan(span)) { return getReadWriteSession().transactionManagerAsync(); @@ -265,6 +271,16 @@ public AsyncTransactionManager transactionManagerAsync() { } } + private AsyncTransactionManager inlinedTransactionManagerAsync() { + Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION_WITH_INLINE_BEGIN).startSpan(); + try (Scope s = tracer.withSpan(span)) { + return getReadSession().transactionManagerAsync(); + } catch (RuntimeException e) { + TraceUtil.endSpanWithFailure(span, e); + throw e; + } + } + @Override public long executePartitionedUpdate(final Statement stmt) { Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan(); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 65c4b3a64d..61e86ca421 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -252,7 +252,8 @@ public TransactionManager transactionManager() { @Override public AsyncTransactionManagerImpl transactionManagerAsync() { - return new AsyncTransactionManagerImpl(this, currentSpan); + return new AsyncTransactionManagerImpl( + this, currentSpan, spanner.getOptions().isInlineBeginForReadWriteTransaction()); } @Override diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java index f9f6b0cd2d..9759f15361 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/InlineBeginTransactionTest.java @@ -28,6 +28,10 @@ import com.google.cloud.spanner.AsyncResultSet.CallbackResponse; import com.google.cloud.spanner.AsyncResultSet.ReadyCallback; import com.google.cloud.spanner.AsyncRunner.AsyncWork; +import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionFunction; +import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionStep; +import com.google.cloud.spanner.AsyncTransactionManager.CommitTimestampFuture; +import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.common.util.concurrent.MoreExecutors; @@ -656,6 +660,164 @@ public ApiFuture doWorkAsync(TransactionContext transaction) { assertThat(countTransactionsStarted()).isEqualTo(1); } + @Test + public void testAsyncTransactionManagerInlinedBeginTx() + throws InterruptedException, ExecutionException { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + try (AsyncTransactionManager txMgr = client.transactionManagerAsync()) { + TransactionContextFuture txn = txMgr.beginAsync(); + while (true) { + AsyncTransactionStep updateCount = + txn.then( + new AsyncTransactionFunction() { + @Override + public ApiFuture apply(TransactionContext txn, Void input) + throws Exception { + return txn.executeUpdateAsync(UPDATE_STATEMENT); + } + }, + executor); + CommitTimestampFuture commitTimestamp = updateCount.commitAsync(); + try { + assertThat(updateCount.get()).isEqualTo(UPDATE_COUNT); + assertThat(commitTimestamp.get()).isNotNull(); + break; + } catch (AbortedException e) { + txn = txMgr.resetForRetryAsync(); + } + } + } + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); + assertThat(countTransactionsStarted()).isEqualTo(1); + } + + @Test + public void testAsyncTransactionManagerInlinedBeginTxAborted() + throws InterruptedException, ExecutionException { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + try (AsyncTransactionManager txMgr = client.transactionManagerAsync()) { + TransactionContextFuture txn = txMgr.beginAsync(); + boolean first = true; + while (true) { + try { + AsyncTransactionStep updateCount = + txn.then( + new AsyncTransactionFunction() { + @Override + public ApiFuture apply(TransactionContext txn, Void input) + throws Exception { + return txn.executeUpdateAsync(UPDATE_STATEMENT); + } + }, + executor); + if (first) { + // Abort the transaction after the statement has been executed to ensure that the + // transaction has actually been started before the test tries to abort it. + updateCount.then( + new AsyncTransactionFunction() { + @Override + public ApiFuture apply(TransactionContext txn, Long input) + throws Exception { + mockSpanner.abortAllTransactions(); + return ApiFutures.immediateFuture(null); + } + }, + MoreExecutors.directExecutor()); + first = false; + } + assertThat(updateCount.commitAsync().get()).isNotNull(); + assertThat(updateCount.get()).isEqualTo(UPDATE_COUNT); + break; + } catch (AbortedException e) { + txn = txMgr.resetForRetryAsync(); + } + } + } + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); + assertThat(countTransactionsStarted()).isEqualTo(2); + } + + @Test + public void testAsyncTransactionManagerInlinedBeginTxWithOnlyMutations() + throws InterruptedException, ExecutionException { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + try (AsyncTransactionManager txMgr = client.transactionManagerAsync()) { + TransactionContextFuture txn = txMgr.beginAsync(); + while (true) { + try { + txn.then( + new AsyncTransactionFunction() { + @Override + public ApiFuture apply(TransactionContext txn, Void input) + throws Exception { + txn.buffer(Mutation.delete("FOO", Key.of(1L))); + return ApiFutures.immediateFuture(null); + } + }, + executor) + .commitAsync() + .get(); + break; + } catch (AbortedException e) { + txn = txMgr.resetForRetryAsync(); + } + } + } + // There should be 1 call to BeginTransaction because there is no statement that we can use to + // inline the BeginTransaction call with. + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1); + assertThat(countTransactionsStarted()).isEqualTo(1); + } + + @Test + public void testAsyncTransactionManagerInlinedBeginTxWithError() + throws InterruptedException, ExecutionException { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]")); + try (AsyncTransactionManager txMgr = client.transactionManagerAsync()) { + TransactionContextFuture txn = txMgr.beginAsync(); + while (true) { + try { + AsyncTransactionStep updateCount = + txn.then( + new AsyncTransactionFunction() { + @Override + public ApiFuture apply(TransactionContext txn, Void input) + throws Exception { + return txn.executeUpdateAsync(INVALID_UPDATE_STATEMENT); + } + }, + executor) + .then( + new AsyncTransactionFunction() { + @Override + public ApiFuture apply(TransactionContext txn, Long input) + throws Exception { + return txn.executeUpdateAsync(UPDATE_STATEMENT); + } + }, + executor); + try { + updateCount.commitAsync().get(); + fail("missing expected exception"); + } catch (ExecutionException e) { + assertThat(e.getCause()).isInstanceOf(SpannerException.class); + SpannerException se = (SpannerException) e.getCause(); + assertThat(se.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + } + break; + } catch (AbortedException e) { + txn = txMgr.resetForRetryAsync(); + } + } + } + assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0); + assertThat(countTransactionsStarted()).isEqualTo(1); + } + private int countRequests(Class requestType) { int count = 0; for (AbstractMessage msg : mockSpanner.getRequests()) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerAsyncTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerAsyncTest.java index c802493dec..f51ac4b480 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerAsyncTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerAsyncTest.java @@ -35,6 +35,7 @@ import com.google.cloud.spanner.Key; import com.google.cloud.spanner.KeySet; import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.TransactionContext; @@ -46,6 +47,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import org.junit.After; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -58,21 +60,29 @@ @RunWith(Parameterized.class) public class ITTransactionManagerAsyncTest { - @Parameter public Executor executor; + @Parameter(0) + public Executor executor; - @Parameters(name = "executor = {0}") + @Parameter(1) + public boolean inlineBegin; + + @Parameters(name = "executor = {0}, inlineBegin = {1}") public static Collection data() { return Arrays.asList( new Object[][] { - {MoreExecutors.directExecutor()}, - {Executors.newSingleThreadExecutor()}, - {Executors.newFixedThreadPool(4)} + {MoreExecutors.directExecutor(), false}, + {MoreExecutors.directExecutor(), true}, + {Executors.newSingleThreadExecutor(), false}, + {Executors.newSingleThreadExecutor(), true}, + {Executors.newFixedThreadPool(4), false}, + {Executors.newFixedThreadPool(4), true} }); } @ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv(); private static Database db; - private static DatabaseClient client; + private Spanner spanner; + private DatabaseClient client; @BeforeClass public static void setUpDatabase() { @@ -84,14 +94,26 @@ public static void setUpDatabase() { + " K STRING(MAX) NOT NULL," + " BoolValue BOOL," + ") PRIMARY KEY (K)"); - client = env.getTestHelper().getDatabaseClient(db); } @Before public void clearTable() { + spanner = + env.getTestHelper() + .getOptions() + .toBuilder() + .setInlineBeginForReadWriteTransaction(inlineBegin) + .build() + .getService(); + client = spanner.getDatabaseClient(db.getId()); client.write(ImmutableList.of(Mutation.delete("T", KeySet.all()))); } + @After + public void closeSpanner() { + spanner.close(); + } + @Test public void testSimpleInsert() throws ExecutionException, InterruptedException { try (AsyncTransactionManager manager = client.transactionManagerAsync()) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerTest.java index 281977af6a..f912c0d710 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionManagerTest.java @@ -26,28 +26,45 @@ import com.google.cloud.spanner.DatabaseClient; import com.google.cloud.spanner.IntegrationTestEnv; import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.KeySet; import com.google.cloud.spanner.Mutation; import com.google.cloud.spanner.ParallelIntegrationTest; +import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Struct; import com.google.cloud.spanner.TransactionContext; import com.google.cloud.spanner.TransactionManager; import com.google.cloud.spanner.TransactionManager.TransactionState; +import com.google.common.collect.ImmutableList; import java.util.Arrays; +import java.util.Collection; +import org.junit.After; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; @Category(ParallelIntegrationTest.class) -@RunWith(JUnit4.class) +@RunWith(Parameterized.class) public class ITTransactionManagerTest { + @Parameter(0) + public boolean inlineBegin; + + @Parameters(name = "inlineBegin = {0}") + public static Collection data() { + return Arrays.asList(new Object[][] {{false}, {true}}); + } + @ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv(); private static Database db; - private static DatabaseClient client; + private Spanner spanner; + private DatabaseClient client; @BeforeClass public static void setUpDatabase() { @@ -59,7 +76,24 @@ public static void setUpDatabase() { + " K STRING(MAX) NOT NULL," + " BoolValue BOOL," + ") PRIMARY KEY (K)"); - client = env.getTestHelper().getDatabaseClient(db); + } + + @Before + public void setupClient() { + spanner = + env.getTestHelper() + .getOptions() + .toBuilder() + .setInlineBeginForReadWriteTransaction(inlineBegin) + .build() + .getService(); + client = spanner.getDatabaseClient(db.getId()); + client.write(ImmutableList.of(Mutation.delete("T", KeySet.all()))); + } + + @After + public void closeClient() { + spanner.close(); } @SuppressWarnings("resource") @@ -200,6 +234,7 @@ public void abortAndRetry() throws InterruptedException { Struct row = client.singleUse().readRow("T", Key.of("Key3"), Arrays.asList("K", "BoolValue")); assertThat(row.getString(0)).isEqualTo("Key3"); assertThat(row.getBoolean(1)).isTrue(); + manager2.close(); } } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java index 5e3c1483e7..672a9e27f3 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java @@ -36,7 +36,9 @@ import com.google.cloud.spanner.ParallelIntegrationTest; import com.google.cloud.spanner.PartitionOptions; import com.google.cloud.spanner.ReadContext; +import com.google.cloud.spanner.ReadOnlyTransaction; import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Spanner; import com.google.cloud.spanner.SpannerException; import com.google.cloud.spanner.Statement; import com.google.cloud.spanner.Struct; @@ -48,24 +50,39 @@ import com.google.common.util.concurrent.Uninterruptibles; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Vector; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; +import org.junit.runners.Parameterized.Parameters; /** Integration tests for read-write transactions. */ @Category(ParallelIntegrationTest.class) -@RunWith(JUnit4.class) +@RunWith(Parameterized.class) public class ITTransactionTest { + + @Parameter(0) + public boolean inlineBegin; + + @Parameters(name = "inlineBegin = {0}") + public static Collection data() { + return Arrays.asList(new Object[][] {{false}, {true}}); + } + @ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv(); private static Database db; - private static DatabaseClient client; + private Spanner spanner; + private DatabaseClient client; /** Sequence for assigning unique keys to test cases. */ private static int seq; @@ -78,7 +95,23 @@ public static void setUpDatabase() { + " K STRING(MAX) NOT NULL," + " V INT64," + ") PRIMARY KEY (K)"); - client = env.getTestHelper().getDatabaseClient(db); + } + + @Before + public void setupClient() { + spanner = + env.getTestHelper() + .getOptions() + .toBuilder() + .setInlineBeginForReadWriteTransaction(inlineBegin) + .build() + .getService(); + client = spanner.getDatabaseClient(db.getId()); + } + + @After + public void closeClient() { + spanner.close(); } private static String uniqueKey() { @@ -427,7 +460,9 @@ public void nestedReadOnlyTxnThrows() { new TransactionCallable() { @Override public Void run(TransactionContext transaction) throws SpannerException { - client.readOnlyTransaction().getReadTimestamp(); + try (ReadOnlyTransaction tx = client.readOnlyTransaction()) { + tx.getReadTimestamp(); + } return null; }