Skip to content

Commit

Permalink
feat: Support building a BigQueryWriteClient within the StreamWriterV2 (
Browse files Browse the repository at this point in the history
#876)

* Support building a BigQueryWriteClient within the StreamWriterV2.

* feat: Support building a BigQueryWriteClient within the StreamWriterV2
  • Loading branch information
yayi-google committed Feb 24, 2021
1 parent 854c81e commit 237c827
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 7 deletions.
Expand Up @@ -17,13 +17,16 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback;
import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.RequestCallback;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,8 +39,6 @@
/**
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
*
* <p>TODO: Add credential support.
*
* <p>TODO: Attach schema.
*
* <p>TODO: Attach traceId.
Expand Down Expand Up @@ -104,6 +105,16 @@ public class StreamWriterV2 implements AutoCloseable {
@GuardedBy("lock")
private final Deque<AppendRequestAndResponse> inflightRequestQueue;

/*
* A client used to interact with BigQuery.
*/
private BigQueryWriteClient client;

/*
* If true, the client above is created by this writer and should be closed.
*/
private boolean ownsBigQueryWriteClient = false;

/*
* Wraps the underlying bi-directional stream connection with server.
*/
Expand All @@ -119,7 +130,7 @@ public static long getApiMaxRequestBytes() {
return 8L * 1000L * 1000L; // 8 megabytes (https://en.wikipedia.org/wiki/Megabyte)
}

private StreamWriterV2(Builder builder) {
private StreamWriterV2(Builder builder) throws IOException {
this.lock = new ReentrantLock();
this.hasMessageInWaitingQueue = lock.newCondition();
this.inflightReduced = lock.newCondition();
Expand All @@ -128,9 +139,22 @@ private StreamWriterV2(Builder builder) {
this.maxInflightBytes = builder.maxInflightBytes;
this.waitingRequestQueue = new LinkedList<AppendRequestAndResponse>();
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
if (builder.client == null) {
BigQueryWriteSettings stubSettings =
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(builder.credentialsProvider)
.setTransportChannelProvider(builder.channelProvider)
.setEndpoint(builder.endpoint)
.build();
this.client = BigQueryWriteClient.create(stubSettings);
this.ownsBigQueryWriteClient = true;
} else {
this.client = builder.client;
this.ownsBigQueryWriteClient = false;
}
this.streamConnection =
new StreamConnection(
builder.client,
this.client,
new RequestCallback() {
@Override
public void run(AppendRowsResponse response) {
Expand Down Expand Up @@ -261,6 +285,9 @@ public void close() {
log.warning(
"Append handler join is interrupted. Stream: " + streamName + " Error: " + e.toString());
}
if (this.ownsBigQueryWriteClient) {
this.client.close();
}
}

/*
Expand Down Expand Up @@ -405,6 +432,11 @@ public static StreamWriterV2.Builder newBuilder(String streamName, BigQueryWrite
return new StreamWriterV2.Builder(streamName, client);
}

/** Constructs a new {@link StreamWriterV2.Builder} using the given stream. */
public static StreamWriterV2.Builder newBuilder(String streamName) {
return new StreamWriterV2.Builder(streamName);
}

/** A builder of {@link StreamWriterV2}s. */
public static final class Builder {

Expand All @@ -420,6 +452,19 @@ public static final class Builder {

private long maxInflightBytes = DEFAULT_MAX_INFLIGHT_BYTES;

private String endpoint = BigQueryWriteSettings.getDefaultEndpoint();

private TransportChannelProvider channelProvider =
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();

private CredentialsProvider credentialsProvider =
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
}

private Builder(String streamName, BigQueryWriteClient client) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = Preconditions.checkNotNull(client);
Expand All @@ -435,8 +480,34 @@ public Builder setMaxInflightBytes(long value) {
return this;
}

/** Gives the ability to override the gRPC endpoint. */
public Builder setEndpoint(String endpoint) {
this.endpoint = Preconditions.checkNotNull(endpoint, "Endpoint is null.");
return this;
}

/**
* {@code ChannelProvider} to use to create Channels, which must point at Cloud BigQuery Storage
* API endpoint.
*
* <p>For performance, this client benefits from having multiple underlying connections. See
* {@link com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder#setPoolSize(int)}.
*/
public Builder setChannelProvider(TransportChannelProvider channelProvider) {
this.channelProvider =
Preconditions.checkNotNull(channelProvider, "ChannelProvider is null.");
return this;
}

/** {@code CredentialsProvider} to use to create Credentials to authenticate calls. */
public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
this.credentialsProvider =
Preconditions.checkNotNull(credentialsProvider, "CredentialsProvider is null.");
return this;
}

/** Builds the {@code StreamWriterV2}. */
public StreamWriterV2 build() {
public StreamWriterV2 build() throws IOException {
return new StreamWriterV2(this);
}
}
Expand Down
Expand Up @@ -31,6 +31,7 @@
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -81,7 +82,7 @@ public void tearDown() throws Exception {
serviceHelper.stop();
}

private StreamWriterV2 getTestStreamWriterV2() {
private StreamWriterV2 getTestStreamWriterV2() throws IOException {
return StreamWriterV2.newBuilder(TEST_STREAM, client).build();
}

Expand Down Expand Up @@ -158,6 +159,20 @@ public void run() {
appendThread.interrupt();
}

@Test
public void testBuildBigQueryWriteClientInWriter() throws Exception {
StreamWriterV2 writer =
StreamWriterV2.newBuilder(TEST_STREAM)
.setCredentialsProvider(NoCredentialsProvider.create())
.setChannelProvider(serviceHelper.createChannelProvider())
.build();

testBigQueryWrite.addResponse(createAppendResponse(0));
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
writer.close();
}

@Test
public void testAppendSuccess() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();
Expand Down Expand Up @@ -371,7 +386,7 @@ public void testAppendsWithTinyMaxInflightBytes() throws Exception {
}

@Test
public void testMessageTooLarge() {
public void testMessageTooLarge() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();

String oversized = Strings.repeat("a", (int) (StreamWriterV2.getApiMaxRequestBytes() + 1));
Expand Down

0 comments on commit 237c827

Please sign in to comment.