Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: migrate json writer to use StreamWriterV2 #1058

Merged
merged 11 commits into from May 11, 2021
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 3 additions & 22 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Expand Up @@ -3,27 +3,8 @@
<differences>
<!-- Allow below protobuf changes as non-breaking-->
<difference>
<differenceType>8001</differenceType>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please don't remove any of these existing <difference>s since they are needed to allow protobuf changes to be non-breaking.

<className>com/google/cloud/bigquery/storage/v1alpha2/*</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/bigquery/storage/v1alpha2/stub/BigQueryWriteStub</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/bigquery/storage/v1alpha2/stub/BigQueryWriteStubSettings</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/bigquery/storage/v1alpha2/stub/BigQueryWriteStubSettings$Builder</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/bigquery/storage/v1alpha2/stub/GrpcBigQueryWriteCallableFactory</className>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/bigquery/storage/v1alpha2/stub/GrpcBigQueryWriteStub</className>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter$Builder</className>
<method>com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter$Builder createDefaultStream()</method>
</difference>
</differences>
Expand Up @@ -16,18 +16,17 @@
package com.google.cloud.bigquery.storage.v1beta2;

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.Schema;
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.json.JSONArray;
Expand All @@ -46,8 +45,8 @@ public class JsonStreamWriter implements AutoCloseable {

private BigQueryWriteClient client;
private String streamName;
private StreamWriter streamWriter;
private StreamWriter.Builder streamWriterBuilder;
private StreamWriterV2 streamWriter;
private StreamWriterV2.Builder streamWriterBuilder;
private Descriptor descriptor;
private TableSchema tableSchema;

Expand All @@ -64,18 +63,18 @@ private JsonStreamWriter(Builder builder)
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema);

if (this.client == null) {
streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName);
streamWriterBuilder = StreamWriterV2.newBuilder(builder.streamName);
} else {
streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName, builder.client);
streamWriterBuilder = StreamWriterV2.newBuilder(builder.streamName, builder.client);
}
streamWriterBuilder.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor));
setStreamWriterSettings(
builder.channelProvider,
builder.credentialsProvider,
builder.endpoint,
builder.flowControlSettings,
builder.createDefaultStream);
builder.flowControlSettings);
this.streamWriter = streamWriterBuilder.build();
this.streamName = this.streamWriter.getStreamNameString();
this.streamName = builder.streamName;
}

/**
Expand Down Expand Up @@ -109,17 +108,10 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, json);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
}
AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder();
// Need to make sure refreshAppendAndSetDescriptor finish first before this can run
synchronized (this) {
data.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor));
data.setRows(rowsBuilder.build());
AppendRowsRequest.Builder request = AppendRowsRequest.newBuilder().setProtoRows(data.build());
if (offset >= 0) {
request.setOffset(Int64Value.of(offset));
}
final ApiFuture<AppendRowsResponse> appendResponseFuture =
this.streamWriter.append(request.build());
this.streamWriter.append(rowsBuilder.build(), offset);
return appendResponseFuture;
}
}
Expand All @@ -134,7 +126,7 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
void refreshConnection()
throws IOException, InterruptedException, Descriptors.DescriptorValidationException {
synchronized (this) {
this.streamWriter.shutdown();
this.streamWriter.close();
this.streamWriter = streamWriterBuilder.build();
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(this.tableSchema);
Expand Down Expand Up @@ -164,41 +156,28 @@ private void setStreamWriterSettings(
@Nullable TransportChannelProvider channelProvider,
@Nullable CredentialsProvider credentialsProvider,
@Nullable String endpoint,
@Nullable FlowControlSettings flowControlSettings,
Boolean createDefaultStream) {
@Nullable FlowControlSettings flowControlSettings) {
if (channelProvider != null) {
streamWriterBuilder.setChannelProvider(channelProvider);
}
if (credentialsProvider != null) {
streamWriterBuilder.setCredentialsProvider(credentialsProvider);
}
BatchingSettings.Builder batchSettingBuilder =
BatchingSettings.newBuilder()
.setElementCountThreshold(1L)
.setRequestByteThreshold(4 * 1024 * 1024L);
if (flowControlSettings != null) {
streamWriterBuilder.setBatchingSettings(
batchSettingBuilder.setFlowControlSettings(flowControlSettings).build());
} else {
streamWriterBuilder.setBatchingSettings(batchSettingBuilder.build());
}
if (endpoint != null) {
streamWriterBuilder.setEndpoint(endpoint);
}
if (createDefaultStream) {
streamWriterBuilder.createDefaultStream();
if (flowControlSettings != null) {
if (flowControlSettings.getMaxOutstandingRequestBytes() != null) {
streamWriterBuilder.setMaxInflightBytes(
flowControlSettings.getMaxOutstandingRequestBytes());
}
if (flowControlSettings.getMaxOutstandingElementCount() != null) {
streamWriterBuilder.setMaxInflightRequests(
flowControlSettings.getMaxOutstandingElementCount());
}
}
}

/**
* Setter for table schema. Used for schema updates.
*
* @param tableSchema
*/
void setTableSchema(TableSchema tableSchema) {
this.tableSchema = tableSchema;
}

