diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java index 7bbac03708..897d379143 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java @@ -65,8 +65,8 @@ * without offset, please use a simpler writer {@code DirectWriter}. * *

A {@link StreamWrier} provides built-in capabilities to: handle batching of messages; - * controlling memory utilization (through flow control); automatic connection re-establishment and - * request cleanup (only keeps write schema on first request in the stream). + * controlling memory utilization (through flow control) and request cleanup (only keeps write + * schema on first request in the stream). * *

With customizable options that control: * @@ -863,14 +863,20 @@ public void onStart(StreamController controller) { private void abortInflightRequests(Throwable t) { synchronized (this.inflightBatches) { + boolean first_error = true; while (!this.inflightBatches.isEmpty()) { InflightBatch inflightBatch = this.inflightBatches.poll(); - inflightBatch.onFailure( - new AbortedException( - "Request aborted due to previous failures", - t, - GrpcStatusCode.of(Status.Code.ABORTED), - true)); + if (first_error) { + inflightBatch.onFailure(t); + first_error = false; + } else { + inflightBatch.onFailure( + new AbortedException( + "Request aborted due to previous failures", + t, + GrpcStatusCode.of(Status.Code.ABORTED), + true)); + } streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); } } @@ -913,7 +919,12 @@ public void onResponse(AppendRowsResponse response) { response.getAppendResult().getOffset().getValue(), inflightBatch.getExpectedOffset())); inflightBatch.onFailure(exception); - abortInflightRequests(exception); + abortInflightRequests( + new AbortedException( + "Request aborted due to previous failures", + exception, + GrpcStatusCode.of(Status.Code.ABORTED), + true)); } else { inflightBatch.onSuccess(response); } @@ -931,56 +942,7 @@ public void onComplete() { @Override public void onError(Throwable t) { LOG.fine("OnError called"); - if (streamWriter.shutdown.get()) { - abortInflightRequests(t); - return; - } - InflightBatch inflightBatch = null; - synchronized (this.inflightBatches) { - if (inflightBatches.isEmpty()) { - // The batches could have been aborted. - return; - } - inflightBatch = this.inflightBatches.poll(); - } - streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); - if (isRecoverableError(t)) { - try { - if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts() - && !streamWriter.shutdown.get()) { - synchronized (streamWriter.currentRetries) { - streamWriter.currentRetries++; - } - LOG.info( - "Try to reestablish connection due to transient error: " - + t.toString() - + " retry times: " - + streamWriter.currentRetries); - streamWriter.refreshAppend(); - LOG.info("Resending requests on after connection established"); - streamWriter.writeBatch(inflightBatch); - } else { - inflightBatch.onFailure(t); - abortInflightRequests(t); - synchronized (streamWriter.currentRetries) { - streamWriter.currentRetries = 0; - } - } - } catch (InterruptedException e) { - LOG.info("Got exception while retrying: " + e.toString()); - inflightBatch.onFailure(new StatusRuntimeException(Status.ABORTED)); - abortInflightRequests(new StatusRuntimeException(Status.ABORTED)); - synchronized (streamWriter.currentRetries) { - streamWriter.currentRetries = 0; - } - } - } else { - inflightBatch.onFailure(t); - abortInflightRequests(t); - synchronized (streamWriter.currentRetries) { - streamWriter.currentRetries = 0; - } - } + abortInflightRequests(t); } }; diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/Waiter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/Waiter.java index fd2efc489c..4422f53b32 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/Waiter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/Waiter.java @@ -64,10 +64,16 @@ private void notifyNextAcquires() { } } - public synchronized void release(long messageSize) { + public synchronized void release(long messageSize) throws IllegalStateException { lock.lock(); --pendingCount; + if (pendingCount < 0) { + throw new IllegalStateException("pendingCount cannot be less than 0"); + } pendingSize -= messageSize; + if (pendingSize < 0) { + throw new IllegalStateException("pendingSize cannot be less than 0"); + } notifyNextAcquires(); lock.unlock(); notifyAll(); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java index 2b67f9f3d1..aeb25a2fcc 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java @@ -32,7 +32,7 @@ import com.google.api.gax.grpc.testing.LocalChannelProvider; import com.google.api.gax.grpc.testing.MockGrpcService; import com.google.api.gax.grpc.testing.MockServiceHelper; -import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.AbortedException; import com.google.api.gax.rpc.DataLossException; import com.google.cloud.bigquery.storage.test.Test.FooType; import com.google.common.base.Strings; @@ -510,42 +510,6 @@ public void testFlowControlBehaviorException() throws Exception { } } - @Test - public void testStreamReconnectionTransient() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setDelayThreshold(Duration.ofSeconds(100000)) - .setElementCountThreshold(1L) - .setFlowControlSettings( - StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS - .toBuilder() - .setMaxOutstandingElementCount(1L) - .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) - .build()) - .build()) - .build(); - - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) - .build()); - testBigQueryWrite.addException(new StatusRuntimeException(Status.UNAVAILABLE)); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) - .build()); - ApiFuture future1 = sendTestMessage(writer, new String[] {"m1"}); - ApiFuture future2 = sendTestMessage(writer, new String[] {"m1"}); - assertEquals(0L, future1.get().getAppendResult().getOffset().getValue()); - assertEquals(1L, future2.get().getAppendResult().getOffset().getValue()); - writer.close(); - } - @Test public void testStreamReconnectionPermanant() throws Exception { StreamWriter writer = @@ -569,36 +533,6 @@ public void testStreamReconnectionPermanant() throws Exception { writer.close(); } - @Test - public void testStreamReconnectionExceedRetry() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setDelayThreshold(Duration.ofSeconds(100000)) - .setElementCountThreshold(1L) - .build()) - .setRetrySettings( - RetrySettings.newBuilder() - .setMaxRetryDelay(Duration.ofMillis(100)) - .setMaxAttempts(1) - .build()) - .build(); - assertEquals(1, writer.getRetrySettings().getMaxAttempts()); - StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE); - testBigQueryWrite.addException(transientError); - testBigQueryWrite.addException(transientError); - ApiFuture future3 = sendTestMessage(writer, new String[] {"toomanyretry"}); - try { - future3.get(); - Assert.fail("This should fail."); - } catch (ExecutionException e) { - assertEquals(transientError.toString(), e.getCause().getCause().toString()); - } - writer.close(); - } - @Test public void testOffset() throws Exception { try (StreamWriter writer = @@ -665,7 +599,7 @@ public void testOffsetMismatch() throws Exception { @Test public void testErrorPropagation() throws Exception { - try (StreamWriter writer = + StreamWriter writer = getTestStreamWriterBuilder() .setExecutorProvider(SINGLE_THREAD_EXECUTOR) .setBatchingSettings( @@ -674,13 +608,23 @@ public void testErrorPropagation() throws Exception { .setElementCountThreshold(1L) .setDelayThreshold(Duration.ofSeconds(5)) .build()) - .build()) { - testBigQueryWrite.addException(Status.DATA_LOSS.asException()); - sendTestMessage(writer, new String[] {"A"}).get(); + .build(); + testBigQueryWrite.addException(Status.DATA_LOSS.asException()); + testBigQueryWrite.addException(Status.DATA_LOSS.asException()); + ApiFuture future1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture future2 = sendTestMessage(writer, new String[] {"B"}); + try { + future1.get(); fail("should throw exception"); } catch (ExecutionException e) { assertThat(e.getCause()).isInstanceOf(DataLossException.class); } + try { + future2.get(); + fail("should throw exception"); + } catch (ExecutionException e) { + assertThat(e.getCause()).isInstanceOf(AbortedException.class); + } } @Test @@ -957,43 +901,6 @@ public void testFlushAll() throws Exception { writer.close(); } - @Test - public void testFlushAllFailed() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(2L) - .setDelayThreshold(Duration.ofSeconds(100000)) - .build()) - .build(); - - testBigQueryWrite.addException(Status.DATA_LOSS.asException()); - testBigQueryWrite.addException(Status.DATA_LOSS.asException()); - testBigQueryWrite.addException(Status.DATA_LOSS.asException()); - - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); - ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); - ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); - - assertFalse(appendFuture3.isDone()); - try { - writer.flushAll(100000); - fail("Should have thrown an Exception"); - } catch (Exception expected) { - if (expected.getCause() instanceof com.google.api.gax.rpc.DataLossException) { - LOG.info("got: " + expected.toString()); - } else { - fail("Unexpected exception:" + expected.toString()); - } - } - - assertTrue(appendFuture3.isDone()); - - writer.close(); - } - @Test public void testDatasetTraceId() throws Exception { StreamWriter writer = @@ -1032,10 +939,12 @@ public void testShutdownWithConnectionError() throws Exception { AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) .build()); testBigQueryWrite.addException(Status.DATA_LOSS.asException()); + testBigQueryWrite.addException(Status.DATA_LOSS.asException()); testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10)); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"B"}); Thread.sleep(5000L); // Move the needle for responses to be sent. fakeExecutor.advanceTime(Duration.ofSeconds(20)); @@ -1044,9 +953,15 @@ public void testShutdownWithConnectionError() throws Exception { assertEquals(1, appendFuture1.get().getAppendResult().getOffset().getValue()); try { appendFuture2.get(); - fail("Should fail with exception"); - } catch (java.util.concurrent.ExecutionException e) { - assertEquals("Request aborted due to previous failures", e.getCause().getMessage()); + fail("Should fail with exception future2"); + } catch (ExecutionException e) { + assertThat(e.getCause()).isInstanceOf(DataLossException.class); + } + try { + appendFuture3.get(); + fail("Should fail with exception future3"); + } catch (ExecutionException e) { + assertThat(e.getCause()).isInstanceOf(AbortedException.class); } } }