From f8f97701e5ca698a170a1d3b6ecb3886e186f9d5 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Sat, 16 Jan 2021 18:08:25 -0800 Subject: [PATCH] fix: StreamWriter hang when we reach the inflight limit control and is doing a retry (#799) * fix:a hang in StreamWriter when inflight request reached a limit and we try to resend a message * . * . * . * . * . --- .../storage/v1beta2/StreamWriter.java | 74 ++++++++++--------- .../storage/v1alpha2/StreamWriterTest.java | 1 - .../storage/v1beta2/StreamWriterTest.java | 19 +++-- 3 files changed, 54 insertions(+), 40 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java index 88837e8582..64b11321de 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java @@ -187,7 +187,14 @@ private StreamWriter(Builder builder) this.onSchemaUpdateRunnable.setStreamWriter(this); } - refreshAppend(); + bidiStreamingCallable = stub.appendRowsCallable(); + clientStream = bidiStreamingCallable.splitCall(responseObserver); + try { + while (!clientStream.isSendReady()) { + Thread.sleep(10); + } + } catch (InterruptedException e) { + } } /** Stream name we are writing to. */ @@ -296,9 +303,9 @@ public void flushAll(long timeoutMillis) throws Exception { /** * Re-establishes a stream connection. * - * @throws IOException + * @throws InterruptedException */ - public void refreshAppend() throws IOException, InterruptedException { + public void refreshAppend() throws InterruptedException { appendAndRefreshAppendLock.lock(); if (shutdown.get()) { LOG.warning("Cannot refresh on a already shutdown writer."); @@ -313,11 +320,8 @@ public void refreshAppend() throws IOException, InterruptedException { messagesBatch.resetAttachSchema(); bidiStreamingCallable = stub.appendRowsCallable(); clientStream = bidiStreamingCallable.splitCall(responseObserver); - try { - while (!clientStream.isSendReady()) { - Thread.sleep(10); - } - } catch (InterruptedException expected) { + while (!clientStream.isSendReady()) { + Thread.sleep(10); } Thread.sleep(this.retrySettings.getInitialRetryDelay().toMillis()); // Can only unlock here since need to sleep the full 7 seconds before stream can allow appends. @@ -922,41 +926,43 @@ public void onError(Throwable t) { } inflightBatch = this.inflightBatches.poll(); } - try { - if (isRecoverableError(t)) { - try { - if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts() - && !streamWriter.shutdown.get()) { - streamWriter.refreshAppend(); - LOG.info("Resending requests on transient error:" + streamWriter.currentRetries); - streamWriter.writeBatch(inflightBatch); - synchronized (streamWriter.currentRetries) { - streamWriter.currentRetries++; - } - } else { - inflightBatch.onFailure(t); - abortInflightRequests(t); - synchronized (streamWriter.currentRetries) { - streamWriter.currentRetries = 0; - } + streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); + if (isRecoverableError(t)) { + try { + if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts() + && !streamWriter.shutdown.get()) { + synchronized (streamWriter.currentRetries) { + streamWriter.currentRetries++; } - } catch (IOException | InterruptedException e) { - LOG.info("Got exception while retrying."); - inflightBatch.onFailure(e); - abortInflightRequests(e); + LOG.info( + "Try to reestablish connection due to transient error: " + + t.toString() + + " retry times: " + + streamWriter.currentRetries); + streamWriter.refreshAppend(); + LOG.info("Resending requests on after connection established"); + streamWriter.writeBatch(inflightBatch); + } else { + inflightBatch.onFailure(t); + abortInflightRequests(t); synchronized (streamWriter.currentRetries) { streamWriter.currentRetries = 0; } } - } else { - inflightBatch.onFailure(t); - abortInflightRequests(t); + } catch (InterruptedException e) { + LOG.info("Got exception while retrying: " + e.toString()); + inflightBatch.onFailure(new StatusRuntimeException(Status.ABORTED)); + abortInflightRequests(new StatusRuntimeException(Status.ABORTED)); synchronized (streamWriter.currentRetries) { streamWriter.currentRetries = 0; } } - } finally { - streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); + } else { + inflightBatch.onFailure(t); + abortInflightRequests(t); + synchronized (streamWriter.currentRetries) { + streamWriter.currentRetries = 0; + } } } }; diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java index 6a265c3910..7a96b87829 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java @@ -855,7 +855,6 @@ public void testFlushAll() throws Exception { ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); - assertFalse(appendFuture3.isDone()); writer.flushAll(100000); assertTrue(appendFuture3.isDone()); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java index ddaee5f34b..7cc101de19 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java @@ -519,21 +519,30 @@ public void testStreamReconnectionTransient() throws Exception { .toBuilder() .setDelayThreshold(Duration.ofSeconds(100000)) .setElementCountThreshold(1L) + .setFlowControlSettings( + StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS + .toBuilder() + .setMaxOutstandingElementCount(1L) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) + .build()) .build()) .build(); - StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE); - testBigQueryWrite.addException(transientError); testBigQueryWrite.addResponse( AppendRowsResponse.newBuilder() .setAppendResult( AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) .build()); + testBigQueryWrite.addException(new StatusRuntimeException(Status.UNAVAILABLE)); + testBigQueryWrite.addResponse( + AppendRowsResponse.newBuilder() + .setAppendResult( + AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) + .build()); ApiFuture future1 = sendTestMessage(writer, new String[] {"m1"}); - assertEquals(false, future1.isDone()); - // Retry is scheduled to be 7 seconds later. + ApiFuture future2 = sendTestMessage(writer, new String[] {"m1"}); assertEquals(0L, future1.get().getAppendResult().getOffset().getValue()); - future1.get(); + assertEquals(1L, future2.get().getAppendResult().getOffset().getValue()); writer.close(); }