diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java index 35ed4648af..d2c63bb78e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java @@ -252,6 +252,13 @@ public void run() { if (cursorReturnedDoneOrException) { break; } + if (state == State.CANCELLED) { + // The callback should always get at least one chance to catch the CANCELLED + // exception. It is however possible that the callback does not call tryNext(), and + // instead directly returns PAUSE or DONE. In those cases, the callback runner should + // also stop, even though the callback has not seen the CANCELLED state. + cursorReturnedDoneOrException = true; + } } CallbackResponse response; try { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java index 32e28295c5..49c698b06a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java @@ -16,7 +16,9 @@ package com.google.cloud.spanner; +import static com.google.cloud.spanner.SpannerApiFutures.get; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; @@ -371,6 +373,33 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable { } } + @Test + public void testCallbackIsNotCalledWhilePausedAndCanceled() + throws InterruptedException, ExecutionException { + Executor executor = Executors.newSingleThreadExecutor(); + ResultSet delegate = mock(ResultSet.class); + + final AtomicInteger callbackCounter = new AtomicInteger(); + ApiFuture callbackResult; + + try (AsyncResultSetImpl rs = + new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { + callbackResult = + rs.setCallback( + executor, + resultSet -> { + callbackCounter.getAndIncrement(); + return CallbackResponse.PAUSE; + }); + + rs.cancel(); + + SpannerException exception = assertThrows(SpannerException.class, () -> get(callbackResult)); + assertEquals(ErrorCode.CANCELLED, exception.getErrorCode()); + assertEquals(1, callbackCounter.get()); + } + } + @Test public void cancel() throws InterruptedException { Executor executor = Executors.newSingleThreadExecutor();