diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml index ddf344fda3..21802fc88b 100644 --- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -16,4 +16,16 @@ com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter$Builder newBuilder(java.lang.String, com.google.cloud.bigquery.Schema) + + 8001 + com/google/cloud/bigquery/storage/v1beta2/StreamWriter + + + 8001 + com/google/cloud/bigquery/storage/v1beta2/StreamWriter$Builder + + + 8001 + com/google/cloud/bigquery/storage/v1beta2/OnSchemaUpdateRunnable + diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java index 91b8216ffc..64687f8732 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java @@ -116,23 +116,6 @@ public ApiFuture append(JSONArray jsonArr, long offset) { } } - /** - * Refreshes connection for a JsonStreamWriter by first flushing all remaining rows, then - * recreates stream writer, and finally setting the descriptor. All of these actions need to be - * performed atomically to avoid having synchronization issues with append(). Flushing all rows - * first is necessary since if there are rows remaining when the connection refreshes, it will - * send out the old writer schema instead of the new one. - */ - void refreshConnection() - throws IOException, InterruptedException, Descriptors.DescriptorValidationException { - synchronized (this) { - this.streamWriter.close(); - this.streamWriter = streamWriterBuilder.build(); - this.descriptor = - BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(this.tableSchema); - } - } - /** * Gets streamName * diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/OnSchemaUpdateRunnable.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/OnSchemaUpdateRunnable.java deleted file mode 100644 index dc2f855d0c..0000000000 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/OnSchemaUpdateRunnable.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigquery.storage.v1beta2; - -/** - * A abstract class that implements the Runnable interface and provides access to the current - * StreamWriter and updatedSchema. This runnable will only be called when a updated schema has been - * passed back through the AppendRowsResponse. Users should only implement the run() function. - * - * @deprecated - */ -public abstract class OnSchemaUpdateRunnable implements Runnable { - private StreamWriter streamWriter; - private TableSchema updatedSchema; - - /** - * Setter for the updatedSchema - * - * @param updatedSchema - */ - void setUpdatedSchema(TableSchema updatedSchema) { - this.updatedSchema = updatedSchema; - } - - /** - * Setter for the streamWriter - * - * @param streamWriter - */ - void setStreamWriter(StreamWriter streamWriter) { - this.streamWriter = streamWriter; - } - - /** Getter for the updatedSchema */ - TableSchema getUpdatedSchema() { - return this.updatedSchema; - } - - /** Getter for the streamWriter */ - StreamWriter getStreamWriter() { - return this.streamWriter; - } -} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java deleted file mode 100644 index 4a937b6be5..0000000000 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java +++ /dev/null @@ -1,1019 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.bigquery.storage.v1beta2; - -import com.google.api.core.ApiFuture; -import com.google.api.core.SettableApiFuture; -import com.google.api.gax.batching.BatchingSettings; -import com.google.api.gax.batching.FlowControlSettings; -import com.google.api.gax.batching.FlowController; -import com.google.api.gax.core.BackgroundResource; -import com.google.api.gax.core.BackgroundResourceAggregation; -import com.google.api.gax.core.CredentialsProvider; -import com.google.api.gax.core.ExecutorAsBackgroundResource; -import com.google.api.gax.core.ExecutorProvider; -import com.google.api.gax.core.InstantiatingExecutorProvider; -import com.google.api.gax.grpc.GrpcStatusCode; -import com.google.api.gax.retrying.RetrySettings; -import com.google.api.gax.rpc.AbortedException; -import com.google.api.gax.rpc.BidiStreamingCallable; -import com.google.api.gax.rpc.ClientStream; -import com.google.api.gax.rpc.ResponseObserver; -import com.google.api.gax.rpc.StreamController; -import com.google.api.gax.rpc.TransportChannelProvider; -import com.google.api.gax.rpc.UnimplementedException; -import com.google.auth.oauth2.GoogleCredentials; -import com.google.common.base.Preconditions; -import com.google.protobuf.Int64Value; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import java.util.logging.Level; -import java.util.logging.Logger; -import java.util.regex.Matcher; -import java.util.regex.Pattern; -import javax.annotation.concurrent.GuardedBy; -import org.threeten.bp.Duration; - -/** - * This is to be used to managed streaming write when you are working with PENDING streams or want - * to explicitly manage offset. In that most common cases when writing with COMMITTED stream without - * offset, please use a simpler writer {@code DirectWriter}. - * - *

A {@link StreamWrier} provides built-in capabilities to: handle batching of messages; - * controlling memory utilization (through flow control) and request cleanup (only keeps write - * schema on first request in the stream). - * - *

With customizable options that control: - * - *

- * - *

{@link StreamWriter} will use the credentials set on the channel, which uses application - * default credentials through {@link GoogleCredentials#getApplicationDefault} by default. - * - * @deprecated use {@link #StreamWriterV2()} instead. - */ -@Deprecated -public class StreamWriter implements AutoCloseable { - private static final Logger LOG = Logger.getLogger(StreamWriter.class.getName()); - - private static String streamPatternString = - "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+"; - private static String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)"; - - private static Pattern streamPattern = Pattern.compile(streamPatternString); - private static Pattern tablePattern = Pattern.compile(tablePatternString); - - private final String streamName; - private final String tableName; - - private final String traceId; - - private final BatchingSettings batchingSettings; - private final RetrySettings retrySettings; - private BigQueryWriteSettings stubSettings; - - private final Lock messagesBatchLock; - private final Lock appendAndRefreshAppendLock; - - @GuardedBy("appendAndRefreshAppendLock") - private final MessagesBatch messagesBatch; - - // Indicates if a stream has some non recoverable exception happened. - private AtomicReference streamException; - - private BackgroundResource backgroundResources; - private List backgroundResourceList; - - private BigQueryWriteClient stub; - BidiStreamingCallable bidiStreamingCallable; - - @GuardedBy("appendAndRefreshAppendLock") - ClientStream clientStream; - - private final AppendResponseObserver responseObserver; - - private final ScheduledExecutorService executor; - - @GuardedBy("appendAndRefreshAppendLock") - private boolean shutdown; - - private final Waiter messagesWaiter; - - @GuardedBy("appendAndRefreshAppendLock") - private boolean activeAlarm; - - private ScheduledFuture currentAlarmFuture; - - private Integer currentRetries = 0; - - // Used for schema updates - private OnSchemaUpdateRunnable onSchemaUpdateRunnable; - - /** The maximum size of one request. Defined by the API. */ - public static long getApiMaxRequestBytes() { - return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) - } - - /** The maximum size of in flight requests. Defined by the API. */ - public static long getApiMaxInflightRequests() { - return 5000L; - } - - private StreamWriter(Builder builder) - throws IllegalArgumentException, IOException, InterruptedException { - if (builder.createDefaultStream) { - Matcher matcher = tablePattern.matcher(builder.streamOrTableName); - if (!matcher.matches()) { - throw new IllegalArgumentException("Invalid table name: " + builder.streamOrTableName); - } - streamName = builder.streamOrTableName + "/_default"; - tableName = builder.streamOrTableName; - } else { - Matcher matcher = streamPattern.matcher(builder.streamOrTableName); - if (!matcher.matches()) { - throw new IllegalArgumentException("Invalid stream name: " + builder.streamOrTableName); - } - streamName = builder.streamOrTableName; - tableName = matcher.group(1); - } - - this.traceId = builder.traceId; - this.batchingSettings = builder.batchingSettings; - this.retrySettings = builder.retrySettings; - this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName, this); - messagesBatchLock = new ReentrantLock(); - appendAndRefreshAppendLock = new ReentrantLock(); - activeAlarm = false; - this.streamException = new AtomicReference(null); - - executor = builder.executorProvider.getExecutor(); - backgroundResourceList = new ArrayList<>(); - if (builder.executorProvider.shouldAutoClose()) { - backgroundResourceList.add(new ExecutorAsBackgroundResource(executor)); - } - messagesWaiter = new Waiter(this.batchingSettings.getFlowControlSettings()); - responseObserver = new AppendResponseObserver(this); - - if (builder.client == null) { - stubSettings = - BigQueryWriteSettings.newBuilder() - .setCredentialsProvider(builder.credentialsProvider) - .setTransportChannelProvider(builder.channelProvider) - .setEndpoint(builder.endpoint) - .build(); - stub = BigQueryWriteClient.create(stubSettings); - backgroundResourceList.add(stub); - } else { - stub = builder.client; - } - backgroundResources = new BackgroundResourceAggregation(backgroundResourceList); - shutdown = false; - if (builder.onSchemaUpdateRunnable != null) { - this.onSchemaUpdateRunnable = builder.onSchemaUpdateRunnable; - this.onSchemaUpdateRunnable.setStreamWriter(this); - } - - bidiStreamingCallable = stub.appendRowsCallable(); - clientStream = bidiStreamingCallable.splitCall(responseObserver); - try { - while (!clientStream.isSendReady()) { - Thread.sleep(10); - } - } catch (InterruptedException e) { - } - } - - /** Stream name we are writing to. */ - public String getStreamNameString() { - return streamName; - } - - /** Table name we are writing to. */ - public String getTableNameString() { - return tableName; - } - - /** OnSchemaUpdateRunnable for this streamWriter. */ - OnSchemaUpdateRunnable getOnSchemaUpdateRunnable() { - return this.onSchemaUpdateRunnable; - } - - /** - * Schedules the writing of a message. The write of the message may occur immediately or be - * delayed based on the writer batching options. - * - *

Example of writing a message. - * - *

{@code
-   * AppendRowsRequest message;
-   * ApiFuture messageIdFuture = writer.append(message);
-   * ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback() {
-   *   public void onSuccess(AppendRowsResponse response) {
-   *     if (response.hasOffset()) {
-   *       System.out.println("written with offset: " + response.getOffset());
-   *     } else {
-   *       System.out.println("received an in stream error: " + response.error().toString());
-   *     }
-   *   }
-   *
-   *   public void onFailure(Throwable t) {
-   *     System.out.println("failed to write: " + t);
-   *   }
-   * }, MoreExecutors.directExecutor());
-   * }
- * - * @param message the message in serialized format to write to BigQuery. - * @return the message ID wrapped in a future. - */ - public ApiFuture append(AppendRowsRequest message) { - appendAndRefreshAppendLock.lock(); - - try { - Preconditions.checkState(!shutdown, "Cannot append on a shut-down writer."); - Preconditions.checkNotNull(message, "Message is null."); - Preconditions.checkState(streamException.get() == null, "Stream already failed."); - final AppendRequestAndFutureResponse outstandingAppend = - new AppendRequestAndFutureResponse(message); - List batchesToSend; - batchesToSend = messagesBatch.add(outstandingAppend); - // Setup the next duration based delivery alarm if there are messages batched. - if (batchingSettings.getDelayThreshold() != null) { - setupAlarm(); - } - if (!batchesToSend.isEmpty()) { - for (final InflightBatch batch : batchesToSend) { - LOG.fine("Scheduling a batch for immediate sending"); - writeBatch(batch); - } - } - return outstandingAppend.appendResult; - } finally { - appendAndRefreshAppendLock.unlock(); - } - } - - /** - * Re-establishes a stream connection. - * - * @throws InterruptedException - */ - public void refreshAppend() throws InterruptedException { - throw new UnimplementedException(null, GrpcStatusCode.of(Status.Code.UNIMPLEMENTED), false); - } - - @GuardedBy("appendAndRefreshAppendLock") - private void setupAlarm() { - if (!messagesBatch.isEmpty()) { - if (!activeAlarm) { - long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis(); - LOG.log(Level.FINE, "Setting up alarm for the next {0} ms.", delayThresholdMs); - currentAlarmFuture = - executor.schedule( - new Runnable() { - @Override - public void run() { - LOG.fine("Sending messages based on schedule"); - appendAndRefreshAppendLock.lock(); - activeAlarm = false; - try { - writeBatch(messagesBatch.popBatch()); - } finally { - appendAndRefreshAppendLock.unlock(); - } - } - }, - delayThresholdMs, - TimeUnit.MILLISECONDS); - } - } else if (currentAlarmFuture != null) { - LOG.log(Level.FINER, "Cancelling alarm, no more messages"); - currentAlarmFuture.cancel(false); - activeAlarm = false; - } - } - - /** - * Write any outstanding batches if non-empty. This method sends buffered messages, but does not - * wait for the send operations to complete. To wait for messages to send, call {@code get} on the - * futures returned from {@code append}. - */ - @GuardedBy("appendAndRefreshAppendLock") - public void writeAllOutstanding() { - InflightBatch unorderedOutstandingBatch = null; - if (!messagesBatch.isEmpty()) { - writeBatch(messagesBatch.popBatch()); - } - messagesBatch.reset(); - } - - @GuardedBy("appendAndRefreshAppendLock") - private void writeBatch(final InflightBatch inflightBatch) { - if (inflightBatch != null) { - AppendRowsRequest request = inflightBatch.getMergedRequest(); - try { - appendAndRefreshAppendLock.unlock(); - messagesWaiter.acquire(inflightBatch.getByteSize()); - appendAndRefreshAppendLock.lock(); - if (shutdown || streamException.get() != null) { - appendAndRefreshAppendLock.unlock(); - messagesWaiter.release(inflightBatch.getByteSize()); - appendAndRefreshAppendLock.lock(); - inflightBatch.onFailure( - new AbortedException( - shutdown - ? "Stream closed, abort append." - : "Stream has previous errors, abort append.", - null, - GrpcStatusCode.of(Status.Code.ABORTED), - true)); - return; - } - responseObserver.addInflightBatch(inflightBatch); - clientStream.send(request); - } catch (FlowController.FlowControlException ex) { - appendAndRefreshAppendLock.lock(); - inflightBatch.onFailure(ex); - } - } - } - - /** Close the stream writer. Shut down all resources. */ - @Override - public void close() { - LOG.info("Closing stream writer:" + streamName); - shutdown(); - try { - awaitTermination(1, TimeUnit.MINUTES); - } catch (InterruptedException ignored) { - } - } - - // The batch of messages that is being sent/processed. - private static final class InflightBatch { - // List of requests that is going to be batched. - final List inflightRequests; - // A list tracks expected offset for each AppendRequest. Used to reconstruct the Response - // future. - private final ArrayList offsetList; - private final long creationTime; - private int attempt; - private long batchSizeBytes; - private long expectedOffset; - private Boolean attachSchema; - private String streamName; - private final AtomicBoolean failed; - private final StreamWriter streamWriter; - - InflightBatch( - List inflightRequests, - long batchSizeBytes, - String streamName, - Boolean attachSchema, - StreamWriter streamWriter) { - this.inflightRequests = inflightRequests; - this.offsetList = new ArrayList(inflightRequests.size()); - for (AppendRequestAndFutureResponse request : inflightRequests) { - if (request.message.hasOffset()) { - offsetList.add(new Long(request.message.getOffset().getValue())); - } else { - offsetList.add(new Long(-1)); - } - } - this.expectedOffset = offsetList.get(0).longValue(); - attempt = 1; - creationTime = System.currentTimeMillis(); - this.batchSizeBytes = batchSizeBytes; - this.attachSchema = attachSchema; - this.streamName = streamName; - this.failed = new AtomicBoolean(false); - this.streamWriter = streamWriter; - } - - int count() { - return inflightRequests.size(); - } - - long getByteSize() { - return this.batchSizeBytes; - } - - long getExpectedOffset() { - return expectedOffset; - } - - private AppendRowsRequest getMergedRequest() throws IllegalStateException { - if (inflightRequests.size() == 0) { - throw new IllegalStateException("Unexpected empty message batch"); - } - ProtoRows.Builder rowsBuilder = - inflightRequests.get(0).message.getProtoRows().getRows().toBuilder(); - for (int i = 1; i < inflightRequests.size(); i++) { - rowsBuilder.addAllSerializedRows( - inflightRequests.get(i).message.getProtoRows().getRows().getSerializedRowsList()); - } - AppendRowsRequest.ProtoData.Builder data = - inflightRequests.get(0).message.getProtoRows().toBuilder().setRows(rowsBuilder.build()); - AppendRowsRequest.Builder requestBuilder = inflightRequests.get(0).message.toBuilder(); - if (!attachSchema) { - data.clearWriterSchema(); - requestBuilder.clearWriteStream(); - } else { - if (!data.hasWriterSchema()) { - throw new IllegalStateException( - "The first message on the connection must have writer schema set"); - } - requestBuilder.setWriteStream(streamName); - if (!inflightRequests.get(0).message.getTraceId().isEmpty()) { - requestBuilder.setTraceId(inflightRequests.get(0).message.getTraceId()); - } else if (streamWriter.traceId != null) { - requestBuilder.setTraceId(streamWriter.traceId); - } - } - return requestBuilder.setProtoRows(data.build()).build(); - } - - private void onFailure(Throwable t) { - if (failed.getAndSet(true)) { - // Error has been set already. - LOG.warning("Ignore " + t.toString() + " since error has already been set"); - return; - } - - for (AppendRequestAndFutureResponse request : inflightRequests) { - request.appendResult.setException(t); - } - } - - // Disassemble the batched response and sets the furture on individual request. - private void onSuccess(AppendRowsResponse response) { - for (int i = 0; i < inflightRequests.size(); i++) { - AppendRowsResponse.Builder singleResponse = response.toBuilder(); - if (response.getAppendResult().hasOffset()) { - long actualOffset = response.getAppendResult().getOffset().getValue(); - for (int j = 0; j < i; j++) { - actualOffset += - inflightRequests.get(j).message.getProtoRows().getRows().getSerializedRowsCount(); - } - singleResponse.setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(actualOffset))); - } - inflightRequests.get(i).appendResult.set(singleResponse.build()); - } - } - } - - // Class that wraps AppendRowsRequest and its cooresponding Response future. - private static final class AppendRequestAndFutureResponse { - final SettableApiFuture appendResult; - final AppendRowsRequest message; - final int messageSize; - - AppendRequestAndFutureResponse(AppendRowsRequest message) { - this.appendResult = SettableApiFuture.create(); - this.message = message; - this.messageSize = message.getProtoRows().getSerializedSize(); - if (this.messageSize > getApiMaxRequestBytes()) { - throw new StatusRuntimeException( - Status.fromCode(Status.Code.FAILED_PRECONDITION) - .withDescription("Message exceeded max size limit: " + getApiMaxRequestBytes())); - } - } - } - - /** The batching settings configured on this {@code StreamWriter}. */ - public BatchingSettings getBatchingSettings() { - return batchingSettings; - } - - /** The retry settings configured on this {@code StreamWriter}. */ - public RetrySettings getRetrySettings() { - return retrySettings; - } - - /** - * Schedules immediate flush of any outstanding messages and waits until all are processed. - * - *

Sends remaining outstanding messages and prevents future calls to publish. This method - * should be invoked prior to deleting the {@link WriteStream} object in order to ensure that no - * pending messages are lost. - */ - protected void shutdown() { - appendAndRefreshAppendLock.lock(); - try { - if (shutdown) { - LOG.fine("Already shutdown."); - return; - } - shutdown = true; - LOG.info("Shutdown called on writer: " + streamName); - if (currentAlarmFuture != null && activeAlarm) { - currentAlarmFuture.cancel(false); - activeAlarm = false; - } - // Wait for current inflight to drain. - try { - appendAndRefreshAppendLock.unlock(); - messagesWaiter.waitComplete(0); - } catch (InterruptedException e) { - LOG.warning("Failed to wait for messages to return " + e.toString()); - } - appendAndRefreshAppendLock.lock(); - // Try to send out what's left in batch. - if (!messagesBatch.isEmpty()) { - InflightBatch inflightBatch = messagesBatch.popBatch(); - AppendRowsRequest request = inflightBatch.getMergedRequest(); - if (streamException.get() != null) { - inflightBatch.onFailure( - new AbortedException( - shutdown - ? "Stream closed, abort append." - : "Stream has previous errors, abort append.", - null, - GrpcStatusCode.of(Status.Code.ABORTED), - true)); - } else { - try { - appendAndRefreshAppendLock.unlock(); - messagesWaiter.acquire(inflightBatch.getByteSize()); - appendAndRefreshAppendLock.lock(); - responseObserver.addInflightBatch(inflightBatch); - clientStream.send(request); - } catch (FlowController.FlowControlException ex) { - appendAndRefreshAppendLock.lock(); - LOG.warning( - "Unexpected flow control exception when sending batch leftover: " + ex.toString()); - } - } - } - // Close the stream. - try { - appendAndRefreshAppendLock.unlock(); - messagesWaiter.waitComplete(0); - } catch (InterruptedException e) { - LOG.warning("Failed to wait for messages to return " + e.toString()); - } - appendAndRefreshAppendLock.lock(); - if (clientStream.isSendReady()) { - clientStream.closeSend(); - } - backgroundResources.shutdown(); - } finally { - appendAndRefreshAppendLock.unlock(); - } - } - - /** - * Wait for all work has completed execution after a {@link #shutdown()} request, or the timeout - * occurs, or the current thread is interrupted. - * - *

Call this method to make sure all resources are freed properly. - */ - protected boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException { - return backgroundResources.awaitTermination(duration, unit); - } - - /** - * Constructs a new {@link Builder} using the given stream. If builder has createDefaultStream set - * to true, then user should pass in a table name here. - * - *

Example of creating a {@code WriteStream}. - * - *

{@code
-   * String table = "projects/my_project/datasets/my_dataset/tables/my_table";
-   * String stream;
-   * try (BigQueryWriteClient bigqueryWriteClient = BigQueryWriteClient.create()) {
-   *     CreateWriteStreamRequest request = CreateWriteStreamRequest.newBuilder().setParent(table).build();
-   *     WriteStream response = bigQueryWriteClient.createWriteStream(request);
-   *     stream = response.getName();
-   * }
-   * try (WriteStream writer = WriteStream.newBuilder(stream).build()) {
-   *   //...
-   * }
-   * }
- * - *

Example of creating a default {@code WriteStream}, which is COMMIT only and doesn't support - * offset. But it will support higher thoughput per stream and not subject to CreateWriteStream - * quotas. - * - *

{@code
-   * String table = "projects/my_project/datasets/my_dataset/tables/my_table";
-   * try (WriteStream writer = WriteStream.newBuilder(table).createDefaultStream().build()) {
-   *   //...
-   * }
-   * }
- */ - public static Builder newBuilder(String streamOrTableName) { - Preconditions.checkNotNull(streamOrTableName, "streamOrTableName is null."); - return new Builder(streamOrTableName, null); - } - - /** - * Constructs a new {@link Builder} using the given stream and an existing BigQueryWriteClient. - */ - public static Builder newBuilder(String streamOrTableName, BigQueryWriteClient client) { - Preconditions.checkNotNull(streamOrTableName, "streamOrTableName is null."); - Preconditions.checkNotNull(client, "Client is null."); - return new Builder(streamOrTableName, client); - } - - /** A builder of {@link StreamWriter}s. */ - public static final class Builder { - static final Duration MIN_TOTAL_TIMEOUT = Duration.ofSeconds(10); - static final Duration MIN_RPC_TIMEOUT = Duration.ofMillis(10); - - // Meaningful defaults. - static final FlowControlSettings DEFAULT_FLOW_CONTROL_SETTINGS = - FlowControlSettings.newBuilder() - .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) - .setMaxOutstandingElementCount(1000L) - .setMaxOutstandingRequestBytes(100 * 1024 * 1024L) // 100 Mb - .build(); - public static final BatchingSettings DEFAULT_BATCHING_SETTINGS = - BatchingSettings.newBuilder() - .setDelayThreshold(Duration.ofMillis(10)) - .setRequestByteThreshold(100 * 1024L) // 100 kb - .setElementCountThreshold(100L) - .setFlowControlSettings(DEFAULT_FLOW_CONTROL_SETTINGS) - .build(); - public static final RetrySettings DEFAULT_RETRY_SETTINGS = - RetrySettings.newBuilder() - .setMaxRetryDelay(Duration.ofSeconds(60)) - .setInitialRetryDelay(Duration.ofMillis(100)) - .setMaxAttempts(3) - .build(); - private static final int THREADS_PER_CPU = 5; - static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER = - InstantiatingExecutorProvider.newBuilder() - .setExecutorThreadCount(THREADS_PER_CPU * Runtime.getRuntime().availableProcessors()) - .build(); - - private String streamOrTableName; - private String endpoint = BigQueryWriteSettings.getDefaultEndpoint(); - - private String traceId; - - private BigQueryWriteClient client = null; - - // Batching options - BatchingSettings batchingSettings = DEFAULT_BATCHING_SETTINGS; - - RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS; - - private TransportChannelProvider channelProvider = - BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build(); - - ExecutorProvider executorProvider = DEFAULT_EXECUTOR_PROVIDER; - private CredentialsProvider credentialsProvider = - BigQueryWriteSettings.defaultCredentialsProviderBuilder().build(); - - private OnSchemaUpdateRunnable onSchemaUpdateRunnable; - - private boolean createDefaultStream = false; - - private Builder(String streamOrTableName, BigQueryWriteClient client) { - this.streamOrTableName = Preconditions.checkNotNull(streamOrTableName); - this.client = client; - } - - /** - * {@code ChannelProvider} to use to create Channels, which must point at Cloud BigQuery Storage - * API endpoint. - * - *

For performance, this client benefits from having multiple underlying connections. See - * {@link com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder#setPoolSize(int)}. - */ - public Builder setChannelProvider(TransportChannelProvider channelProvider) { - this.channelProvider = - Preconditions.checkNotNull(channelProvider, "ChannelProvider is null."); - return this; - } - - /** {@code CredentialsProvider} to use to create Credentials to authenticate calls. */ - public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { - this.credentialsProvider = - Preconditions.checkNotNull(credentialsProvider, "CredentialsProvider is null."); - return this; - } - - /** - * Sets the {@code BatchSettings} on the writer. - * - * @param batchingSettings - * @return - */ - public Builder setBatchingSettings(BatchingSettings batchingSettings) { - Preconditions.checkNotNull(batchingSettings, "BatchingSettings is null."); - - BatchingSettings.Builder builder = batchingSettings.toBuilder(); - Preconditions.checkNotNull(batchingSettings.getElementCountThreshold()); - Preconditions.checkArgument(batchingSettings.getElementCountThreshold() > 0); - Preconditions.checkNotNull(batchingSettings.getRequestByteThreshold()); - Preconditions.checkArgument(batchingSettings.getRequestByteThreshold() > 0); - if (batchingSettings.getRequestByteThreshold() > getApiMaxRequestBytes()) { - builder.setRequestByteThreshold(getApiMaxRequestBytes()); - } - LOG.info("here" + batchingSettings.getFlowControlSettings()); - if (batchingSettings.getFlowControlSettings() == null) { - builder.setFlowControlSettings(DEFAULT_FLOW_CONTROL_SETTINGS); - } else { - Long elementCount = - batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount(); - if (elementCount == null || elementCount > getApiMaxInflightRequests()) { - elementCount = DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingElementCount(); - } - Long elementSize = - batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes(); - if (elementSize == null || elementSize < 0) { - elementSize = DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingRequestBytes(); - } - FlowController.LimitExceededBehavior behavior = - batchingSettings.getFlowControlSettings().getLimitExceededBehavior(); - if (behavior == null || behavior == FlowController.LimitExceededBehavior.Ignore) { - behavior = DEFAULT_FLOW_CONTROL_SETTINGS.getLimitExceededBehavior(); - } - builder.setFlowControlSettings( - FlowControlSettings.newBuilder() - .setMaxOutstandingElementCount(elementCount) - .setMaxOutstandingRequestBytes(elementSize) - .setLimitExceededBehavior(behavior) - .build()); - } - this.batchingSettings = builder.build(); - return this; - } - - /** - * Sets the {@code RetrySettings} on the writer. - * - * @param retrySettings - * @return - */ - public Builder setRetrySettings(RetrySettings retrySettings) { - this.retrySettings = Preconditions.checkNotNull(retrySettings, "RetrySettings is null."); - return this; - } - - /** Gives the ability to set a custom executor to be used by the library. */ - public Builder setExecutorProvider(ExecutorProvider executorProvider) { - this.executorProvider = - Preconditions.checkNotNull(executorProvider, "ExecutorProvider is null."); - return this; - } - - /** Gives the ability to override the gRPC endpoint. */ - public Builder setEndpoint(String endpoint) { - this.endpoint = Preconditions.checkNotNull(endpoint, "Endpoint is null."); - return this; - } - - /** Gives the ability to set action on schema update. */ - public Builder setOnSchemaUpdateRunnable(OnSchemaUpdateRunnable onSchemaUpdateRunnable) { - this.onSchemaUpdateRunnable = - Preconditions.checkNotNull(onSchemaUpdateRunnable, "onSchemaUpdateRunnable is null."); - return this; - } - - /** If the stream is a default stream. */ - public Builder createDefaultStream() { - this.createDefaultStream = true; - return this; - } - - /** Mark the request as coming from Dataflow. */ - public Builder setDataflowTraceId() { - this.traceId = "Dataflow"; - return this; - } - - /** Builds the {@code StreamWriter}. */ - public StreamWriter build() throws IllegalArgumentException, IOException, InterruptedException { - return new StreamWriter(this); - } - } - - private static final class AppendResponseObserver - implements ResponseObserver { - private Queue inflightBatches = new LinkedList(); - private StreamWriter streamWriter; - - public void addInflightBatch(InflightBatch batch) { - synchronized (this.inflightBatches) { - this.inflightBatches.add(batch); - } - } - - public AppendResponseObserver(StreamWriter streamWriter) { - this.streamWriter = streamWriter; - } - - private boolean isRecoverableError(Throwable t) { - Status status = Status.fromThrowable(t); - return status.getCode() == Status.Code.UNAVAILABLE; - } - - @Override - public void onStart(StreamController controller) { - // no-op - } - - private void abortInflightRequests(Throwable t) { - LOG.fine("Aborting all inflight requests"); - synchronized (this.inflightBatches) { - boolean first_error = true; - while (!this.inflightBatches.isEmpty()) { - InflightBatch inflightBatch = this.inflightBatches.poll(); - if (first_error || t.getCause().getClass() == AbortedException.class) { - inflightBatch.onFailure(t); - first_error = false; - } else { - inflightBatch.onFailure( - new AbortedException( - "Request aborted due to previous failures", - t, - GrpcStatusCode.of(Status.Code.ABORTED), - true)); - } - streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); - } - } - } - - @Override - public void onResponse(AppendRowsResponse response) { - InflightBatch inflightBatch = null; - synchronized (this.inflightBatches) { - inflightBatch = this.inflightBatches.poll(); - } - try { - streamWriter.currentRetries = 0; - if (response == null) { - inflightBatch.onFailure(new IllegalStateException("Response is null")); - } - if (response.hasUpdatedSchema()) { - if (streamWriter.getOnSchemaUpdateRunnable() != null) { - streamWriter.getOnSchemaUpdateRunnable().setUpdatedSchema(response.getUpdatedSchema()); - streamWriter.executor.schedule( - streamWriter.getOnSchemaUpdateRunnable(), 0L, TimeUnit.MILLISECONDS); - } - } - // Currently there is nothing retryable. If the error is already exists, then ignore it. - if (response.hasError()) { - StatusRuntimeException exception = - new StatusRuntimeException( - Status.fromCodeValue(response.getError().getCode()) - .withDescription(response.getError().getMessage())); - inflightBatch.onFailure(exception); - } else { - if (inflightBatch.getExpectedOffset() > 0 - && (response.getAppendResult().hasOffset() - && response.getAppendResult().getOffset().getValue() - != inflightBatch.getExpectedOffset())) { - IllegalStateException exception = - new IllegalStateException( - String.format( - "The append result offset %s does not match the expected offset %s.", - response.getAppendResult().getOffset().getValue(), - inflightBatch.getExpectedOffset())); - inflightBatch.onFailure(exception); - abortInflightRequests( - new AbortedException( - "Request aborted due to previous failures", - exception, - GrpcStatusCode.of(Status.Code.ABORTED), - true)); - } else { - inflightBatch.onSuccess(response); - } - } - } finally { - streamWriter.messagesWaiter.release(inflightBatch.getByteSize()); - } - } - - @Override - public void onComplete() { - LOG.info("OnComplete called"); - } - - @Override - public void onError(Throwable t) { - LOG.info("OnError called: " + t.toString()); - streamWriter.streamException.set(t); - abortInflightRequests(t); - } - }; - - // This class controls how many messages are going to be sent out in a batch. - private static class MessagesBatch { - private List messages; - private long batchedBytes; - private final BatchingSettings batchingSettings; - private Boolean attachSchema = true; - private final String streamName; - private final StreamWriter streamWriter; - - private MessagesBatch( - BatchingSettings batchingSettings, String streamName, StreamWriter streamWriter) { - this.batchingSettings = batchingSettings; - this.streamName = streamName; - this.streamWriter = streamWriter; - reset(); - } - - // Get all the messages out in a batch. - @GuardedBy("appendAndRefreshAppendLock") - private InflightBatch popBatch() { - InflightBatch batch = - new InflightBatch( - messages, batchedBytes, this.streamName, this.attachSchema, this.streamWriter); - this.attachSchema = false; - reset(); - return batch; - } - - private void reset() { - messages = new LinkedList<>(); - batchedBytes = 0; - } - - private void resetAttachSchema() { - attachSchema = true; - } - - private boolean isEmpty() { - return messages.isEmpty(); - } - - private long getBatchedBytes() { - return batchedBytes; - } - - private int getMessagesCount() { - return messages.size(); - } - - private boolean hasBatchingBytes() { - return getMaxBatchBytes() > 0; - } - - private long getMaxBatchBytes() { - return batchingSettings.getRequestByteThreshold(); - } - - // The message batch returned could contain the previous batch of messages plus the current - // message. - // if the message is too large. - @GuardedBy("appendAndRefreshAppendLock") - private List add(AppendRequestAndFutureResponse outstandingAppend) { - List batchesToSend = new ArrayList<>(); - // Check if the next message makes the current batch exceed the max batch byte size. - if (!isEmpty() - && hasBatchingBytes() - && getBatchedBytes() + outstandingAppend.messageSize >= getMaxBatchBytes()) { - batchesToSend.add(popBatch()); - } - - messages.add(outstandingAppend); - batchedBytes += outstandingAppend.messageSize; - - // Border case: If the message to send is greater or equals to the max batch size then send it - // immediately. - // Alternatively if after adding the message we have reached the batch max messages then we - // have a batch to send. - if ((hasBatchingBytes() && outstandingAppend.messageSize >= getMaxBatchBytes()) - || getMessagesCount() == batchingSettings.getElementCountThreshold()) { - batchesToSend.add(popBatch()); - } - return batchesToSend; - } - } -} diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index ad104c497b..1d83af111e 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -172,6 +172,7 @@ private StreamWriterV2(Builder builder) throws IOException { this.client = builder.client; this.ownsBigQueryWriteClient = false; } + this.streamConnection = new StreamConnection( this.client, @@ -492,7 +493,11 @@ private AppendRequestAndResponse pollInflightRequestQueue() { return requestWrapper; } - /** Constructs a new {@link StreamWriterV2.Builder} using the given stream and client. */ + /** + * Constructs a new {@link StreamWriterV2.Builder} using the given stream and client. AppendRows + * needs special headers to be added to client, so a passed in client will not work. This should + * be used by test only. + */ public static StreamWriterV2.Builder newBuilder(String streamName, BigQueryWriteClient client) { return new StreamWriterV2.Builder(streamName, client); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java deleted file mode 100644 index 7821cbae2f..0000000000 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java +++ /dev/null @@ -1,1398 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.bigquery.storage.v1beta2; - -import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import com.google.api.core.ApiFuture; -import com.google.api.gax.batching.BatchingSettings; -import com.google.api.gax.batching.FlowControlSettings; -import com.google.api.gax.batching.FlowController; -import com.google.api.gax.core.ExecutorProvider; -import com.google.api.gax.core.FixedExecutorProvider; -import com.google.api.gax.core.InstantiatingExecutorProvider; -import com.google.api.gax.core.NoCredentialsProvider; -import com.google.api.gax.grpc.testing.LocalChannelProvider; -import com.google.api.gax.grpc.testing.MockGrpcService; -import com.google.api.gax.grpc.testing.MockServiceHelper; -import com.google.api.gax.rpc.AbortedException; -import com.google.api.gax.rpc.DataLossException; -import com.google.api.gax.rpc.UnknownException; -import com.google.cloud.bigquery.storage.test.Test.FooType; -import com.google.common.base.Strings; -import com.google.protobuf.DescriptorProtos; -import com.google.protobuf.Int64Value; -import com.google.protobuf.Timestamp; -import io.grpc.Status; -import io.grpc.StatusRuntimeException; -import java.util.Arrays; -import java.util.LinkedList; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.logging.Logger; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.threeten.bp.Duration; -import org.threeten.bp.Instant; - -@RunWith(JUnit4.class) -public class StreamWriterTest { - private static final Logger LOG = Logger.getLogger(StreamWriterTest.class.getName()); - private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s"; - private static final String TEST_TABLE = "projects/p/datasets/d/tables/t"; - private static final ExecutorProvider SINGLE_THREAD_EXECUTOR = - InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build(); - private static LocalChannelProvider channelProvider; - private FakeScheduledExecutorService fakeExecutor; - private FakeBigQueryWrite testBigQueryWrite; - private static MockServiceHelper serviceHelper; - - @Before - public void setUp() throws Exception { - testBigQueryWrite = new FakeBigQueryWrite(); - serviceHelper = - new MockServiceHelper( - UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite)); - serviceHelper.start(); - channelProvider = serviceHelper.createChannelProvider(); - fakeExecutor = new FakeScheduledExecutorService(); - testBigQueryWrite.setExecutor(fakeExecutor); - Instant time = Instant.now(); - Timestamp timestamp = - Timestamp.newBuilder().setSeconds(time.getEpochSecond()).setNanos(time.getNano()).build(); - // Add enough GetWriteStream response. - for (int i = 0; i < 4; i++) { - testBigQueryWrite.addResponse( - WriteStream.newBuilder().setName(TEST_STREAM).setCreateTime(timestamp).build()); - } - } - - @After - public void tearDown() throws Exception { - LOG.info("tearDown called"); - serviceHelper.stop(); - } - - private StreamWriter.Builder getTestStreamWriterBuilder(String testStream) { - return StreamWriter.newBuilder(testStream) - .setChannelProvider(channelProvider) - .setExecutorProvider(SINGLE_THREAD_EXECUTOR) - .setCredentialsProvider(NoCredentialsProvider.create()); - } - - private StreamWriter.Builder getTestStreamWriterBuilder() { - return getTestStreamWriterBuilder(TEST_STREAM); - } - - private AppendRowsRequest createAppendRequest(String[] messages, long offset) { - AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); - AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder(); - dataBuilder.setWriterSchema( - ProtoSchema.newBuilder() - .setProtoDescriptor( - DescriptorProtos.DescriptorProto.newBuilder() - .setName("Message") - .addField( - DescriptorProtos.FieldDescriptorProto.newBuilder() - .setName("foo") - .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING) - .setNumber(1) - .build()) - .build())); - ProtoRows.Builder rows = ProtoRows.newBuilder(); - for (String message : messages) { - FooType foo = FooType.newBuilder().setFoo(message).build(); - rows.addSerializedRows(foo.toByteString()); - } - if (offset > 0) { - requestBuilder.setOffset(Int64Value.of(offset)); - } - return requestBuilder - .setProtoRows(dataBuilder.setRows(rows.build()).build()) - .setWriteStream(TEST_STREAM) - .build(); - } - - private ApiFuture sendTestMessage( - StreamWriter writer, String[] messages, int offset) { - return writer.append(createAppendRequest(messages, offset)); - } - - private ApiFuture sendTestMessage(StreamWriter writer, String[] messages) { - return writer.append(createAppendRequest(messages, -1)); - } - - @Test - public void testTableName() throws Exception { - try (StreamWriter writer = getTestStreamWriterBuilder().build()) { - assertEquals("projects/p/datasets/d/tables/t", writer.getTableNameString()); - } - } - - @Test - public void testDefaultStream() throws Exception { - try (StreamWriter writer = - StreamWriter.newBuilder(TEST_TABLE) - .createDefaultStream() - .setChannelProvider(channelProvider) - .setExecutorProvider(SINGLE_THREAD_EXECUTOR) - .setCredentialsProvider(NoCredentialsProvider.create()) - .build()) { - assertEquals("projects/p/datasets/d/tables/t", writer.getTableNameString()); - assertEquals("projects/p/datasets/d/tables/t/_default", writer.getStreamNameString()); - } - } - - @Test - public void testAppendByDuration() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setDelayThreshold(Duration.ofSeconds(5)) - .setElementCountThreshold(10L) - .build()) - .setExecutorProvider(FixedExecutorProvider.create(fakeExecutor)) - .build(); - - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) - .build()); - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); - ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); - - assertFalse(appendFuture1.isDone()); - assertFalse(appendFuture2.isDone()); - fakeExecutor.advanceTime(Duration.ofSeconds(10)); - - assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); - assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); - appendFuture1.get(); - appendFuture2.get(); - assertEquals(1, testBigQueryWrite.getAppendRequests().size()); - - assertEquals( - 2, - testBigQueryWrite - .getAppendRequests() - .get(0) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); - writer.close(); - } - - @Test - public void testAppendByNumBatchedMessages() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(2L) - .setDelayThreshold(Duration.ofSeconds(100)) - .build()) - .build(); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) - .build()); - - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); - ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); - ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); - - assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); - assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); - - assertFalse(appendFuture3.isDone()); - - ApiFuture appendFuture4 = sendTestMessage(writer, new String[] {"D"}); - - assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); - assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue()); - - assertEquals(2, testBigQueryWrite.getAppendRequests().size()); - assertEquals( - 2, - testBigQueryWrite - .getAppendRequests() - .get(0) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); - assertEquals( - 2, - testBigQueryWrite - .getAppendRequests() - .get(1) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); - writer.close(); - } - - @Test - public void testAppendByNumBytes() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - // Each message is 32 bytes, setting batch size to 70 bytes allows 2 messages. - .setRequestByteThreshold(70L) - .setDelayThreshold(Duration.ofSeconds(100000)) - .build()) - .build(); - - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) - .build()); - - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); - ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); - ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); - - assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); - assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); - - assertFalse(appendFuture3.isDone()); - - // This message is big enough to trigger send on the previous message and itself. - ApiFuture appendFuture4 = - sendTestMessage(writer, new String[] {Strings.repeat("A", 100)}); - assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue()); - assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue()); - - assertEquals(3, testBigQueryWrite.getAppendRequests().size()); - - writer.close(); - } - - @Test - public void testShutdownFlushBatch() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setDelayThreshold(Duration.ofSeconds(100)) - .setElementCountThreshold(10L) - .build()) - .build(); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) - .build()); - - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); - ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); - - // Note we are not advancing time or reaching the count threshold but messages should - // still get written by call to shutdown - - writer.close(); - - // Verify the appends completed - assertTrue(appendFuture1.isDone()); - assertTrue(appendFuture2.isDone()); - assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); - assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); - } - - @Test - public void testWriteMixedSizeAndDuration() throws Exception { - try (StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(2L) - .setDelayThreshold(Duration.ofSeconds(1)) - .build()) - .build()) { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) - .build()); - - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); - assertFalse(appendFuture1.isDone()); - - ApiFuture appendFuture2 = - sendTestMessage(writer, new String[] {"B", "C"}); - - // Write triggered by batch size - assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); - assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue()); - - ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"D"}); - assertFalse(appendFuture3.isDone()); - // Eventually will be triggered by time elapsed. - assertEquals(3L, appendFuture3.get().getAppendResult().getOffset().getValue()); - - assertEquals( - 3, - testBigQueryWrite - .getAppendRequests() - .get(0) - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - true, testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema()); - assertEquals( - 1, - testBigQueryWrite - .getAppendRequests() - .get(1) // this gives IndexOutOfBounds error at the moment - .getProtoRows() - .getRows() - .getSerializedRowsCount()); - assertEquals( - false, testBigQueryWrite.getAppendRequests().get(1).getProtoRows().hasWriterSchema()); - Thread.sleep(1005); - assertTrue(appendFuture3.isDone()); - } - } - - @Test - public void testBatchIsFlushed() throws Exception { - try (StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(2L) - .setDelayThreshold(Duration.ofSeconds(1)) - .build()) - .build()) { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build()) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) - .build()); - - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); - assertFalse(appendFuture1.isDone()); - writer.shutdown(); - // Write triggered by shutdown. - assertTrue(appendFuture1.isDone()); - assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue()); - } - } - - @Test - public void testBatchIsFlushedWithError() throws Exception { - try (StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(2L) - .setDelayThreshold(Duration.ofSeconds(1)) - .build()) - .build()) { - testBigQueryWrite.addException(Status.DATA_LOSS.asException()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) - .build()); - - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); - try { - ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); - ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}); - try { - appendFuture2.get(); - } catch (ExecutionException ex) { - assertEquals(DataLossException.class, ex.getCause().getClass()); - } - assertFalse(appendFuture3.isDone()); - writer.shutdown(); - try { - appendFuture3.get(); - } catch (ExecutionException ex) { - assertEquals(AbortedException.class, ex.getCause().getClass()); - } - } catch (IllegalStateException ex) { - assertEquals("Stream already failed.", ex.getMessage()); - } - } - } - - @Test - public void testFlowControlBehaviorBlock() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(1L) - .setFlowControlSettings( - StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS - .toBuilder() - .setMaxOutstandingElementCount(2L) - .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) - .build()) - .build()) - .build(); - - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build()) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(4)).build()) - .build()); - // Response will have a 10 second delay before server sends them back. - testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10)); - - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 2); - final StreamWriter writer1 = writer; - ExecutorService executor = Executors.newFixedThreadPool(2); - Callable callable = - new Callable() { - @Override - public Throwable call() { - try { - ApiFuture appendFuture2 = - sendTestMessage(writer1, new String[] {"B"}, 3); - ApiFuture appendFuture3 = - sendTestMessage(writer1, new String[] {"C"}, 4); - // This request will be send out immediately because there is space in inflight queue. - // The time advance in the main thread will cause it to be sent back. - if (3 != appendFuture2.get().getAppendResult().getOffset().getValue()) { - return new Exception( - "expected 3 but got " - + appendFuture2.get().getAppendResult().getOffset().getValue()); - } - testBigQueryWrite.waitForResponseScheduled(); - // Wait a while so that the close is called before we release the last response on the - // ohter thread. - Thread.sleep(500); - // This triggers the last response to come back. - fakeExecutor.advanceTime(Duration.ofSeconds(10)); - // This request will be waiting for previous response to come back. - if (4 != appendFuture3.get().getAppendResult().getOffset().getValue()) { - return new Exception( - "expected 4 but got " - + appendFuture3.get().getAppendResult().getOffset().getValue()); - } - return null; - } catch (IllegalStateException ex) { - // Sometimes the close will race before these calls. - return null; - } catch (Exception e) { - return e; - } - } - }; - Future future = executor.submit(callable); - assertEquals(false, appendFuture1.isDone()); - testBigQueryWrite.waitForResponseScheduled(); - testBigQueryWrite.waitForResponseScheduled(); - // This will trigger the previous two responses to come back. - fakeExecutor.advanceTime(Duration.ofSeconds(10)); - assertEquals(2L, appendFuture1.get().getAppendResult().getOffset().getValue()); - Thread.sleep(500); - // When close is called, there should be one inflight request waiting. - writer.close(); - if (future.get() != null) { - future.get().printStackTrace(); - fail("Call got exception: " + future.get().toString()); - } - // Everything should come back. - executor.shutdown(); - } - - @Test - public void testFlowControlBehaviorBlockWithError() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(1L) - .setFlowControlSettings( - StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS - .toBuilder() - .setMaxOutstandingElementCount(2L) - .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) - .build()) - .build()) - .build(); - - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) - .build()); - testBigQueryWrite.addException(Status.DATA_LOSS.asException()); - - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 2); - final StreamWriter writer1 = writer; - ExecutorService executor = Executors.newFixedThreadPool(2); - Callable callable = - new Callable() { - @Override - public Throwable call() { - try { - ApiFuture appendFuture2 = - sendTestMessage(writer1, new String[] {"B"}, 3); - ApiFuture appendFuture3 = - sendTestMessage(writer1, new String[] {"C"}, 4); - try { - // This request will be send out immediately because there is space in inflight - // queue. - assertEquals(3L, appendFuture2.get().getAppendResult().getOffset().getValue()); - return new Exception("Should have failure on future2"); - } catch (ExecutionException e) { - if (e.getCause().getClass() != DataLossException.class) { - return e; - } - } - try { - // This request will be waiting for previous response to come back. - assertEquals(4L, appendFuture3.get().getAppendResult().getOffset().getValue()); - fail("Should be aborted future3"); - } catch (ExecutionException e) { - if (e.getCause().getClass() != AbortedException.class) { - return e; - } - } - return null; - } catch (IllegalStateException ex) { - // Sometimes the append will happen after the stream is shutdown. - ex.printStackTrace(); - return null; - } catch (Exception e) { - return e; - } - } - }; - Future future = executor.submit(callable); - // Wait is necessary for response to be scheduled before timer is advanced. - testBigQueryWrite.waitForResponseScheduled(); - testBigQueryWrite.waitForResponseScheduled(); - // This will trigger the previous two responses to come back. - fakeExecutor.advanceTime(Duration.ofSeconds(10)); - // The first requests gets back while the second one is blocked. - assertEquals(2L, appendFuture1.get().getAppendResult().getOffset().getValue()); - Thread.sleep(500); - // When close is called, there should be one inflight request waiting. - writer.close(); - if (future.get() != null) { - future.get().printStackTrace(); - fail("Call got exception: " + future.get().toString()); - } - // Everything should come back. - executor.shutdown(); - } - - @Test - public void testAppendWhileShutdownSuccess() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - // When shutdown, we should have something in batch. - .setElementCountThreshold(3L) - .setDelayThreshold(Duration.ofSeconds(1000)) - .setFlowControlSettings( - StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS - .toBuilder() - // When shutdown, we should have something in flight. - .setMaxOutstandingElementCount(5L) - .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) - .build()) - .build()) - .build(); - - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build()) - .build()); - for (int i = 1; i < 13; i++) { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder() - .setOffset(Int64Value.of(i * 3 + 2)) - .build()) - .build()); - } - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 2); - final StreamWriter writer1 = writer; - ExecutorService executor = Executors.newFixedThreadPool(2); - Callable callable = - new Callable() { - @Override - public Throwable call() { - try { - LinkedList> responses = - new LinkedList>(); - int last_count = 0; - for (int i = 0; i < 20; i++) { - try { - responses.add(sendTestMessage(writer1, new String[] {"B"}, i + 3)); - } catch (IllegalStateException ex) { - LOG.info("Stopped at " + i + " responses:" + responses.size()); - last_count = i; - if ("Cannot append on a shut-down writer." != ex.getMessage()) { - return new Exception("Got unexpected message:" + ex.getMessage()); - } - break; - } catch (AbortedException ex) { - LOG.info("Stopped at " + i + " responses:" + responses.size()); - last_count = i; - if ("Stream closed, abort append." != ex.getMessage()) { - return new Exception("Got unexpected message:" + ex.getMessage()); - } - break; - } - } - // For all the requests that are sent in, we should get a finished callback. - for (int i = 0; i < last_count; i++) { - if (i + 3 != responses.get(i).get().getAppendResult().getOffset().getValue()) { - return new Exception( - "Got unexpected offset expect:" - + i - + " actual:" - + responses.get(i - 3).get().getAppendResult().getOffset().getValue()); - } - } - return null; - } catch (ExecutionException ex) { - // Some wiredness in test presubmit runs, it seems this thread is always started after - // the main thread. - if (ex.getCause().getClass() == AbortedException.class) { - return null; - } else { - return ex; - } - } catch (Exception e) { - return e; - } - } - }; - Future future = executor.submit(callable); - assertEquals(false, appendFuture1.isDone()); - // The first requests gets back while the second one is blocked. - assertEquals(2L, appendFuture1.get().getAppendResult().getOffset().getValue()); - // When close is called, there should be one inflight request waiting. - writer.close(); - if (future.get() != null) { - future.get().printStackTrace(); - fail("Call got exception: " + future.get().toString()); - } - // Everything should come back. - executor.shutdown(); - } - - @Test - public void testAppendWhileShutdownFailed() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - // When shutdown, we should have something in batch. - .setElementCountThreshold(3L) - .setDelayThreshold(Duration.ofSeconds(10)) - .setFlowControlSettings( - StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS - .toBuilder() - // When shutdown, we should have something in flight. - .setMaxOutstandingElementCount(5L) - .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) - .build()) - .build()) - .build(); - - // The responses are for every 3 messages. - for (int i = 0; i < 2; i++) { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder() - .setOffset(Int64Value.of(i * 3)) - .build()) - .build()); - } - for (int i = 2; i < 6; i++) { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setError( - com.google.rpc.Status.newBuilder().setCode(3).setMessage("error " + i).build()) - .build()); - } - // Stream failed at 7th request. - for (int i = 6; i < 10; i++) { - testBigQueryWrite.addException(new UnsupportedOperationException("Strange exception")); - } - final StreamWriter writer1 = writer; - ExecutorService executor = Executors.newFixedThreadPool(2); - Callable callable = - new Callable() { - @Override - public Throwable call() { - try { - LinkedList> responses = - new LinkedList>(); - int last_count = 30; - LOG.info( - "Send 30 messages, will be batched into 10 messages, start fail at 7th message"); - for (int i = 0; i < 30; i++) { - try { - responses.add(sendTestMessage(writer1, new String[] {"B"}, i)); - Thread.sleep(500); - } catch (IllegalStateException ex) { - LOG.info("Stopped at sending request no." + i + " ex: " + ex.toString()); - last_count = i; - if ("Stream already failed." != ex.getMessage() - && "Cannot append on a shut-down writer." != ex.getMessage()) { - return new Exception("Got unexpected message:" + ex.getMessage()); - } - break; - } - } - // Verify sent responses. - // For all the requests that are sent in, we should get a finished callback. - for (int i = 0; i < 2 * 3; i++) { - try { - if (i != responses.get(i).get().getAppendResult().getOffset().getValue()) { - return new Exception( - "Got unexpected offset expect:" - + i - + " actual:" - + responses.get(i).get().getAppendResult().getOffset().getValue()); - } - } catch (Exception e) { - return e; - } - } - // For all the requests that are sent in, we should get a finished callback. - for (int i = 2 * 3; i < 6 * 3; i++) { - try { - responses.get(i).get(); - return new Exception( - "Expect response return an error after a in-stream exception"); - } catch (Exception e) { - if (e.getCause().getClass() != StatusRuntimeException.class) { - return new Exception( - "Expect first error of stream exception to be the original exception but got" - + e.getCause().toString()); - } - } - } - LOG.info("Last count is:" + last_count); - for (int i = 6 * 3; i < last_count; i++) { - try { - responses.get(i).get(); - return new Exception("Expect response return an error after a stream exception"); - } catch (Exception e) { - if (e.getCause().getClass() != UnknownException.class - && e.getCause().getClass() != AbortedException.class) { - return new Exception("Unexpected stream exception:" + e.toString()); - } - } - } - return null; - } catch (Exception e) { - return e; - } - } - }; - Future future = executor.submit(callable); - // Wait for at least 7 request (after batch) to reach server. - for (int i = 0; i < 7; i++) { - LOG.info("Wait for " + i + " response scheduled"); - testBigQueryWrite.waitForResponseScheduled(); - } - Thread.sleep(500); - writer.close(); - if (future.get() != null) { - future.get().printStackTrace(); - fail("Callback got exception" + future.get().toString()); - } - // Everything should come back. - executor.shutdown(); - } - - @Test - public void testFlowControlBehaviorException() throws Exception { - try (StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(1L) - .setFlowControlSettings( - StreamWriter.Builder.DEFAULT_FLOW_CONTROL_SETTINGS - .toBuilder() - .setMaxOutstandingElementCount(1L) - .setLimitExceededBehavior( - FlowController.LimitExceededBehavior.ThrowException) - .build()) - .build()) - .build()) { - assertEquals( - 1L, - writer - .getBatchingSettings() - .getFlowControlSettings() - .getMaxOutstandingElementCount() - .longValue()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) - .build()); - testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10)); - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); - ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); - // Wait is necessary for response to be scheduled before timer is advanced. - testBigQueryWrite.waitForResponseScheduled(); - fakeExecutor.advanceTime(Duration.ofSeconds(10)); - try { - appendFuture2.get(); - Assert.fail("This should fail"); - } catch (Exception e) { - assertEquals( - "java.util.concurrent.ExecutionException: The maximum number of batch elements: 1 have been reached.", - e.toString()); - } - assertEquals(1L, appendFuture1.get().getAppendResult().getOffset().getValue()); - } - } - - @Test - public void testStreamReconnectionPermanant() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setDelayThreshold(Duration.ofSeconds(100000)) - .setElementCountThreshold(1L) - .build()) - .build(); - StatusRuntimeException permanentError = new StatusRuntimeException(Status.INVALID_ARGUMENT); - testBigQueryWrite.addException(permanentError); - ApiFuture future2 = sendTestMessage(writer, new String[] {"m2"}); - try { - future2.get(); - Assert.fail("This should fail."); - } catch (ExecutionException e) { - assertEquals(permanentError.toString(), e.getCause().getCause().toString()); - } - writer.close(); - } - - @Test - public void testOffset() throws Exception { - try (StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(2L) - .setDelayThreshold(Duration.ofSeconds(1000)) - .build()) - .build()) { - - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(10)).build()) - .build()); - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(13)).build()) - .build()); - - AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L); - ApiFuture appendFuture1 = writer.append(request1); - AppendRowsRequest request2 = createAppendRequest(new String[] {"B", "C"}, 11L); - ApiFuture appendFuture2 = writer.append(request2); - AppendRowsRequest request3 = createAppendRequest(new String[] {"E", "F"}, 13L); - ApiFuture appendFuture3 = writer.append(request3); - AppendRowsRequest request4 = createAppendRequest(new String[] {"G"}, 15L); - ApiFuture appendFuture4 = writer.append(request4); - assertEquals(10L, appendFuture1.get().getAppendResult().getOffset().getValue()); - assertEquals(11L, appendFuture2.get().getAppendResult().getOffset().getValue()); - assertEquals(13L, appendFuture3.get().getAppendResult().getOffset().getValue()); - assertEquals(15L, appendFuture4.get().getAppendResult().getOffset().getValue()); - } - } - - @Test - public void testOffsetMismatch() throws Exception { - try (StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(1L) - .build()) - .build()) { - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(11)).build()) - .build()); - AppendRowsRequest request1 = createAppendRequest(new String[] {"A"}, 10L); - ApiFuture appendFuture1 = writer.append(request1); - - appendFuture1.get(); - fail("Should throw exception"); - } catch (Exception e) { - assertEquals( - "java.lang.IllegalStateException: The append result offset 11 does not match the expected offset 10.", - e.getCause().toString()); - } - } - - @Test - public void testStreamAppendDirectException() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(1L) - .setDelayThreshold(Duration.ofSeconds(5)) - .build()) - .build(); - testBigQueryWrite.addException(Status.DATA_LOSS.asException()); - ApiFuture future1 = sendTestMessage(writer, new String[] {"A"}); - try { - future1.get(); - fail("Expected furture1 to fail"); - } catch (ExecutionException ex) { - assertEquals(DataLossException.class, ex.getCause().getClass()); - } - try { - sendTestMessage(writer, new String[] {"B"}); - fail("Expected furture2 to fail"); - } catch (IllegalStateException ex) { - assertEquals("Stream already failed.", ex.getMessage()); - } - writer.shutdown(); - try { - sendTestMessage(writer, new String[] {"C"}); - fail("Expected furture3 to fail"); - } catch (IllegalStateException ex) { - assertEquals("Cannot append on a shut-down writer.", ex.getMessage()); - } - } - - @Test - public void testErrorPropagation() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setExecutorProvider(SINGLE_THREAD_EXECUTOR) - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(1L) - .setDelayThreshold(Duration.ofSeconds(5)) - .build()) - .build(); - testBigQueryWrite.addException(Status.DATA_LOSS.asException()); - testBigQueryWrite.addException(Status.DATA_LOSS.asException()); - ApiFuture future1 = sendTestMessage(writer, new String[] {"A"}); - ApiFuture future2 = sendTestMessage(writer, new String[] {"B"}); - try { - future1.get(); - fail("should throw exception"); - } catch (ExecutionException e) { - assertThat(e.getCause()).isInstanceOf(DataLossException.class); - } - try { - future2.get(); - fail("should throw exception"); - } catch (ExecutionException e) { - assertThat(e.getCause()).isInstanceOf(AbortedException.class); - } - } - - @Test - public void testWriterGetters() throws Exception { - StreamWriter.Builder builder = StreamWriter.newBuilder(TEST_STREAM); - builder.setChannelProvider(channelProvider); - builder.setExecutorProvider(SINGLE_THREAD_EXECUTOR); - builder.setBatchingSettings( - BatchingSettings.newBuilder() - .setRequestByteThreshold(10L) - .setDelayThreshold(Duration.ofMillis(11)) - .setElementCountThreshold(12L) - .setFlowControlSettings( - FlowControlSettings.newBuilder() - .setMaxOutstandingElementCount(100L) - .setMaxOutstandingRequestBytes(1000L) - .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) - .build()) - .build()); - builder.setCredentialsProvider(NoCredentialsProvider.create()); - StreamWriter writer = builder.build(); - - assertEquals(TEST_STREAM, writer.getStreamNameString()); - assertEquals(10, (long) writer.getBatchingSettings().getRequestByteThreshold()); - assertEquals(Duration.ofMillis(11), writer.getBatchingSettings().getDelayThreshold()); - assertEquals(12, (long) writer.getBatchingSettings().getElementCountThreshold()); - assertEquals( - FlowController.LimitExceededBehavior.Block, - writer.getBatchingSettings().getFlowControlSettings().getLimitExceededBehavior()); - assertEquals( - 100L, - writer - .getBatchingSettings() - .getFlowControlSettings() - .getMaxOutstandingElementCount() - .longValue()); - assertEquals( - 1000L, - writer - .getBatchingSettings() - .getFlowControlSettings() - .getMaxOutstandingRequestBytes() - .longValue()); - writer.close(); - } - - @Test - public void testBuilderParametersAndDefaults() { - StreamWriter.Builder builder = StreamWriter.newBuilder(TEST_STREAM); - assertEquals(StreamWriter.Builder.DEFAULT_EXECUTOR_PROVIDER, builder.executorProvider); - assertEquals(100 * 1024L, builder.batchingSettings.getRequestByteThreshold().longValue()); - assertEquals(Duration.ofMillis(10), builder.batchingSettings.getDelayThreshold()); - assertEquals(100L, builder.batchingSettings.getElementCountThreshold().longValue()); - assertEquals(StreamWriter.Builder.DEFAULT_RETRY_SETTINGS, builder.retrySettings); - assertEquals(Duration.ofMillis(100), builder.retrySettings.getInitialRetryDelay()); - assertEquals(3, builder.retrySettings.getMaxAttempts()); - } - - @Test - public void testBuilderInvalidArguments() { - StreamWriter.Builder builder = StreamWriter.newBuilder(TEST_STREAM); - - try { - builder.setChannelProvider(null); - fail("Should have thrown an NullPointerException"); - } catch (NullPointerException expected) { - // Expected - } - - try { - builder.setExecutorProvider(null); - fail("Should have thrown an NullPointerException"); - } catch (NullPointerException expected) { - // Expected - } - try { - builder.setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setRequestByteThreshold(null) - .build()); - fail("Should have thrown an NullPointerException"); - } catch (NullPointerException expected) { - // Expected - } - try { - builder.setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setRequestByteThreshold(0L) - .build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - try { - builder.setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setRequestByteThreshold(-1L) - .build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - - builder.setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setDelayThreshold(Duration.ofMillis(1)) - .build()); - try { - builder.setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setDelayThreshold(Duration.ofMillis(-1)) - .build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - - builder.setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(1L) - .build()); - try { - builder.setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(null) - .build()); - fail("Should have thrown an NullPointerException"); - } catch (NullPointerException expected) { - // Expected - } - try { - builder.setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(0L) - .build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - try { - builder.setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(-1L) - .build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - - try { - FlowControlSettings flowControlSettings = - FlowControlSettings.newBuilder().setMaxOutstandingElementCount(-1L).build(); - builder.setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setFlowControlSettings(flowControlSettings) - .build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - - try { - FlowControlSettings flowControlSettings = - FlowControlSettings.newBuilder().setMaxOutstandingRequestBytes(-1L).build(); - builder.setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setFlowControlSettings(flowControlSettings) - .build()); - fail("Should have thrown an IllegalArgumentException"); - } catch (IllegalArgumentException expected) { - // Expected - } - { - FlowControlSettings flowControlSettings = - FlowControlSettings.newBuilder() - .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore) - .build(); - builder.setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setFlowControlSettings(flowControlSettings) - .build()); - } - try { - FlowControlSettings flowControlSettings = - FlowControlSettings.newBuilder().setLimitExceededBehavior(null).build(); - builder.setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setFlowControlSettings(flowControlSettings) - .build()); - fail("Should have thrown an NullPointerException"); - } catch (NullPointerException expected) { - // Expected - } - } - - @Test - public void testExistingClient() throws Exception { - BigQueryWriteSettings settings = - BigQueryWriteSettings.newBuilder() - .setTransportChannelProvider(channelProvider) - .setCredentialsProvider(NoCredentialsProvider.create()) - .build(); - BigQueryWriteClient client = BigQueryWriteClient.create(settings); - StreamWriter writer = StreamWriter.newBuilder(TEST_STREAM, client).build(); - writer.close(); - assertFalse(client.isShutdown()); - client.shutdown(); - client.awaitTermination(1, TimeUnit.MINUTES); - } - - @Test - public void testDatasetTraceId() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(1L) - .build()) - .setDataflowTraceId() - .build(); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); - testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); - - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); - ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}); - appendFuture1.get(); - appendFuture2.get(); - assertEquals("Dataflow", testBigQueryWrite.getAppendRequests().get(0).getTraceId()); - assertEquals("", testBigQueryWrite.getAppendRequests().get(1).getTraceId()); - } - - @Test - public void testShutdownWithConnectionError() throws Exception { - StreamWriter writer = - getTestStreamWriterBuilder() - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setElementCountThreshold(1L) - .build()) - .build(); - // Three request will reach the server. - testBigQueryWrite.addResponse( - AppendRowsResponse.newBuilder() - .setAppendResult( - AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build()) - .build()); - testBigQueryWrite.addException(Status.DATA_LOSS.asException()); - testBigQueryWrite.addException(Status.DATA_LOSS.asException()); - testBigQueryWrite.addException(Status.DATA_LOSS.asException()); - testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10)); - - ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}, 1); - ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"}, 2); - ApiFuture appendFuture3 = sendTestMessage(writer, new String[] {"C"}, 3); - testBigQueryWrite.waitForResponseScheduled(); - testBigQueryWrite.waitForResponseScheduled(); - testBigQueryWrite.waitForResponseScheduled(); - fakeExecutor.advanceTime(Duration.ofSeconds(10)); - // This will will never be in inflight and aborted by previous failure, because its delay is set - // after timer advance. - Thread.sleep(500); - try { - ApiFuture appendFuture4 = sendTestMessage(writer, new String[] {"D"}, 4); - } catch (IllegalStateException ex) { - assertEquals("Stream already failed.", ex.getMessage()); - } - // Shutdown writer immediately and there will be some error happened when flushing the queue. - writer.shutdown(); - assertEquals(1, appendFuture1.get().getAppendResult().getOffset().getValue()); - try { - appendFuture2.get(); - fail("Should fail with exception future2"); - } catch (ExecutionException e) { - assertThat(e.getCause()).isInstanceOf(DataLossException.class); - } - try { - appendFuture3.get(); - fail("Should fail with exception future3"); - } catch (ExecutionException e) { - assertThat(e.getCause()).isInstanceOf(AbortedException.class); - } - } -} diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java index b93eeeaf34..3c1eeef8fd 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java @@ -29,7 +29,6 @@ import com.google.cloud.bigquery.storage.v1beta2.*; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; import com.google.protobuf.Descriptors; -import com.google.protobuf.Int64Value; import java.io.IOException; import java.math.BigDecimal; import java.util.*; @@ -145,28 +144,16 @@ public static void afterClass() { } } - private AppendRowsRequest.Builder createAppendRequest(String streamName, String[] messages) { - AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); - - AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder(); - dataBuilder.setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor())); - + ProtoRows CreateProtoRows(String[] messages) { ProtoRows.Builder rows = ProtoRows.newBuilder(); for (String message : messages) { FooType foo = FooType.newBuilder().setFoo(message).build(); rows.addSerializedRows(foo.toByteString()); } - dataBuilder.setRows(rows.build()); - return requestBuilder.setProtoRows(dataBuilder.build()).setWriteStream(streamName); + return rows.build(); } - private AppendRowsRequest.Builder createAppendRequestComplicateType( - String streamName, String[] messages) { - AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder(); - - AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder(); - dataBuilder.setWriterSchema(ProtoSchemaConverter.convert(ComplicateType.getDescriptor())); - + ProtoRows CreateProtoRowsComplex(String[] messages) { ProtoRows.Builder rows = ProtoRows.newBuilder(); for (String message : messages) { ComplicateType foo = @@ -175,16 +162,6 @@ private AppendRowsRequest.Builder createAppendRequestComplicateType( .build(); rows.addSerializedRows(foo.toByteString()); } - dataBuilder.setRows(rows.build()); - return requestBuilder.setProtoRows(dataBuilder.build()).setWriteStream(streamName); - } - - ProtoRows CreateProtoRows(String[] messages) { - ProtoRows.Builder rows = ProtoRows.newBuilder(); - for (String message : messages) { - FooType foo = FooType.newBuilder().setFoo(message).build(); - rows.addSerializedRows(foo.toByteString()); - } return rows.build(); } @@ -431,20 +408,17 @@ public void testComplicateSchemaWithPendingStream() .setWriteStream(WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build()) .build()); FinalizeWriteStreamResponse finalizeResponse = FinalizeWriteStreamResponse.getDefaultInstance(); - try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) { + try (StreamWriterV2 streamWriter = + StreamWriterV2.newBuilder(writeStream.getName()) + .setWriterSchema(ProtoSchemaConverter.convert(ComplicateType.getDescriptor())) + .build()) { LOG.info("Sending two messages"); ApiFuture response = - streamWriter.append( - createAppendRequestComplicateType(writeStream.getName(), new String[] {"aaa"}) - .setOffset(Int64Value.of(0L)) - .build()); + streamWriter.append(CreateProtoRowsComplex(new String[] {"aaa"}), 0L); assertEquals(0, response.get().getAppendResult().getOffset().getValue()); ApiFuture response2 = - streamWriter.append( - createAppendRequestComplicateType(writeStream.getName(), new String[] {"bbb"}) - .setOffset(Int64Value.of(1L)) - .build()); + streamWriter.append(CreateProtoRowsComplex(new String[] {"bbb"}), 1L); assertEquals(1, response2.get().getAppendResult().getOffset().getValue()); // Nothing showed up since rows are not committed. @@ -460,10 +434,7 @@ public void testComplicateSchemaWithPendingStream() FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build()); ApiFuture response3 = - streamWriter.append( - createAppendRequestComplicateType(writeStream.getName(), new String[] {"ccc"}) - .setOffset(Int64Value.of(2L)) - .build()); + streamWriter.append(CreateProtoRows(new String[] {"ccc"}), 2L); try { response3.get(); Assert.fail("Append to finalized stream should fail."); @@ -503,23 +474,16 @@ public void testStreamError() throws IOException, InterruptedException, Executio .setWriteStream( WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) .build()); - try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) { - AppendRowsRequest request = - createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build(); - request - .toBuilder() - .setProtoRows(request.getProtoRows().toBuilder().clearWriterSchema().build()) - .build(); + try (StreamWriterV2 streamWriter = + StreamWriterV2.newBuilder(writeStream.getName()) + .setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor())) + .build()) { ApiFuture response = - streamWriter.append( - createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build()); + streamWriter.append(CreateProtoRows(new String[] {"aaa"}), -1L); assertEquals(0L, response.get().getAppendResult().getOffset().getValue()); // Send in a bogus stream name should cause in connection error. ApiFuture response2 = - streamWriter.append( - createAppendRequest(writeStream.getName(), new String[] {"aaa"}) - .setOffset(Int64Value.of(100L)) - .build()); + streamWriter.append(CreateProtoRows(new String[] {"aaa"}), 100L); try { response2.get(); Assert.fail("Should fail"); @@ -529,8 +493,7 @@ public void testStreamError() throws IOException, InterruptedException, Executio } // We can keep sending requests on the same stream. ApiFuture response3 = - streamWriter.append( - createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build()); + streamWriter.append(CreateProtoRows(new String[] {"aaa"}), -1L); assertEquals(1L, response3.get().getAppendResult().getOffset().getValue()); } finally { } @@ -545,23 +508,23 @@ public void testStreamReconnect() throws IOException, InterruptedException, Exec .setWriteStream( WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) .build()); - try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) { + try (StreamWriterV2 streamWriter = + StreamWriterV2.newBuilder(writeStream.getName()) + .setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor())) + .build()) { ApiFuture response = - streamWriter.append( - createAppendRequest(writeStream.getName(), new String[] {"aaa"}) - .setOffset(Int64Value.of(0L)) - .build()); + streamWriter.append(CreateProtoRows(new String[] {"aaa"}), 0L); assertEquals(0L, response.get().getAppendResult().getOffset().getValue()); } - try (StreamWriter streamWriter = StreamWriter.newBuilder(writeStream.getName()).build()) { + try (StreamWriterV2 streamWriter = + StreamWriterV2.newBuilder(writeStream.getName()) + .setWriterSchema(ProtoSchemaConverter.convert(FooType.getDescriptor())) + .build()) { // Currently there is a bug that reconnection must wait 5 seconds to get the real row count. Thread.sleep(5000L); ApiFuture response = - streamWriter.append( - createAppendRequest(writeStream.getName(), new String[] {"bbb"}) - .setOffset(Int64Value.of(1L)) - .build()); + streamWriter.append(CreateProtoRows(new String[] {"bbb"}), 1L); assertEquals(1L, response.get().getAppendResult().getOffset().getValue()); } }