diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContextFutureImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContextFutureImpl.java index bc8262a535..be21a947d1 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContextFutureImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionContextFutureImpl.java @@ -121,6 +121,7 @@ public void onSuccess(I result) { @Override public void onFailure(Throwable t) { mgr.onError(t); + statementResult.setException(t); txnResult.setException(t); } @@ -132,6 +133,7 @@ public void onSuccess(O result) { MoreExecutors.directExecutor()); } catch (Throwable t) { mgr.onError(t); + statementResult.setException(t); txnResult.setException(t); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java index bf1f214a71..1d1b16d014 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerTest.java @@ -60,6 +60,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Test; import org.junit.runner.RunWith; @@ -1112,4 +1113,48 @@ public ApiFuture apply(TransactionContext txn, Struct input) } } } + + @Test + public void asyncTransactionManager_shouldPropagateStatementFailure() + throws ExecutionException, InterruptedException, TimeoutException { + DatabaseClient dbClient = client(); + try (AsyncTransactionManager transactionManager = dbClient.transactionManagerAsync()) { + TransactionContextFuture txnContextFuture = transactionManager.beginAsync(); + AsyncTransactionStep updateFuture = + txnContextFuture.then( + new AsyncTransactionFunction() { + @Override + public ApiFuture apply(TransactionContext txn, Void input) throws Exception { + return txn.executeUpdateAsync(INVALID_UPDATE_STATEMENT); + } + }, + executor); + final SettableApiFuture res = SettableApiFuture.create(); + ApiFutures.addCallback( + updateFuture, + new ApiFutureCallback() { + @Override + public void onFailure(Throwable throwable) { + // Check that we got the expected failure. + try { + assertThat(throwable).isInstanceOf(SpannerException.class); + SpannerException e = (SpannerException) throwable; + assertThat(e.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT); + assertThat(e.getMessage()).contains("invalid statement"); + res.set(null); + } catch (Throwable t) { + res.setException(t); + } + } + + @Override + public void onSuccess(Long aLong) { + res.setException(new AssertionError("Statement should not succeed.")); + } + }, + executor); + + assertThat(res.get(10L, TimeUnit.SECONDS)).isNull(); + } + } }