Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: do not close delegate rs in callback runnable #425

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -275,7 +275,7 @@ public void run() {
switch (response) {
case DONE:
state = State.DONE;
closeDelegateResultSet();
cursorReturnedDoneOrException = true;
return;
case PAUSE:
state = State.PAUSED;
Expand Down
Expand Up @@ -284,6 +284,47 @@ public CallbackResponse cursorReady(AsyncResultSet resultSet) {
}
}

@Test
public void returnDoneBeforeEnd() throws Exception {
ExecutorProvider executorProvider = SpannerOptions.createDefaultAsyncExecutorProvider();
final Random random = new Random();
for (Executor executor :
new Executor[] {
MoreExecutors.directExecutor(), createExecService(), createExecService(32)
}) {
for (int bufferSize = 1; bufferSize < resultSetSize * 2; bufferSize *= 2) {
for (int i = 0; i < TEST_RUNS; i++) {
try (AsyncResultSetImpl impl =
new AsyncResultSetImpl(executorProvider, createResultSet(), bufferSize)) {
ApiFuture<Void> res =
impl.setCallback(
executor,
new ReadyCallback() {
@Override
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
switch (resultSet.tryNext()) {
case DONE:
return CallbackResponse.DONE;
case NOT_READY:
return random.nextBoolean()
? CallbackResponse.DONE
: CallbackResponse.CONTINUE;
case OK:
return random.nextInt(resultSetSize) <= 2
? CallbackResponse.DONE
: CallbackResponse.CONTINUE;
default:
throw new IllegalStateException();
}
}
});
assertThat(res.get(10L, TimeUnit.SECONDS)).isNull();
}
}
}
}
}

@Test
public void pauseResume() throws Exception {
ExecutorProvider executorProvider = SpannerOptions.createDefaultAsyncExecutorProvider();
Expand Down
Expand Up @@ -440,4 +440,26 @@ public CallbackResponse cursorReady(AsyncResultSet resultSet) {
assertThat(callbackCounter.get()).isEqualTo(1);
}
}

@Test
public void callbackReturnsDoneBeforeEnd_shouldStopIteration() throws Exception {
Executor executor = Executors.newSingleThreadExecutor();
ResultSet delegate = mock(ResultSet.class);
when(delegate.next()).thenReturn(true, true, true, false);
when(delegate.getCurrentRowAsStruct()).thenReturn(mock(Struct.class));
try (AsyncResultSetImpl rs =
new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) {
rs.setCallback(
executor,
new ReadyCallback() {
@Override
public CallbackResponse cursorReady(AsyncResultSet resultSet) {
// Not calling resultSet.tryNext() means that it will also never return DONE.
// Instead the callback indicates that it does not want any more rows.
return CallbackResponse.DONE;
}
});
rs.getResult().get(10L, TimeUnit.SECONDS);
}
}
}