New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix:shutdown stuck when there is error on the flush path #831
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1015,4 +1015,37 @@ public void testDatasetTraceId() throws Exception { | |
assertEquals("Dataflow", testBigQueryWrite.getAppendRequests().get(0).getTraceId()); | ||
assertEquals("", testBigQueryWrite.getAppendRequests().get(1).getTraceId()); | ||
} | ||
|
||
@Test | ||
public void testShutdownWithConnectionError() throws Exception { | ||
StreamWriter writer = | ||
getTestStreamWriterBuilder() | ||
.setBatchingSettings( | ||
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS | ||
.toBuilder() | ||
.setElementCountThreshold(1L) | ||
.build()) | ||
.build(); | ||
testBigQueryWrite.addResponse( | ||
AppendRowsResponse.newBuilder() | ||
.setAppendResult( | ||
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) | ||
.build()); | ||
testBigQueryWrite.addException(Status.DATA_LOSS.asException()); | ||
testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10)); | ||
|
||
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"}); | ||
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"}); | ||
Thread.sleep(5000L); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is this for? |
||
fakeExecutor.advanceTime(Duration.ofSeconds(20)); | ||
// Shutdown writer immediately and there will be some error happened when flushing the queue. | ||
writer.shutdown(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand this. You advance the time before shutdown. So even in the old code path before your fix, you can reduce the only inflight requests in the queue and unblock. Will this unit test block before your fix? Please think carefully how to write a unit test. |
||
assertEquals(1, appendFuture1.get().getAppendResult().getOffset().getValue()); | ||
try { | ||
appendFuture2.get(); | ||
fail("Should fail with exception"); | ||
} catch (java.util.concurrent.ExecutionException e) { | ||
LOG.info("got: " + e.toString()); | ||
yirutang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are two more issues: