From 80d68cf8a9810ea9ffa0075522d5c40e457623d2 Mon Sep 17 00:00:00 2001 From: yirutang Date: Wed, 17 Feb 2021 15:44:57 -0800 Subject: [PATCH 1/9] fix: Remove reconnection logic --- .../storage/v1beta2/StreamWriter.java | 40 +---------- .../storage/v1beta2/StreamWriterTest.java | 66 ------------------- 2 files changed, 2 insertions(+), 104 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java index 7bbac03708..f7598e4c4c 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java @@ -943,44 +943,8 @@ public void onError(Throwable t) { } inflightBatch = this.inflightBatches.poll(); } - streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); - if (isRecoverableError(t)) { - try { - if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts() - && !streamWriter.shutdown.get()) { - synchronized (streamWriter.currentRetries) { - streamWriter.currentRetries++; - } - LOG.info( - "Try to reestablish connection due to transient error: " - + t.toString() - + " retry times: " - + streamWriter.currentRetries); - streamWriter.refreshAppend(); - LOG.info("Resending requests on after connection established"); - streamWriter.writeBatch(inflightBatch); - } else { - inflightBatch.onFailure(t); - abortInflightRequests(t); - synchronized (streamWriter.currentRetries) { - streamWriter.currentRetries = 0; - } - } - } catch (InterruptedException e) { - LOG.info("Got exception while retrying: " + e.toString()); - inflightBatch.onFailure(new StatusRuntimeException(Status.ABORTED)); - abortInflightRequests(new StatusRuntimeException(Status.ABORTED)); - synchronized (streamWriter.currentRetries) { - streamWriter.currentRetries = 0; - } - } - } else { - inflightBatch.onFailure(t); - abortInflightRequests(t); - synchronized (streamWriter.currentRetries) { - streamWriter.currentRetries = 0; - } - } + inflightBatch.onFailure(t); + abortInflightRequests(t); } }; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java index 2b67f9f3d1..f26840e7d3 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java @@ -510,42 +510,6 @@ public void testFlowControlBehaviorException() throws Exception { } } - @Test - public void testStreamReconnectionTransient() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setDelayThreshold(Duration.ofSeconds(100000)) - .setElementCountThreshold(1L) - .setFlowControlSettings( - StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS - .toBuilder() - .setMaxOutstandingElementCount(1L) - .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) - .build()) - .build()) - .build(); - - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) - .build()); - testBigQueryWrite.addException(new StatusRuntimeException(Status.UNAVAILABLE)); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) - .build()); - ApiFuture future1 = sendTestMessage(writer, new String[] {"m1"}); - ApiFuture future2 = sendTestMessage(writer, new String[] {"m1"}); - assertEquals(0L, future1.get().getAppendResult().getOffset().getValue()); - assertEquals(1L, future2.get().getAppendResult().getOffset().getValue()); - writer.close(); - } - @Test public void testStreamReconnectionPermanant() throws Exception { StreamWriter writer = @@ -569,36 +533,6 @@ public void testStreamReconnectionPermanant() throws Exception { writer.close(); } - @Test - public void testStreamReconnectionExceedRetry() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setDelayThreshold(Duration.ofSeconds(100000)) - .setElementCountThreshold(1L) - .build()) - .setRetrySettings( - RetrySettings.newBuilder() - .setMaxRetryDelay(Duration.ofMillis(100)) - .setMaxAttempts(1) - .build()) - .build(); - assertEquals(1, writer.getRetrySettings().getMaxAttempts()); - StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE); - testBigQueryWrite.addException(transientError); - testBigQueryWrite.addException(transientError); - ApiFuture future3 = sendTestMessage(writer, new String[] {"toomanyretry"}); - try { - future3.get(); - Assert.fail("This should fail."); - } catch (ExecutionException e) { - assertEquals(transientError.toString(), e.getCause().getCause().toString()); - } - writer.close(); - } - @Test public void testOffset() throws Exception { try (StreamWriter writer = From d209b1f11cff60a9044e605ce553666fad692c83 Mon Sep 17 00:00:00 2001 From: yirutang Date: Wed, 17 Feb 2021 15:57:38 -0800 Subject: [PATCH 2/9] . --- .../storage/v1beta2/StreamWriter.java | 5 +-- .../storage/v1beta2/StreamWriterTest.java | 38 ------------------- 2 files changed, 2 insertions(+), 41 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java index f7598e4c4c..6d1232ed60 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java @@ -65,8 +65,7 @@ * without offset, please use a simpler writer {@code DirectWriter}. * *

A {@link StreamWrier} provides built-in capabilities to: handle batching of messages; - * controlling memory utilization (through flow control); automatic connection re-establishment and - * request cleanup (only keeps write schema on first request in the stream). + * controlling memory utilization (through flow control); request cleanup (only keeps write schema on first request in the stream). * *

With customizable options that control: * @@ -496,6 +495,7 @@ private void onFailure(Throwable t) { return; } else { LOG.info("Setting " + t.toString() + " on response"); + streamWriter.messagesWaiter.release(this.getByteSize()); this.streamWriter.setException(t); } @@ -871,7 +871,6 @@ private void abortInflightRequests(Throwable t) { t, GrpcStatusCode.of(Status.Code.ABORTED), true)); - streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); } } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java index f26840e7d3..5670eed541 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java @@ -32,7 +32,6 @@ import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.api.gax.grpc.testing.MockGrpcService; import com.google.api.gax.grpc.testing.MockServiceHelper; -import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.DataLossException; import com.google.cloud.bigquery.storage.test.Test.FooType; import com.google.common.base.Strings; @@ -891,43 +890,6 @@ public void testFlushAll() throws Exception { writer.close(); } - @Test - public void testFlushAllFailed() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(2L) - .setDelayThreshold(Duration.ofSeconds(100000)) - .build()) - .build(); - - testBigQueryWrite.addException(Status.DATA_LOSS.asException()); - testBigQueryWrite.addException(Status.DATA_LOSS.asException()); - testBigQueryWrite.addException(Status.DATA_LOSS.asException()); - - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); - ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); - ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); - - assertFalse(appendFuture3.isDone()); - try { - writer.flushAll(100000); - fail("Should have thrown an Exception"); - } catch (Exception expected) { - if (expected.getCause() instanceof com.google.api.gax.rpc.DataLossException) { - LOG.info("got: " + expected.toString()); - } else { - fail("Unexpected exception:" + expected.toString()); - } - } - - assertTrue(appendFuture3.isDone()); - - writer.close(); - } - @Test public void testDatasetTraceId() throws Exception { StreamWriter writer = From 76a4ab65bbbffe8937da0881a04f085e9c6b3382 Mon Sep 17 00:00:00 2001 From: yirutang Date: Wed, 17 Feb 2021 16:11:55 -0800 Subject: [PATCH 3/9] . --- .../google/cloud/bigquery/storage/v1beta2/StreamWriter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java index 6d1232ed60..396cd5ac22 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java @@ -65,7 +65,8 @@ * without offset, please use a simpler writer {@code DirectWriter}. * *

A {@link StreamWrier} provides built-in capabilities to: handle batching of messages; - * controlling memory utilization (through flow control); request cleanup (only keeps write schema on first request in the stream). + * controlling memory utilization (through flow control); request cleanup (only keeps write schema + * on first request in the stream). * *

With customizable options that control: * From 9183173cf3d415ed3c50670dde70d734499c40de Mon Sep 17 00:00:00 2001 From: yirutang Date: Wed, 17 Feb 2021 17:20:41 -0800 Subject: [PATCH 4/9] . --- .../cloud/bigquery/storage/v1beta2/StreamWriter.java | 10 +++++++--- .../google/cloud/bigquery/storage/v1beta2/Waiter.java | 8 +++++++- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java index 396cd5ac22..4717791664 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java @@ -65,8 +65,8 @@ * without offset, please use a simpler writer {@code DirectWriter}. * *

A {@link StreamWrier} provides built-in capabilities to: handle batching of messages; - * controlling memory utilization (through flow control); request cleanup (only keeps write schema - * on first request in the stream). + * controlling memory utilization (through flow control) and request cleanup (only keeps write + * schema on first request in the stream). * *

With customizable options that control: * @@ -496,7 +496,6 @@ private void onFailure(Throwable t) { return; } else { LOG.info("Setting " + t.toString() + " on response"); - streamWriter.messagesWaiter.release(this.getByteSize()); this.streamWriter.setException(t); } @@ -872,6 +871,7 @@ private void abortInflightRequests(Throwable t) { t, GrpcStatusCode.of(Status.Code.ABORTED), true)); + streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); } } } @@ -943,8 +943,12 @@ public void onError(Throwable t) { } inflightBatch = this.inflightBatches.poll(); } + streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); inflightBatch.onFailure(t); abortInflightRequests(t); + synchronized (streamWriter.currentRetries) { + streamWriter.currentRetries = 0; + } } }; diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/Waiter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/Waiter.java index fd2efc489c..4422f53b32 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/Waiter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/Waiter.java @@ -64,10 +64,16 @@ private void notifyNextAcquires() { } } - public synchronized void release(long messageSize) { + public synchronized void release(long messageSize) throws IllegalStateException { lock.lock(); --pendingCount; + if (pendingCount < 0) { + throw new IllegalStateException("pendingCount cannot be less than 0"); + } pendingSize -= messageSize; + if (pendingSize < 0) { + throw new IllegalStateException("pendingSize cannot be less than 0"); + } notifyNextAcquires(); lock.unlock(); notifyAll(); From 3d081e2b7f9fedd2f5ac05751629df6e416015b9 Mon Sep 17 00:00:00 2001 From: yirutang Date: Wed, 17 Feb 2021 17:29:37 -0800 Subject: [PATCH 5/9] . --- .../storage/v1beta2/StreamWriter.java | 41 ++++++++----------- .../storage/v1beta2/StreamWriterTest.java | 2 +- 2 files changed, 18 insertions(+), 25 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java index 4717791664..45521c0b9d 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java @@ -863,14 +863,20 @@ public void onStart(StreamController controller) { private void abortInflightRequests(Throwable t) { synchronized (this.inflightBatches) { + boolean first_error = true; while (!this.inflightBatches.isEmpty()) { InflightBatch inflightBatch = this.inflightBatches.poll(); - inflightBatch.onFailure( - new AbortedException( - "Request aborted due to previous failures", - t, - GrpcStatusCode.of(Status.Code.ABORTED), - true)); + if (first_error) { + inflightBatch.onFailure(t); + first_error = true; + } else { + inflightBatch.onFailure( + new AbortedException( + "Request aborted due to previous failures", + t, + GrpcStatusCode.of(Status.Code.ABORTED), + true)); + } streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); } } @@ -913,7 +919,11 @@ public void onResponse(AppendRowsResponse response) { response.getAppendResult().getOffset().getValue(), inflightBatch.getExpectedOffset())); inflightBatch.onFailure(exception); - abortInflightRequests(exception); + abortInflightRequests(new AbortedException( + "Request aborted due to previous failures", + exception, + GrpcStatusCode.of(Status.Code.ABORTED), + true)); } else { inflightBatch.onSuccess(response); } @@ -931,24 +941,7 @@ public void onComplete() { @Override public void onError(Throwable t) { LOG.fine("OnError called"); - if (streamWriter.shutdown.get()) { - abortInflightRequests(t); - return; - } - InflightBatch inflightBatch = null; - synchronized (this.inflightBatches) { - if (inflightBatches.isEmpty()) { - // The batches could have been aborted. - return; - } - inflightBatch = this.inflightBatches.poll(); - } - streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); - inflightBatch.onFailure(t); abortInflightRequests(t); - synchronized (streamWriter.currentRetries) { - streamWriter.currentRetries = 0; - } } }; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java index 5670eed541..86e3ea7b6d 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java @@ -942,7 +942,7 @@ public void testShutdownWithConnectionError() throws Exception { appendFuture2.get(); fail("Should fail with exception"); } catch (java.util.concurrent.ExecutionException e) { - assertEquals("Request aborted due to previous failures", e.getCause().getMessage()); + assertEquals("io.grpc.StatusRuntimeException: DATA_LOSS", e.getCause().getMessage()); } } } From 38c720f438c42eac7dca07dc6eac1f3ba947d19a Mon Sep 17 00:00:00 2001 From: yirutang Date: Wed, 17 Feb 2021 17:36:06 -0800 Subject: [PATCH 6/9] . --- .../cloud/bigquery/storage/v1beta2/StreamWriter.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java index 45521c0b9d..f1d7069690 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java @@ -919,11 +919,12 @@ public void onResponse(AppendRowsResponse response) { response.getAppendResult().getOffset().getValue(), inflightBatch.getExpectedOffset())); inflightBatch.onFailure(exception); - abortInflightRequests(new AbortedException( - "Request aborted due to previous failures", - exception, - GrpcStatusCode.of(Status.Code.ABORTED), - true)); + abortInflightRequests( + new AbortedException( + "Request aborted due to previous failures", + exception, + GrpcStatusCode.of(Status.Code.ABORTED), + true)); } else { inflightBatch.onSuccess(response); } From 3a96467d09b9b4aa54421d31a9c547e9d0ceef01 Mon Sep 17 00:00:00 2001 From: yirutang Date: Wed, 17 Feb 2021 18:22:27 -0800 Subject: [PATCH 7/9] . --- .../storage/v1beta2/StreamWriter.java | 2 +- .../storage/v1beta2/StreamWriterTest.java | 29 +++++++++++++++---- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java index f1d7069690..897d379143 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java @@ -868,7 +868,7 @@ private void abortInflightRequests(Throwable t) { InflightBatch inflightBatch = this.inflightBatches.poll(); if (first_error) { inflightBatch.onFailure(t); - first_error = true; + first_error = false; } else { inflightBatch.onFailure( new AbortedException( diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java index 86e3ea7b6d..dfbfba6eb0 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java @@ -32,6 +32,7 @@ import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.api.gax.grpc.testing.MockGrpcService; import com.google.api.gax.grpc.testing.MockServiceHelper; +import com.google.api.gax.rpc.AbortedException; import com.google.api.gax.rpc.DataLossException; import com.google.cloud.bigquery.storage.test.Test.FooType; import com.google.common.base.Strings; @@ -598,7 +599,7 @@ public void testOffsetMismatch() throws Exception { @Test public void testErrorPropagation() throws Exception { - try (StreamWriter writer = + StreamWriter writer = getTestStreamWriterBuilder() .setExecutorProvider(SINGLE_THREAD_EXECUTOR) .setBatchingSettings( @@ -607,13 +608,23 @@ public void testErrorPropagation() throws Exception { .setElementCountThreshold(1L) .setDelayThreshold(Duration.ofSeconds(5)) .build()) - .build()) { - testBigQueryWrite.addException(Status.DATA_LOSS.asException()); - sendTestMessage(writer, new String[] {"A"}).get(); + .build(); + testBigQueryWrite.addException(Status.DATA_LOSS.asException()); + testBigQueryWrite.addException(Status.DATA_LOSS.asException()); + ApiFuture future1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture future2 = sendTestMessage(writer, new String[] {"B"}); + try { + future1.get(); fail("should throw exception"); } catch (ExecutionException e) { assertThat(e.getCause()).isInstanceOf(DataLossException.class); } + try { + future2.get(); + fail("should throw exception"); + } catch (ExecutionException e) { + assertThat(e.getCause()).isInstanceOf(AbortedException.class); + } } @Test @@ -928,10 +939,12 @@ public void testShutdownWithConnectionError() throws Exception { AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) .build()); testBigQueryWrite.addException(Status.DATA_LOSS.asException()); + testBigQueryWrite.addException(Status.DATA_LOSS.asException()); testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10)); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"B"}); Thread.sleep(5000L); // Move the needle for responses to be sent. fakeExecutor.advanceTime(Duration.ofSeconds(20)); @@ -940,9 +953,15 @@ public void testShutdownWithConnectionError() throws Exception { assertEquals(1, appendFuture1.get().getAppendResult().getOffset().getValue()); try { appendFuture2.get(); - fail("Should fail with exception"); + fail("Should fail with exception future2"); } catch (java.util.concurrent.ExecutionException e) { assertEquals("io.grpc.StatusRuntimeException: DATA_LOSS", e.getCause().getMessage()); } + try { + appendFuture3.get(); + fail("Should fail with exception future3"); + } catch (java.util.concurrent.ExecutionException e) { + assertEquals("Request aborted due to previous failures", e.getCause().getMessage()); + } } } From e080c4daef22f6b763550cdaf6d47f1458a08770 Mon Sep 17 00:00:00 2001 From: yirutang Date: Wed, 17 Feb 2021 18:24:30 -0800 Subject: [PATCH 8/9] . --- .../cloud/bigquery/storage/v1beta2/StreamWriterTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java index dfbfba6eb0..e6e4919f6c 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java @@ -955,13 +955,13 @@ public void testShutdownWithConnectionError() throws Exception { appendFuture2.get(); fail("Should fail with exception future2"); } catch (java.util.concurrent.ExecutionException e) { - assertEquals("io.grpc.StatusRuntimeException: DATA_LOSS", e.getCause().getMessage()); + assertThat(e.getCause()).isInstanceOf(DataLossException.class); } try { appendFuture3.get(); fail("Should fail with exception future3"); - } catch (java.util.concurrent.ExecutionException e) { - assertEquals("Request aborted due to previous failures", e.getCause().getMessage()); + } catch (ExecutionException e) { + assertThat(e.getCause()).isInstanceOf(AbortedException.class); } } } From 57426c44c63b51864a44f2b3f2c8b160a9a35a75 Mon Sep 17 00:00:00 2001 From: yirutang Date: Wed, 17 Feb 2021 18:24:59 -0800 Subject: [PATCH 9/9] . --- .../google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java index e6e4919f6c..aeb25a2fcc 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java @@ -954,7 +954,7 @@ public void testShutdownWithConnectionError() throws Exception { try { appendFuture2.get(); fail("Should fail with exception future2"); - } catch (java.util.concurrent.ExecutionException e) { + } catch (ExecutionException e) { assertThat(e.getCause()).isInstanceOf(DataLossException.class); } try {