From 78e678448782d5d16ba43ec7c10ab85b89059d88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 19 May 2021 10:11:33 +0200 Subject: [PATCH] fix: stop invoking callback after pausing and cancelling result set (#1192) An AsyncResultSet that was PAUSED and then CANCELLED, would continue to invoke the callback until the callback would call tryNext(). If the callback never called tryNext(), the spinning would continue until the entire result set had been consumed. Fixes #1191 --- .../cloud/spanner/AsyncResultSetImpl.java | 7 +++++ .../cloud/spanner/AsyncResultSetImplTest.java | 29 +++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java index 35ed4648af..d2c63bb78e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java @@ -252,6 +252,13 @@ public void run() { if (cursorReturnedDoneOrException) { break; } + if (state == State.CANCELLED) { + // The callback should always get at least one chance to catch the CANCELLED + // exception. It is however possible that the callback does not call tryNext(), and + // instead directly returns PAUSE or DONE. In those cases, the callback runner should + // also stop, even though the callback has not seen the CANCELLED state. + cursorReturnedDoneOrException = true; + } } CallbackResponse response; try { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java index 32e28295c5..49c698b06a 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java @@ -16,7 +16,9 @@ package com.google.cloud.spanner; +import static com.google.cloud.spanner.SpannerApiFutures.get; import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThrows; @@ -371,6 +373,33 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable { } } + @Test + public void testCallbackIsNotCalledWhilePausedAndCanceled() + throws InterruptedException, ExecutionException { + Executor executor = Executors.newSingleThreadExecutor(); + ResultSet delegate = mock(ResultSet.class); + + final AtomicInteger callbackCounter = new AtomicInteger(); + ApiFuture callbackResult; + + try (AsyncResultSetImpl rs = + new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) { + callbackResult = + rs.setCallback( + executor, + resultSet -> { + callbackCounter.getAndIncrement(); + return CallbackResponse.PAUSE; + }); + + rs.cancel(); + + SpannerException exception = assertThrows(SpannerException.class, () -> get(callbackResult)); + assertEquals(ErrorCode.CANCELLED, exception.getErrorCode()); + assertEquals(1, callbackCounter.get()); + } + } + @Test public void cancel() throws InterruptedException { Executor executor = Executors.newSingleThreadExecutor();