From 7c01f45ded1c516c23954b88103ca80b2132ecfc Mon Sep 17 00:00:00 2001
From: yayi-google <75696801+yayi-google@users.noreply.github.com>
Date: Tue, 23 Feb 2021 07:56:21 -0800
Subject: [PATCH] feat: Create Stream writer v2 - starting with core logics
(#867)
* Add a new StreamWriterV2.
Compared to existing StreamWriter, its locking mechanism is much
simpler.
* Stop using Java8 features as we still need to support Java7
* Do not hold lock while sending requests, and some minor refactoring.
---
.../storage/v1beta2/StreamConnection.java | 104 +++++
.../storage/v1beta2/StreamWriterV2.java | 364 ++++++++++++++++++
.../storage/v1beta2/FakeBigQueryWrite.java | 4 +
.../v1beta2/FakeBigQueryWriteImpl.java | 14 +
.../storage/v1beta2/StreamWriterV2Test.java | 293 ++++++++++++++
5 files changed, 779 insertions(+)
create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamConnection.java
create mode 100644 google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java
create mode 100644 google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamConnection.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamConnection.java
new file mode 100644
index 0000000000..c04f43c944
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamConnection.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigquery.storage.v1beta2;
+
+import com.google.api.gax.rpc.BidiStreamingCallable;
+import com.google.api.gax.rpc.ClientStream;
+import com.google.api.gax.rpc.ResponseObserver;
+import com.google.api.gax.rpc.StreamController;
+import io.grpc.Status;
+import io.grpc.Status.Code;
+import io.grpc.StatusRuntimeException;
+
+/**
+ * StreamConnection is responsible for writing requests to a GRPC bidirecional connection.
+ *
+ *
StreamWriter creates a connection. Two callback functions are necessary: request_callback and
+ * done_callback. Request callback is used for every request, and done callback is used to notify
+ * the user that the connection is closed and no more callbacks will be received from this
+ * connection.
+ *
+ *
The stream writer will accept all the requests without flow control, and makes the callbacks
+ * in receiving order.
+ *
+ *
It's user's responsibility to do the flow control and maintain the lifetime of the requests.
+ */
+public class StreamConnection {
+ private BidiStreamingCallable bidiStreamingCallable;
+ private ClientStream clientStream;
+
+ private RequestCallback requestCallback;
+ private DoneCallback doneCallback;
+
+ public StreamConnection(
+ BigQueryWriteClient client, RequestCallback requestCallback, DoneCallback doneCallback) {
+ this.requestCallback = requestCallback;
+ this.doneCallback = doneCallback;
+
+ bidiStreamingCallable = client.appendRowsCallable();
+ clientStream =
+ bidiStreamingCallable.splitCall(
+ new ResponseObserver() {
+
+ @Override
+ public void onStart(StreamController controller) {
+ // no-op
+ }
+
+ @Override
+ public void onResponse(AppendRowsResponse response) {
+ StreamConnection.this.requestCallback.run(response);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ StreamConnection.this.doneCallback.run(t);
+ }
+
+ @Override
+ public void onComplete() {
+ StreamConnection.this.doneCallback.run(
+ new StatusRuntimeException(
+ Status.fromCode(Code.CANCELLED)
+ .withDescription("Stream is closed by user.")));
+ }
+ });
+ }
+
+ /**
+ * Sends a request to the bi-directional stream connection.
+ *
+ * @param request request to send.
+ */
+ public void send(AppendRowsRequest request) {
+ clientStream.send(request);
+ }
+
+ /** Close the bi-directional stream connection. */
+ public void close() {
+ clientStream.closeSend();
+ }
+
+ /** Invoked when a response is received from the server. */
+ public static interface RequestCallback {
+ public void run(AppendRowsResponse response);
+ }
+
+ /** Invoked when server closes the connection. */
+ public static interface DoneCallback {
+ public void run(Throwable finalStatus);
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java
new file mode 100644
index 0000000000..10ed99d137
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java
@@ -0,0 +1,364 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigquery.storage.v1beta2;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.SettableApiFuture;
+import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback;
+import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.RequestCallback;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Logger;
+import javax.annotation.concurrent.GuardedBy;
+
+/**
+ * A BigQuery Stream Writer that can be used to write data into BigQuery Table.
+ *
+ * TODO: Add credential support.
+ *
+ *
TODO: Attach schema.
+ *
+ *
TODO: Add max size check.
+ *
+ *
TODO: Add inflight control.
+ *
+ *
TODO: Attach traceId.
+ *
+ *
TODO: Support batching.
+ *
+ *
TODO: Support schema change.
+ */
+public class StreamWriterV2 implements AutoCloseable {
+ private static final Logger log = Logger.getLogger(StreamWriterV2.class.getName());
+
+ private Lock lock;
+ private Condition hasMessageInWaitingQueue;
+
+ /*
+ * The identifier of stream to write to.
+ */
+ private final String streamName;
+
+ /*
+ * Indicates whether user has called Close() or not.
+ */
+ @GuardedBy("lock")
+ private boolean userClosed = false;
+
+ /*
+ * The final status of connection. Set to nonnull when connection is permanently closed.
+ */
+ @GuardedBy("lock")
+ private Throwable connectionFinalStatus = null;
+
+ /*
+ * Contains requests buffered in the client and not yet sent to server.
+ */
+ @GuardedBy("lock")
+ private final Deque waitingRequestQueue;
+
+ /*
+ * Contains sent append requests waiting for response from server.
+ */
+ @GuardedBy("lock")
+ private final Deque inflightRequestQueue;
+
+ /*
+ * Wraps the underlying bi-directional stream connection with server.
+ */
+ private StreamConnection streamConnection;
+
+ /*
+ * A separate thread to handle actual communication with server.
+ */
+ private Thread appendThread;
+
+ private StreamWriterV2(Builder builder) {
+ this.lock = new ReentrantLock();
+ this.hasMessageInWaitingQueue = lock.newCondition();
+ this.streamName = builder.streamName;
+ this.waitingRequestQueue = new LinkedList();
+ this.inflightRequestQueue = new LinkedList();
+ this.streamConnection =
+ new StreamConnection(
+ builder.client,
+ new RequestCallback() {
+ @Override
+ public void run(AppendRowsResponse response) {
+ requestCallback(response);
+ }
+ },
+ new DoneCallback() {
+ @Override
+ public void run(Throwable finalStatus) {
+ doneCallback(finalStatus);
+ }
+ });
+ this.appendThread =
+ new Thread(
+ new Runnable() {
+ @Override
+ public void run() {
+ appendLoop();
+ }
+ });
+ this.appendThread.start();
+ }
+
+ /**
+ * Schedules the writing of a message.
+ *
+ * Example of writing a message.
+ *
+ *
{@code
+ * AppendRowsRequest message;
+ * ApiFuture messageIdFuture = writer.append(message);
+ * ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback() {
+ * public void onSuccess(AppendRowsResponse response) {
+ * if (response.hasOffset()) {
+ * System.out.println("written with offset: " + response.getOffset());
+ * } else {
+ * System.out.println("received an in stream error: " + response.error().toString());
+ * }
+ * }
+ *
+ * public void onFailure(Throwable t) {
+ * System.out.println("failed to write: " + t);
+ * }
+ * }, MoreExecutors.directExecutor());
+ * }
+ *
+ * @param message the message in serialized format to write to BigQuery.
+ * @return the message ID wrapped in a future.
+ */
+ public ApiFuture append(AppendRowsRequest message) {
+ AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message);
+ this.lock.lock();
+ try {
+ if (userClosed) {
+ requestWrapper.appendResult.setException(
+ new StatusRuntimeException(
+ Status.fromCode(Status.Code.FAILED_PRECONDITION)
+ .withDescription("Stream is already closed")));
+ return requestWrapper.appendResult;
+ }
+ if (connectionFinalStatus != null) {
+ requestWrapper.appendResult.setException(
+ new StatusRuntimeException(
+ Status.fromCode(Status.Code.FAILED_PRECONDITION)
+ .withDescription(
+ "Stream is closed due to " + connectionFinalStatus.toString())));
+ return requestWrapper.appendResult;
+ }
+ waitingRequestQueue.addLast(requestWrapper);
+ hasMessageInWaitingQueue.signal();
+ return requestWrapper.appendResult;
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ /** Close the stream writer. Shut down all resources. */
+ @Override
+ public void close() {
+ log.info("User closing stream: " + streamName);
+ this.lock.lock();
+ try {
+ this.userClosed = true;
+ } finally {
+ this.lock.unlock();
+ }
+ log.info("Waiting for append thread to finish. Stream: " + streamName);
+ try {
+ appendThread.join();
+ log.info("User close complete. Stream: " + streamName);
+ } catch (InterruptedException e) {
+ // Unexpected. Just swallow the exception with logging.
+ log.warning(
+ "Append handler join is interrupted. Stream: " + streamName + " Error: " + e.toString());
+ }
+ }
+
+ /*
+ * This loop is executed in a separate thread.
+ *
+ * It takes requests from waiting queue and sends them to server.
+ */
+ private void appendLoop() {
+ Deque localQueue = new LinkedList();
+ while (!waitingQueueDrained()) {
+ this.lock.lock();
+ try {
+ hasMessageInWaitingQueue.await(100, TimeUnit.MILLISECONDS);
+ while (!this.waitingRequestQueue.isEmpty()) {
+ AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst();
+ this.inflightRequestQueue.addLast(requestWrapper);
+ localQueue.addLast(requestWrapper);
+ }
+ } catch (InterruptedException e) {
+ log.warning(
+ "Interrupted while waiting for message. Stream: "
+ + streamName
+ + " Error: "
+ + e.toString());
+ } finally {
+ this.lock.unlock();
+ }
+
+ if (localQueue.isEmpty()) {
+ continue;
+ }
+
+ // TODO: Add reconnection here.
+ while (!localQueue.isEmpty()) {
+ this.streamConnection.send(localQueue.pollFirst().message);
+ }
+ }
+
+ log.info("Cleanup starts. Stream: " + streamName);
+ // At this point, the waiting queue is drained, so no more requests.
+ // We can close the stream connection and handle the remaining inflight requests.
+ this.streamConnection.close();
+ waitForDoneCallback();
+
+ // At this point, there cannot be more callback. It is safe to clean up all inflight requests.
+ log.info(
+ "Stream connection is fully closed. Cleaning up inflight requests. Stream: " + streamName);
+ cleanupInflightRequests();
+ log.info("Append thread is done. Stream: " + streamName);
+ }
+
+ /*
+ * Returns true if waiting queue is drain, a.k.a. no more requests in the waiting queue.
+ *
+ * It serves as a signal to append thread that there cannot be any more requests in the waiting
+ * queue and it can prepare to stop.
+ */
+ private boolean waitingQueueDrained() {
+ this.lock.lock();
+ try {
+ return (this.userClosed || this.connectionFinalStatus != null)
+ && this.waitingRequestQueue.isEmpty();
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ private void waitForDoneCallback() {
+ log.info("Waiting for done callback from stream connection. Stream: " + streamName);
+ while (true) {
+ this.lock.lock();
+ try {
+ if (connectionFinalStatus != null) {
+ // Done callback is received, return.
+ return;
+ }
+ } finally {
+ this.lock.unlock();
+ }
+ Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private void cleanupInflightRequests() {
+ Throwable finalStatus;
+ Deque localQueue = new LinkedList();
+ this.lock.lock();
+ try {
+ finalStatus = this.connectionFinalStatus;
+ while (!this.inflightRequestQueue.isEmpty()) {
+ localQueue.addLast(this.inflightRequestQueue.pollFirst());
+ }
+ } finally {
+ this.lock.unlock();
+ }
+ log.info(
+ "Cleaning "
+ + localQueue.size()
+ + " inflight requests with error: "
+ + finalStatus.toString());
+ while (!localQueue.isEmpty()) {
+ localQueue.pollFirst().appendResult.setException(finalStatus);
+ }
+ }
+
+ private void requestCallback(AppendRowsResponse response) {
+ AppendRequestAndResponse requestWrapper;
+ this.lock.lock();
+ try {
+ requestWrapper = this.inflightRequestQueue.pollFirst();
+ } finally {
+ this.lock.unlock();
+ }
+ requestWrapper.appendResult.set(response);
+ }
+
+ private void doneCallback(Throwable finalStatus) {
+ log.info(
+ "Received done callback. Stream: "
+ + streamName
+ + " Final status: "
+ + finalStatus.toString());
+ this.lock.lock();
+ try {
+ this.connectionFinalStatus = finalStatus;
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ /** Constructs a new {@link StreamWriterV2.Builder} using the given stream and client. */
+ public static StreamWriterV2.Builder newBuilder(String streamName, BigQueryWriteClient client) {
+ return new StreamWriterV2.Builder(streamName, client);
+ }
+
+ /** A builder of {@link StreamWriterV2}s. */
+ public static final class Builder {
+
+ private String streamName;
+
+ private BigQueryWriteClient client;
+
+ private Builder(String streamName, BigQueryWriteClient client) {
+ this.streamName = Preconditions.checkNotNull(streamName);
+ this.client = Preconditions.checkNotNull(client);
+ }
+
+ /** Builds the {@code StreamWriterV2}. */
+ public StreamWriterV2 build() {
+ return new StreamWriterV2(this);
+ }
+ }
+
+ // Class that wraps AppendRowsRequest and its corresponding Response future.
+ private static final class AppendRequestAndResponse {
+ final SettableApiFuture appendResult;
+ final AppendRowsRequest message;
+
+ AppendRequestAndResponse(AppendRowsRequest message) {
+ this.appendResult = SettableApiFuture.create();
+ this.message = message;
+ }
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWrite.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWrite.java
index 618366cfdc..a333260529 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWrite.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWrite.java
@@ -79,6 +79,10 @@ public void setResponseDelay(Duration delay) {
serviceImpl.setResponseDelay(delay);
}
+ public void setResponseSleep(Duration sleep) {
+ serviceImpl.setResponseSleep(sleep);
+ }
+
public void setExecutor(ScheduledExecutorService executor) {
serviceImpl.setExecutor(executor);
}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWriteImpl.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWriteImpl.java
index 7cef4f7483..b99dab99bd 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWriteImpl.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWriteImpl.java
@@ -16,6 +16,7 @@
package com.google.cloud.bigquery.storage.v1beta2;
import com.google.common.base.Optional;
+import com.google.common.util.concurrent.Uninterruptibles;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
@@ -44,6 +45,7 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase {
private boolean autoPublishResponse;
private ScheduledExecutorService executor = null;
private Duration responseDelay = Duration.ZERO;
+ private Duration responseSleep = Duration.ZERO;
/** Class used to save the state of a possible response. */
private static class Response {
@@ -121,10 +123,16 @@ public void onNext(AppendRowsRequest value) {
LOG.info("Get request:" + value.toString());
final Response response = responses.remove();
requests.add(value);
+ if (responseSleep.compareTo(Duration.ZERO) > 0) {
+ LOG.info("Sleeping before response for " + responseSleep.toString());
+ Uninterruptibles.sleepUninterruptibly(
+ responseSleep.toMillis(), TimeUnit.MILLISECONDS);
+ }
if (responseDelay == Duration.ZERO) {
sendResponse(response, responseObserver);
} else {
final Response responseToSend = response;
+ // TODO(yirutang): This is very wrong because it messes up response/complete ordering.
LOG.info("Schedule a response to be sent at delay");
executor.schedule(
new Runnable() {
@@ -173,6 +181,12 @@ public FakeBigQueryWriteImpl setResponseDelay(Duration responseDelay) {
return this;
}
+ /** Set an amount of time by which to sleep before publishing responses. */
+ public FakeBigQueryWriteImpl setResponseSleep(Duration responseSleep) {
+ this.responseSleep = responseSleep;
+ return this;
+ }
+
public FakeBigQueryWriteImpl addResponse(AppendRowsResponse appendRowsResponse) {
responses.add(new Response(appendRowsResponse));
return this;
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java
new file mode 100644
index 0000000000..c50e5abb70
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java
@@ -0,0 +1,293 @@
+/*
+ * Copyright 2020 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigquery.storage.v1beta2;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.testing.MockGrpcService;
+import com.google.api.gax.grpc.testing.MockServiceHelper;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode.Code;
+import com.google.cloud.bigquery.storage.test.Test.FooType;
+import com.google.protobuf.DescriptorProtos;
+import com.google.protobuf.Int64Value;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Logger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.function.ThrowingRunnable;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.threeten.bp.Duration;
+
+@RunWith(JUnit4.class)
+public class StreamWriterV2Test {
+ private static final Logger log = Logger.getLogger(StreamWriterV2Test.class.getName());
+ private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/s";
+ private FakeScheduledExecutorService fakeExecutor;
+ private FakeBigQueryWrite testBigQueryWrite;
+ private static MockServiceHelper serviceHelper;
+ private BigQueryWriteClient client;
+
+ @Before
+ public void setUp() throws Exception {
+ testBigQueryWrite = new FakeBigQueryWrite();
+ serviceHelper =
+ new MockServiceHelper(
+ UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite));
+ serviceHelper.start();
+ fakeExecutor = new FakeScheduledExecutorService();
+ testBigQueryWrite.setExecutor(fakeExecutor);
+ client =
+ BigQueryWriteClient.create(
+ BigQueryWriteSettings.newBuilder()
+ .setCredentialsProvider(NoCredentialsProvider.create())
+ .setTransportChannelProvider(serviceHelper.createChannelProvider())
+ .build());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ log.info("tearDown called");
+ client.close();
+ serviceHelper.stop();
+ }
+
+ private StreamWriterV2 getTestStreamWriterV2() {
+ return StreamWriterV2.newBuilder(TEST_STREAM, client).build();
+ }
+
+ private AppendRowsRequest createAppendRequest(String[] messages, long offset) {
+ AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
+ AppendRowsRequest.ProtoData.Builder dataBuilder = AppendRowsRequest.ProtoData.newBuilder();
+ dataBuilder.setWriterSchema(
+ ProtoSchema.newBuilder()
+ .setProtoDescriptor(
+ DescriptorProtos.DescriptorProto.newBuilder()
+ .setName("Message")
+ .addField(
+ DescriptorProtos.FieldDescriptorProto.newBuilder()
+ .setName("foo")
+ .setType(DescriptorProtos.FieldDescriptorProto.Type.TYPE_STRING)
+ .setNumber(1)
+ .build())
+ .build()));
+ ProtoRows.Builder rows = ProtoRows.newBuilder();
+ for (String message : messages) {
+ FooType foo = FooType.newBuilder().setFoo(message).build();
+ rows.addSerializedRows(foo.toByteString());
+ }
+ if (offset > 0) {
+ requestBuilder.setOffset(Int64Value.of(offset));
+ }
+ return requestBuilder
+ .setProtoRows(dataBuilder.setRows(rows.build()).build())
+ .setWriteStream(TEST_STREAM)
+ .build();
+ }
+
+ private AppendRowsResponse createAppendResponse(long offset) {
+ return AppendRowsResponse.newBuilder()
+ .setAppendResult(
+ AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(offset)).build())
+ .build();
+ }
+
+ private ApiFuture sendTestMessage(StreamWriterV2 writer, String[] messages) {
+ return writer.append(createAppendRequest(messages, -1));
+ }
+
+ private static T assertFutureException(
+ Class expectedThrowable, final Future> future) {
+ return assertThrows(
+ expectedThrowable,
+ new ThrowingRunnable() {
+ @Override
+ public void run() throws Throwable {
+ try {
+ future.get();
+ } catch (ExecutionException ex) {
+ // Future wraps exception with ExecutionException. So unwrapper it here.
+ throw ex.getCause();
+ }
+ }
+ });
+ }
+
+ @Test
+ public void testAppendSuccess() throws Exception {
+ StreamWriterV2 writer = getTestStreamWriterV2();
+
+ long appendCount = 1000;
+ for (int i = 0; i < appendCount; i++) {
+ testBigQueryWrite.addResponse(createAppendResponse(i));
+ }
+
+ List> futures = new ArrayList<>();
+ for (int i = 0; i < appendCount; i++) {
+ futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)}));
+ }
+
+ for (int i = 0; i < appendCount; i++) {
+ assertEquals(i, futures.get(i).get().getAppendResult().getOffset().getValue());
+ }
+ assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size());
+
+ writer.close();
+ }
+
+ @Test
+ public void testAppendSuccessAndError() throws Exception {
+ StreamWriterV2 writer = getTestStreamWriterV2();
+ testBigQueryWrite.addResponse(createAppendResponse(0));
+ testBigQueryWrite.addException(Status.INTERNAL.asException());
+
+ ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
+ ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"});
+
+ assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
+ ApiException actualError = assertFutureException(ApiException.class, appendFuture2);
+ assertEquals(Code.INTERNAL, actualError.getStatusCode().getCode());
+
+ writer.close();
+ }
+
+ @Test
+ public void longIdleBetweenAppends() throws Exception {
+ StreamWriterV2 writer = getTestStreamWriterV2();
+ testBigQueryWrite.addResponse(createAppendResponse(0));
+ testBigQueryWrite.addResponse(createAppendResponse(1));
+
+ ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
+ assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
+
+ // Sleep to create a long idle between appends.
+ TimeUnit.SECONDS.sleep(3);
+
+ ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"});
+ assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue());
+
+ writer.close();
+ }
+
+ @Test
+ public void testAppendAfterUserClose() throws Exception {
+ StreamWriterV2 writer = getTestStreamWriterV2();
+ testBigQueryWrite.addResponse(createAppendResponse(0));
+
+ ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
+ writer.close();
+ ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"});
+
+ assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
+ assertTrue(appendFuture2.isDone());
+ StatusRuntimeException actualError =
+ assertFutureException(StatusRuntimeException.class, appendFuture2);
+ assertEquals(Status.Code.FAILED_PRECONDITION, actualError.getStatus().getCode());
+ }
+
+ @Test
+ public void testAppendAfterServerClose() throws Exception {
+ StreamWriterV2 writer = getTestStreamWriterV2();
+ testBigQueryWrite.addException(Status.INTERNAL.asException());
+
+ ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
+ ApiException error1 = assertFutureException(ApiException.class, appendFuture1);
+ assertEquals(Code.INTERNAL, error1.getStatusCode().getCode());
+
+ ApiFuture appendFuture2 = sendTestMessage(writer, new String[] {"B"});
+ assertTrue(appendFuture2.isDone());
+ StatusRuntimeException error2 =
+ assertFutureException(StatusRuntimeException.class, appendFuture2);
+ assertEquals(Status.Code.FAILED_PRECONDITION, error2.getStatus().getCode());
+
+ writer.close();
+ }
+
+ @Test
+ public void userCloseWhileRequestInflight() throws Exception {
+ final StreamWriterV2 writer = getTestStreamWriterV2();
+ // Server will sleep 2 seconds before sending back the response.
+ testBigQueryWrite.setResponseSleep(Duration.ofSeconds(2));
+ testBigQueryWrite.addResponse(createAppendResponse(0));
+
+ // Send a request and close the stream in separate thread while the request is inflight.
+ final ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"});
+ Thread closeThread =
+ new Thread(
+ new Runnable() {
+ @Override
+ public void run() {
+ writer.close();
+ }
+ });
+ closeThread.start();
+
+ // Due to the sleep on server, the append won't finish within 1 second even though stream
+ // is being closed.
+ assertThrows(
+ TimeoutException.class,
+ new ThrowingRunnable() {
+ @Override
+ public void run() throws Throwable {
+ appendFuture1.get(1, TimeUnit.SECONDS);
+ }
+ });
+
+ // Within 2 seconds, the request should be done and stream should be closed.
+ closeThread.join(2000);
+ assertTrue(appendFuture1.isDone());
+ assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
+ }
+
+ @Test
+ public void serverCloseWhileRequestsInflight() throws Exception {
+ StreamWriterV2 writer = getTestStreamWriterV2();
+ // Server will sleep 2 seconds before closing the connection.
+ testBigQueryWrite.setResponseSleep(Duration.ofSeconds(2));
+ testBigQueryWrite.addException(Status.INTERNAL.asException());
+
+ // Send 10 requests, so that there are 10 inflight requests.
+ int appendCount = 10;
+ List> futures = new ArrayList<>();
+ for (int i = 0; i < appendCount; i++) {
+ futures.add(sendTestMessage(writer, new String[] {String.valueOf(i)}));
+ }
+
+ // Server close should properly handle all inflight requests.
+ for (int i = 0; i < appendCount; i++) {
+ ApiException actualError = assertFutureException(ApiException.class, futures.get(i));
+ assertEquals(Code.INTERNAL, actualError.getStatusCode().getCode());
+ }
+
+ writer.close();
+ ;
+ }
+}