diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java index a979e1fa61..bd1668c43f 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java @@ -27,6 +27,11 @@ /** Exceptions for Storage Client Libraries. */ public final class Exceptions { + public static class WriterClosedException extends Exception { + public WriterClosedException(String streamName) { + super("Writer closed on: " + streamName); + } + } /** Main Storage Exception. Might contain map of streams to errors for that stream. */ public static class StorageException extends RuntimeException { diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 1cac296a03..de6e8333d5 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -502,11 +502,13 @@ private AppendRowsRequest prepareRequestBasedOnPosition( } private void cleanupInflightRequests() { - Throwable finalStatus; + Throwable finalStatus = new Exceptions.WriterClosedException(streamName); Deque localQueue = new LinkedList(); this.lock.lock(); try { - finalStatus = this.connectionFinalStatus; + if (this.connectionFinalStatus != null) { + finalStatus = this.connectionFinalStatus; + } while (!this.inflightRequestQueue.isEmpty()) { localQueue.addLast(pollInflightRequestQueue()); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index 92740d2a4e..692f6ce9bc 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -638,4 +638,12 @@ public void testRetryAfterAllRecordsInflight() throws Exception { assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue()); } } + + @Test + public void testWriterClosedStream() throws Exception { + try (StreamWriter writer = getTestStreamWriter()) { + // Writer is closed without any traffic. + TimeUnit.SECONDS.sleep(1); + } + } }