Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support building a BigQueryWriteClient within the StreamWriterV2 #876

Merged
merged 4 commits into from Feb 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -17,13 +17,16 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback;
import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.RequestCallback;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
Expand All @@ -36,8 +39,6 @@
/**
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
*
* <p>TODO: Add credential support.
*
* <p>TODO: Attach schema.
*
* <p>TODO: Attach traceId.
Expand Down Expand Up @@ -104,6 +105,16 @@ public class StreamWriterV2 implements AutoCloseable {
@GuardedBy("lock")
private final Deque<AppendRequestAndResponse> inflightRequestQueue;

/*
* A client used to interact with BigQuery.
*/
private BigQueryWriteClient client;

/*
* If true, the client above is created by this writer and should be closed.
*/
private boolean ownsBigQueryWriteClient = false;

/*
* Wraps the underlying bi-directional stream connection with server.
*/
Expand All @@ -119,7 +130,7 @@ public static long getApiMaxRequestBytes() {
return 8L * 1000L * 1000L; // 8 megabytes (https://en.wikipedia.org/wiki/Megabyte)
}

private StreamWriterV2(Builder builder) {
private StreamWriterV2(Builder builder) throws IOException {
this.lock = new ReentrantLock();
this.hasMessageInWaitingQueue = lock.newCondition();
this.inflightReduced = lock.newCondition();
Expand All @@ -128,9 +139,22 @@ private StreamWriterV2(Builder builder) {
this.maxInflightBytes = builder.maxInflightBytes;
this.waitingRequestQueue = new LinkedList<AppendRequestAndResponse>();
this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
if (builder.client == null) {
BigQueryWriteSettings stubSettings =
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(builder.credentialsProvider)
.setTransportChannelProvider(builder.channelProvider)
.setEndpoint(builder.endpoint)
.build();
this.client = BigQueryWriteClient.create(stubSettings);
this.ownsBigQueryWriteClient = true;
} else {
this.client = builder.client;
this.ownsBigQueryWriteClient = false;
}
this.streamConnection =
new StreamConnection(
builder.client,
this.client,
new RequestCallback() {
@Override
public void run(AppendRowsResponse response) {
Expand Down Expand Up @@ -261,6 +285,9 @@ public void close() {
log.warning(
"Append handler join is interrupted. Stream: " + streamName + " Error: " + e.toString());
}
if (this.ownsBigQueryWriteClient) {
this.client.close();
}
}

/*
Expand Down Expand Up @@ -405,6 +432,11 @@ public static StreamWriterV2.Builder newBuilder(String streamName, BigQueryWrite
return new StreamWriterV2.Builder(streamName, client);
}

/** Constructs a new {@link StreamWriterV2.Builder} using the given stream. */
public static StreamWriterV2.Builder newBuilder(String streamName) {
return new StreamWriterV2.Builder(streamName);
}

/** A builder of {@link StreamWriterV2}s. */
public static final class Builder {

Expand All @@ -420,6 +452,19 @@ public static final class Builder {

private long maxInflightBytes = DEFAULT_MAX_INFLIGHT_BYTES;

private String endpoint = BigQueryWriteSettings.getDefaultEndpoint();

private TransportChannelProvider channelProvider =
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();

private CredentialsProvider credentialsProvider =
BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
}

private Builder(String streamName, BigQueryWriteClient client) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = Preconditions.checkNotNull(client);
Expand All @@ -435,8 +480,34 @@ public Builder setMaxInflightBytes(long value) {
return this;
}

/** Gives the ability to override the gRPC endpoint. */
public Builder setEndpoint(String endpoint) {
this.endpoint = Preconditions.checkNotNull(endpoint, "Endpoint is null.");
return this;
}

/**
* {@code ChannelProvider} to use to create Channels, which must point at Cloud BigQuery Storage
* API endpoint.
*
* <p>For performance, this client benefits from having multiple underlying connections. See
* {@link com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder#setPoolSize(int)}.
*/
public Builder setChannelProvider(TransportChannelProvider channelProvider) {
this.channelProvider =
Preconditions.checkNotNull(channelProvider, "ChannelProvider is null.");
return this;
}

/** {@code CredentialsProvider} to use to create Credentials to authenticate calls. */
public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
this.credentialsProvider =
Preconditions.checkNotNull(credentialsProvider, "CredentialsProvider is null.");
return this;
}

/** Builds the {@code StreamWriterV2}. */
public StreamWriterV2 build() {
public StreamWriterV2 build() throws IOException {
return new StreamWriterV2(this);
}
}
Expand Down
Expand Up @@ -31,6 +31,7 @@
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -81,7 +82,7 @@ public void tearDown() throws Exception {
serviceHelper.stop();
}

private StreamWriterV2 getTestStreamWriterV2() {
private StreamWriterV2 getTestStreamWriterV2() throws IOException {
return StreamWriterV2.newBuilder(TEST_STREAM, client).build();
}

Expand Down Expand Up @@ -158,6 +159,20 @@ public void run() {
appendThread.interrupt();
}

@Test
public void testBuildBigQueryWriteClientInWriter() throws Exception {
StreamWriterV2 writer =
StreamWriterV2.newBuilder(TEST_STREAM)
.setCredentialsProvider(NoCredentialsProvider.create())
.setChannelProvider(serviceHelper.createChannelProvider())
.build();

testBigQueryWrite.addResponse(createAppendResponse(0));
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
writer.close();
}

@Test
public void testAppendSuccess() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();
Expand Down Expand Up @@ -371,7 +386,7 @@ public void testAppendsWithTinyMaxInflightBytes() throws Exception {
}

@Test
public void testMessageTooLarge() {
public void testMessageTooLarge() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();

String oversized = Strings.repeat("a", (int) (StreamWriterV2.getApiMaxRequestBytes() + 1));
Expand Down