From 98851e96f7c20228cf888e4a847ac98f3da2e4b7 Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Thu, 2 Apr 2020 17:45:37 -0700 Subject: [PATCH] feat: create manual client for Write API (#112) * First writeapi manual client First version, test to be developed. * Manual client with e2e * StreamWriter library * ProtoSchema convert library WriteApi client library addition. This library helps to convert a ProtoDescriptorProto out of ProtoDescriptor into a self contained ProtoDescriptorProto, that can be passed into API and reconstructed on the server side. modified: google-cloud-bigquerystorage/pom.xml new file: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverter.java new file: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverterTest.java new file: google-cloud-bigquerystorage/src/test/proto/test.proto modified: pom.xml * First writeapi manual client First version, test to be developed. * . * incremental development * . modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeScheduledExecutorService.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java * . * . * . * feat: create manual java client for writeapi modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java * feat: add integration test final verification. * . * . modified: google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java modified: google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java * . * . * . * . * avoid duplicate count++ and make it easier to read * Update google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java Co-Authored-By: Stephanie Wang * Update google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java Co-Authored-By: Stephanie Wang * Update google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java Co-Authored-By: Stephanie Wang * . * . * . * . * . * dep: add undeclared dep in parent pom * dep: remove duplicate and add undeclared remove dupe gax; add undeclared commons-lang3 Co-authored-by: Stephanie Wang --- google-cloud-bigquerystorage/pom.xml | 4 + .../v1alpha2/ProtoSchemaConverter.java | 3 +- .../storage/v1alpha2/StreamWriter.java | 881 ++++++++++++++++++ .../bigquery/storage/v1alpha2/Waiter.java | 134 +++ .../storage/v1alpha2/FakeBigQueryWrite.java | 78 ++ .../v1alpha2/FakeBigQueryWriteImpl.java | 165 ++++ .../bigquery/storage/v1alpha2/FakeClock.java | 42 + .../FakeScheduledExecutorService.java | 347 +++++++ .../v1alpha2/ProtoSchemaConverterTest.java | 16 +- .../storage/v1alpha2/StreamWriterTest.java | 802 ++++++++++++++++ .../it/ITBigQueryWriteManualClientTest.java | 351 +++++++ .../src/test/proto/test.proto | 8 +- pom.xml | 8 +- 13 files changed, 2827 insertions(+), 12 deletions(-) create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeClock.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeScheduledExecutorService.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java diff --git a/google-cloud-bigquerystorage/pom.xml b/google-cloud-bigquerystorage/pom.xml index 37c7987345..66009152c4 100644 --- a/google-cloud-bigquerystorage/pom.xml +++ b/google-cloud-bigquerystorage/pom.xml @@ -100,6 +100,10 @@ org.threeten threetenbp + + org.apache.commons + commons-lang3 + diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverter.java index 81fa5fd419..7969ad5f23 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverter.java @@ -31,7 +31,8 @@ public class ProtoSchemaConverter { private static class StructName { public String getName() { - return "__S" + (count++); + count++; + return count == 1 ? "__ROOT__" : "__S" + count; } private int count = 0; diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java new file mode 100644 index 0000000000..921fb2be83 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriter.java @@ -0,0 +1,881 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1alpha2; + +import com.google.api.core.ApiFuture; +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.BackgroundResource; +import com.google.api.gax.core.BackgroundResourceAggregation; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.core.ExecutorAsBackgroundResource; +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.*; +import com.google.auth.oauth2.GoogleCredentials; +import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsRequest; +import com.google.cloud.bigquery.storage.v1alpha2.Storage.AppendRowsResponse; +import com.google.common.base.Preconditions; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.threeten.bp.Duration; + +/** + * A BigQuery Stream Writer that can be used to write data into BigQuery Table. + * + *

A {@link StreamWrier} provides built-in capabilities to: handle batching of messages; + * controlling memory utilization (through flow control); automatic connection re-establishment and + * request cleanup (only keeps write schema on first request in the stream). + * + *

With customizable options that control: + * + *

    + *
  • Message batching: such as number of messages or max batch byte size, and batching deadline + *
  • Inflight message control: such as number of messages or max batch byte size + *
+ * + *

{@link StreamWriter} will use the credentials set on the channel, which uses application + * default credentials through {@link GoogleCredentials#getApplicationDefault} by default. + */ +public class StreamWriter implements AutoCloseable { + private static final Logger LOG = Logger.getLogger(StreamWriter.class.getName()); + + private final String streamName; + + private final BatchingSettings batchingSettings; + private final RetrySettings retrySettings; + private final BigQueryWriteSettings stubSettings; + + private final Lock messagesBatchLock; + private final MessagesBatch messagesBatch; + + private BackgroundResource backgroundResources; + private List backgroundResourceList; + + private BigQueryWriteClient stub; + BidiStreamingCallable bidiStreamingCallable; + ClientStream clientStream; + private final AppendResponseObserver responseObserver; + + private final ScheduledExecutorService executor; + + private final AtomicBoolean shutdown; + private final Waiter messagesWaiter; + private final AtomicBoolean activeAlarm; + private ScheduledFuture currentAlarmFuture; + + private Integer currentRetries = 0; + + /** The maximum size of one request. Defined by the API. */ + public static long getApiMaxRequestBytes() { + return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) + } + + /** The maximum size of in flight requests. Defined by the API. */ + public static long getApiMaxInflightRequests() { + return 5000L; + } + + private StreamWriter(Builder builder) throws IOException { + streamName = builder.streamName; + + this.batchingSettings = builder.batchingSettings; + this.retrySettings = builder.retrySettings; + this.messagesBatch = new MessagesBatch(batchingSettings); + messagesBatchLock = new ReentrantLock(); + activeAlarm = new AtomicBoolean(false); + executor = builder.executorProvider.getExecutor(); + backgroundResourceList = new ArrayList<>(); + if (builder.executorProvider.shouldAutoClose()) { + backgroundResourceList.add(new ExecutorAsBackgroundResource(executor)); + } + messagesWaiter = new Waiter(this.batchingSettings.getFlowControlSettings()); + responseObserver = new AppendResponseObserver(this); + + stubSettings = + BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(builder.credentialsProvider) + .setExecutorProvider(builder.executorProvider) + .setTransportChannelProvider(builder.channelProvider) + .setEndpoint(builder.endpoint) + .build(); + shutdown = new AtomicBoolean(false); + refreshAppend(); + } + + /** Stream name we are writing to. */ + public String getStreamNameString() { + return streamName; + } + + /** + * Schedules the writing of a message. The write of the message may occur immediately or be + * delayed based on the writer batching options. + * + *

Example of writing a message. + * + *

{@code
+   * AppendRowsRequest message;
+   * ApiFuture messageIdFuture = writer.append(message);
+   * ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback() {
+   *   public void onSuccess(AppendRowsResponse response) {
+   *     if (response.hasOffset()) {
+   *       System.out.println("written with offset: " + response.getOffset());
+   *     } else {
+   *       System.out.println("received an in stream error: " + response.error().toString());
+   *     }
+   *   }
+   *
+   *   public void onFailure(Throwable t) {
+   *     System.out.println("failed to write: " + t);
+   *   }
+   * }, MoreExecutors.directExecutor());
+   * }
+ * + * @param message the message in serialized format to write to BigQuery. + * @return the message ID wrapped in a future. + */ + public ApiFuture append(AppendRowsRequest message) { + Preconditions.checkState(!shutdown.get(), "Cannot append on a shut-down writer."); + + final AppendRequestAndFutureResponse outstandingAppend = + new AppendRequestAndFutureResponse(message); + List batchesToSend; + messagesBatchLock.lock(); + try { + batchesToSend = messagesBatch.add(outstandingAppend); + // Setup the next duration based delivery alarm if there are messages batched. + setupAlarm(); + if (!batchesToSend.isEmpty()) { + for (final InflightBatch batch : batchesToSend) { + LOG.fine("Scheduling a batch for immediate sending."); + writeBatch(batch); + } + } + } finally { + messagesBatchLock.unlock(); + } + + return outstandingAppend.appendResult; + } + + /** + * Re-establishes a stream connection. + * + * @throws IOException + */ + private void refreshAppend() throws IOException { + synchronized (this) { + Preconditions.checkState(!shutdown.get(), "Cannot append on a shut-down writer."); + if (stub != null) { + stub.shutdown(); + } + backgroundResourceList.remove(stub); + stub = BigQueryWriteClient.create(stubSettings); + backgroundResourceList.add(stub); + backgroundResources = new BackgroundResourceAggregation(backgroundResourceList); + messagesBatch.resetAttachSchema(); + bidiStreamingCallable = stub.appendRowsCallable(); + clientStream = bidiStreamingCallable.splitCall(responseObserver); + } + try { + while (!clientStream.isSendReady()) { + Thread.sleep(10); + } + } catch (InterruptedException expected) { + } + } + + private void setupAlarm() { + if (!messagesBatch.isEmpty()) { + if (!activeAlarm.getAndSet(true)) { + long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis(); + LOG.log(Level.INFO, "Setting up alarm for the next {0} ms.", delayThresholdMs); + currentAlarmFuture = + executor.schedule( + new Runnable() { + @Override + public void run() { + LOG.fine("Sending messages based on schedule"); + activeAlarm.getAndSet(false); + messagesBatchLock.lock(); + try { + writeBatch(messagesBatch.popBatch()); + } finally { + messagesBatchLock.unlock(); + } + } + }, + delayThresholdMs, + TimeUnit.MILLISECONDS); + } + } else if (currentAlarmFuture != null) { + LOG.log(Level.FINER, "Cancelling alarm, no more messages"); + if (activeAlarm.getAndSet(false)) { + currentAlarmFuture.cancel(false); + } + } + } + + /** + * Write any outstanding batches if non-empty. This method sends buffered messages, but does not + * wait for the send operations to complete. To wait for messages to send, call {@code get} on the + * futures returned from {@code append}. + */ + public void writeAllOutstanding() { + InflightBatch unorderedOutstandingBatch = null; + messagesBatchLock.lock(); + try { + if (!messagesBatch.isEmpty()) { + writeBatch(messagesBatch.popBatch()); + } + messagesBatch.reset(); + } finally { + messagesBatchLock.unlock(); + } + } + + private void writeBatch(final InflightBatch inflightBatch) { + if (inflightBatch != null) { + AppendRowsRequest request = inflightBatch.getMergedRequest(); + messagesWaiter.waitOnElementCount(); + messagesWaiter.waitOnSizeLimit(inflightBatch.getByteSize()); + responseObserver.addInflightBatch(inflightBatch); + clientStream.send(request); + + synchronized (messagesWaiter) { + messagesWaiter.incrementPendingCount(1); + messagesWaiter.incrementPendingSize(inflightBatch.getByteSize()); + } + } + } + + /** Close the stream writer. Shut down all resources. */ + @Override + public void close() { + shutdown(); + // There is some problem waiting for resource to shutdown. So comment this statement out since + // it will cause a minute hang. + // awaitTermination(1, TimeUnit.MINUTES); + } + + // The batch of messages that is being sent/processed. + private static final class InflightBatch { + // List of requests that is going to be batched. + final List inflightRequests; + // A list tracks expected offset for each AppendRequest. Used to reconstruct the Response + // future. + final ArrayList offsetList; + final long creationTime; + int attempt; + int batchSizeBytes; + long expectedOffset; + Boolean attachSchema; + + InflightBatch( + List inflightRequests, + int batchSizeBytes, + Boolean attachSchema) { + this.inflightRequests = inflightRequests; + this.offsetList = new ArrayList(inflightRequests.size()); + for (AppendRequestAndFutureResponse request : inflightRequests) { + if (request.message.getOffset().getValue() > 0) { + offsetList.add(new Long(request.message.getOffset().getValue())); + } else { + offsetList.add(new Long(-1)); + } + } + this.expectedOffset = offsetList.get(0).longValue(); + attempt = 1; + creationTime = System.currentTimeMillis(); + this.batchSizeBytes = batchSizeBytes; + this.attachSchema = attachSchema; + } + + int count() { + return inflightRequests.size(); + } + + int getByteSize() { + return this.batchSizeBytes; + } + + long getExpectedOffset() { + return expectedOffset; + } + + private AppendRowsRequest getMergedRequest() throws IllegalStateException { + if (inflightRequests.size() == 0) { + throw new IllegalStateException("Unexpected empty message batch"); + } + ProtoBufProto.ProtoRows.Builder rowsBuilder = + inflightRequests.get(0).message.getProtoRows().getRows().toBuilder(); + for (int i = 1; i < inflightRequests.size(); i++) { + rowsBuilder.addAllSerializedRows( + inflightRequests.get(i).message.getProtoRows().getRows().getSerializedRowsList()); + } + AppendRowsRequest.ProtoData.Builder data = + inflightRequests.get(0).message.getProtoRows().toBuilder().setRows(rowsBuilder.build()); + if (!attachSchema) { + data.clearWriterSchema(); + } else { + if (!data.hasWriterSchema()) { + throw new IllegalStateException( + "The first message on the connection must have writer schema set"); + } + } + return inflightRequests.get(0).message.toBuilder().setProtoRows(data.build()).build(); + } + + private void onFailure(Throwable t) { + for (AppendRequestAndFutureResponse request : inflightRequests) { + request.appendResult.setException(t); + } + } + + // Disassemble the batched response and sets the furture on individual request. + private void onSuccess(AppendRowsResponse response) { + for (int i = 0; i < inflightRequests.size(); i++) { + AppendRowsResponse.Builder singleResponse = response.toBuilder(); + if (offsetList.get(i) > 0) { + singleResponse.setOffset(offsetList.get(i)); + } else { + long actualOffset = response.getOffset(); + for (int j = 0; j < i; j++) { + actualOffset += + inflightRequests.get(j).message.getProtoRows().getRows().getSerializedRowsCount(); + } + singleResponse.setOffset(actualOffset); + } + inflightRequests.get(i).appendResult.set(singleResponse.build()); + } + } + } + + // Class that wraps AppendRowsRequest and its cooresponding Response future. + private static final class AppendRequestAndFutureResponse { + final SettableApiFuture appendResult; + final AppendRowsRequest message; + final int messageSize; + + AppendRequestAndFutureResponse(AppendRowsRequest message) { + this.appendResult = SettableApiFuture.create(); + this.message = message; + this.messageSize = message.getProtoRows().getSerializedSize(); + if (this.messageSize > getApiMaxRequestBytes()) { + throw new StatusRuntimeException( + Status.fromCode(Status.Code.FAILED_PRECONDITION) + .withDescription("Message exceeded max size limit: " + getApiMaxRequestBytes())); + } + } + } + + /** The batching settings configured on this {@code StreamWriter}. */ + public BatchingSettings getBatchingSettings() { + return batchingSettings; + } + + /** The retry settings configured on this {@code StreamWriter}. */ + public RetrySettings getRetrySettings() { + return retrySettings; + } + + /** + * Schedules immediate flush of any outstanding messages and waits until all are processed. + * + *

Sends remaining outstanding messages and prevents future calls to publish. This method + * should be invoked prior to deleting the {@link WriteStream} object in order to ensure that no + * pending messages are lost. + */ + public void shutdown() { + Preconditions.checkState( + !shutdown.getAndSet(true), "Cannot shut down a writer already shut-down."); + if (currentAlarmFuture != null && activeAlarm.getAndSet(false)) { + currentAlarmFuture.cancel(false); + } + writeAllOutstanding(); + messagesWaiter.waitComplete(); + backgroundResources.shutdown(); + } + + /** + * Wait for all work has completed execution after a {@link #shutdown()} request, or the timeout + * occurs, or the current thread is interrupted. + * + *

Call this method to make sure all resources are freed properly. + */ + public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException { + return backgroundResources.awaitTermination(duration, unit); + } + + /** + * Constructs a new {@link Builder} using the given topic. + * + *

Example of creating a {@code WriteStream}. + * + *

{@code
+   * String table = "projects/my_project/datasets/my_dataset/tables/my_table";
+   * String stream;
+   * try (BigQueryWriteClient bigqueryWriteClient = BigQueryWriteClient.create()) {
+   *     CreateWriteStreamRequest request = CreateWriteStreamRequest.newBuilder().setParent(table).build();
+   *     WriteStream response = bigQueryWriteClient.createWriteStream(request);
+   *     stream = response.getName();
+   * }
+   * WriteStream writer = WriteStream.newBuilder(stream).build();
+   * try {
+   *   // ...
+   * } finally {
+   *   // When finished with the writer, make sure to shutdown to free up resources.
+   *   writer.shutdown();
+   *   writer.awaitTermination(1, TimeUnit.MINUTES);
+   * }
+   * }
+ */ + public static Builder newBuilder(String streamName) { + return new Builder(streamName); + } + + /** A builder of {@link Publisher}s. */ + public static final class Builder { + static final Duration MIN_TOTAL_TIMEOUT = Duration.ofSeconds(10); + static final Duration MIN_RPC_TIMEOUT = Duration.ofMillis(10); + + // Meaningful defaults. + static final long DEFAULT_ELEMENT_COUNT_THRESHOLD = 100L; + static final long DEFAULT_REQUEST_BYTES_THRESHOLD = 100 * 1024L; // 100 kB + static final Duration DEFAULT_DELAY_THRESHOLD = Duration.ofMillis(1); + static final FlowControlSettings DEFAULT_FLOW_CONTROL_SETTINGS = + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) + .setMaxOutstandingElementCount(1000L) + .setMaxOutstandingRequestBytes(100 * 1024 * 1024L) // 100 Mb + .build(); + public static final BatchingSettings DEFAULT_BATCHING_SETTINGS = + BatchingSettings.newBuilder() + .setDelayThreshold(DEFAULT_DELAY_THRESHOLD) + .setRequestByteThreshold(DEFAULT_REQUEST_BYTES_THRESHOLD) + .setElementCountThreshold(DEFAULT_ELEMENT_COUNT_THRESHOLD) + .setFlowControlSettings(DEFAULT_FLOW_CONTROL_SETTINGS) + .build(); + public static final RetrySettings DEFAULT_RETRY_SETTINGS = + RetrySettings.newBuilder() + .setMaxRetryDelay(Duration.ofSeconds(60)) + .setInitialRetryDelay(Duration.ofMillis(100)) + .setMaxAttempts(3) + .build(); + static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false; + private static final int THREADS_PER_CPU = 5; + static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = + InstantiatingExecutorProvider.newBuilder() + .setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors()) + .build(); + + private String streamName; + private String endpoint = BigQueryWriteSettings.getDefaultEndpoint(); + + // Batching options + BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS; + + RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS; + + private boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING; + + private TransportChannelProvider channelProvider = + BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build(); + + private HeaderProvider headerProvider = new NoHeaderProvider(); + private HeaderProvider internalHeaderProvider = + BigQueryWriteSettings.defaultApiClientHeaderProviderBuilder().build(); + ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER; + private CredentialsProvider credentialsProvider = + BigQueryWriteSettings.defaultCredentialsProviderBuilder().build(); + + private Builder(String stream) { + this.streamName = Preconditions.checkNotNull(stream); + } + + /** + * {@code ChannelProvider} to use to create Channels, which must point at Cloud BigQuery Storage + * API endpoint. + * + *

For performance, this client benefits from having multiple underlying connections. See + * {@link com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder#setPoolSize(int)}. + */ + public Builder setChannelProvider(TransportChannelProvider channelProvider) { + this.channelProvider = Preconditions.checkNotNull(channelProvider); + return this; + } + + /** {@code CredentialsProvider} to use to create Credentials to authenticate calls. */ + public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { + this.credentialsProvider = Preconditions.checkNotNull(credentialsProvider); + return this; + } + + /** + * Sets the {@code BatchSettings} on the writer. + * + * @param batchingSettings + * @return + */ + public Builder setBatchingSettings(BatchingSettings batchingSettings) { + Preconditions.checkNotNull(batchingSettings); + + BatchingSettings.Builder builder = batchingSettings.toBuilder(); + Preconditions.checkNotNull(batchingSettings.getElementCountThreshold()); + Preconditions.checkArgument(batchingSettings.getElementCountThreshold() > 0); + Preconditions.checkNotNull(batchingSettings.getRequestByteThreshold()); + Preconditions.checkArgument(batchingSettings.getRequestByteThreshold() > 0); + if (batchingSettings.getRequestByteThreshold() > getApiMaxRequestBytes()) { + builder.setRequestByteThreshold(getApiMaxRequestBytes()); + } + Preconditions.checkNotNull(batchingSettings.getDelayThreshold()); + Preconditions.checkArgument(batchingSettings.getDelayThreshold().toMillis() > 0); + if (batchingSettings.getFlowControlSettings() == null) { + builder.setFlowControlSettings(DEFAULT_FLOW_CONTROL_SETTINGS); + } else { + + if (batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount() == null) { + builder.setFlowControlSettings( + batchingSettings + .getFlowControlSettings() + .toBuilder() + .setMaxOutstandingElementCount( + DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingElementCount()) + .build()); + } else { + Preconditions.checkArgument( + batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount() > 0); + if (batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount() + > getApiMaxInflightRequests()) { + builder.setFlowControlSettings( + batchingSettings + .getFlowControlSettings() + .toBuilder() + .setMaxOutstandingElementCount(getApiMaxInflightRequests()) + .build()); + } + } + if (batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes() == null) { + builder.setFlowControlSettings( + batchingSettings + .getFlowControlSettings() + .toBuilder() + .setMaxOutstandingRequestBytes( + DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingRequestBytes()) + .build()); + } else { + Preconditions.checkArgument( + batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes() > 0); + } + if (batchingSettings.getFlowControlSettings().getLimitExceededBehavior() == null) { + builder.setFlowControlSettings( + batchingSettings + .getFlowControlSettings() + .toBuilder() + .setLimitExceededBehavior( + DEFAULT_FLOW_CONTROL_SETTINGS.getLimitExceededBehavior()) + .build()); + } else { + Preconditions.checkArgument( + batchingSettings.getFlowControlSettings().getLimitExceededBehavior() + != FlowController.LimitExceededBehavior.Ignore); + } + } + this.batchingSettings = builder.build(); + return this; + } + + /** + * Sets the {@code RetrySettings} on the writer. + * + * @param retrySettings + * @return + */ + public Builder setRetrySettings(RetrySettings retrySettings) { + Preconditions.checkNotNull(retrySettings); + Preconditions.checkArgument( + retrySettings.getTotalTimeout().compareTo(MIN_TOTAL_TIMEOUT) >= 0); + Preconditions.checkArgument( + retrySettings.getInitialRpcTimeout().compareTo(MIN_RPC_TIMEOUT) >= 0); + this.retrySettings = retrySettings; + return this; + } + + /** Gives the ability to set a custom executor to be used by the library. */ + public Builder setExecutorProvider(ExecutorProvider executorProvider) { + this.executorProvider = Preconditions.checkNotNull(executorProvider); + return this; + } + + /** Gives the ability to override the gRPC endpoint. */ + public Builder setEndpoint(String endpoint) { + this.endpoint = endpoint; + return this; + } + + /** Builds the {@code StreamWriter}. */ + public StreamWriter build() throws IOException { + return new StreamWriter(this); + } + } + + private static final class AppendResponseObserver + implements ResponseObserver { + private Queue inflightBatches = new LinkedList(); + private StreamWriter streamWriter; + + public void addInflightBatch(InflightBatch batch) { + synchronized (this.inflightBatches) { + this.inflightBatches.add(batch); + } + } + + public AppendResponseObserver(StreamWriter streamWriter) { + this.streamWriter = streamWriter; + } + + private boolean isRecoverableError(Throwable t) { + Status status = Status.fromThrowable(t); + return status.getCode() == Status.Code.UNAVAILABLE; + } + + @Override + public void onStart(StreamController controller) { + // no-op + } + + private void abortInflightRequests(Throwable t) { + synchronized (this.inflightBatches) { + while (!this.inflightBatches.isEmpty()) { + this.inflightBatches + .poll() + .onFailure( + new AbortedException( + "Request aborted due to previous failures", + t, + GrpcStatusCode.of(Status.Code.ABORTED), + true)); + } + } + } + + @Override + public void onResponse(AppendRowsResponse response) { + InflightBatch inflightBatch = null; + synchronized (this.inflightBatches) { + inflightBatch = this.inflightBatches.poll(); + } + try { + streamWriter.currentRetries = 0; + if (response == null) { + inflightBatch.onFailure(new IllegalStateException("Response is null")); + } + // TODO: Deal with in stream errors. + if (response.hasError()) { + StatusRuntimeException exception = + new StatusRuntimeException( + Status.fromCodeValue(response.getError().getCode()) + .withDescription(response.getError().getMessage())); + inflightBatch.onFailure(exception); + } + if (inflightBatch.getExpectedOffset() > 0 + && response.getOffset() != inflightBatch.getExpectedOffset()) { + IllegalStateException exception = + new IllegalStateException( + String.format( + "The append result offset %s does not match " + "the expected offset %s.", + response.getOffset(), inflightBatch.getExpectedOffset())); + inflightBatch.onFailure(exception); + abortInflightRequests(exception); + } else { + inflightBatch.onSuccess(response); + } + } finally { + synchronized (streamWriter.messagesWaiter) { + streamWriter.messagesWaiter.incrementPendingCount(-1); + streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize()); + streamWriter.messagesWaiter.notifyAll(); + } + } + } + + @Override + public void onComplete() { + LOG.info("OnComplete called"); + } + + @Override + public void onError(Throwable t) { + LOG.info("OnError called"); + if (streamWriter.shutdown.get()) { + return; + } + InflightBatch inflightBatch = null; + synchronized (this.inflightBatches) { + if (inflightBatches.isEmpty()) { + // The batches could have been aborted. + return; + } + inflightBatch = this.inflightBatches.poll(); + } + if (isRecoverableError(t)) { + try { + if (streamWriter.currentRetries < streamWriter.getRetrySettings().getMaxAttempts() + && !streamWriter.shutdown.get()) { + streamWriter.refreshAppend(); + // Currently there is a bug that it took reconnected stream 5 seconds to pick up + // stream count. So wait at least 5 seconds before sending a new request. + Thread.sleep( + Math.min( + streamWriter.getRetrySettings().getInitialRetryDelay().toMillis(), + Duration.ofSeconds(5).toMillis())); + streamWriter.writeBatch(inflightBatch); + synchronized (streamWriter.currentRetries) { + streamWriter.currentRetries++; + } + } else { + synchronized (streamWriter.currentRetries) { + streamWriter.currentRetries = 0; + } + inflightBatch.onFailure(t); + } + } catch (IOException | InterruptedException e) { + streamWriter.currentRetries = 0; + inflightBatch.onFailure(e); + synchronized (streamWriter.messagesWaiter) { + streamWriter.messagesWaiter.incrementPendingCount(-1); + streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize()); + streamWriter.messagesWaiter.notifyAll(); + } + } + } else { + LOG.info("Set error response"); + try { + synchronized (streamWriter.currentRetries) { + streamWriter.currentRetries = 0; + } + inflightBatch.onFailure(t); + try { + // Establish a new connection. + streamWriter.refreshAppend(); + } catch (IOException e) { + LOG.info("Failed to establish a new connection, shutdown writer"); + streamWriter.shutdown(); + } + } finally { + synchronized (streamWriter.messagesWaiter) { + streamWriter.messagesWaiter.incrementPendingCount(-1); + streamWriter.messagesWaiter.incrementPendingSize(0 - inflightBatch.getByteSize()); + streamWriter.messagesWaiter.notifyAll(); + } + } + } + } + }; + + // This class controls how many messages are going to be sent out in a batch. + private static class MessagesBatch { + private List messages; + private int batchedBytes; + private final BatchingSettings batchingSettings; + private Boolean attachSchema = true; + + private MessagesBatch(BatchingSettings batchingSettings) { + this.batchingSettings = batchingSettings; + reset(); + } + + // Get all the messages out in a batch. + private InflightBatch popBatch() { + InflightBatch batch = new InflightBatch(messages, batchedBytes, this.attachSchema); + this.attachSchema = false; + reset(); + return batch; + } + + private void reset() { + messages = new LinkedList<>(); + batchedBytes = 0; + } + + private void resetAttachSchema() { + attachSchema = true; + } + + private boolean isEmpty() { + return messages.isEmpty(); + } + + private int getBatchedBytes() { + return batchedBytes; + } + + private int getMessagesCount() { + return messages.size(); + } + + private boolean hasBatchingBytes() { + return getMaxBatchBytes() > 0; + } + + private long getMaxBatchBytes() { + return batchingSettings.getRequestByteThreshold(); + } + + // The message batch returned could contain the previous batch of messages plus the current + // message. + // if the message is too large. + private List add(AppendRequestAndFutureResponse outstandingAppend) { + List batchesToSend = new ArrayList<>(); + // Check if the next message makes the current batch exceed the max batch byte size. + if (!isEmpty() + && hasBatchingBytes() + && getBatchedBytes() + outstandingAppend.messageSize >= getMaxBatchBytes()) { + batchesToSend.add(popBatch()); + } + + messages.add(outstandingAppend); + batchedBytes += outstandingAppend.messageSize; + + // Border case: If the message to send is greater or equals to the max batch size then send it + // immediately. + // Alternatively if after adding the message we have reached the batch max messages then we + // have a batch to send. + if ((hasBatchingBytes() && outstandingAppend.messageSize >= getMaxBatchBytes()) + || getMessagesCount() == batchingSettings.getElementCountThreshold()) { + batchesToSend.add(popBatch()); + } + + return batchesToSend; + } + } +} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java new file mode 100644 index 0000000000..0e15d6c726 --- /dev/null +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/Waiter.java @@ -0,0 +1,134 @@ +/* + * Copyright 2016 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1alpha2; + +import com.google.api.core.InternalApi; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.grpc.GrpcStatusCode; +import com.google.api.gax.rpc.UnimplementedException; +import io.grpc.Status; +import java.util.logging.Logger; + +/** + * A barrier kind of object that helps keep track of pending actions and synchronously wait until + * all have completed. + */ +class Waiter { + private static final Logger LOG = Logger.getLogger(Waiter.class.getName()); + + private int pendingCount; + private int pendingSize; + FlowControlSettings flowControlSettings; + + Waiter(FlowControlSettings flowControlSettings) { + pendingCount = 0; + pendingSize = 0; + this.flowControlSettings = flowControlSettings; + } + + public synchronized void incrementPendingCount(int delta) { + this.pendingCount += delta; + if (pendingCount == 0) { + notifyAll(); + } + } + + public synchronized void incrementPendingSize(int delta) { + this.pendingSize += delta; + } + + private void wait(String message) { + boolean interrupted = false; + try { + LOG.fine("Wait on: " + message); + wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private void handleOverLimit(String message) { + boolean interrupted = false; + switch (this.flowControlSettings.getLimitExceededBehavior()) { + case Block: + wait(message); + break; + case ThrowException: + throw new IllegalStateException("FlowControl limit exceeded: " + message); + case Ignore: + return; + default: + throw new UnimplementedException( + "Unknown behavior setting: " + + this.flowControlSettings.getLimitExceededBehavior().toString(), + null, + GrpcStatusCode.of(Status.Code.UNIMPLEMENTED), + false); + } + } + + public synchronized void waitOnElementCount() { + LOG.finer( + "Waiting on element count " + + this.pendingCount + + " " + + this.flowControlSettings.getMaxOutstandingElementCount()); + while (this.pendingCount >= this.flowControlSettings.getMaxOutstandingElementCount()) { + handleOverLimit("Element count"); + } + } + + public synchronized void waitOnSizeLimit(int incomingSize) { + LOG.finer( + "Waiting on size limit " + + (this.pendingSize + incomingSize) + + " " + + this.flowControlSettings.getMaxOutstandingRequestBytes()); + while (this.pendingSize + incomingSize + >= this.flowControlSettings.getMaxOutstandingRequestBytes()) { + handleOverLimit("Byte size"); + } + } + + public synchronized void waitComplete() { + boolean interrupted = false; + try { + while (pendingCount > 0) { + try { + wait(); + } catch (InterruptedException e) { + // Ignored, uninterruptibly. + interrupted = true; + } + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + } + + @InternalApi + public int pendingCount() { + return pendingCount; + } + + @InternalApi + public int pendingSize() { + return pendingSize; + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java new file mode 100644 index 0000000000..5298e80ae4 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWrite.java @@ -0,0 +1,78 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.bigquery.storage.v1alpha2; + +import com.google.api.gax.grpc.testing.MockGrpcService; +import com.google.cloud.bigquery.storage.v1alpha2.Storage.*; +import com.google.protobuf.AbstractMessage; +import io.grpc.ServerServiceDefinition; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import org.threeten.bp.Duration; + +/** + * A fake implementation of {@link MockGrpcService}, that can be used to test clients of a + * StreamWriter. It forwards calls to the real implementation (@link FakeBigQueryWriteImpl}. + */ +public class FakeBigQueryWrite implements MockGrpcService { + private final FakeBigQueryWriteImpl serviceImpl; + + public FakeBigQueryWrite() { + serviceImpl = new FakeBigQueryWriteImpl(); + } + + @Override + public List getRequests() { + return new LinkedList(serviceImpl.getCapturedRequests()); + } + + public List getAppendRequests() { + return serviceImpl.getCapturedRequests(); + } + + @Override + public void addResponse(AbstractMessage response) { + if (response instanceof AppendRowsResponse) { + serviceImpl.addResponse((AppendRowsResponse) response); + } else { + throw new IllegalStateException("Unsupported service"); + } + } + + @Override + public void addException(Exception exception) { + serviceImpl.addConnectionError(exception); + } + + @Override + public ServerServiceDefinition getServiceDefinition() { + return serviceImpl.bindService(); + } + + @Override + public void reset() { + serviceImpl.reset(); + } + + public void setResponseDelay(Duration delay) { + serviceImpl.setResponseDelay(delay); + } + + public void setExecutor(ScheduledExecutorService executor) { + serviceImpl.setExecutor(executor); + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java new file mode 100644 index 0000000000..aa3f7e734d --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeBigQueryWriteImpl.java @@ -0,0 +1,165 @@ +/* + * Copyright 2016 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1alpha2; + +import com.google.cloud.bigquery.storage.v1alpha2.Storage.*; +import com.google.common.base.Optional; +import io.grpc.stub.StreamObserver; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; +import org.threeten.bp.Duration; + +/** + * A fake implementation of {@link BigQueryWriteImplBase} that can acts like server in StreamWriter + * unit testing. + */ +class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase { + private static final Logger LOG = Logger.getLogger(FakeBigQueryWriteImpl.class.getName()); + + private final LinkedBlockingQueue requests = new LinkedBlockingQueue<>(); + private final LinkedBlockingQueue responses = new LinkedBlockingQueue<>(); + private final AtomicInteger nextMessageId = new AtomicInteger(1); + private boolean autoPublishResponse; + private ScheduledExecutorService executor = null; + private Duration responseDelay = Duration.ZERO; + + /** Class used to save the state of a possible response. */ + private static class Response { + Optional appendResponse; + Optional error; + + public Response(AppendRowsResponse appendResponse) { + this.appendResponse = Optional.of(appendResponse); + this.error = Optional.absent(); + } + + public Response(Throwable exception) { + this.appendResponse = Optional.absent(); + this.error = Optional.of(exception); + } + + public AppendRowsResponse getResponse() { + return appendResponse.get(); + } + + public Throwable getError() { + return error.get(); + } + + boolean isError() { + return error.isPresent(); + } + + @Override + public String toString() { + if (isError()) { + return error.get().toString(); + } + return appendResponse.get().toString(); + } + } + + @Override + public StreamObserver appendRows( + final StreamObserver responseObserver) { + StreamObserver requestObserver = + new StreamObserver() { + @Override + public void onNext(AppendRowsRequest value) { + LOG.info("Get request:" + value.toString()); + final Response response = responses.remove(); + requests.add(value); + if (responseDelay == Duration.ZERO) { + sendResponse(response, responseObserver); + } else { + final Response responseToSend = response; + LOG.info("Schedule a response to be sent at delay"); + executor.schedule( + new Runnable() { + @Override + public void run() { + sendResponse(responseToSend, responseObserver); + } + }, + responseDelay.toMillis(), + TimeUnit.MILLISECONDS); + } + } + + @Override + public void onError(Throwable t) { + responseObserver.onError(t); + } + + @Override + public void onCompleted() { + responseObserver.onCompleted(); + } + }; + return requestObserver; + } + + private void sendResponse( + Response response, StreamObserver responseObserver) { + LOG.info("Sending response: " + response.toString()); + if (response.isError()) { + responseObserver.onError(response.getError()); + } else { + responseObserver.onNext(response.getResponse()); + } + } + + /** Set an executor to use to delay publish responses. */ + public FakeBigQueryWriteImpl setExecutor(ScheduledExecutorService executor) { + this.executor = executor; + return this; + } + + /** Set an amount of time by which to delay publish responses. */ + public FakeBigQueryWriteImpl setResponseDelay(Duration responseDelay) { + this.responseDelay = responseDelay; + return this; + } + + public FakeBigQueryWriteImpl addResponse(AppendRowsResponse appendRowsResponse) { + responses.add(new Response(appendRowsResponse)); + return this; + } + + public FakeBigQueryWriteImpl addResponse(AppendRowsResponse.Builder appendResponseBuilder) { + return addResponse(appendResponseBuilder.build()); + } + + public FakeBigQueryWriteImpl addConnectionError(Throwable error) { + responses.add(new Response(error)); + return this; + } + + public List getCapturedRequests() { + return new ArrayList(requests); + } + + public void reset() { + requests.clear(); + responses.clear(); + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeClock.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeClock.java new file mode 100644 index 0000000000..ee8ee3221b --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeClock.java @@ -0,0 +1,42 @@ +/* + * Copyright 2016 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1alpha2; + +import com.google.api.core.ApiClock; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** A Clock to help with testing time-based logic. */ +public class FakeClock implements ApiClock { + + private final AtomicLong millis = new AtomicLong(); + + // Advances the clock value by {@code time} in {@code timeUnit}. + public void advance(long time, TimeUnit timeUnit) { + millis.addAndGet(timeUnit.toMillis(time)); + } + + @Override + public long nanoTime() { + return millisTime() * 1000_000L; + } + + @Override + public long millisTime() { + return millis.get(); + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeScheduledExecutorService.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeScheduledExecutorService.java new file mode 100644 index 0000000000..8ee37cc0ba --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/FakeScheduledExecutorService.java @@ -0,0 +1,347 @@ +/* + * Copyright 2016 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1alpha2; + +import com.google.api.core.ApiClock; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.SettableFuture; +import java.util.ArrayList; +import java.util.Deque; +import java.util.LinkedList; +import java.util.List; +import java.util.PriorityQueue; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.Callable; +import java.util.concurrent.Delayed; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; +import org.threeten.bp.Duration; +import org.threeten.bp.Instant; + +/** + * Fake implementation of {@link ScheduledExecutorService} that allows tests control the reference + * time of the executor and decide when to execute any outstanding task. + */ +public class FakeScheduledExecutorService extends AbstractExecutorService + implements ScheduledExecutorService { + private static final Logger LOG = Logger.getLogger(FakeScheduledExecutorService.class.getName()); + + private final AtomicBoolean shutdown = new AtomicBoolean(false); + private final PriorityQueue> pendingCallables = new PriorityQueue<>(); + private final FakeClock clock = new FakeClock(); + private final Deque expectedWorkQueue = new LinkedList<>(); + + public ApiClock getClock() { + return clock; + } + + @Override + public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { + return schedulePendingCallable( + new PendingCallable<>( + Duration.ofMillis(unit.toMillis(delay)), command, PendingCallableType.NORMAL)); + } + + @Override + public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { + return schedulePendingCallable( + new PendingCallable<>( + Duration.ofMillis(unit.toMillis(delay)), callable, PendingCallableType.NORMAL)); + } + + @Override + public ScheduledFuture scheduleAtFixedRate( + Runnable command, long initialDelay, long period, TimeUnit unit) { + return schedulePendingCallable( + new PendingCallable<>( + Duration.ofMillis(unit.toMillis(initialDelay)), + command, + PendingCallableType.FIXED_RATE)); + } + + @Override + public ScheduledFuture scheduleWithFixedDelay( + Runnable command, long initialDelay, long delay, TimeUnit unit) { + return schedulePendingCallable( + new PendingCallable<>( + Duration.ofMillis(unit.toMillis(initialDelay)), + command, + PendingCallableType.FIXED_DELAY)); + } + + /** + * This will advance the reference time of the executor and execute (in the same thread) any + * outstanding callable which execution time has passed. + */ + public void advanceTime(Duration toAdvance) { + LOG.info( + "Advance to time to:" + + Instant.ofEpochMilli(clock.millisTime() + toAdvance.toMillis()).toString()); + clock.advance(toAdvance.toMillis(), TimeUnit.MILLISECONDS); + work(); + } + + private void work() { + for (; ; ) { + PendingCallable callable = null; + Instant cmpTime = Instant.ofEpochMilli(clock.millisTime()); + if (!pendingCallables.isEmpty()) { + LOG.info( + "Going to call: Current time: " + + cmpTime.toString() + + " Scheduled time: " + + pendingCallables.peek().getScheduledTime().toString() + + " Creation time:" + + pendingCallables.peek().getCreationTime().toString()); + } + synchronized (pendingCallables) { + if (pendingCallables.isEmpty() + || pendingCallables.peek().getScheduledTime().isAfter(cmpTime)) { + break; + } + callable = pendingCallables.poll(); + } + if (callable != null) { + try { + callable.call(); + } catch (Exception e) { + // We ignore any callable exception, which should be set to the future but not relevant to + // advanceTime. + } + } + } + + synchronized (pendingCallables) { + if (shutdown.get() && pendingCallables.isEmpty()) { + pendingCallables.notifyAll(); + } + } + } + + @Override + public void shutdown() { + if (shutdown.getAndSet(true)) { + throw new IllegalStateException("This executor has been shutdown already"); + } + } + + @Override + public List shutdownNow() { + if (shutdown.getAndSet(true)) { + throw new IllegalStateException("This executor has been shutdown already"); + } + List pending = new ArrayList<>(); + for (final PendingCallable pendingCallable : pendingCallables) { + pending.add( + new Runnable() { + @Override + public void run() { + pendingCallable.call(); + } + }); + } + synchronized (pendingCallables) { + pendingCallables.notifyAll(); + pendingCallables.clear(); + } + return pending; + } + + @Override + public boolean isShutdown() { + return shutdown.get(); + } + + @Override + public boolean isTerminated() { + return pendingCallables.isEmpty(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + synchronized (pendingCallables) { + if (pendingCallables.isEmpty()) { + return true; + } + LOG.info("Wating on pending callables" + pendingCallables.size()); + pendingCallables.wait(unit.toMillis(timeout)); + return pendingCallables.isEmpty(); + } + } + + @Override + public void execute(Runnable command) { + if (shutdown.get()) { + throw new IllegalStateException("This executor has been shutdown"); + } + command.run(); + } + + ScheduledFuture schedulePendingCallable(PendingCallable callable) { + LOG.info( + "Schedule pending callable called " + callable.delay + " " + callable.getScheduledTime()); + if (shutdown.get()) { + throw new IllegalStateException("This executor has been shutdown"); + } + synchronized (pendingCallables) { + pendingCallables.add(callable); + } + work(); + synchronized (expectedWorkQueue) { + // We compare by the callable delay in order decide when to remove expectations from the + // expected work queue, i.e. only the expected work that matches the delay of the scheduled + // callable is removed from the queue. + if (!expectedWorkQueue.isEmpty() && expectedWorkQueue.peek().equals(callable.delay)) { + expectedWorkQueue.poll(); + } + expectedWorkQueue.notifyAll(); + } + + return callable.getScheduledFuture(); + } + + enum PendingCallableType { + NORMAL, + FIXED_RATE, + FIXED_DELAY + } + + /** Class that saves the state of an scheduled pending callable. */ + class PendingCallable implements Comparable> { + Instant creationTime = Instant.ofEpochMilli(clock.millisTime()); + Duration delay; + Callable pendingCallable; + SettableFuture future = SettableFuture.create(); + AtomicBoolean cancelled = new AtomicBoolean(false); + AtomicBoolean done = new AtomicBoolean(false); + PendingCallableType type; + + PendingCallable(Duration delay, final Runnable runnable, PendingCallableType type) { + pendingCallable = + new Callable() { + @Override + public T call() { + runnable.run(); + return null; + } + }; + this.type = type; + this.delay = delay; + } + + PendingCallable(Duration delay, Callable callable, PendingCallableType type) { + pendingCallable = callable; + this.type = type; + this.delay = delay; + } + + private Instant getScheduledTime() { + return creationTime.plus(delay); + } + + private Instant getCreationTime() { + return creationTime; + } + + ScheduledFuture getScheduledFuture() { + return new ScheduledFuture() { + @Override + public long getDelay(TimeUnit unit) { + return unit.convert( + getScheduledTime().toEpochMilli() - clock.millisTime(), TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(Delayed o) { + return Ints.saturatedCast( + getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + synchronized (this) { + cancelled.set(true); + return !done.get(); + } + } + + @Override + public boolean isCancelled() { + return cancelled.get(); + } + + @Override + public boolean isDone() { + return done.get(); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return future.get(); + } + + @Override + public T get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return future.get(timeout, unit); + } + }; + } + + T call() { + T result = null; + synchronized (this) { + if (cancelled.get()) { + return null; + } + try { + result = pendingCallable.call(); + future.set(result); + } catch (Exception e) { + future.setException(e); + } finally { + switch (type) { + case NORMAL: + done.set(true); + break; + case FIXED_DELAY: + this.creationTime = Instant.ofEpochMilli(clock.millisTime()); + schedulePendingCallable(this); + break; + case FIXED_RATE: + this.creationTime = this.creationTime.plus(delay); + schedulePendingCallable(this); + break; + default: + // Nothing to do + } + } + } + return result; + } + + @Override + public int compareTo(PendingCallable other) { + return getScheduledTime().compareTo(other.getScheduledTime()); + } + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverterTest.java index de1e7f4888..1cf7263628 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/ProtoSchemaConverterTest.java @@ -26,7 +26,7 @@ public void convertSimple() { ProtoBufProto.ProtoSchema protoSchema = ProtoSchemaConverter.convert(testProto.getDescriptorForType()); Assert.assertEquals( - "name: \"__S0\"\n" + "name: \"__ROOT__\"\n" + "field {\n" + " name: \"int32_value\"\n" + " number: 1\n" @@ -102,32 +102,32 @@ public void convertNested() { ProtoBufProto.ProtoSchema protoSchema = ProtoSchemaConverter.convert(testProto.getDescriptorForType()); Assert.assertEquals( - "name: \"__S0\"\n" + "name: \"__ROOT__\"\n" + "field {\n" + " name: \"nested_repeated_type\"\n" + " number: 1\n" + " label: LABEL_REPEATED\n" + " type: TYPE_MESSAGE\n" - + " type_name: \"__S1\"\n" + + " type_name: \"__S2\"\n" + "}\n" + "field {\n" + " name: \"inner_type\"\n" + " number: 2\n" + " label: LABEL_OPTIONAL\n" + " type: TYPE_MESSAGE\n" - + " type_name: \"__S3\"\n" + + " type_name: \"__S4\"\n" + "}\n" + "nested_type {\n" - + " name: \"__S1\"\n" + + " name: \"__S2\"\n" + " field {\n" + " name: \"inner_type\"\n" + " number: 1\n" + " label: LABEL_REPEATED\n" + " type: TYPE_MESSAGE\n" - + " type_name: \"__S2\"\n" + + " type_name: \"__S3\"\n" + " }\n" + " nested_type {\n" - + " name: \"__S2\"\n" + + " name: \"__S3\"\n" + " field {\n" + " name: \"value\"\n" + " number: 1\n" @@ -137,7 +137,7 @@ public void convertNested() { + " }\n" + "}\n" + "nested_type {\n" - + " name: \"__S3\"\n" + + " name: \"__S4\"\n" + " field {\n" + " name: \"value\"\n" + " number: 1\n" diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java new file mode 100644 index 0000000000..324599a05a --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/StreamWriterTest.java @@ -0,0 +1,802 @@ +/* + * Copyright 2016 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1alpha2; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.api.core.ApiFuture; +import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.batching.FlowControlSettings; +import com.google.api.gax.batching.FlowController; +import com.google.api.gax.core.ExecutorProvider; +import com.google.api.gax.core.FixedExecutorProvider; +import com.google.api.gax.core.InstantiatingExecutorProvider; +import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.testing.LocalChannelProvider; +import com.google.api.gax.grpc.testing.MockGrpcService; +import com.google.api.gax.grpc.testing.MockServiceHelper; +import com.google.api.gax.rpc.DataLossException; +import com.google.cloud.bigquery.storage.test.Test.FooType; +import com.google.cloud.bigquery.storage.v1alpha2.Storage.*; +import com.google.protobuf.DescriptorProtos; +import com.google.protobuf.Int64Value; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import java.util.Arrays; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.logging.Logger; +import org.apache.commons.lang3.StringUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.threeten.bp.Duration; + +@RunWith(JUnit4.class) +public class StreamWriterTest { + private static final Logger LOG = Logger.getLogger(StreamWriterTest.class.getName()); + private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s"; + private static final ExecutorProvider SINGLE_THREAD_EXECUTOR = + InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build(); + private static LocalChannelProvider channelProvider; + private FakeScheduledExecutorService fakeExecutor; + private FakeBigQueryWrite testBigQueryWrite; + private static MockServiceHelper serviceHelper; + + @Before + public void setUp() throws Exception { + testBigQueryWrite = new FakeBigQueryWrite(); + serviceHelper = + new MockServiceHelper( + UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite)); + serviceHelper.start(); + channelProvider = serviceHelper.createChannelProvider(); + fakeExecutor = new FakeScheduledExecutorService(); + testBigQueryWrite.setExecutor(fakeExecutor); + } + + @After + public void tearDown() throws Exception { + LOG.info("tearDown called"); + serviceHelper.stop(); + } + + private StreamWriter.Builder getTestStreamWriterBuilder() { + return StreamWriter.newBuilder(TEST_STREAM) + .setChannelProvider(channelProvider) + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setCredentialsProvider(NoCredentialsProvider.create()); + } + + private AppendRowsRequest createAppendRequest(String[] messages, long offset) { + AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); + AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder(); + dataBuilder.setWriterSchema( + ProtoBufProto.ProtoSchema.newBuilder() + .setProtoDescriptor( + DescriptorProtos.DescriptorProto.newBuilder() + .setName("Message") + .addField( + DescriptorProtos.FieldDescriptorProto.newBuilder() + .setName("foo") + .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING) + .setNumber(1) + .build()) + .build())); + ProtoBufProto.ProtoRows.Builder rows = ProtoBufProto.ProtoRows.newBuilder(); + for (String message : messages) { + FooType foo = FooType.newBuilder().setFoo(message).build(); + rows.addSerializedRows(foo.toByteString()); + } + if (offset > 0) { + requestBuilder.setOffset(Int64Value.of(offset)); + } + return requestBuilder + .setProtoRows(dataBuilder.setRows(rows.build()).build()) + .setWriteStream(TEST_STREAM) + .build(); + } + + private ApiFuture sendTestMessage(StreamWriter writer, String[] messages) { + return writer.append(createAppendRequest(messages, -1)); + } + + @Test + public void testAppendByDuration() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setDelayThreshold(Duration.ofSeconds(5)) + .setElementCountThreshold(10L) + .build()) + .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) + .build(); + + testBigQueryWrite.addResponse(Storage.AppendRowsResponse.newBuilder().setOffset(0).build()); + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + + assertFalse(appendFuture1.isDone()); + assertFalse(appendFuture2.isDone()); + fakeExecutor.advanceTime(Duration.ofSeconds(10)); + + assertEquals(0L, appendFuture1.get().getOffset()); + assertEquals(1L, appendFuture2.get().getOffset()); + + assertEquals(1, testBigQueryWrite.getAppendRequests().size()); + + assertEquals( + 2, + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); + writer.shutdown(); + } + + @Test + public void testAppendByNumBatchedMessages() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .setDelayThreshold(Duration.ofSeconds(100)) + .build()) + .build(); + + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); + + assertEquals(0L, appendFuture1.get().getOffset()); + assertEquals(1L, appendFuture2.get().getOffset()); + + assertFalse(appendFuture3.isDone()); + + ApiFuture appendFuture4 = sendTestMessage(writer, new String[] {"D"}); + + assertEquals(2L, appendFuture3.get().getOffset()); + assertEquals(3L, appendFuture4.get().getOffset()); + + assertEquals(2, testBigQueryWrite.getAppendRequests().size()); + assertEquals( + 2, + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); + assertEquals( + 2, + testBigQueryWrite + .getAppendRequests() + .get(1) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); + writer.shutdown(); + } + + @Test + public void testAppendByNumBytes() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + // Each message is 32 bytes, setting batch size to 70 bytes allows 2 messages. + .setRequestByteThreshold(70L) + .setDelayThreshold(Duration.ofSeconds(100000)) + .build()) + .build(); + + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3).build()); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); + + assertEquals(0L, appendFuture1.get().getOffset()); + assertEquals(1L, appendFuture2.get().getOffset()); + assertFalse(appendFuture3.isDone()); + + // This message is big enough to trigger send on the pervious message and itself. + ApiFuture appendFuture4 = + sendTestMessage(writer, new String[] {StringUtils.repeat('A', 100)}); + assertEquals(2L, appendFuture3.get().getOffset()); + assertEquals(3L, appendFuture4.get().getOffset()); + + assertEquals(3, testBigQueryWrite.getAppendRequests().size()); + + writer.shutdown(); + } + + @Test + public void testWriteByShutdown() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setDelayThreshold(Duration.ofSeconds(100)) + .setElementCountThreshold(10L) + .build()) + .build(); + + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build()); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + + // Note we are not advancing time or reaching the count threshold but messages should + // still get written by call to shutdown + + writer.shutdown(); + LOG.info("Wait for termination"); + writer.awaitTermination(10, TimeUnit.SECONDS); + + // Verify the appends completed + assertTrue(appendFuture1.isDone()); + assertTrue(appendFuture2.isDone()); + assertEquals(0L, appendFuture1.get().getOffset()); + assertEquals(1L, appendFuture2.get().getOffset()); + } + + @Test + public void testWriteMixedSizeAndDuration() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .setDelayThreshold(Duration.ofSeconds(5)) + .build()) + .build(); + + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2L).build()); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + + fakeExecutor.advanceTime(Duration.ofSeconds(2)); + assertFalse(appendFuture1.isDone()); + + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B", "C"}); + + // Write triggered by batch size + assertEquals(0L, appendFuture1.get().getOffset()); + assertEquals(1L, appendFuture2.get().getOffset()); + + ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"D"}); + + assertFalse(appendFuture3.isDone()); + + // Write triggered by time + fakeExecutor.advanceTime(Duration.ofSeconds(5)); + + assertEquals(2L, appendFuture3.get().getOffset()); + + assertEquals( + 3, + testBigQueryWrite + .getAppendRequests() + .get(0) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); + assertEquals( + 1, + testBigQueryWrite + .getAppendRequests() + .get(1) + .getProtoRows() + .getRows() + .getSerializedRowsCount()); + assertEquals( + false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); + writer.shutdown(); + } + + @Test + public void testFlowControlBehaviorBlock() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setFlowControlSettings( + StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS + .toBuilder() + .setMaxOutstandingRequestBytes(40L) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) + .build()) + .build()) + .build(); + + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(2L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(3L).build()); + testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10)); + + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + final StreamWriter writer1 = writer; + Runnable runnable = + new Runnable() { + @Override + public void run() { + ApiFuture appendFuture2 = + sendTestMessage(writer1, new String[] {"B"}); + } + }; + Thread t = new Thread(runnable); + t.start(); + assertEquals(true, t.isAlive()); + assertEquals(false, appendFuture1.isDone()); + // Wait is necessary for response to be scheduled before timer is advanced. + Thread.sleep(5000L); + fakeExecutor.advanceTime(Duration.ofSeconds(10)); + // The first requests gets back while the second one is blocked. + assertEquals(2L, appendFuture1.get().getOffset()); + Thread.sleep(5000L); + // Wait is necessary for response to be scheduled before timer is advanced. + fakeExecutor.advanceTime(Duration.ofSeconds(10)); + t.join(); + writer.shutdown(); + } + + @Test + public void testFlowControlBehaviorException() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setFlowControlSettings( + StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS + .toBuilder() + .setMaxOutstandingElementCount(1L) + .setLimitExceededBehavior( + FlowController.LimitExceededBehavior.ThrowException) + .build()) + .build()) + .build(); + + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(1L).build()); + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + try { + ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); + Assert.fail("This should fail"); + } catch (IllegalStateException e) { + assertEquals("FlowControl limit exceeded: Element count", e.getMessage()); + } + assertEquals(1L, appendFuture1.get().getOffset()); + writer.shutdown(); + } + + @Test + public void testStreamReconnection() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setDelayThreshold(Duration.ofSeconds(100000)) + .setElementCountThreshold(1L) + .build()) + .build(); + + // Case 1: Request succeeded after retry since the error is transient. + StatusRuntimeException transientError = new StatusRuntimeException(Status.UNAVAILABLE); + testBigQueryWrite.addException(transientError); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(0).build()); + ApiFuture future1 = sendTestMessage(writer, new String[] {"m1"}); + assertEquals(false, future1.isDone()); + // Retry is scheduled to be 5 seconds later. + assertEquals(0L, future1.get().getOffset()); + + LOG.info("======CASE II"); + // Case 2 : Request failed since the error is not transient. + StatusRuntimeException permanentError = new StatusRuntimeException(Status.INVALID_ARGUMENT); + testBigQueryWrite.addException(permanentError); + ApiFuture future2 = sendTestMessage(writer, new String[] {"m2"}); + try { + future2.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(permanentError.toString(), e.getCause().getCause().toString()); + } + + LOG.info("======CASE III"); + // Writer needs to be recreated since the previous error is not recoverable. + writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setDelayThreshold(Duration.ofSeconds(100000)) + .setElementCountThreshold(1L) + .build()) + .build(); + // Case 3: Failed after retried max retry times. + testBigQueryWrite.addException(transientError); + testBigQueryWrite.addException(transientError); + testBigQueryWrite.addException(transientError); + testBigQueryWrite.addException(transientError); + ApiFuture future3 = sendTestMessage(writer, new String[] {"m3"}); + try { + future3.get(); + Assert.fail("This should fail."); + } catch (ExecutionException e) { + assertEquals(transientError.toString(), e.getCause().getCause().toString()); + } + } + + @Test + public void testOffset() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(2L) + .build()) + .build(); + + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(10L).build()); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(13L).build()); + AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L); + ApiFuture appendFuture1 = writer.append(request1); + AppendRowsRequest request2 = createAppendRequest(new String[] {"B", "C"}, 11L); + ApiFuture appendFuture2 = writer.append(request2); + AppendRowsRequest request3 = createAppendRequest(new String[] {"E", "F"}, 13L); + ApiFuture appendFuture3 = writer.append(request3); + AppendRowsRequest request4 = createAppendRequest(new String[] {"G"}, 15L); + ApiFuture appendFuture4 = writer.append(request4); + assertEquals(10L, appendFuture1.get().getOffset()); + assertEquals(11L, appendFuture2.get().getOffset()); + assertEquals(13L, appendFuture3.get().getOffset()); + assertEquals(15L, appendFuture4.get().getOffset()); + writer.shutdown(); + } + + @Test + public void testOffsetMismatch() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .build()) + .build(); + + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setOffset(11L).build()); + AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L); + ApiFuture appendFuture1 = writer.append(request1); + try { + appendFuture1.get(); + fail("Should throw exception"); + } catch (Exception e) { + assertEquals( + "java.lang.IllegalStateException: The append result offset 11 does not match the expected offset 10.", + e.getCause().toString()); + } + writer.shutdown(); + } + + @Test + public void testErrorPropagation() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder() + .setExecutorProvider(SINGLE_THREAD_EXECUTOR) + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setDelayThreshold(Duration.ofSeconds(5)) + .build()) + .build(); + testBigQueryWrite.addException(Status.DATA_LOSS.asException()); + try { + sendTestMessage(writer, new String[] {"A"}).get(); + fail("should throw exception"); + } catch (ExecutionException e) { + assertThat(e.getCause()).isInstanceOf(DataLossException.class); + } + } + + @Test + public void testWriterGetters() throws Exception { + StreamWriter.Builder builder = StreamWriter.newBuilder(TEST_STREAM); + builder.setChannelProvider(channelProvider); + builder.setExecutorProvider(SINGLE_THREAD_EXECUTOR); + builder.setBatchingSettings( + BatchingSettings.newBuilder() + .setRequestByteThreshold(10L) + .setDelayThreshold(Duration.ofMillis(11)) + .setElementCountThreshold(12L) + .setFlowControlSettings( + FlowControlSettings.newBuilder() + .setMaxOutstandingElementCount(100L) + .setMaxOutstandingRequestBytes(1000L) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) + .build()) + .build()); + builder.setCredentialsProvider(NoCredentialsProvider.create()); + StreamWriter writer = builder.build(); + + assertEquals(TEST_STREAM, writer.getStreamNameString()); + assertEquals(10, (long) writer.getBatchingSettings().getRequestByteThreshold()); + assertEquals(Duration.ofMillis(11), writer.getBatchingSettings().getDelayThreshold()); + assertEquals(12, (long) writer.getBatchingSettings().getElementCountThreshold()); + assertEquals( + FlowController.LimitExceededBehavior.Block, + writer.getBatchingSettings().getFlowControlSettings().getLimitExceededBehavior()); + assertEquals( + 100L, + writer + .getBatchingSettings() + .getFlowControlSettings() + .getMaxOutstandingElementCount() + .longValue()); + assertEquals( + 1000L, + writer + .getBatchingSettings() + .getFlowControlSettings() + .getMaxOutstandingRequestBytes() + .longValue()); + writer.shutdown(); + } + + @Test + public void testBuilderParametersAndDefaults() { + StreamWriter.Builder builder = StreamWriter.newBuilder(TEST_STREAM); + assertEquals(StreamWriter.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider); + assertEquals( + StreamWriter.Builder.DEFAULT_REQUEST_BYTES_THRESHOLD, + builder.batchingSettings.getRequestByteThreshold().longValue()); + assertEquals( + StreamWriter.Builder.DEFAULT_DELAY_THRESHOLD, builder.batchingSettings.getDelayThreshold()); + assertEquals( + StreamWriter.Builder.DEFAULT_ELEMENT_COUNT_THRESHOLD, + builder.batchingSettings.getElementCountThreshold().longValue()); + assertEquals(StreamWriter.Builder.DEFAULT_RETRY_SETTINGS, builder.retrySettings); + assertEquals(Duration.ofMillis(100), builder.retrySettings.getInitialRetryDelay()); + assertEquals(3, builder.retrySettings.getMaxAttempts()); + } + + @Test + public void testBuilderInvalidArguments() { + StreamWriter.Builder builder = StreamWriter.newBuilder(TEST_STREAM); + + try { + builder.setChannelProvider(null); + fail("Should have thrown an NullPointerException"); + } catch (NullPointerException expected) { + // Expected + } + + try { + builder.setExecutorProvider(null); + fail("Should have thrown an NullPointerException"); + } catch (NullPointerException expected) { + // Expected + } + try { + builder.setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setRequestByteThreshold(null) + .build()); + fail("Should have thrown an NullPointerException"); + } catch (NullPointerException expected) { + // Expected + } + try { + builder.setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setRequestByteThreshold(0L) + .build()); + fail("Should have thrown an IllegalArgumentException"); + } catch (IllegalArgumentException expected) { + // Expected + } + try { + builder.setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setRequestByteThreshold(-1L) + .build()); + fail("Should have thrown an IllegalArgumentException"); + } catch (IllegalArgumentException expected) { + // Expected + } + + builder.setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setDelayThreshold(Duration.ofMillis(1)) + .build()); + try { + builder.setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setDelayThreshold(null) + .build()); + fail("Should have thrown an NullPointerException"); + } catch (NullPointerException expected) { + // Expected + } + try { + builder.setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setDelayThreshold(Duration.ofMillis(-1)) + .build()); + fail("Should have thrown an IllegalArgumentException"); + } catch (IllegalArgumentException expected) { + // Expected + } + + builder.setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .build()); + try { + builder.setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(null) + .build()); + fail("Should have thrown an NullPointerException"); + } catch (NullPointerException expected) { + // Expected + } + try { + builder.setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(0L) + .build()); + fail("Should have thrown an IllegalArgumentException"); + } catch (IllegalArgumentException expected) { + // Expected + } + try { + builder.setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(-1L) + .build()); + fail("Should have thrown an IllegalArgumentException"); + } catch (IllegalArgumentException expected) { + // Expected + } + + try { + FlowControlSettings flowControlSettings = + FlowControlSettings.newBuilder().setMaxOutstandingElementCount(-1L).build(); + builder.setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setFlowControlSettings(flowControlSettings) + .build()); + fail("Should have thrown an IllegalArgumentException"); + } catch (IllegalArgumentException expected) { + // Expected + } + + try { + FlowControlSettings flowControlSettings = + FlowControlSettings.newBuilder().setMaxOutstandingRequestBytes(-1L).build(); + builder.setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setFlowControlSettings(flowControlSettings) + .build()); + fail("Should have thrown an IllegalArgumentException"); + } catch (IllegalArgumentException expected) { + // Expected + } + + try { + FlowControlSettings flowControlSettings = + FlowControlSettings.newBuilder() + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore) + .build(); + builder.setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setFlowControlSettings(flowControlSettings) + .build()); + fail("Should have thrown an IllegalArgumentException"); + } catch (IllegalArgumentException expected) { + // Expected + } + + try { + FlowControlSettings flowControlSettings = + FlowControlSettings.newBuilder().setLimitExceededBehavior(null).build(); + builder.setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setFlowControlSettings(flowControlSettings) + .build()); + fail("Should have thrown an NullPointerException"); + } catch (NullPointerException expected) { + // Expected + } + } + + @Test + public void testAwaitTermination() throws Exception { + StreamWriter writer = + getTestStreamWriterBuilder().setExecutorProvider(SINGLE_THREAD_EXECUTOR).build(); + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + writer.shutdown(); + // TODO: for some reason, await always returns false. + // assertTrue(writer.awaitTermination(1, TimeUnit.MINUTES)); + } + + @Test + public void testClose() throws Exception { + StreamWriter writer = getTestStreamWriterBuilder().build(); + writer.close(); + try { + writer.shutdown(); + fail("Should throw"); + } catch (IllegalStateException e) { + LOG.info(e.toString()); + assertEquals("Cannot shut down a writer already shut-down.", e.getMessage()); + } + } +} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java new file mode 100644 index 0000000000..4b5976f45b --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java @@ -0,0 +1,351 @@ +/* + * Copyright 2019 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.bigquery.storage.v1alpha2.it; + +import static org.junit.Assert.assertEquals; + +import com.google.api.core.ApiFuture; +import com.google.cloud.ServiceOptions; +import com.google.cloud.bigquery.*; +import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.storage.test.Test.*; +import com.google.cloud.bigquery.storage.v1alpha2.BigQueryWriteClient; +import com.google.cloud.bigquery.storage.v1alpha2.ProtoBufProto; +import com.google.cloud.bigquery.storage.v1alpha2.ProtoSchemaConverter; +import com.google.cloud.bigquery.storage.v1alpha2.Storage.*; +import com.google.cloud.bigquery.storage.v1alpha2.Stream.WriteStream; +import com.google.cloud.bigquery.storage.v1alpha2.StreamWriter; +import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; +import com.google.protobuf.Int64Value; +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.ExecutionException; +import java.util.logging.Logger; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.threeten.bp.Duration; + +/** Integration tests for BigQuery Storage API. */ +public class ITBigQueryWriteManualClientTest { + private static final Logger LOG = + Logger.getLogger(ITBigQueryWriteManualClientTest.class.getName()); + private static final String DATASET = RemoteBigQueryHelper.generateDatasetName(); + private static final String TABLE = "testtable"; + private static final String TABLE2 = "complicatedtable"; + private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset"; + + private static BigQueryWriteClient client; + private static TableInfo tableInfo; + private static TableInfo tableInfo2; + private static String tableId; + private static String tableId2; + private static BigQuery bigquery; + + @BeforeClass + public static void beforeClass() throws IOException { + client = BigQueryWriteClient.create(); + + RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create(); + bigquery = bigqueryHelper.getOptions().getService(); + DatasetInfo datasetInfo = + DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build(); + bigquery.create(datasetInfo); + LOG.info("Created test dataset: " + DATASET); + tableInfo = + TableInfo.newBuilder( + TableId.of(DATASET, TABLE), + StandardTableDefinition.of( + Schema.of( + com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING) + .build()))) + .build(); + com.google.cloud.bigquery.Field.Builder innerTypeFieldBuilder = + com.google.cloud.bigquery.Field.newBuilder( + "inner_type", + LegacySQLTypeName.RECORD, + com.google.cloud.bigquery.Field.newBuilder("value", LegacySQLTypeName.STRING) + .setMode(Field.Mode.REPEATED) + .build()); + + tableInfo2 = + TableInfo.newBuilder( + TableId.of(DATASET, TABLE2), + StandardTableDefinition.of( + Schema.of( + Field.newBuilder( + "nested_repeated_type", + LegacySQLTypeName.RECORD, + innerTypeFieldBuilder.setMode(Field.Mode.REPEATED).build()) + .setMode(Field.Mode.REPEATED) + .build(), + innerTypeFieldBuilder.setMode(Field.Mode.NULLABLE).build()))) + .build(); + bigquery.create(tableInfo); + bigquery.create(tableInfo2); + tableId = + String.format( + "projects/%s/datasets/%s/tables/%s", + ServiceOptions.getDefaultProjectId(), DATASET, TABLE); + tableId2 = + String.format( + "projects/%s/datasets/%s/tables/%s", + ServiceOptions.getDefaultProjectId(), DATASET, TABLE2); + } + + @AfterClass + public static void afterClass() { + if (client != null) { + client.close(); + } + + if (bigquery != null) { + RemoteBigQueryHelper.forceDelete(bigquery, DATASET); + LOG.info("Deleted test dataset: " + DATASET); + } + } + + private AppendRowsRequest.Builder createAppendRequest(String streamName, String[] messages) { + AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); + + AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder(); + dataBuilder.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor())); + + ProtoBufProto.ProtoRows.Builder rows = ProtoBufProto.ProtoRows.newBuilder(); + for (String message : messages) { + FooType foo = FooType.newBuilder().setFoo(message).build(); + rows.addSerializedRows(foo.toByteString()); + } + dataBuilder.setRows(rows.build()); + return requestBuilder.setProtoRows(dataBuilder.build()).setWriteStream(streamName); + } + + private AppendRowsRequest.Builder createAppendRequestComplicateType( + String streamName, String[] messages) { + AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); + + AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder(); + dataBuilder.setWriterSchema(ProtoSchemaConverter.convert(ComplicateType.getDescriptor())); + + ProtoBufProto.ProtoRows.Builder rows = ProtoBufProto.ProtoRows.newBuilder(); + for (String message : messages) { + ComplicateType foo = + ComplicateType.newBuilder() + .setInnerType(InnerType.newBuilder().addValue(message).addValue(message).build()) + .build(); + rows.addSerializedRows(foo.toByteString()); + } + dataBuilder.setRows(rows.build()); + return requestBuilder.setProtoRows(dataBuilder.build()).setWriteStream(streamName); + } + + @Test + public void testBatchWriteWithCommittedStream() + throws IOException, InterruptedException, ExecutionException { + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(tableId) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + try (StreamWriter streamWriter = + StreamWriter.newBuilder(writeStream.getName()) + .setBatchingSettings( + StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setRequestByteThreshold(1024 * 1024L) // 1 Mb + .setElementCountThreshold(2L) + .setDelayThreshold(Duration.ofSeconds(2)) + .build()) + .build()) { + LOG.info("Sending one message"); + ApiFuture response = + streamWriter.append( + createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build()); + assertEquals(0, response.get().getOffset()); + + LOG.info("Sending two more messages"); + ApiFuture response1 = + streamWriter.append( + createAppendRequest(writeStream.getName(), new String[] {"bbb", "ccc"}).build()); + ApiFuture response2 = + streamWriter.append( + createAppendRequest(writeStream.getName(), new String[] {"ddd"}).build()); + assertEquals(1, response1.get().getOffset()); + assertEquals(3, response2.get().getOffset()); + } + + TableResult result = + bigquery.listTableData(tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + Iterator iter = result.getValues().iterator(); + assertEquals("aaa", iter.next().get(0).getStringValue()); + assertEquals("bbb", iter.next().get(0).getStringValue()); + assertEquals("ccc", iter.next().get(0).getStringValue()); + assertEquals("ddd", iter.next().get(0).getStringValue()); + assertEquals(false, iter.hasNext()); + + LOG.info("Waiting for termination"); + // The awaitTermination always returns false. + // assertEquals(true, streamWriter.awaitTermination(10, TimeUnit.SECONDS)); + } + + @Test + public void testComplicateSchemaWithPendingStream() + throws IOException, InterruptedException, ExecutionException { + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(tableId2) + .setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build()) + .build()); + try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) { + LOG.info("Sending two messages"); + ApiFuture response = + streamWriter.append( + createAppendRequestComplicateType(writeStream.getName(), new String[] {"aaa"}) + .setOffset(Int64Value.of(0L)) + .build()); + assertEquals(0, response.get().getOffset()); + + ApiFuture response2 = + streamWriter.append( + createAppendRequestComplicateType(writeStream.getName(), new String[] {"bbb"}) + .setOffset(Int64Value.of(1L)) + .build()); + assertEquals(1, response2.get().getOffset()); + } finally { + } + + // Nothing showed up since rows are not committed. + TableResult result = + bigquery.listTableData( + tableInfo2.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + Iterator iter = result.getValues().iterator(); + assertEquals(false, iter.hasNext()); + + FinalizeWriteStreamResponse finalizeResponse = + client.finalizeWriteStream( + FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build()); + // Finalize row count is not populated. + // assertEquals(1, finalizeResponse.getRowCount()); + BatchCommitWriteStreamsResponse batchCommitWriteStreamsResponse = + client.batchCommitWriteStreams( + BatchCommitWriteStreamsRequest.newBuilder() + .setParent(tableId2) + .addWriteStreams(writeStream.getName()) + .build()); + assertEquals(true, batchCommitWriteStreamsResponse.hasCommitTime()); + + LOG.info("Waiting for termination"); + // The awaitTermination always returns false. + // assertEquals(true, streamWriter.awaitTermination(10, TimeUnit.SECONDS)); + + // Data showed up. + result = + bigquery.listTableData( + tableInfo2.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); + iter = result.getValues().iterator(); + assertEquals( + "[FieldValue{attribute=REPEATED, value=[FieldValue{attribute=PRIMITIVE, value=aaa}, FieldValue{attribute=PRIMITIVE, value=aaa}]}]", + iter.next().get(1).getRepeatedValue().toString()); + assertEquals( + "[FieldValue{attribute=REPEATED, value=[FieldValue{attribute=PRIMITIVE, value=bbb}, FieldValue{attribute=PRIMITIVE, value=bbb}]}]", + iter.next().get(1).getRepeatedValue().toString()); + assertEquals(false, iter.hasNext()); + } + + @Test + public void testStreamError() throws IOException, InterruptedException, ExecutionException { + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(tableId) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) { + + AppendRowsRequest request = + createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build(); + request + .toBuilder() + .setProtoRows(request.getProtoRows().toBuilder().clearWriterSchema().build()) + .build(); + ApiFuture response = + streamWriter.append( + createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build()); + assertEquals(0L, response.get().getOffset()); + // Send in a bogus stream name should cause in connection error. + ApiFuture response2 = + streamWriter.append( + createAppendRequest(writeStream.getName(), new String[] {"aaa"}) + .setWriteStream("blah") + .build()); + try { + response2.get().getOffset(); + Assert.fail("Should fail"); + } catch (ExecutionException e) { + assertEquals( + true, + e.getCause() + .getMessage() + .startsWith( + "INVALID_ARGUMENT: Stream name `blah` in the request doesn't match the one already specified")); + } + // We can keep sending requests on the same stream. + ApiFuture response3 = + streamWriter.append( + createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build()); + assertEquals(1L, response3.get().getOffset()); + } finally { + } + } + + @Test + public void testStreamReconnect() throws IOException, InterruptedException, ExecutionException { + WriteStream writeStream = + client.createWriteStream( + CreateWriteStreamRequest.newBuilder() + .setParent(tableId) + .setWriteStream( + WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) + .build()); + try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) { + ApiFuture response = + streamWriter.append( + createAppendRequest(writeStream.getName(), new String[] {"aaa"}) + .setOffset(Int64Value.of(0L)) + .build()); + assertEquals(0L, response.get().getOffset()); + } finally { + } + + try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) { + // Currently there is a bug that reconnection must wait 5 seconds to get the real row count. + Thread.sleep(5000L); + ApiFuture response = + streamWriter.append( + createAppendRequest(writeStream.getName(), new String[] {"bbb"}) + .setOffset(Int64Value.of(1L)) + .build()); + assertEquals(1L, response.get().getOffset()); + } finally { + } + } +} diff --git a/google-cloud-bigquerystorage/src/test/proto/test.proto b/google-cloud-bigquerystorage/src/test/proto/test.proto index 2b1a988ea6..34ef52b265 100644 --- a/google-cloud-bigquerystorage/src/test/proto/test.proto +++ b/google-cloud-bigquerystorage/src/test/proto/test.proto @@ -17,8 +17,6 @@ syntax = "proto2"; package com.google.cloud.bigquery.storage.test; -import "google/protobuf/descriptor.proto"; - enum TestEnum { TestEnum0 = 0; TestEnum1 = 1; @@ -39,6 +37,7 @@ message AllSupportedTypes { message InnerType { repeated string value = 1; } + message NestedType { repeated InnerType inner_type = 1; } @@ -51,6 +50,11 @@ message ComplicateType { message ContainsRecursive { optional RecursiveType field = 1; } + message RecursiveType { optional ContainsRecursive field = 2; } + +message FooType { + optional string foo = 1; +} diff --git a/pom.xml b/pom.xml index 1873d12934..2b23ae8ddb 100644 --- a/pom.xml +++ b/pom.xml @@ -77,6 +77,7 @@ 1.4.2 1.3.2 1.18 + 3.5 @@ -208,6 +209,11 @@ animal-sniffer-annotations ${animal-sniffer.version} + + org.apache.commons + commons-lang3 + ${commons-lang3.version} + junit @@ -321,4 +327,4 @@ - \ No newline at end of file +