diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java index b9dd306935..c078fa41e3 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java @@ -250,10 +250,15 @@ public ApiFuture append(AppendRowsRequest message) { * @throws IOException */ public void refreshAppend() throws IOException, InterruptedException { + LOG.info("Establish a write connection."); synchronized (this) { - Preconditions.checkState(!shutdown.get(), "Cannot shut down on a shut-down writer."); + if (shutdown.get()) { + LOG.warning("Cannot refresh on a already shutdown writer."); + return; + } // There could be a moment, stub is not yet initialized. if (clientStream != null) { + LOG.info("Closing the stream"); clientStream.closeSend(); } messagesBatch.resetAttachSchema(); @@ -348,13 +353,15 @@ private static final class InflightBatch { final List inflightRequests; // A list tracks expected offset for each AppendRequest. Used to reconstruct the Response // future. - final ArrayList offsetList; - final long creationTime; - int attempt; - long batchSizeBytes; - long expectedOffset; - Boolean attachSchema; - String streamName; + private final ArrayList offsetList; + private final long creationTime; + private int attempt; + private long batchSizeBytes; + private long expectedOffset; + private Boolean attachSchema; + private String streamName; + + private final AtomicBoolean failed; InflightBatch( List inflightRequests, @@ -376,6 +383,7 @@ private static final class InflightBatch { this.batchSizeBytes = batchSizeBytes; this.attachSchema = attachSchema; this.streamName = streamName; + this.failed = new AtomicBoolean(false); } int count() { @@ -417,6 +425,13 @@ private AppendRowsRequest getMergedRequest() throws IllegalStateException { } private void onFailure(Throwable t) { + if (failed.getAndSet(true)) { + // Error has been set already. + LOG.warning("Ignore " + t.toString() + " since error has already been set"); + return; + } else { + LOG.fine("Setting " + t.toString() + " on response"); + } for (AppendRequestAndFutureResponse request : inflightRequests) { request.appendResult.setException(t); } @@ -838,8 +853,10 @@ public void onError(Throwable t) { } inflightBatch.onFailure(t); try { - // Establish a new connection. - streamWriter.refreshAppend(); + if (!streamWriter.shutdown.get()) { + // Establish a new connection. + streamWriter.refreshAppend(); + } } catch (IOException | InterruptedException e) { LOG.info("Failed to establish a new connection"); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java index f9a57ff760..7e8babb2dd 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java @@ -424,18 +424,25 @@ public void testFlowControlBehaviorException() throws Exception { .build()) .build()) .build()) { + assertEquals( + 1L, + writer + .getBatchingSettings() + .getFlowControlSettings() + .getMaxOutstandingElementCount() + .longValue()); testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build()); + testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10)); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + // Wait is necessary for response to be scheduled before timer is advanced. + Thread.sleep(5000L); + fakeExecutor.advanceTime(Duration.ofSeconds(10)); try { appendFuture2.get(); Assert.fail("This should fail"); } catch (Exception e) { - if (!e.getMessage().equals("The maximum number of batch elements: 1 have been reached.")) { - LOG.info("More error info:"); - e.printStackTrace(); - } assertEquals( "java.util.concurrent.ExecutionException: The maximum number of batch elements: 1 have been reached.", e.toString()); @@ -505,6 +512,7 @@ public void testStreamReconnectionExceedRetry() throws Exception { .setMaxAttempts(1) .build()) .build(); + assertEquals(1, writer.getRetrySettings().getMaxAttempts()); StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE); testBigQueryWrite.addException(transientError); testBigQueryWrite.addException(transientError); @@ -818,7 +826,7 @@ public void testAwaitTermination() throws Exception { testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); writer.shutdown(); - assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES)); + assertTrue(writer.awaitTermination(2, TimeUnit.MINUTES)); } @Test