From 308ea73ba7857b3d0fa76d80dc274eb4d62eba40 Mon Sep 17 00:00:00 2001 From: yayi Date: Tue, 23 Feb 2021 14:39:10 -0800 Subject: [PATCH 1/3] Support building a BigQueryWriteClient within the StreamWriterV2. --- .../storage/v1beta2/StreamWriterV2.java | 81 +++++++++++++++++-- .../storage/v1beta2/StreamWriterV2Test.java | 19 ++++- 2 files changed, 93 insertions(+), 7 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 8debea15f9..0aff575d96 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -17,6 +17,8 @@ import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback; import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.RequestCallback; import com.google.common.base.Preconditions; @@ -24,6 +26,7 @@ import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; +import java.io.IOException; import java.util.Deque; import java.util.LinkedList; import java.util.concurrent.TimeUnit; @@ -36,8 +39,6 @@ /** * A BigQuery Stream Writer that can be used to write data into BigQuery Table. * - *

TODO: Add credential support. - * *

TODO: Attach schema. * *

TODO: Add inflight control. @@ -83,6 +84,16 @@ public class StreamWriterV2 implements AutoCloseable { @GuardedBy("lock") private final Deque inflightRequestQueue; + /* + * A client used to interact with BigQuery. + */ + private BigQueryWriteClient client; + + /* + * If true, the client above is created by this writer and should be closed. + */ + private boolean ownsBigQueryWriteClient = false; + /* * Wraps the underlying bi-directional stream connection with server. */ @@ -98,15 +109,28 @@ public static long getApiMaxRequestBytes() { return 8L * 1000L * 1000L; // 8 megabytes (https://en.wikipedia.org/wiki/Megabyte) } - private StreamWriterV2(Builder builder) { + private StreamWriterV2(Builder builder) throws IOException { this.lock = new ReentrantLock(); this.hasMessageInWaitingQueue = lock.newCondition(); this.streamName = builder.streamName; this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); + if (builder.client == null) { + BigQueryWriteSettings stubSettings = + BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(builder.credentialsProvider) + .setTransportChannelProvider(builder.channelProvider) + .setEndpoint(builder.endpoint) + .build(); + this.client = BigQueryWriteClient.create(stubSettings); + this.ownsBigQueryWriteClient = true; + } else { + this.client = builder.client; + this.ownsBigQueryWriteClient = false; + } this.streamConnection = new StreamConnection( - builder.client, + this.client, new RequestCallback() { @Override public void run(AppendRowsResponse response) { @@ -213,6 +237,9 @@ public void close() { log.warning( "Append handler join is interrupted. Stream: " + streamName + " Error: " + e.toString()); } + if (this.ownsBigQueryWriteClient) { + this.client.close(); + } } /* @@ -348,6 +375,11 @@ public static StreamWriterV2.Builder newBuilder(String streamName, BigQueryWrite return new StreamWriterV2.Builder(streamName, client); } + /** Constructs a new {@link StreamWriterV2.Builder} using the given stream. */ + public static StreamWriterV2.Builder newBuilder(String streamName) { + return new StreamWriterV2.Builder(streamName); + } + /** A builder of {@link StreamWriterV2}s. */ public static final class Builder { @@ -355,13 +387,52 @@ public static final class Builder { private BigQueryWriteClient client; + private String endpoint = BigQueryWriteSettings.getDefaultEndpoint(); + + private TransportChannelProvider channelProvider = + BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build(); + + private CredentialsProvider credentialsProvider = + BigQueryWriteSettings.defaultCredentialsProviderBuilder().build(); + + private Builder(String streamName) { + this.streamName = Preconditions.checkNotNull(streamName); + this.client = null; + } + private Builder(String streamName, BigQueryWriteClient client) { this.streamName = Preconditions.checkNotNull(streamName); this.client = Preconditions.checkNotNull(client); } + /** Gives the ability to override the gRPC endpoint. */ + public Builder setEndpoint(String endpoint) { + this.endpoint = Preconditions.checkNotNull(endpoint, "Endpoint is null."); + return this; + } + + /** + * {@code ChannelProvider} to use to create Channels, which must point at Cloud BigQuery Storage + * API endpoint. + * + *

For performance, this client benefits from having multiple underlying connections. See + * {@link com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder#setPoolSize(int)}. + */ + public Builder setChannelProvider(TransportChannelProvider channelProvider) { + this.channelProvider = + Preconditions.checkNotNull(channelProvider, "ChannelProvider is null."); + return this; + } + + /** {@code CredentialsProvider} to use to create Credentials to authenticate calls. */ + public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { + this.credentialsProvider = + Preconditions.checkNotNull(credentialsProvider, "CredentialsProvider is null."); + return this; + } + /** Builds the {@code StreamWriterV2}. */ - public StreamWriterV2 build() { + public StreamWriterV2 build() throws IOException { return new StreamWriterV2(this); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index 4d6fba9dcd..b0f1b4abf3 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -31,6 +31,7 @@ import com.google.protobuf.Int64Value; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -81,7 +82,7 @@ public void tearDown() throws Exception { serviceHelper.stop(); } - private StreamWriterV2 getTestStreamWriterV2() { + private StreamWriterV2 getTestStreamWriterV2() throws IOException { return StreamWriterV2.newBuilder(TEST_STREAM, client).build(); } @@ -142,6 +143,20 @@ public void run() throws Throwable { }); } + @Test + public void testBuildBigQueryWriteClientInWriter() throws Exception { + StreamWriterV2 writer = + StreamWriterV2.newBuilder(TEST_STREAM) + .setCredentialsProvider(NoCredentialsProvider.create()) + .setChannelProvider(serviceHelper.createChannelProvider()) + .build(); + + testBigQueryWrite.addResponse(createAppendResponse(0)); + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + writer.close(); + } + @Test public void testAppendSuccess() throws Exception { StreamWriterV2 writer = getTestStreamWriterV2(); @@ -292,7 +307,7 @@ public void serverCloseWhileRequestsInflight() throws Exception { } @Test - public void testMessageTooLarge() { + public void testMessageTooLarge() throws Exception { StreamWriterV2 writer = getTestStreamWriterV2(); String oversized = Strings.repeat("a", (int) (StreamWriterV2.getApiMaxRequestBytes() + 1)); From e25c55004ec73b62efdc3ecdd36187ce59ef66cf Mon Sep 17 00:00:00 2001 From: yayi Date: Tue, 23 Feb 2021 14:39:10 -0800 Subject: [PATCH 2/3] feat: Support building a BigQueryWriteClient within the StreamWriterV2 --- .../storage/v1beta2/StreamWriterV2.java | 81 +++++++++++++++++-- .../storage/v1beta2/StreamWriterV2Test.java | 19 ++++- 2 files changed, 93 insertions(+), 7 deletions(-) diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java index 8debea15f9..0aff575d96 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java @@ -17,6 +17,8 @@ import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.core.CredentialsProvider; +import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback; import com.google.cloud.bigquery.storage.v1beta2.StreamConnection.RequestCallback; import com.google.common.base.Preconditions; @@ -24,6 +26,7 @@ import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; +import java.io.IOException; import java.util.Deque; import java.util.LinkedList; import java.util.concurrent.TimeUnit; @@ -36,8 +39,6 @@ /** * A BigQuery Stream Writer that can be used to write data into BigQuery Table. * - *

TODO: Add credential support. - * *

TODO: Attach schema. * *

TODO: Add inflight control. @@ -83,6 +84,16 @@ public class StreamWriterV2 implements AutoCloseable { @GuardedBy("lock") private final Deque inflightRequestQueue; + /* + * A client used to interact with BigQuery. + */ + private BigQueryWriteClient client; + + /* + * If true, the client above is created by this writer and should be closed. + */ + private boolean ownsBigQueryWriteClient = false; + /* * Wraps the underlying bi-directional stream connection with server. */ @@ -98,15 +109,28 @@ public static long getApiMaxRequestBytes() { return 8L * 1000L * 1000L; // 8 megabytes (https://en.wikipedia.org/wiki/Megabyte) } - private StreamWriterV2(Builder builder) { + private StreamWriterV2(Builder builder) throws IOException { this.lock = new ReentrantLock(); this.hasMessageInWaitingQueue = lock.newCondition(); this.streamName = builder.streamName; this.waitingRequestQueue = new LinkedList(); this.inflightRequestQueue = new LinkedList(); + if (builder.client == null) { + BigQueryWriteSettings stubSettings = + BigQueryWriteSettings.newBuilder() + .setCredentialsProvider(builder.credentialsProvider) + .setTransportChannelProvider(builder.channelProvider) + .setEndpoint(builder.endpoint) + .build(); + this.client = BigQueryWriteClient.create(stubSettings); + this.ownsBigQueryWriteClient = true; + } else { + this.client = builder.client; + this.ownsBigQueryWriteClient = false; + } this.streamConnection = new StreamConnection( - builder.client, + this.client, new RequestCallback() { @Override public void run(AppendRowsResponse response) { @@ -213,6 +237,9 @@ public void close() { log.warning( "Append handler join is interrupted. Stream: " + streamName + " Error: " + e.toString()); } + if (this.ownsBigQueryWriteClient) { + this.client.close(); + } } /* @@ -348,6 +375,11 @@ public static StreamWriterV2.Builder newBuilder(String streamName, BigQueryWrite return new StreamWriterV2.Builder(streamName, client); } + /** Constructs a new {@link StreamWriterV2.Builder} using the given stream. */ + public static StreamWriterV2.Builder newBuilder(String streamName) { + return new StreamWriterV2.Builder(streamName); + } + /** A builder of {@link StreamWriterV2}s. */ public static final class Builder { @@ -355,13 +387,52 @@ public static final class Builder { private BigQueryWriteClient client; + private String endpoint = BigQueryWriteSettings.getDefaultEndpoint(); + + private TransportChannelProvider channelProvider = + BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build(); + + private CredentialsProvider credentialsProvider = + BigQueryWriteSettings.defaultCredentialsProviderBuilder().build(); + + private Builder(String streamName) { + this.streamName = Preconditions.checkNotNull(streamName); + this.client = null; + } + private Builder(String streamName, BigQueryWriteClient client) { this.streamName = Preconditions.checkNotNull(streamName); this.client = Preconditions.checkNotNull(client); } + /** Gives the ability to override the gRPC endpoint. */ + public Builder setEndpoint(String endpoint) { + this.endpoint = Preconditions.checkNotNull(endpoint, "Endpoint is null."); + return this; + } + + /** + * {@code ChannelProvider} to use to create Channels, which must point at Cloud BigQuery Storage + * API endpoint. + * + *

For performance, this client benefits from having multiple underlying connections. See + * {@link com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.Builder#setPoolSize(int)}. + */ + public Builder setChannelProvider(TransportChannelProvider channelProvider) { + this.channelProvider = + Preconditions.checkNotNull(channelProvider, "ChannelProvider is null."); + return this; + } + + /** {@code CredentialsProvider} to use to create Credentials to authenticate calls. */ + public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) { + this.credentialsProvider = + Preconditions.checkNotNull(credentialsProvider, "CredentialsProvider is null."); + return this; + } + /** Builds the {@code StreamWriterV2}. */ - public StreamWriterV2 build() { + public StreamWriterV2 build() throws IOException { return new StreamWriterV2(this); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java index 4d6fba9dcd..b0f1b4abf3 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java @@ -31,6 +31,7 @@ import com.google.protobuf.Int64Value; import io.grpc.Status; import io.grpc.StatusRuntimeException; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -81,7 +82,7 @@ public void tearDown() throws Exception { serviceHelper.stop(); } - private StreamWriterV2 getTestStreamWriterV2() { + private StreamWriterV2 getTestStreamWriterV2() throws IOException { return StreamWriterV2.newBuilder(TEST_STREAM, client).build(); } @@ -142,6 +143,20 @@ public void run() throws Throwable { }); } + @Test + public void testBuildBigQueryWriteClientInWriter() throws Exception { + StreamWriterV2 writer = + StreamWriterV2.newBuilder(TEST_STREAM) + .setCredentialsProvider(NoCredentialsProvider.create()) + .setChannelProvider(serviceHelper.createChannelProvider()) + .build(); + + testBigQueryWrite.addResponse(createAppendResponse(0)); + ApiFuture appendFuture1 = sendTestMessage(writer, new String[] {"A"}); + assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue()); + writer.close(); + } + @Test public void testAppendSuccess() throws Exception { StreamWriterV2 writer = getTestStreamWriterV2(); @@ -292,7 +307,7 @@ public void serverCloseWhileRequestsInflight() throws Exception { } @Test - public void testMessageTooLarge() { + public void testMessageTooLarge() throws Exception { StreamWriterV2 writer = getTestStreamWriterV2(); String oversized = Strings.repeat("a", (int) (StreamWriterV2.getApiMaxRequestBytes() + 1)); From 8a7da6da4e1f8b89d3de4347566af249cab40e16 Mon Sep 17 00:00:00 2001 From: yayi Date: Wed, 24 Feb 2021 14:53:20 -0800 Subject: [PATCH 3/3] docs(sample): Update parallel append sample to use StreamWriterV2 --- .../ParallelWriteCommittedStream.java | 45 +++++++++++++------ 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java index 71155a38c7..0de3302063 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/ParallelWriteCommittedStream.java @@ -20,21 +20,29 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; +import com.google.cloud.bigquery.storage.v1beta2.AppendRowsRequest; import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1beta2.BQTableSchemaToProtoDescriptor; import com.google.cloud.bigquery.storage.v1beta2.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1beta2.CreateWriteStreamRequest; -import com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter; +import com.google.cloud.bigquery.storage.v1beta2.JsonToProtoMessage; +import com.google.cloud.bigquery.storage.v1beta2.ProtoRows; +import com.google.cloud.bigquery.storage.v1beta2.ProtoSchema; +import com.google.cloud.bigquery.storage.v1beta2.ProtoSchemaConverter; +import com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2; import com.google.cloud.bigquery.storage.v1beta2.TableName; import com.google.cloud.bigquery.storage.v1beta2.WriteStream; import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.DescriptorValidationException; +import com.google.protobuf.Int64Value; +import com.google.protobuf.Message; import java.io.IOException; import java.time.Duration; import java.util.concurrent.ThreadLocalRandom; import java.util.logging.Logger; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; -import org.json.JSONArray; import org.json.JSONObject; public class ParallelWriteCommittedStream { @@ -151,12 +159,11 @@ private void writeToStream( lastMetricsSuccessCount = 0; lastMetricsFailureCount = 0; } - // Use the JSON stream writer to send records in JSON format. - // For more information about JsonStreamWriter, see: - // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.html - try (JsonStreamWriter writer = - JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client) - .build()) { + Descriptor descriptor = + BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor( + writeStream.getTableSchema()); + ProtoSchema protoSchema = ProtoSchemaConverter.convert(descriptor); + try (StreamWriterV2 writer = StreamWriterV2.newBuilder(writeStream.getName()).build()) { while (System.currentTimeMillis() < deadlineMillis) { synchronized (this) { if (error != null) { @@ -164,7 +171,8 @@ private void writeToStream( throw error; } } - ApiFuture future = writer.append(createPayload(), -1); + ApiFuture future = + writer.append(createAppendRequest(writeStream.getName(), descriptor, protoSchema, -1)); synchronized (this) { inflightCount++; } @@ -189,17 +197,26 @@ private void waitForInflightToReachZero(Duration timeout) { throw new RuntimeException("Timeout waiting for inflight count to reach 0"); } - private JSONArray createPayload() { - // Create a JSON object that is compatible with the table schema. - JSONArray jsonArr = new JSONArray(); + private AppendRowsRequest createAppendRequest( + String streamName, Descriptor descriptor, ProtoSchema protoSchema, long offset) { + ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder(); for (int i = 0; i < BATCH_SIZE; i++) { byte[] payload = new byte[ROW_SIZE]; ThreadLocalRandom.current().nextBytes(payload); JSONObject record = new JSONObject(); record.put("col1", new String(payload)); - jsonArr.put(record); + Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(descriptor, record); + rowsBuilder.addSerializedRows(protoMessage.toByteString()); } - return jsonArr; + AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder(); + data.setWriterSchema(protoSchema); + data.setRows(rowsBuilder.build()); + AppendRowsRequest.Builder request = AppendRowsRequest.newBuilder().setProtoRows(data.build()); + request.setWriteStream(streamName); + if (offset >= 0) { + request.setOffset(Int64Value.of(offset)); + } + return request.build(); } private void sleepIgnoringInterruption(Duration duration) {