Skip to content

Commit

Permalink
fix: stop invoking callback after pausing and cancelling result set
Browse files Browse the repository at this point in the history
An AsyncResultSet that was PAUSED and then CANCELLED, would continue to invoke the
callback until the callback would call tryNext(). If the callback never called tryNext(),
the spinning would continue until the entire result set had been consumed.

Fixes #1191
  • Loading branch information
olavloite committed May 18, 2021
1 parent e70b009 commit c9c4b94
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
Expand Up @@ -252,6 +252,13 @@ public void run() {
if (cursorReturnedDoneOrException) {
break;
}
if (state == State.CANCELLED) {
// The callback should always get at least one chance to catch the CANCELLED
// exception. It is however possible that the callback does not call tryNext(), and
// instead directly returns PAUSE or DONE. In those cases, the callback runner should
// also stop, even though the callback has not seen the CANCELLED state.
cursorReturnedDoneOrException = true;
}
}
CallbackResponse response;
try {
Expand Down
Expand Up @@ -16,7 +16,9 @@

package com.google.cloud.spanner;

import static com.google.cloud.spanner.SpannerApiFutures.get;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
Expand Down Expand Up @@ -371,6 +373,33 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable {
}
}

@Test
public void testCallbackIsNotCalledWhilePausedAndCanceled()
throws InterruptedException, ExecutionException {
Executor executor = Executors.newSingleThreadExecutor();
ResultSet delegate = mock(ResultSet.class);

final AtomicInteger callbackCounter = new AtomicInteger();
ApiFuture<Void> callbackResult;

try (AsyncResultSetImpl rs =
new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) {
callbackResult =
rs.setCallback(
executor,
resultSet -> {
callbackCounter.getAndIncrement();
return CallbackResponse.PAUSE;
});

rs.cancel();

SpannerException exception = assertThrows(SpannerException.class, () -> get(callbackResult));
assertEquals(ErrorCode.CANCELLED, exception.getErrorCode());
assertEquals(1, callbackCounter.get());
}
}

@Test
public void cancel() throws InterruptedException {
Executor executor = Executors.newSingleThreadExecutor();
Expand Down

0 comments on commit c9c4b94

Please sign in to comment.