diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java index 2ba66d0c86..7dda260102 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java @@ -66,7 +66,9 @@ public ApiFuture closeAsync() { if (txnState == TransactionState.STARTED) { res = rollbackAsync(); } - txn.close(); + if (txn != null) { + txn.close(); + } return MoreObjects.firstNonNull(res, ApiFutures.immediateFuture(null)); } @@ -172,7 +174,7 @@ public ApiFuture apply(Empty input) throws Exception { @Override public TransactionContextFuture resetForRetryAsync() { - if (txn == null || !txn.isAborted() && txnState != TransactionState.ABORTED) { + if (txn == null || (!txn.isAborted() && txnState != TransactionState.ABORTED)) { throw new IllegalStateException( "resetForRetry can only be called if the previous attempt aborted"); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java index be96f651ac..f2f8601516 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java @@ -38,6 +38,7 @@ import static com.google.cloud.spanner.MetricRegistryConstants.SPANNER_LABEL_KEYS_WITH_TYPE; import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException; +import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; @@ -540,177 +541,248 @@ public Timestamp getReadTimestamp() { } } - private static class AutoClosingTransactionManager implements TransactionManager { - private class SessionPoolResultSet extends ForwardingResultSet { - private SessionPoolResultSet(ResultSet delegate) { - super(delegate); - } + interface SessionNotFoundHandler { + /** + * Handles the given {@link SessionNotFoundException} by possibly converting it to a different + * exception that should be thrown. + */ + SpannerException handleSessionNotFound(SessionNotFoundException notFound); + } - @Override - public boolean next() { - try { - return super.next(); - } catch (SessionNotFoundException e) { - throw handleSessionNotFound(e); - } + static class SessionPoolResultSet extends ForwardingResultSet { + private final SessionNotFoundHandler handler; + + private SessionPoolResultSet(SessionNotFoundHandler handler, ResultSet delegate) { + super(delegate); + this.handler = Preconditions.checkNotNull(handler); + } + + @Override + public boolean next() { + try { + return super.next(); + } catch (SessionNotFoundException e) { + throw handler.handleSessionNotFound(e); } } + } - /** - * {@link TransactionContext} that is used in combination with an {@link - * AutoClosingTransactionManager}. This {@link TransactionContext} handles {@link - * SessionNotFoundException}s by replacing the underlying session with a fresh one, and then - * throws an {@link AbortedException} to trigger the retry-loop that has been created by the - * caller. - */ - private class SessionPoolTransactionContext implements TransactionContext { - private final TransactionContext delegate; + static class AsyncSessionPoolResultSet extends ForwardingAsyncResultSet { + private final SessionNotFoundHandler handler; - private SessionPoolTransactionContext(TransactionContext delegate) { - this.delegate = delegate; - } + private AsyncSessionPoolResultSet(SessionNotFoundHandler handler, AsyncResultSet delegate) { + super(delegate); + this.handler = Preconditions.checkNotNull(handler); + } - @Override - public ResultSet read( - String table, KeySet keys, Iterable columns, ReadOption... options) { - return new SessionPoolResultSet(delegate.read(table, keys, columns, options)); - } + @Override + public ApiFuture setCallback(Executor executor, final ReadyCallback callback) { + return super.setCallback( + executor, + new ReadyCallback() { + @Override + public CallbackResponse cursorReady(AsyncResultSet resultSet) { + try { + return callback.cursorReady(resultSet); + } catch (SessionNotFoundException e) { + throw handler.handleSessionNotFound(e); + } + } + }); + } - @Override - public AsyncResultSet readAsync( - String table, KeySet keys, Iterable columns, ReadOption... options) { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.UNIMPLEMENTED, "not yet implemented"); + @Override + public boolean next() { + try { + return super.next(); + } catch (SessionNotFoundException e) { + throw handler.handleSessionNotFound(e); } + } - @Override - public ResultSet readUsingIndex( - String table, - String index, - KeySet keys, - Iterable columns, - ReadOption... options) { - return new SessionPoolResultSet( - delegate.readUsingIndex(table, index, keys, columns, options)); + @Override + public CursorState tryNext() { + try { + return super.tryNext(); + } catch (SessionNotFoundException e) { + throw handler.handleSessionNotFound(e); } + } + } - @Override - public AsyncResultSet readUsingIndexAsync( - String table, - String index, - KeySet keys, - Iterable columns, - ReadOption... options) { - throw SpannerExceptionFactory.newSpannerException( - ErrorCode.UNIMPLEMENTED, "not yet implemented"); - } + /** + * {@link TransactionContext} that is used in combination with an {@link + * AutoClosingTransactionManager}. This {@link TransactionContext} handles {@link + * SessionNotFoundException}s by replacing the underlying session with a fresh one, and then + * throws an {@link AbortedException} to trigger the retry-loop that has been created by the + * caller. + */ + static class SessionPoolTransactionContext implements TransactionContext { + private final SessionNotFoundHandler handler; + final TransactionContext delegate; - @Override - public Struct readRow(String table, Key key, Iterable columns) { - try { - return delegate.readRow(table, key, columns); - } catch (SessionNotFoundException e) { - throw handleSessionNotFound(e); - } - } + SessionPoolTransactionContext(SessionNotFoundHandler handler, TransactionContext delegate) { + this.handler = Preconditions.checkNotNull(handler); + this.delegate = delegate; + } - @Override - public ApiFuture readRowAsync(String table, Key key, Iterable columns) { - try (AsyncResultSet rs = readAsync(table, KeySet.singleKey(key), columns)) { - return AbstractReadContext.consumeSingleRowAsync(rs); - } - } + @Override + public ResultSet read( + String table, KeySet keys, Iterable columns, ReadOption... options) { + return new SessionPoolResultSet(handler, delegate.read(table, keys, columns, options)); + } - @Override - public void buffer(Mutation mutation) { - delegate.buffer(mutation); - } + @Override + public AsyncResultSet readAsync( + String table, KeySet keys, Iterable columns, ReadOption... options) { + return new AsyncSessionPoolResultSet( + handler, delegate.readAsync(table, keys, columns, options)); + } - @Override - public Struct readRowUsingIndex( - String table, String index, Key key, Iterable columns) { - try { - return delegate.readRowUsingIndex(table, index, key, columns); - } catch (SessionNotFoundException e) { - throw handleSessionNotFound(e); - } - } + @Override + public ResultSet readUsingIndex( + String table, String index, KeySet keys, Iterable columns, ReadOption... options) { + return new SessionPoolResultSet( + handler, delegate.readUsingIndex(table, index, keys, columns, options)); + } - @Override - public ApiFuture readRowUsingIndexAsync( - String table, String index, Key key, Iterable columns) { - try (AsyncResultSet rs = - readUsingIndexAsync(table, index, KeySet.singleKey(key), columns)) { - return AbstractReadContext.consumeSingleRowAsync(rs); - } - } + @Override + public AsyncResultSet readUsingIndexAsync( + String table, String index, KeySet keys, Iterable columns, ReadOption... options) { + return new AsyncSessionPoolResultSet( + handler, delegate.readUsingIndexAsync(table, index, keys, columns, options)); + } - @Override - public void buffer(Iterable mutations) { - delegate.buffer(mutations); + @Override + public Struct readRow(String table, Key key, Iterable columns) { + try { + return delegate.readRow(table, key, columns); + } catch (SessionNotFoundException e) { + throw handler.handleSessionNotFound(e); } + } - @Override - public long executeUpdate(Statement statement) { - try { - return delegate.executeUpdate(statement); - } catch (SessionNotFoundException e) { - throw handleSessionNotFound(e); - } + @Override + public ApiFuture readRowAsync(String table, Key key, Iterable columns) { + try (AsyncResultSet rs = readAsync(table, KeySet.singleKey(key), columns)) { + return ApiFutures.catching( + AbstractReadContext.consumeSingleRowAsync(rs), + SessionNotFoundException.class, + new ApiFunction() { + @Override + public Struct apply(SessionNotFoundException input) { + throw handler.handleSessionNotFound(input); + } + }, + MoreExecutors.directExecutor()); } + } - @Override - public ApiFuture executeUpdateAsync(Statement statement) { - try { - return delegate.executeUpdateAsync(statement); - } catch (SessionNotFoundException e) { - throw handleSessionNotFound(e); - } - } + @Override + public void buffer(Mutation mutation) { + delegate.buffer(mutation); + } - @Override - public long[] batchUpdate(Iterable statements) { - try { - return delegate.batchUpdate(statements); - } catch (SessionNotFoundException e) { - throw handleSessionNotFound(e); - } + @Override + public Struct readRowUsingIndex(String table, String index, Key key, Iterable columns) { + try { + return delegate.readRowUsingIndex(table, index, key, columns); + } catch (SessionNotFoundException e) { + throw handler.handleSessionNotFound(e); } + } - @Override - public ApiFuture batchUpdateAsync(Iterable statements) { - try { - return delegate.batchUpdateAsync(statements); - } catch (SessionNotFoundException e) { - throw handleSessionNotFound(e); - } + @Override + public ApiFuture readRowUsingIndexAsync( + String table, String index, Key key, Iterable columns) { + try (AsyncResultSet rs = readUsingIndexAsync(table, index, KeySet.singleKey(key), columns)) { + return ApiFutures.catching( + AbstractReadContext.consumeSingleRowAsync(rs), + SessionNotFoundException.class, + new ApiFunction() { + @Override + public Struct apply(SessionNotFoundException input) { + throw handler.handleSessionNotFound(input); + } + }, + MoreExecutors.directExecutor()); } + } - @Override - public ResultSet executeQuery(Statement statement, QueryOption... options) { - return new SessionPoolResultSet(delegate.executeQuery(statement, options)); - } + @Override + public void buffer(Iterable mutations) { + delegate.buffer(mutations); + } - @Override - public AsyncResultSet executeQueryAsync(Statement statement, QueryOption... options) { - try { - return delegate.executeQueryAsync(statement, options); - } catch (SessionNotFoundException e) { - throw handleSessionNotFound(e); - } + @Override + public long executeUpdate(Statement statement) { + try { + return delegate.executeUpdate(statement); + } catch (SessionNotFoundException e) { + throw handler.handleSessionNotFound(e); } + } - @Override - public ResultSet analyzeQuery(Statement statement, QueryAnalyzeMode queryMode) { - return new SessionPoolResultSet(delegate.analyzeQuery(statement, queryMode)); - } + @Override + public ApiFuture executeUpdateAsync(Statement statement) { + return ApiFutures.catching( + delegate.executeUpdateAsync(statement), + SessionNotFoundException.class, + new ApiFunction() { + @Override + public Long apply(SessionNotFoundException input) { + throw handler.handleSessionNotFound(input); + } + }, + MoreExecutors.directExecutor()); + } - @Override - public void close() { - delegate.close(); + @Override + public long[] batchUpdate(Iterable statements) { + try { + return delegate.batchUpdate(statements); + } catch (SessionNotFoundException e) { + throw handler.handleSessionNotFound(e); } } + @Override + public ApiFuture batchUpdateAsync(Iterable statements) { + return ApiFutures.catching( + delegate.batchUpdateAsync(statements), + SessionNotFoundException.class, + new ApiFunction() { + @Override + public long[] apply(SessionNotFoundException input) { + throw handler.handleSessionNotFound(input); + } + }, + MoreExecutors.directExecutor()); + } + + @Override + public ResultSet executeQuery(Statement statement, QueryOption... options) { + return new SessionPoolResultSet(handler, delegate.executeQuery(statement, options)); + } + + @Override + public AsyncResultSet executeQueryAsync(Statement statement, QueryOption... options) { + return new AsyncSessionPoolResultSet(handler, delegate.executeQueryAsync(statement, options)); + } + + @Override + public ResultSet analyzeQuery(Statement statement, QueryAnalyzeMode queryMode) { + return new SessionPoolResultSet(handler, delegate.analyzeQuery(statement, queryMode)); + } + + @Override + public void close() { + delegate.close(); + } + } + + private static class AutoClosingTransactionManager + implements TransactionManager, SessionNotFoundHandler { private TransactionManager delegate; private final SessionPool sessionPool; private PooledSessionFuture session; @@ -731,12 +803,13 @@ public TransactionContext begin() { } private TransactionContext internalBegin() { - TransactionContext res = new SessionPoolTransactionContext(delegate.begin()); + TransactionContext res = new SessionPoolTransactionContext(this, delegate.begin()); session.get().markUsed(); return res; } - private SpannerException handleSessionNotFound(SessionNotFoundException notFound) { + @Override + public SpannerException handleSessionNotFound(SessionNotFoundException notFound) { session = sessionPool.replaceSession(notFound, session); PooledSession pooledSession = session.get(); delegate = pooledSession.delegate.transactionManager(); @@ -772,11 +845,11 @@ public TransactionContext resetForRetry() { while (true) { try { if (restartedAfterSessionNotFound) { - TransactionContext res = new SessionPoolTransactionContext(delegate.begin()); + TransactionContext res = new SessionPoolTransactionContext(this, delegate.begin()); restartedAfterSessionNotFound = false; return res; } else { - return new SessionPoolTransactionContext(delegate.resetForRetry()); + return new SessionPoolTransactionContext(this, delegate.resetForRetry()); } } catch (SessionNotFoundException e) { session = sessionPool.replaceSession(e, session); @@ -1145,7 +1218,7 @@ public AsyncRunner runAsync() { @Override public AsyncTransactionManager transactionManagerAsync() { - return new SessionPoolAsyncTransactionManager(this); + return new SessionPoolAsyncTransactionManager(SessionPool.this, this); } @Override diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java index 54b621b93b..515286fb11 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolAsyncTransactionManager.java @@ -17,30 +17,40 @@ package com.google.cloud.spanner; import com.google.api.core.ApiAsyncFunction; +import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.cloud.Timestamp; import com.google.cloud.spanner.SessionPool.PooledSessionFuture; +import com.google.cloud.spanner.SessionPool.SessionNotFoundHandler; import com.google.cloud.spanner.TransactionContextFutureImpl.CommittableAsyncTransactionManager; import com.google.cloud.spanner.TransactionManager.TransactionState; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.MoreExecutors; import javax.annotation.concurrent.GuardedBy; -class SessionPoolAsyncTransactionManager implements CommittableAsyncTransactionManager { +class SessionPoolAsyncTransactionManager + implements CommittableAsyncTransactionManager, SessionNotFoundHandler { private final Object lock = new Object(); @GuardedBy("lock") private TransactionState txnState; + private final SessionPool pool; private volatile PooledSessionFuture session; - private final SettableApiFuture delegate = - SettableApiFuture.create(); + private volatile SettableApiFuture delegate; + private boolean restartedAfterSessionNotFound; - SessionPoolAsyncTransactionManager(PooledSessionFuture session) { + SessionPoolAsyncTransactionManager(SessionPool pool, PooledSessionFuture session) { + this.pool = Preconditions.checkNotNull(pool); + createTransaction(session); + } + + private void createTransaction(PooledSessionFuture session) { this.session = session; + this.delegate = SettableApiFuture.create(); this.session.addListener( new Runnable() { @Override @@ -56,6 +66,16 @@ public void run() { MoreExecutors.directExecutor()); } + @Override + public SpannerException handleSessionNotFound(SessionNotFoundException notFound) { + // Restart the entire transaction with a new session and throw an AbortedException to force the + // client application to retry. + createTransaction(pool.replaceSession(notFound, session)); + restartedAfterSessionNotFound = true; + return SpannerExceptionFactory.newSpannerException( + ErrorCode.ABORTED, notFound.getMessage(), notFound); + } + @Override public void close() { SpannerApiFutures.get(closeAsync()); @@ -122,7 +142,9 @@ public void onFailure(Throwable t) { @Override public void onSuccess(TransactionContext result) { - delegateTxnFuture.set(result); + delegateTxnFuture.set( + new SessionPool.SessionPoolTransactionContext( + SessionPoolAsyncTransactionManager.this, result)); } }, MoreExecutors.directExecutor()); @@ -215,19 +237,33 @@ public void run() { public TransactionContextFuture resetForRetryAsync() { synchronized (lock) { Preconditions.checkState( - txnState == TransactionState.ABORTED, + txnState == TransactionState.ABORTED || restartedAfterSessionNotFound, "resetForRetry can only be called after the transaction aborted."); txnState = TransactionState.STARTED; } return new TransactionContextFutureImpl( this, - ApiFutures.transformAsync( - delegate, - new ApiAsyncFunction() { + ApiFutures.transform( + ApiFutures.transformAsync( + delegate, + new ApiAsyncFunction() { + @Override + public ApiFuture apply(AsyncTransactionManagerImpl input) + throws Exception { + if (restartedAfterSessionNotFound) { + restartedAfterSessionNotFound = false; + return input.beginAsync(); + } + return input.resetForRetryAsync(); + } + }, + MoreExecutors.directExecutor()), + new ApiFunction() { + @Override - public ApiFuture apply(AsyncTransactionManagerImpl input) - throws Exception { - return input.resetForRetryAsync(); + public TransactionContext apply(TransactionContext input) { + return new SessionPool.SessionPoolTransactionContext( + SessionPoolAsyncTransactionManager.this, input); } }, MoreExecutors.directExecutor())); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java index ddf8f580a5..aed844ed9a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java @@ -36,6 +36,7 @@ import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.cloud.spanner.Options.ReadOption; +import com.google.cloud.spanner.SessionPool.SessionPoolTransactionContext; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.common.base.Function; import com.google.common.base.Predicate; @@ -191,7 +192,9 @@ public void asyncTransactionManager_shouldRollbackOnCloseAsync() throws Exceptio AsyncTransactionManager manager = client().transactionManagerAsync(); TransactionContext txn = manager.beginAsync().get(); txn.executeUpdateAsync(UPDATE_STATEMENT).get(); - final TransactionSelector selector = ((TransactionContextImpl) txn).getTransactionSelector(); + final TransactionSelector selector = + ((TransactionContextImpl) ((SessionPoolTransactionContext) txn).delegate) + .getTransactionSelector(); SpannerApiFutures.get(manager.closeAsync()); // The mock server should already have the Rollback request, as we are waiting for the returned diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 85e935a75a..d271561447 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -20,6 +20,7 @@ import com.google.cloud.ByteArray; import com.google.cloud.Date; import com.google.cloud.spanner.AbstractResultSet.GrpcStruct; +import com.google.cloud.spanner.SessionPool.SessionPoolTransactionContext; import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -664,6 +665,9 @@ public void setAbortProbability(double probability) { */ public void abortTransaction(TransactionContext transactionContext) { Preconditions.checkNotNull(transactionContext); + if (transactionContext instanceof SessionPoolTransactionContext) { + transactionContext = ((SessionPoolTransactionContext) transactionContext).delegate; + } if (transactionContext instanceof TransactionContextImpl) { TransactionContextImpl impl = (TransactionContextImpl) transactionContext; ByteString id = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java index 4c7f6d26c9..e49c7e739c 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java @@ -30,6 +30,10 @@ import com.google.cloud.spanner.AsyncResultSet.CallbackResponse; import com.google.cloud.spanner.AsyncResultSet.ReadyCallback; import com.google.cloud.spanner.AsyncRunner.AsyncWork; +import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionFunction; +import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionStep; +import com.google.cloud.spanner.AsyncTransactionManager.CommitTimestampFuture; +import com.google.cloud.spanner.AsyncTransactionManager.TransactionContextFuture; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.cloud.spanner.TransactionRunner.TransactionCallable; import com.google.cloud.spanner.v1.SpannerClient; @@ -1655,4 +1659,332 @@ public ApiFuture doWorkAsync(TransactionContext txn) { assertThat(failOnInvalidatedSession).isTrue(); } } + + @Test + public void asyncTransactionManagerAsyncSelect() throws InterruptedException { + asyncTransactionManager_readAsync( + new Function() { + @Override + public AsyncResultSet apply(TransactionContext input) { + return input.executeQueryAsync(SELECT1AND2); + } + }); + } + + @Test + public void asyncTransactionManagerAsyncRead() throws InterruptedException { + asyncTransactionManager_readAsync( + new Function() { + @Override + public AsyncResultSet apply(TransactionContext input) { + return input.readAsync("FOO", KeySet.all(), Arrays.asList("BAR")); + } + }); + } + + @Test + public void asyncTransactionManagerAsyncReadUsingIndex() throws InterruptedException { + asyncTransactionManager_readAsync( + new Function() { + @Override + public AsyncResultSet apply(TransactionContext input) { + return input.readUsingIndexAsync("FOO", "idx", KeySet.all(), Arrays.asList("BAR")); + } + }); + } + + private void asyncTransactionManager_readAsync( + final Function fn) throws InterruptedException { + invalidateSessionPool(); + final ExecutorService queryExecutor = Executors.newSingleThreadExecutor(); + try (AsyncTransactionManager manager = client.transactionManagerAsync()) { + TransactionContextFuture context = manager.beginAsync(); + while (true) { + try { + final AtomicLong counter = new AtomicLong(); + AsyncTransactionStep count = + context.then( + new AsyncTransactionFunction() { + @Override + public ApiFuture apply(TransactionContext txn, Void input) + throws Exception { + AsyncResultSet rs = fn.apply(txn); + ApiFuture fut = + rs.setCallback( + queryExecutor, + new ReadyCallback() { + @Override + public CallbackResponse cursorReady(AsyncResultSet resultSet) { + while (true) { + switch (resultSet.tryNext()) { + case OK: + counter.incrementAndGet(); + break; + case DONE: + return CallbackResponse.DONE; + case NOT_READY: + return CallbackResponse.CONTINUE; + } + } + } + }); + return ApiFutures.transform( + fut, + new ApiFunction() { + @Override + public Long apply(Void input) { + return counter.get(); + } + }, + MoreExecutors.directExecutor()); + } + }, + executor); + CommitTimestampFuture ts = count.commitAsync(); + assertThat(get(ts)).isNotNull(); + assertThat(get(count)).isEqualTo(2); + assertThat(failOnInvalidatedSession).isFalse(); + break; + } catch (AbortedException e) { + context = manager.resetForRetryAsync(); + } + } + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } finally { + queryExecutor.shutdown(); + } + } + + @Test + public void asyncTransactionManagerSelect() throws InterruptedException { + asyncTransactionManager_readSync( + new Function() { + @Override + public ResultSet apply(TransactionContext input) { + return input.executeQuery(SELECT1AND2); + } + }); + } + + @Test + public void asyncTransactionManagerRead() throws InterruptedException { + asyncTransactionManager_readSync( + new Function() { + @Override + public ResultSet apply(TransactionContext input) { + return input.read("FOO", KeySet.all(), Arrays.asList("BAR")); + } + }); + } + + @Test + public void asyncTransactionManagerReadUsingIndex() throws InterruptedException { + asyncTransactionManager_readSync( + new Function() { + @Override + public ResultSet apply(TransactionContext input) { + return input.readUsingIndex("FOO", "idx", KeySet.all(), Arrays.asList("BAR")); + } + }); + } + + private void asyncTransactionManager_readSync(final Function fn) + throws InterruptedException { + invalidateSessionPool(); + final ExecutorService queryExecutor = Executors.newSingleThreadExecutor(); + try (AsyncTransactionManager manager = client.transactionManagerAsync()) { + TransactionContextFuture context = manager.beginAsync(); + while (true) { + try { + AsyncTransactionStep count = + context.then( + new AsyncTransactionFunction() { + @Override + public ApiFuture apply(TransactionContext txn, Void input) + throws Exception { + long counter = 0L; + try (ResultSet rs = fn.apply(txn)) { + while (rs.next()) { + counter++; + } + } + return ApiFutures.immediateFuture(counter); + } + }, + executor); + CommitTimestampFuture ts = count.commitAsync(); + assertThat(get(ts)).isNotNull(); + assertThat(get(count)).isEqualTo(2); + assertThat(failOnInvalidatedSession).isFalse(); + break; + } catch (AbortedException e) { + context = manager.resetForRetryAsync(); + } + } + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } finally { + queryExecutor.shutdown(); + } + } + + @Test + public void asyncTransactionManagerReadRow() throws InterruptedException { + asyncTransactionManager_readRowFunction( + new Function>() { + @Override + public ApiFuture apply(TransactionContext input) { + return ApiFutures.immediateFuture( + input.readRow("FOO", Key.of("foo"), Arrays.asList("BAR"))); + } + }); + } + + @Test + public void asyncTransactionManagerReadRowUsingIndex() throws InterruptedException { + asyncTransactionManager_readRowFunction( + new Function>() { + @Override + public ApiFuture apply(TransactionContext input) { + return ApiFutures.immediateFuture( + input.readRowUsingIndex("FOO", "idx", Key.of("foo"), Arrays.asList("BAR"))); + } + }); + } + + @Test + public void asyncTransactionManagerReadRowAsync() throws InterruptedException { + asyncTransactionManager_readRowFunction( + new Function>() { + @Override + public ApiFuture apply(TransactionContext input) { + return input.readRowAsync("FOO", Key.of("foo"), Arrays.asList("BAR")); + } + }); + } + + @Test + public void asyncTransactionManagerReadRowUsingIndexAsync() throws InterruptedException { + asyncTransactionManager_readRowFunction( + new Function>() { + @Override + public ApiFuture apply(TransactionContext input) { + return input.readRowUsingIndexAsync("FOO", "idx", Key.of("foo"), Arrays.asList("BAR")); + } + }); + } + + private void asyncTransactionManager_readRowFunction( + final Function> fn) throws InterruptedException { + invalidateSessionPool(); + final ExecutorService queryExecutor = Executors.newSingleThreadExecutor(); + try (AsyncTransactionManager manager = client.transactionManagerAsync()) { + TransactionContextFuture context = manager.beginAsync(); + while (true) { + try { + AsyncTransactionStep row = + context.then( + new AsyncTransactionFunction() { + @Override + public ApiFuture apply(TransactionContext txn, Void input) + throws Exception { + return fn.apply(txn); + } + }, + executor); + CommitTimestampFuture ts = row.commitAsync(); + assertThat(get(ts)).isNotNull(); + assertThat(get(row)).isEqualTo(Struct.newBuilder().set("BAR").to(1L).build()); + assertThat(failOnInvalidatedSession).isFalse(); + break; + } catch (AbortedException e) { + context = manager.resetForRetryAsync(); + } + } + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } finally { + queryExecutor.shutdown(); + } + } + + @Test + public void asyncTransactionManagerUpdateAsync() throws InterruptedException { + asyncTransactionManager_updateFunction( + new Function>() { + @Override + public ApiFuture apply(TransactionContext input) { + return input.executeUpdateAsync(UPDATE_STATEMENT); + } + }, + UPDATE_COUNT); + } + + @Test + public void asyncTransactionManagerUpdate() throws InterruptedException { + asyncTransactionManager_updateFunction( + new Function>() { + @Override + public ApiFuture apply(TransactionContext input) { + return ApiFutures.immediateFuture(input.executeUpdate(UPDATE_STATEMENT)); + } + }, + UPDATE_COUNT); + } + + @Test + public void asyncTransactionManagerBatchUpdateAsync() throws InterruptedException { + asyncTransactionManager_updateFunction( + new Function>() { + @Override + public ApiFuture apply(TransactionContext input) { + return input.batchUpdateAsync(Arrays.asList(UPDATE_STATEMENT, UPDATE_STATEMENT)); + } + }, + new long[] {UPDATE_COUNT, UPDATE_COUNT}); + } + + @Test + public void asyncTransactionManagerBatchUpdate() throws InterruptedException { + asyncTransactionManager_updateFunction( + new Function>() { + @Override + public ApiFuture apply(TransactionContext input) { + return ApiFutures.immediateFuture( + input.batchUpdate(Arrays.asList(UPDATE_STATEMENT, UPDATE_STATEMENT))); + } + }, + new long[] {UPDATE_COUNT, UPDATE_COUNT}); + } + + private void asyncTransactionManager_updateFunction( + final Function> fn, T expected) throws InterruptedException { + invalidateSessionPool(); + try (AsyncTransactionManager manager = client.transactionManagerAsync()) { + TransactionContextFuture transaction = manager.beginAsync(); + while (true) { + try { + AsyncTransactionStep res = + transaction.then( + new AsyncTransactionFunction() { + @Override + public ApiFuture apply(TransactionContext txn, Void input) throws Exception { + return fn.apply(txn); + } + }, + executor); + CommitTimestampFuture ts = res.commitAsync(); + assertThat(get(res)).isEqualTo(expected); + assertThat(get(ts)).isNotNull(); + break; + } catch (AbortedException e) { + transaction = manager.resetForRetryAsync(); + } + } + assertThat(failOnInvalidatedSession).isFalse(); + } catch (SessionNotFoundException e) { + assertThat(failOnInvalidatedSession).isTrue(); + } + } }