Skip to content

Commit

Permalink
fix: stop invoking callback after pausing and cancelling result set (#…
Browse files Browse the repository at this point in the history
…1192)

An AsyncResultSet that was PAUSED and then CANCELLED, would continue to invoke the
callback until the callback would call tryNext(). If the callback never called tryNext(),
the spinning would continue until the entire result set had been consumed.

Fixes #1191
  • Loading branch information
olavloite committed May 19, 2021
1 parent 9935066 commit 78e6784
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
Expand Up @@ -252,6 +252,13 @@ public void run() {
if (cursorReturnedDoneOrException) {
break;
}
if (state == State.CANCELLED) {
// The callback should always get at least one chance to catch the CANCELLED
// exception. It is however possible that the callback does not call tryNext(), and
// instead directly returns PAUSE or DONE. In those cases, the callback runner should
// also stop, even though the callback has not seen the CANCELLED state.
cursorReturnedDoneOrException = true;
}
}
CallbackResponse response;
try {
Expand Down
Expand Up @@ -16,7 +16,9 @@

package com.google.cloud.spanner;

import static com.google.cloud.spanner.SpannerApiFutures.get;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -371,6 +373,33 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable {
}
}

@Test
public void testCallbackIsNotCalledWhilePausedAndCanceled()
throws InterruptedException, ExecutionException {
Executor executor = Executors.newSingleThreadExecutor();
ResultSet delegate = mock(ResultSet.class);

final AtomicInteger callbackCounter = new AtomicInteger();
ApiFuture<Void> callbackResult;

try (AsyncResultSetImpl rs =
new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) {
callbackResult =
rs.setCallback(
executor,
resultSet -> {
callbackCounter.getAndIncrement();
return CallbackResponse.PAUSE;
});

rs.cancel();

SpannerException exception = assertThrows(SpannerException.class, () -> get(callbackResult));
assertEquals(ErrorCode.CANCELLED, exception.getErrorCode());
assertEquals(1, callbackCounter.get());
}
}

@Test
public void cancel() throws InterruptedException {
Executor executor = Executors.newSingleThreadExecutor();
Expand Down

0 comments on commit 78e6784

Please sign in to comment.