/**
* newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by
* StreamWriter by default.
Expand Down Expand Up @@ -259,7 +238,7 @@ public void close() {
}

public static final class Builder {
private String streamOrTableName;
private String streamName;
private BigQueryWriteClient client;
private TableSchema tableSchema;

Expand All @@ -269,6 +248,13 @@ public static final class Builder {
private String endpoint;
private boolean createDefaultStream = false;

private static String streamPatternString =
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
private static String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)";

private static Pattern streamPattern = Pattern.compile(streamPatternString);
private static Pattern tablePattern = Pattern.compile(tablePatternString);

/**
* Constructor for JsonStreamWriter's Builder
*
Expand All @@ -279,7 +265,17 @@ public static final class Builder {
* @param client
*/
private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWriteClient client) {
this.streamOrTableName = streamOrTableName;
Matcher streamMatcher = streamPattern.matcher(streamOrTableName);
if (!streamMatcher.matches()) {
Matcher tableMatcher = tablePattern.matcher(streamOrTableName);
if (!tableMatcher.matches()) {
throw new IllegalArgumentException("Invalid name: " + streamOrTableName);
} else {
this.streamName = streamOrTableName + "/_default";
}
} else {
this.streamName = streamOrTableName;
}
this.tableSchema = tableSchema;
this.client = client;
}
Expand Down Expand Up @@ -322,13 +318,12 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
}

/**
* If it is writing to a default stream.
* Stream name on the builder.
*
* @return Builder
*/
public Builder createDefaultStream() {
this.createDefaultStream = true;
return this;
public String getStreamName() {
return streamName;
}

/**
Expand Down
Expand Up @@ -529,7 +529,6 @@ public void testCreateDefaultStream() throws Exception {
.build());
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(TEST_TABLE, v2Schema)
.createDefaultStream()
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build()) {
Expand Down
Expand Up @@ -105,7 +105,6 @@ public void TestBigDecimalEncoding()
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, TABLE);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(parent.toString(), tableInfo.getDefinition().getSchema())
.createDefaultStream()
.build()) {
JSONObject row = new JSONObject();
row.put(
Expand Down
Expand Up @@ -104,7 +104,6 @@ public void TestTimeEncoding()
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, TABLE);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(parent.toString(), tableInfo.getDefinition().getSchema())
.createDefaultStream()
.build()) {
JSONObject row = new JSONObject();
row.put("test_str", "Start of the day");
Expand Down
Expand Up @@ -41,7 +41,6 @@
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.threeten.bp.Duration;
import org.threeten.bp.LocalDateTime;

/** Integration tests for BigQuery Write API. */
Expand Down Expand Up @@ -180,54 +179,6 @@ private AppendRowsRequest.Builder createAppendRequestComplicateType(
return requestBuilder.setProtoRows(dataBuilder.build()).setWriteStream(streamName);
}

@Test
public void testBatchWriteWithCommittedStream()
throws IOException, InterruptedException, ExecutionException {
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableId)
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
try (StreamWriter streamWriter =
StreamWriter.newBuilder(writeStream.getName())
.setBatchingSettings(
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setRequestByteThreshold(1024 * 1024L) // 1 Mb
.setElementCountThreshold(2L)
.setDelayThreshold(Duration.ofSeconds(2))
.build())
.build()) {
LOG.info("Sending one message");
ApiFuture<AppendRowsResponse> response =
streamWriter.append(
createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build());
assertEquals(0, response.get().getAppendResult().getOffset().getValue());

LOG.info("Sending two more messages");
ApiFuture<AppendRowsResponse> response1 =
streamWriter.append(
createAppendRequest(writeStream.getName(), new String[] {"bbb", "ccc"}).build());
ApiFuture<AppendRowsResponse> response2 =
streamWriter.append(
createAppendRequest(writeStream.getName(), new String[] {"ddd"}).build());
assertEquals(1, response1.get().getAppendResult().getOffset().getValue());
assertEquals(3, response2.get().getAppendResult().getOffset().getValue());

TableResult result =
bigquery.listTableData(
tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
Iterator<FieldValueList> iter = result.getValues().iterator();
assertEquals("aaa", iter.next().get(0).getStringValue());
assertEquals("bbb", iter.next().get(0).getStringValue());
assertEquals("ccc", iter.next().get(0).getStringValue());
assertEquals("ddd", iter.next().get(0).getStringValue());
assertEquals(false, iter.hasNext());
}
}

ProtoRows CreateProtoRows(String[] messages) {
ProtoRows.Builder rows = ProtoRows.newBuilder();
for (String message : messages) {
Expand Down Expand Up @@ -389,7 +340,6 @@ public void testJsonStreamWriterWithDefaultStream()
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(parent.toString(), tableInfo.getDefinition().getSchema())
.createDefaultStream()
.build()) {
LOG.info("Sending one message");
JSONObject row1 = new JSONObject();
Expand Down
Expand Up @@ -54,7 +54,7 @@ public static void writeToDefaultStream(String projectId, String datasetName, St
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JstreamWriter.html
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(parentTable.toString(), schema).createDefaultStream().build()) {
JsonStreamWriter.newBuilder(parentTable.toString(), schema).build()) {
// Append 10 JSON objects to the stream.
for (int i = 0; i < 10; i++) {
// Create a JSON object that is compatible with the table schema.
Expand Down