Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:shutdown stuck when there is error on the flush path #831

Merged
merged 2 commits into from Feb 16, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -932,6 +932,7 @@ public void onComplete() {
public void onError(Throwable t) {
LOG.fine("OnError called");
if (streamWriter.shutdown.get()) {
abortInflightRequests(t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are two more issues:

  1. Even if you abort the inflight request here, the shutdown can still add more requests through writeAllOutstanding();
  2. Besides, it is also possible that another thread is calling append, and will contribute another stuck request at https://screenshot.googleplex.com/7pmsyb2zyKDEaGF

return;
}
InflightBatch inflightBatch = null;
Expand Down
Expand Up @@ -1015,4 +1015,37 @@ public void testDatasetTraceId() throws Exception {
assertEquals("Dataflow", testBigQueryWrite.getAppendRequests().get(0).getTraceId());
assertEquals("", testBigQueryWrite.getAppendRequests().get(1).getTraceId());
}

@Test
public void testShutdownWithConnectionError() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(1L)
.build())
.build();
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build())
.build());
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10));

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
Thread.sleep(5000L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for?

fakeExecutor.advanceTime(Duration.ofSeconds(20));
// Shutdown writer immediately and there will be some error happened when flushing the queue.
writer.shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this. You advance the time before shutdown. So even in the old code path before your fix, you can reduce the only inflight requests in the queue and unblock. Will this unit test block before your fix?

Please think carefully how to write a unit test.

assertEquals(1, appendFuture1.get().getAppendResult().getOffset().getValue());
try {
appendFuture2.get();
fail("Should fail with exception");
} catch (java.util.concurrent.ExecutionException e) {
LOG.info("got: " + e.toString());
yirutang marked this conversation as resolved.
Show resolved Hide resolved
}
}
}