From b80386394f3082e9695712343f37afc4d29df76f Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Fri, 24 Apr 2020 14:45:03 -0700 Subject: [PATCH] fix: several StreamWriter issues (#213) * fix: several StreamWriter issues - Make messages wait in flow controll to be delivered in order - Avoid recreating the BigQueryWriteClient stub during reconnection, which is not necessary. - Allow user to pass in BigQueryWriteClient stub so that it can be shared with other API calls. modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java * Wait doesn't need to be synchronized * fix: unlock is not called in exception case modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java --- .../storage/v1alpha2/StreamWriter.java | 103 +++++----- .../bigquery/storage/v1alpha2/Waiter.java | 177 +++++++++++------- .../storage/v1alpha2/StreamWriterTest.java | 43 +++-- 3 files changed, 184 insertions(+), 139 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java index 78e23458ab..cd27c741e5 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java @@ -85,7 +85,7 @@ public class StreamWriter implements AutoCloseable { private final BatchingSettings batchingSettings; private final RetrySettings retrySettings; - private final BigQueryWriteSettings stubSettings; + private BigQueryWriteSettings stubSettings; private final Lock messagesBatchLock; private final MessagesBatch messagesBatch; @@ -142,13 +142,21 @@ private StreamWriter(Builder builder) messagesWaiter = new Waiter(this.batchingSettings.getFlowControlSettings()); responseObserver = new AppendResponseObserver(this); - stubSettings = - BigQueryWriteSettings.newBuilder() - .setCredentialsProvider(builder.credentialsProvider) - .setExecutorProvider(builder.executorProvider) - .setTransportChannelProvider(builder.channelProvider) - .setEndpoint(builder.endpoint) - .build(); + if (builder.client == null) { + stubSettings = + BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(builder.credentialsProvider) + .setExecutorProvider(builder.executorProvider) + .setTransportChannelProvider(builder.channelProvider) + .setEndpoint(builder.endpoint) + .build(); + stub = BigQueryWriteClient.create(stubSettings); + backgroundResourceList.add(stub); + } else { + stub = builder.client; + } + backgroundResources = new BackgroundResourceAggregation(backgroundResourceList); + shutdown = new AtomicBoolean(false); refreshAppend(); Stream.WriteStream stream = @@ -240,15 +248,10 @@ public ApiFuture append(AppendRowsRequest message) { public void refreshAppend() throws IOException, InterruptedException { synchronized (this) { Preconditions.checkState(!shutdown.get(), "Cannot append on a shut-down writer."); - if (stub != null) { + // There could be a moment, stub is not yet initialized. + if (clientStream != null) { clientStream.closeSend(); - stub.shutdown(); - stub.awaitTermination(1, TimeUnit.MINUTES); } - backgroundResourceList.remove(stub); - stub = BigQueryWriteClient.create(stubSettings); - backgroundResourceList.add(stub); - backgroundResources = new BackgroundResourceAggregation(backgroundResourceList); messagesBatch.resetAttachSchema(); bidiStreamingCallable = stub.appendRowsCallable(); clientStream = bidiStreamingCallable.splitCall(responseObserver); @@ -314,14 +317,12 @@ public void writeAllOutstanding() { private void writeBatch(final InflightBatch inflightBatch) { if (inflightBatch != null) { AppendRowsRequest request = inflightBatch.getMergedRequest(); - messagesWaiter.waitOnElementCount(); - messagesWaiter.waitOnSizeLimit(inflightBatch.getByteSize()); - responseObserver.addInflightBatch(inflightBatch); - clientStream.send(request); - - synchronized (messagesWaiter) { - messagesWaiter.incrementPendingCount(1); - messagesWaiter.incrementPendingSize(inflightBatch.getByteSize()); + try { + messagesWaiter.acquire(inflightBatch.getByteSize()); + responseObserver.addInflightBatch(inflightBatch); + clientStream.send(request); + } catch (FlowController.FlowControlException ex) { + inflightBatch.onFailure(ex); } } } @@ -346,14 +347,14 @@ private static final class InflightBatch { final ArrayList offsetList; final long creationTime; int attempt; - int batchSizeBytes; + long batchSizeBytes; long expectedOffset; Boolean attachSchema; String streamName; InflightBatch( List inflightRequests, - int batchSizeBytes, + long batchSizeBytes, String streamName, Boolean attachSchema) { this.inflightRequests = inflightRequests; @@ -377,7 +378,7 @@ int count() { return inflightRequests.size(); } - int getByteSize() { + long getByteSize() { return this.batchSizeBytes; } @@ -478,7 +479,9 @@ public void shutdown() { currentAlarmFuture.cancel(false); } writeAllOutstanding(); - messagesWaiter.waitComplete(); + synchronized (messagesWaiter) { + messagesWaiter.waitComplete(); + } if (clientStream.isSendReady()) { clientStream.closeSend(); } @@ -496,7 +499,7 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted } /** - * Constructs a new {@link Builder} using the given topic. + * Constructs a new {@link Builder} using the given stream. * *

Example of creating a {@code WriteStream}. * @@ -514,7 +517,15 @@ public boolean awaitTermination(long duration, TimeUnit unit) throws Interrupted * } */ public static Builder newBuilder(String streamName) { - return new Builder(streamName); + return new Builder(streamName, null); + } + + /** + * Constructs a new {@link Builder} using the given stream and an existing BigQueryWriteClient. + */ + public static Builder newBuilder(String streamName, BigQueryWriteClient client) { + Preconditions.checkArgument(client != null); + return new Builder(streamName, client); } /** A builder of {@link StreamWriter}s. */ @@ -523,9 +534,6 @@ public static final class Builder { static final Duration MIN_RPC_TIMEOUT = Duration.ofMillis(10); // Meaningful defaults. - static final long DEFAULT_ELEMENT_COUNT_THRESHOLD = 100L; - static final long DEFAULT_REQUEST_BYTES_THRESHOLD = 100 * 1024L; // 100 kB - static final Duration DEFAULT_DELAY_THRESHOLD = Duration.ofMillis(10); static final FlowControlSettings DEFAULT_FLOW_CONTROL_SETTINGS = FlowControlSettings.newBuilder() .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) @@ -534,9 +542,9 @@ public static final class Builder { .build(); public static final BatchingSettings DEFAULT_BATCHING_SETTINGS = BatchingSettings.newBuilder() - .setDelayThreshold(DEFAULT_DELAY_THRESHOLD) - .setRequestByteThreshold(DEFAULT_REQUEST_BYTES_THRESHOLD) - .setElementCountThreshold(DEFAULT_ELEMENT_COUNT_THRESHOLD) + .setDelayThreshold(Duration.ofMillis(10)) + .setRequestByteThreshold(100 * 1024L) // 100 kb + .setElementCountThreshold(100L) .setFlowControlSettings(DEFAULT_FLOW_CONTROL_SETTINGS) .build(); public static final RetrySettings DEFAULT_RETRY_SETTINGS = @@ -555,6 +563,8 @@ public static final class Builder { private String streamName; private String endpoint = BigQueryWriteSettings.getDefaultEndpoint(); + private BigQueryWriteClient client = null; + // Batching options BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS; @@ -569,8 +579,9 @@ public static final class Builder { private CredentialsProvider credentialsProvider = BigQueryWriteSettings.defaultCredentialsProviderBuilder().build(); - private Builder(String stream) { + private Builder(String stream, BigQueryWriteClient client) { this.streamName = Preconditions.checkNotNull(stream); + this.client = client; } /** @@ -771,11 +782,7 @@ public void onResponse(AppendRowsResponse response) { inflightBatch.onSuccess(response); } } finally { - synchronized (streamWriter.messagesWaiter) { - streamWriter.messagesWaiter.incrementPendingCount(-1); - streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize()); - streamWriter.messagesWaiter.notifyAll(); - } + streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); } } @@ -805,11 +812,11 @@ public void onError(Throwable t) { && !streamWriter.shutdown.get()) { streamWriter.refreshAppend(); // Currently there is a bug that it took reconnected stream 5 seconds to pick up - // stream count. So wait at least 5 seconds before sending a new request. + // stream count. So wait at least 7 seconds before sending a new request. Thread.sleep( Math.min( streamWriter.getRetrySettings().getInitialRetryDelay().toMillis(), - Duration.ofSeconds(5).toMillis())); + Duration.ofSeconds(7).toMillis())); streamWriter.writeBatch(inflightBatch); synchronized (streamWriter.currentRetries) { streamWriter.currentRetries++; @@ -837,11 +844,7 @@ public void onError(Throwable t) { } } } finally { - synchronized (streamWriter.messagesWaiter) { - streamWriter.messagesWaiter.incrementPendingCount(-1); - streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize()); - streamWriter.messagesWaiter.notifyAll(); - } + streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); } } }; @@ -849,7 +852,7 @@ public void onError(Throwable t) { // This class controls how many messages are going to be sent out in a batch. private static class MessagesBatch { private List messages; - private int batchedBytes; + private long batchedBytes; private final BatchingSettings batchingSettings; private Boolean attachSchema = true; private final String streamName; @@ -882,7 +885,7 @@ private boolean isEmpty() { return messages.isEmpty(); } - private int getBatchedBytes() { + private long getBatchedBytes() { return batchedBytes; } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java index 0e15d6c726..43830ae021 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java @@ -18,9 +18,11 @@ import com.google.api.core.InternalApi; import com.google.api.gax.batching.FlowControlSettings; -import com.google.api.gax.grpc.GrpcStatusCode; -import com.google.api.gax.rpc.UnimplementedException; -import io.grpc.Status; +import com.google.api.gax.batching.FlowController; +import java.util.LinkedList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Logger; /** @@ -30,105 +32,146 @@ class Waiter { private static final Logger LOG = Logger.getLogger(Waiter.class.getName()); - private int pendingCount; - private int pendingSize; - FlowControlSettings flowControlSettings; + private long pendingCount; + private long pendingSize; + private long countLimit; + private long sizeLimit; + private FlowController.LimitExceededBehavior behavior; + private LinkedList awaitingMessageAcquires; + private LinkedList awaitingBytesAcquires; + private final Lock lock; Waiter(FlowControlSettings flowControlSettings) { pendingCount = 0; pendingSize = 0; - this.flowControlSettings = flowControlSettings; + this.awaitingMessageAcquires = new LinkedList(); + this.awaitingBytesAcquires = new LinkedList(); + this.countLimit = flowControlSettings.getMaxOutstandingElementCount(); + this.sizeLimit = flowControlSettings.getMaxOutstandingRequestBytes(); + this.behavior = flowControlSettings.getLimitExceededBehavior(); + this.lock = new ReentrantLock(); } - public synchronized void incrementPendingCount(int delta) { - this.pendingCount += delta; - if (pendingCount == 0) { - notifyAll(); + private void notifyNextAcquires() { + if (!awaitingMessageAcquires.isEmpty()) { + CountDownLatch awaitingAcquire = awaitingMessageAcquires.getFirst(); + awaitingAcquire.countDown(); + } + if (!awaitingBytesAcquires.isEmpty()) { + CountDownLatch awaitingAcquire = awaitingBytesAcquires.getFirst(); + awaitingAcquire.countDown(); } } - public synchronized void incrementPendingSize(int delta) { - this.pendingSize += delta; + public synchronized void release(long messageSize) { + lock.lock(); + --pendingCount; + pendingSize -= messageSize; + notifyNextAcquires(); + lock.unlock(); + notifyAll(); } - private void wait(String message) { - boolean interrupted = false; + public void acquire(long messageSize) throws FlowController.FlowControlException { + lock.lock(); try { - LOG.fine("Wait on: " + message); - wait(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } + if (pendingCount >= countLimit + && behavior == FlowController.LimitExceededBehavior.ThrowException) { + throw new FlowController.MaxOutstandingElementCountReachedException(countLimit); + } + if (pendingSize + messageSize >= sizeLimit + && behavior == FlowController.LimitExceededBehavior.ThrowException) { + throw new FlowController.MaxOutstandingRequestBytesReachedException(sizeLimit); + } - private void handleOverLimit(String message) { - boolean interrupted = false; - switch (this.flowControlSettings.getLimitExceededBehavior()) { - case Block: - wait(message); - break; - case ThrowException: - throw new IllegalStateException("FlowControl limit exceeded: " + message); - case Ignore: - return; - default: - throw new UnimplementedException( - "Unknown behavior setting: " - + this.flowControlSettings.getLimitExceededBehavior().toString(), - null, - GrpcStatusCode.of(Status.Code.UNIMPLEMENTED), - false); - } - } + CountDownLatch messageWaiter = null; + while (pendingCount >= countLimit) { + if (messageWaiter == null) { + messageWaiter = new CountDownLatch(1); + awaitingMessageAcquires.addLast(messageWaiter); + } else { + // This message already in line stays at the head of the line. + messageWaiter = new CountDownLatch(1); + awaitingMessageAcquires.set(0, messageWaiter); + } + lock.unlock(); + try { + messageWaiter.await(); + } catch (InterruptedException e) { + LOG.warning("Interrupted while waiting to acquire flow control tokens"); + } + lock.lock(); + } + ++pendingCount; + if (messageWaiter != null) { + awaitingMessageAcquires.removeFirst(); + } - public synchronized void waitOnElementCount() { - LOG.finer( - "Waiting on element count " - + this.pendingCount - + " " - + this.flowControlSettings.getMaxOutstandingElementCount()); - while (this.pendingCount >= this.flowControlSettings.getMaxOutstandingElementCount()) { - handleOverLimit("Element count"); - } - } + if (!awaitingMessageAcquires.isEmpty() && pendingCount < countLimit) { + awaitingMessageAcquires.getFirst().countDown(); + } - public synchronized void waitOnSizeLimit(int incomingSize) { - LOG.finer( - "Waiting on size limit " - + (this.pendingSize + incomingSize) - + " " - + this.flowControlSettings.getMaxOutstandingRequestBytes()); - while (this.pendingSize + incomingSize - >= this.flowControlSettings.getMaxOutstandingRequestBytes()) { - handleOverLimit("Byte size"); + // Now acquire space for bytes. + CountDownLatch bytesWaiter = null; + Long bytesRemaining = messageSize; + while (pendingSize + messageSize >= sizeLimit) { + if (bytesWaiter == null) { + // This message gets added to the back of the line. + bytesWaiter = new CountDownLatch(1); + awaitingBytesAcquires.addLast(bytesWaiter); + } else { + // This message already in line stays at the head of the line. + bytesWaiter = new CountDownLatch(1); + awaitingBytesAcquires.set(0, bytesWaiter); + } + lock.unlock(); + try { + bytesWaiter.await(); + } catch (InterruptedException e) { + LOG.warning("Interrupted while waiting to acquire flow control tokens"); + } + lock.lock(); + } + + pendingSize += messageSize; + if (bytesWaiter != null) { + awaitingBytesAcquires.removeFirst(); + } + // There may be some surplus bytes left; let the next message waiting for bytes have some. + if (!awaitingBytesAcquires.isEmpty() && pendingSize < sizeLimit) { + awaitingBytesAcquires.getFirst().countDown(); + } + } finally { + lock.unlock(); } } public synchronized void waitComplete() { - boolean interrupted = false; + lock.lock(); try { while (pendingCount > 0) { + lock.unlock(); try { wait(); } catch (InterruptedException e) { - // Ignored, uninterruptibly. - interrupted = true; + LOG.warning("Interrupted while waiting for completion"); } + lock.lock(); } + } catch (Exception e) { + LOG.warning(e.toString()); } finally { - if (interrupted) { - Thread.currentThread().interrupt(); - } + lock.unlock(); } } @InternalApi - public int pendingCount() { + public long pendingCount() { return pendingCount; } @InternalApi - public int pendingSize() { + public long pendingSize() { return pendingSize; } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java index 38394a7479..950419fdc9 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java @@ -425,11 +425,12 @@ public void testFlowControlBehaviorException() throws Exception { testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build()); ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); try { - ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + appendFuture2.get(); Assert.fail("This should fail"); - } catch (IllegalStateException e) { - assertEquals("FlowControl limit exceeded: Element count", e.getMessage()); + } catch (ExecutionException e) { + assertEquals("The maximum number of batch elements: 1 have been reached.", e.getMessage()); } assertEquals(1L, appendFuture1.get().getOffset()); } @@ -453,7 +454,7 @@ public void testStreamReconnection() throws Exception { testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); ApiFuture future1 = sendTestMessage(writer, new String[] {"m1"}); assertEquals(false, future1.isDone()); - // Retry is scheduled to be 5 seconds later. + // Retry is scheduled to be 7 seconds later. assertEquals(0L, future1.get().getOffset()); LOG.info("======CASE II"); @@ -469,16 +470,6 @@ public void testStreamReconnection() throws Exception { } LOG.info("======CASE III"); - // Writer needs to be recreated since the previous error is not recoverable. - writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setDelayThreshold(Duration.ofSeconds(100000)) - .setElementCountThreshold(1L) - .build()) - .build(); // Case 3: Failed after retried max retry times. testBigQueryWrite.addException(transientError); testBigQueryWrite.addException(transientError); @@ -614,14 +605,9 @@ public void testWriterGetters() throws Exception { public void testBuilderParametersAndDefaults() { StreamWriter.Builder builder = StreamWriter.newBuilder(TEST_STREAM); assertEquals(StreamWriter.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider); - assertEquals( - StreamWriter.Builder.DEFAULT_REQUEST_BYTES_THRESHOLD, - builder.batchingSettings.getRequestByteThreshold().longValue()); - assertEquals( - StreamWriter.Builder.DEFAULT_DELAY_THRESHOLD, builder.batchingSettings.getDelayThreshold()); - assertEquals( - StreamWriter.Builder.DEFAULT_ELEMENT_COUNT_THRESHOLD, - builder.batchingSettings.getElementCountThreshold().longValue()); + assertEquals(100 * 1024L, builder.batchingSettings.getRequestByteThreshold().longValue()); + assertEquals(Duration.ofMillis(10), builder.batchingSettings.getDelayThreshold()); + assertEquals(100L, builder.batchingSettings.getElementCountThreshold().longValue()); assertEquals(StreamWriter.Builder.DEFAULT_RETRY_SETTINGS, builder.retrySettings); assertEquals(Duration.ofMillis(100), builder.retrySettings.getInitialRetryDelay()); assertEquals(3, builder.retrySettings.getMaxAttempts()); @@ -814,4 +800,17 @@ public void testClose() throws Exception { assertEquals("Cannot shut down a writer already shut-down.", e.getMessage()); } } + + @Test + public void testExistingClient() throws Exception { + BigQueryWriteSettings settings = + BigQueryWriteSettings.newBuilder() + .setTransportChannelProvider(channelProvider) + .setCredentialsProvider(NoCredentialsProvider.create()) + .build(); + BigQueryWriteClient client = BigQueryWriteClient.create(settings); + StreamWriter writer = StreamWriter.newBuilder(TEST_STREAM, client).build(); + writer.close(); + assertFalse(client.isShutdown()); + } }