From c2fd750e1309fb7b6eb862dea1ad8546dcd78bef Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Tue, 16 Feb 2021 12:29:23 -0800 Subject: [PATCH] fix:shutdown stuck when there is error on the flush path (#831) * fix:When the client is shutdown and the flush path has errors, we fail to release the requests in the queue, causing shutdown to wait forever * . --- .../storage/v1beta2/StreamWriter.java | 1 + .../storage/v1beta2/StreamWriterTest.java | 34 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java index ad6b6ddba5..7bbac03708 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java @@ -932,6 +932,7 @@ public void onComplete() { public void onError(Throwable t) { LOG.fine("OnError called"); if (streamWriter.shutdown.get()) { + abortInflightRequests(t); return; } InflightBatch inflightBatch = null; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java index 169c815e91..2b67f9f3d1 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java @@ -1015,4 +1015,38 @@ public void testDatasetTraceId() throws Exception { assertEquals("Dataflow", testBigQueryWrite.getAppendRequests().get(0).getTraceId()); assertEquals("", testBigQueryWrite.getAppendRequests().get(1).getTraceId()); } + + @Test + public void testShutdownWithConnectionError() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .build()) + .build(); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) + .build()); + testBigQueryWrite.addException(Status.DATA_LOSS.asException()); + testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10)); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + Thread.sleep(5000L); + // Move the needle for responses to be sent. + fakeExecutor.advanceTime(Duration.ofSeconds(20)); + // Shutdown writer immediately and there will be some error happened when flushing the queue. + writer.shutdown(); + assertEquals(1, appendFuture1.get().getAppendResult().getOffset().getValue()); + try { + appendFuture2.get(); + fail("Should fail with exception"); + } catch (java.util.concurrent.ExecutionException e) { + assertEquals("Request aborted due to previous failures", e.getCause().getMessage()); + } + } }