From 6134818f7bc5ada324a78d5c048bb2eeb83f8ca8 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Wed, 19 Aug 2020 13:34:27 -0700 Subject: [PATCH] feat: Add a flushAll() method that will flush all the inflight request and make sure all responses returned (#492) * feat: Add a flushAll method that will make sure all the request in the pipe are sent and responded * . * . * . * . * Add timeout * Add timeout fix * . * . * . * Lock flush and append operations. * make sure lock is released * remove timeout test since it is flaky in public runs * remove timeout test since it is flaky in public runs * fix lint * remove a test * . --- .../storage/v1alpha2/StreamWriter.java | 109 +++++++++++++----- .../bigquery/storage/v1alpha2/Waiter.java | 14 ++- .../storage/v1alpha2/StreamWriterTest.java | 63 ++++++++++ 3 files changed, 151 insertions(+), 35 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java index 214dff3814..b30b16ea25 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java @@ -99,6 +99,10 @@ public class StreamWriter implements AutoCloseable { private final Lock appendAndRefreshAppendLock; private final MessagesBatch messagesBatch; + // Indicates if a stream has some non recoverable exception happened. + private final Lock exceptionLock; + private Throwable streamException; + private BackgroundResource backgroundResources; private List backgroundResourceList; @@ -145,10 +149,13 @@ private StreamWriter(Builder builder) this.batchingSettings = builder.batchingSettings; this.retrySettings = builder.retrySettings; - this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName); + this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName, this); messagesBatchLock = new ReentrantLock(); appendAndRefreshAppendLock = new ReentrantLock(); activeAlarm = new AtomicBoolean(false); + this.exceptionLock = new ReentrantLock(); + this.streamException = null; + executor = builder.executorProvider.getExecutor(); backgroundResourceList = new ArrayList<>(); if (builder.executorProvider.shouldAutoClose()) { @@ -212,6 +219,14 @@ public Boolean expired() { return createTime.plus(streamTTL).compareTo(Instant.now()) < 0; } + private void setException(Throwable t) { + exceptionLock.lock(); + if (this.streamException == null) { + this.streamException = t; + } + exceptionLock.unlock(); + } + /** * Schedules the writing of a message. The write of the message may occur immediately or be * delayed based on the writer batching options. @@ -265,6 +280,33 @@ public ApiFuture append(AppendRowsRequest message) { return outstandingAppend.appendResult; } + /** + * This is the general flush method for asynchronise append operation. When you have outstanding + * append requests, calling flush will make sure all outstanding append requests completed and + * successful. Otherwise there will be an exception thrown. + * + * @throws Exception + */ + public void flushAll(long timeoutMillis) throws Exception { + appendAndRefreshAppendLock.lock(); + try { + writeAllOutstanding(); + synchronized (messagesWaiter) { + messagesWaiter.waitComplete(timeoutMillis); + } + } finally { + appendAndRefreshAppendLock.unlock(); + } + exceptionLock.lock(); + try { + if (streamException != null) { + throw new Exception(streamException); + } + } finally { + exceptionLock.unlock(); + } + } + /** * Flush the rows on a BUFFERED stream, up to the specified offset. After flush, rows will be * available for read. If no exception is thrown, it means the flush happened. @@ -411,14 +453,15 @@ private static final class InflightBatch { private long expectedOffset; private Boolean attachSchema; private String streamName; - private final AtomicBoolean failed; + private final StreamWriter streamWriter; InflightBatch( List inflightRequests, long batchSizeBytes, String streamName, - Boolean attachSchema) { + Boolean attachSchema, + StreamWriter streamWriter) { this.inflightRequests = inflightRequests; this.offsetList = new ArrayList(inflightRequests.size()); for (AppendRequestAndFutureResponse request : inflightRequests) { @@ -435,6 +478,7 @@ private static final class InflightBatch { this.attachSchema = attachSchema; this.streamName = streamName; this.failed = new AtomicBoolean(false); + this.streamWriter = streamWriter; } int count() { @@ -482,7 +526,9 @@ private void onFailure(Throwable t) { return; } else { LOG.info("Setting " + t.toString() + " on response"); + this.streamWriter.setException(t); } + for (AppendRequestAndFutureResponse request : inflightRequests) { request.appendResult.setException(t); } @@ -552,8 +598,12 @@ protected void shutdown() { currentAlarmFuture.cancel(false); } writeAllOutstanding(); - synchronized (messagesWaiter) { - messagesWaiter.waitComplete(); + try { + synchronized (messagesWaiter) { + messagesWaiter.waitComplete(0); + } + } catch (InterruptedException e) { + LOG.warning("Failed to wait for messages to return " + e.toString()); } if (clientStream.isSendReady()) { clientStream.closeSend(); @@ -820,14 +870,14 @@ public void onStart(StreamController controller) { private void abortInflightRequests(Throwable t) { synchronized (this.inflightBatches) { while (!this.inflightBatches.isEmpty()) { - this.inflightBatches - .poll() - .onFailure( - new AbortedException( - "Request aborted due to previous failures", - t, - GrpcStatusCode.of(Status.Code.ABORTED), - true)); + InflightBatch inflightBatch = this.inflightBatches.poll(); + inflightBatch.onFailure( + new AbortedException( + "Request aborted due to previous failures", + t, + GrpcStatusCode.of(Status.Code.ABORTED), + true)); + streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); } } } @@ -850,13 +900,15 @@ public void onResponse(AppendRowsResponse response) { streamWriter.getOnSchemaUpdateRunnable(), 0L, TimeUnit.MILLISECONDS); } } - // TODO: Deal with in stream errors. + // Currently there is nothing retryable. If the error is already exists, then ignore it. if (response.hasError()) { - StatusRuntimeException exception = - new StatusRuntimeException( - Status.fromCodeValue(response.getError().getCode()) - .withDescription(response.getError().getMessage())); - inflightBatch.onFailure(exception); + if (response.getError().getCode() != 6 /* ALREADY_EXISTS */) { + StatusRuntimeException exception = + new StatusRuntimeException( + Status.fromCodeValue(response.getError().getCode()) + .withDescription(response.getError().getMessage())); + inflightBatch.onFailure(exception); + } } if (inflightBatch.getExpectedOffset() > 0 && response.getOffset() != inflightBatch.getExpectedOffset()) { @@ -907,6 +959,7 @@ public void onError(Throwable t) { } } else { inflightBatch.onFailure(t); + abortInflightRequests(t); synchronized (streamWriter.currentRetries) { streamWriter.currentRetries = 0; } @@ -914,23 +967,17 @@ public void onError(Throwable t) { } catch (IOException | InterruptedException e) { LOG.info("Got exception while retrying."); inflightBatch.onFailure(e); + abortInflightRequests(e); synchronized (streamWriter.currentRetries) { streamWriter.currentRetries = 0; } } } else { inflightBatch.onFailure(t); + abortInflightRequests(t); synchronized (streamWriter.currentRetries) { streamWriter.currentRetries = 0; } - try { - if (!streamWriter.shutdown.get()) { - // Establish a new connection. - streamWriter.refreshAppend(); - } - } catch (IOException | InterruptedException e) { - LOG.info("Failed to establish a new connection"); - } } } finally { streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); @@ -945,17 +992,21 @@ private static class MessagesBatch { private final BatchingSettings batchingSettings; private Boolean attachSchema = true; private final String streamName; + private final StreamWriter streamWriter; - private MessagesBatch(BatchingSettings batchingSettings, String streamName) { + private MessagesBatch( + BatchingSettings batchingSettings, String streamName, StreamWriter streamWriter) { this.batchingSettings = batchingSettings; this.streamName = streamName; + this.streamWriter = streamWriter; reset(); } // Get all the messages out in a batch. private InflightBatch popBatch() { InflightBatch batch = - new InflightBatch(messages, batchedBytes, this.streamName, this.attachSchema); + new InflightBatch( + messages, batchedBytes, this.streamName, this.attachSchema, this.streamWriter); this.attachSchema = false; reset(); return batch; diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java index 43830ae021..5da8a1a57c 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java @@ -146,20 +146,22 @@ public void acquire(long messageSize) throws FlowController.FlowControlException } } - public synchronized void waitComplete() { + public synchronized void waitComplete(long timeoutMillis) throws InterruptedException { + long end = System.currentTimeMillis() + timeoutMillis; lock.lock(); try { - while (pendingCount > 0) { + while (pendingCount > 0 && (timeoutMillis == 0 || end > System.currentTimeMillis())) { lock.unlock(); try { - wait(); + wait(timeoutMillis == 0 ? 0 : end - System.currentTimeMillis()); } catch (InterruptedException e) { - LOG.warning("Interrupted while waiting for completion"); + throw e; } lock.lock(); } - } catch (Exception e) { - LOG.warning(e.toString()); + if (pendingCount > 0) { + throw new InterruptedException("Wait timeout"); + } } finally { lock.unlock(); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java index 372b4ad97f..2f9160f156 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java @@ -834,4 +834,67 @@ public void testExistingClient() throws Exception { client.shutdown(); client.awaitTermination(1, TimeUnit.MINUTES); } + + @Test + public void testFlushAll() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .setDelayThreshold(Duration.ofSeconds(100000)) + .build()) + .build(); + + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); + + assertFalse(appendFuture3.isDone()); + writer.flushAll(100000); + + assertTrue(appendFuture3.isDone()); + + writer.close(); + } + + @Test + public void testFlushAllFailed() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .setDelayThreshold(Duration.ofSeconds(100000)) + .build()) + .build(); + + testBigQueryWrite.addException(Status.DATA_LOSS.asException()); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); + + assertFalse(appendFuture3.isDone()); + try { + writer.flushAll(100000); + fail("Should have thrown an Exception"); + } catch (Exception expected) { + if (expected.getCause() instanceof com.google.api.gax.rpc.DataLossException) { + LOG.info("got: " + expected.toString()); + } else { + fail("Unexpected exception:" + expected.toString()); + } + } + + assertTrue(appendFuture3.isDone()); + + writer.close(); + } }