Skip to content

Commit

Permalink
test: add additional tests and ITs
Browse files Browse the repository at this point in the history
  • Loading branch information
olavloite committed Jul 17, 2020
1 parent 436833b commit 93b72f2
Show file tree
Hide file tree
Showing 7 changed files with 297 additions and 19 deletions.
Expand Up @@ -39,14 +39,16 @@ final class AsyncTransactionManagerImpl

private final SessionImpl session;
private Span span;
private final boolean inlineBegin;

private TransactionRunnerImpl.TransactionContextImpl txn;
private TransactionState txnState;
private final SettableApiFuture<Timestamp> commitTimestamp = SettableApiFuture.create();

AsyncTransactionManagerImpl(SessionImpl session, Span span) {
AsyncTransactionManagerImpl(SessionImpl session, Span span, boolean inlineBegin) {
this.session = session;
this.span = span;
this.inlineBegin = inlineBegin;
}

@Override
Expand Down Expand Up @@ -74,7 +76,12 @@ private ApiFuture<TransactionContext> internalBeginAsync(boolean setActive) {
session.setActive(this);
}
final SettableApiFuture<TransactionContext> res = SettableApiFuture.create();
final ApiFuture<Void> fut = txn.ensureTxnAsync();
final ApiFuture<Void> fut;
if (inlineBegin) {
fut = ApiFutures.immediateFuture(null);
} else {
fut = txn.ensureTxnAsync();
}
ApiFutures.addCallback(
fut,
new ApiFutureCallback<Void>() {
Expand Down
Expand Up @@ -256,6 +256,12 @@ private AsyncRunner inlinedRunAsync() {

@Override
public AsyncTransactionManager transactionManagerAsync() {
return inlineBeginReadWriteTransactions
? inlinedTransactionManagerAsync()
: preparedTransactionManagerAsync();
}

private AsyncTransactionManager preparedTransactionManagerAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadWriteSession().transactionManagerAsync();
Expand All @@ -265,6 +271,16 @@ public AsyncTransactionManager transactionManagerAsync() {
}
}

private AsyncTransactionManager inlinedTransactionManagerAsync() {
Span span = tracer.spanBuilder(READ_WRITE_TRANSACTION_WITH_INLINE_BEGIN).startSpan();
try (Scope s = tracer.withSpan(span)) {
return getReadSession().transactionManagerAsync();
} catch (RuntimeException e) {
TraceUtil.endSpanWithFailure(span, e);
throw e;
}
}

@Override
public long executePartitionedUpdate(final Statement stmt) {
Span span = tracer.spanBuilder(PARTITION_DML_TRANSACTION).startSpan();
Expand Down
Expand Up @@ -252,7 +252,8 @@ public TransactionManager transactionManager() {

@Override
public AsyncTransactionManagerImpl transactionManagerAsync() {
return new AsyncTransactionManagerImpl(this, currentSpan);
return new AsyncTransactionManagerImpl(
this, currentSpan, spanner.getOptions().isInlineBeginForReadWriteTransaction());
}

@Override
Expand Down
Expand Up @@ -28,6 +28,10 @@
import com.google.cloud.spanner.AsyncResultSet.CallbackResponse;
import com.google.cloud.spanner.AsyncResultSet.ReadyCallback;
import com.google.cloud.spanner.AsyncRunner.AsyncWork;
import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionFunction;
import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionStep;
import com.google.cloud.spanner.AsyncTransactionManager.CommitTimestampFuture;
import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture;
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
import com.google.cloud.spanner.TransactionRunner.TransactionCallable;
import com.google.common.util.concurrent.MoreExecutors;
Expand Down Expand Up @@ -656,6 +660,164 @@ public ApiFuture<Void> doWorkAsync(TransactionContext transaction) {
assertThat(countTransactionsStarted()).isEqualTo(1);
}

@Test
public void testAsyncTransactionManagerInlinedBeginTx()
throws InterruptedException, ExecutionException {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
try (AsyncTransactionManager txMgr = client.transactionManagerAsync()) {
TransactionContextFuture txn = txMgr.beginAsync();
while (true) {
AsyncTransactionStep<Void, Long> updateCount =
txn.then(
new AsyncTransactionFunction<Void, Long>() {
@Override
public ApiFuture<Long> apply(TransactionContext txn, Void input)
throws Exception {
return txn.executeUpdateAsync(UPDATE_STATEMENT);
}
},
executor);
CommitTimestampFuture commitTimestamp = updateCount.commitAsync();
try {
assertThat(updateCount.get()).isEqualTo(UPDATE_COUNT);
assertThat(commitTimestamp.get()).isNotNull();
break;
} catch (AbortedException e) {
txn = txMgr.resetForRetryAsync();
}
}
}
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countTransactionsStarted()).isEqualTo(1);
}

@Test
public void testAsyncTransactionManagerInlinedBeginTxAborted()
throws InterruptedException, ExecutionException {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
try (AsyncTransactionManager txMgr = client.transactionManagerAsync()) {
TransactionContextFuture txn = txMgr.beginAsync();
boolean first = true;
while (true) {
try {
AsyncTransactionStep<Void, Long> updateCount =
txn.then(
new AsyncTransactionFunction<Void, Long>() {
@Override
public ApiFuture<Long> apply(TransactionContext txn, Void input)
throws Exception {
return txn.executeUpdateAsync(UPDATE_STATEMENT);
}
},
executor);
if (first) {
// Abort the transaction after the statement has been executed to ensure that the
// transaction has actually been started before the test tries to abort it.
updateCount.then(
new AsyncTransactionFunction<Long, Void>() {
@Override
public ApiFuture<Void> apply(TransactionContext txn, Long input)
throws Exception {
mockSpanner.abortAllTransactions();
return ApiFutures.immediateFuture(null);
}
},
MoreExecutors.directExecutor());
first = false;
}
assertThat(updateCount.commitAsync().get()).isNotNull();
assertThat(updateCount.get()).isEqualTo(UPDATE_COUNT);
break;
} catch (AbortedException e) {
txn = txMgr.resetForRetryAsync();
}
}
}
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countTransactionsStarted()).isEqualTo(2);
}

@Test
public void testAsyncTransactionManagerInlinedBeginTxWithOnlyMutations()
throws InterruptedException, ExecutionException {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
try (AsyncTransactionManager txMgr = client.transactionManagerAsync()) {
TransactionContextFuture txn = txMgr.beginAsync();
while (true) {
try {
txn.then(
new AsyncTransactionFunction<Void, Void>() {
@Override
public ApiFuture<Void> apply(TransactionContext txn, Void input)
throws Exception {
txn.buffer(Mutation.delete("FOO", Key.of(1L)));
return ApiFutures.immediateFuture(null);
}
},
executor)
.commitAsync()
.get();
break;
} catch (AbortedException e) {
txn = txMgr.resetForRetryAsync();
}
}
}
// There should be 1 call to BeginTransaction because there is no statement that we can use to
// inline the BeginTransaction call with.
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countTransactionsStarted()).isEqualTo(1);
}

@Test
public void testAsyncTransactionManagerInlinedBeginTxWithError()
throws InterruptedException, ExecutionException {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
try (AsyncTransactionManager txMgr = client.transactionManagerAsync()) {
TransactionContextFuture txn = txMgr.beginAsync();
while (true) {
try {
AsyncTransactionStep<Long, Long> updateCount =
txn.then(
new AsyncTransactionFunction<Void, Long>() {
@Override
public ApiFuture<Long> apply(TransactionContext txn, Void input)
throws Exception {
return txn.executeUpdateAsync(INVALID_UPDATE_STATEMENT);
}
},
executor)
.then(
new AsyncTransactionFunction<Long, Long>() {
@Override
public ApiFuture<Long> apply(TransactionContext txn, Long input)
throws Exception {
return txn.executeUpdateAsync(UPDATE_STATEMENT);
}
},
executor);
try {
updateCount.commitAsync().get();
fail("missing expected exception");
} catch (ExecutionException e) {
assertThat(e.getCause()).isInstanceOf(SpannerException.class);
SpannerException se = (SpannerException) e.getCause();
assertThat(se.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT);
}
break;
} catch (AbortedException e) {
txn = txMgr.resetForRetryAsync();
}
}
}
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countTransactionsStarted()).isEqualTo(1);
}

private int countRequests(Class<? extends AbstractMessage> requestType) {
int count = 0;
for (AbstractMessage msg : mockSpanner.getRequests()) {
Expand Down
Expand Up @@ -35,6 +35,7 @@
import com.google.cloud.spanner.Key;
import com.google.cloud.spanner.KeySet;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerException;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TransactionContext;
Expand All @@ -46,6 +47,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand All @@ -58,21 +60,29 @@
@RunWith(Parameterized.class)
public class ITTransactionManagerAsyncTest {

@Parameter public Executor executor;
@Parameter(0)
public Executor executor;

@Parameters(name = "executor = {0}")
@Parameter(1)
public boolean inlineBegin;

@Parameters(name = "executor = {0}, inlineBegin = {1}")
public static Collection<Object[]> data() {
return Arrays.asList(
new Object[][] {
{MoreExecutors.directExecutor()},
{Executors.newSingleThreadExecutor()},
{Executors.newFixedThreadPool(4)}
{MoreExecutors.directExecutor(), false},
{MoreExecutors.directExecutor(), true},
{Executors.newSingleThreadExecutor(), false},
{Executors.newSingleThreadExecutor(), true},
{Executors.newFixedThreadPool(4), false},
{Executors.newFixedThreadPool(4), true}
});
}

@ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv();
private static Database db;
private static DatabaseClient client;
private Spanner spanner;
private DatabaseClient client;

@BeforeClass
public static void setUpDatabase() {
Expand All @@ -84,14 +94,26 @@ public static void setUpDatabase() {
+ " K STRING(MAX) NOT NULL,"
+ " BoolValue BOOL,"
+ ") PRIMARY KEY (K)");
client = env.getTestHelper().getDatabaseClient(db);
}

@Before
public void clearTable() {
spanner =
env.getTestHelper()
.getOptions()
.toBuilder()
.setInlineBeginForReadWriteTransaction(inlineBegin)
.build()
.getService();
client = spanner.getDatabaseClient(db.getId());
client.write(ImmutableList.of(Mutation.delete("T", KeySet.all())));
}

@After
public void closeSpanner() {
spanner.close();
}

@Test
public void testSimpleInsert() throws ExecutionException, InterruptedException {
try (AsyncTransactionManager manager = client.transactionManagerAsync()) {
Expand Down

0 comments on commit 93b72f2

Please sign in to comment.