Skip to content

Commit

Permalink
fix:shutdown stuck when there is error on the flush path (#831)
Browse files Browse the repository at this point in the history
* fix:When the client is shutdown and the flush path has errors, we fail to release the requests in the queue, causing shutdown to wait forever

* .
  • Loading branch information
yirutang committed Feb 16, 2021
1 parent 94c7848 commit c2fd750
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 0 deletions.
Expand Up @@ -932,6 +932,7 @@ public void onComplete() {
public void onError(Throwable t) {
LOG.fine("OnError called");
if (streamWriter.shutdown.get()) {
abortInflightRequests(t);
return;
}
InflightBatch inflightBatch = null;
Expand Down
Expand Up @@ -1015,4 +1015,38 @@ public void testDatasetTraceId() throws Exception {
assertEquals("Dataflow", testBigQueryWrite.getAppendRequests().get(0).getTraceId());
assertEquals("", testBigQueryWrite.getAppendRequests().get(1).getTraceId());
}

@Test
public void testShutdownWithConnectionError() throws Exception {
StreamWriter writer =
getTestStreamWriterBuilder()
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(1L)
.build())
.build();
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build())
.build());
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10));

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
Thread.sleep(5000L);
// Move the needle for responses to be sent.
fakeExecutor.advanceTime(Duration.ofSeconds(20));
// Shutdown writer immediately and there will be some error happened when flushing the queue.
writer.shutdown();
assertEquals(1, appendFuture1.get().getAppendResult().getOffset().getValue());
try {
appendFuture2.get();
fail("Should fail with exception");
} catch (java.util.concurrent.ExecutionException e) {
assertEquals("Request aborted due to previous failures", e.getCause().getMessage());
}
}
}

0 comments on commit c2fd750

Please sign in to comment.