From 586777f289f3dc84d2d6237463f128d278d2465f Mon Sep 17 00:00:00 2001 From: Yiru Tang Date: Mon, 10 May 2021 18:20:32 -0700 Subject: [PATCH] fix: migrate json writer to use StreamWriterV2 (#1058) * . * . * fix: move JsonWriter to use StreamWriterV2. * . * . * . * . * . * . * . * . --- .../clirr-ignored-differences.xml | 5 ++ .../storage/v1beta2/JsonStreamWriter.java | 89 +++++++++---------- .../storage/v1beta2/JsonStreamWriterTest.java | 1 - ...gQueryBigDecimalByteStringEncoderTest.java | 1 - .../v1beta2/it/ITBigQueryTimeEncoderTest.java | 1 - .../it/ITBigQueryWriteManualClientTest.java | 50 ----------- .../bigquerystorage/WriteToDefaultStream.java | 2 +- 7 files changed, 48 insertions(+), 101 deletions(-) diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml index ea19962285..edc5073600 100644 --- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml +++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml @@ -26,4 +26,9 @@ 8001 com/google/cloud/bigquery/storage/v1alpha2/stub/GrpcBigQueryWriteStub + + 7002 + com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter$Builder + com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter$Builder createDefaultStream() + \ No newline at end of file diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java index 19fc29f9cf..4cceeda16d 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java @@ -16,7 +16,6 @@ package com.google.cloud.bigquery.storage.v1beta2; import com.google.api.core.ApiFuture; -import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.rpc.TransportChannelProvider; @@ -24,10 +23,10 @@ import com.google.common.base.Preconditions; import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors.Descriptor; -import com.google.protobuf.Int64Value; import com.google.protobuf.Message; import java.io.IOException; import java.util.logging.Logger; +import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; import org.json.JSONArray; @@ -46,8 +45,8 @@ public class JsonStreamWriter implements AutoCloseable { private BigQueryWriteClient client; private String streamName; - private StreamWriter streamWriter; - private StreamWriter.Builder streamWriterBuilder; + private StreamWriterV2 streamWriter; + private StreamWriterV2.Builder streamWriterBuilder; private Descriptor descriptor; private TableSchema tableSchema; @@ -64,18 +63,18 @@ private JsonStreamWriter(Builder builder) BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema); if (this.client == null) { - streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName); + streamWriterBuilder = StreamWriterV2.newBuilder(builder.streamName); } else { - streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName, builder.client); + streamWriterBuilder = StreamWriterV2.newBuilder(builder.streamName, builder.client); } + streamWriterBuilder.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor)); setStreamWriterSettings( builder.channelProvider, builder.credentialsProvider, builder.endpoint, - builder.flowControlSettings, - builder.createDefaultStream); + builder.flowControlSettings); this.streamWriter = streamWriterBuilder.build(); - this.streamName = this.streamWriter.getStreamNameString(); + this.streamName = builder.streamName; } /** @@ -109,17 +108,10 @@ public ApiFuture append(JSONArray jsonArr, long offset) { Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, json); rowsBuilder.addSerializedRows(protoMessage.toByteString()); } - AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder(); // Need to make sure refreshAppendAndSetDescriptor finish first before this can run synchronized (this) { - data.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor)); - data.setRows(rowsBuilder.build()); - AppendRowsRequest.Builder request = AppendRowsRequest.newBuilder().setProtoRows(data.build()); - if (offset >= 0) { - request.setOffset(Int64Value.of(offset)); - } final ApiFuture appendResponseFuture = - this.streamWriter.append(request.build()); + this.streamWriter.append(rowsBuilder.build(), offset); return appendResponseFuture; } } @@ -134,7 +126,7 @@ public ApiFuture append(JSONArray jsonArr, long offset) { void refreshConnection() throws IOException, InterruptedException, Descriptors.DescriptorValidationException { synchronized (this) { - this.streamWriter.shutdown(); + this.streamWriter.close(); this.streamWriter = streamWriterBuilder.build(); this.descriptor = BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(this.tableSchema); @@ -164,41 +156,28 @@ private void setStreamWriterSettings( @Nullable TransportChannelProvider channelProvider, @Nullable CredentialsProvider credentialsProvider, @Nullable String endpoint, - @Nullable FlowControlSettings flowControlSettings, - Boolean createDefaultStream) { + @Nullable FlowControlSettings flowControlSettings) { if (channelProvider != null) { streamWriterBuilder.setChannelProvider(channelProvider); } if (credentialsProvider != null) { streamWriterBuilder.setCredentialsProvider(credentialsProvider); } - BatchingSettings.Builder batchSettingBuilder = - BatchingSettings.newBuilder() - .setElementCountThreshold(1L) - .setRequestByteThreshold(4 * 1024 * 1024L); - if (flowControlSettings != null) { - streamWriterBuilder.setBatchingSettings( - batchSettingBuilder.setFlowControlSettings(flowControlSettings).build()); - } else { - streamWriterBuilder.setBatchingSettings(batchSettingBuilder.build()); - } if (endpoint != null) { streamWriterBuilder.setEndpoint(endpoint); } - if (createDefaultStream) { - streamWriterBuilder.createDefaultStream(); + if (flowControlSettings != null) { + if (flowControlSettings.getMaxOutstandingRequestBytes() != null) { + streamWriterBuilder.setMaxInflightBytes( + flowControlSettings.getMaxOutstandingRequestBytes()); + } + if (flowControlSettings.getMaxOutstandingElementCount() != null) { + streamWriterBuilder.setMaxInflightRequests( + flowControlSettings.getMaxOutstandingElementCount()); + } } } - /** - * Setter for table schema. Used for schema updates. - * - * @param tableSchema - */ - void setTableSchema(TableSchema tableSchema) { - this.tableSchema = tableSchema; - } - /** * newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by * StreamWriter by default. @@ -259,7 +238,7 @@ public void close() { } public static final class Builder { - private String streamOrTableName; + private String streamName; private BigQueryWriteClient client; private TableSchema tableSchema; @@ -269,6 +248,13 @@ public static final class Builder { private String endpoint; private boolean createDefaultStream = false; + private static String streamPatternString = + "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+"; + private static String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)"; + + private static Pattern streamPattern = Pattern.compile(streamPatternString); + private static Pattern tablePattern = Pattern.compile(tablePatternString); + /** * Constructor for JsonStreamWriter's Builder * @@ -279,7 +265,17 @@ public static final class Builder { * @param client */ private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWriteClient client) { - this.streamOrTableName = streamOrTableName; + Matcher streamMatcher = streamPattern.matcher(streamOrTableName); + if (!streamMatcher.matches()) { + Matcher tableMatcher = tablePattern.matcher(streamOrTableName); + if (!tableMatcher.matches()) { + throw new IllegalArgumentException("Invalid name: " + streamOrTableName); + } else { + this.streamName = streamOrTableName + "/_default"; + } + } else { + this.streamName = streamOrTableName; + } this.tableSchema = tableSchema; this.client = client; } @@ -322,13 +318,12 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) { } /** - * If it is writing to a default stream. + * Stream name on the builder. * * @return Builder */ - public Builder createDefaultStream() { - this.createDefaultStream = true; - return this; + public String getStreamName() { + return streamName; } /** diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java index 7dae1cd5d3..0e888ef267 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java @@ -529,7 +529,6 @@ public void testCreateDefaultStream() throws Exception { .build()); try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(TEST_TABLE, v2Schema) - .createDefaultStream() .setChannelProvider(channelProvider) .setCredentialsProvider(NoCredentialsProvider.create()) .build()) { diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryBigDecimalByteStringEncoderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryBigDecimalByteStringEncoderTest.java index e61354363a..2738bc0db7 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryBigDecimalByteStringEncoderTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryBigDecimalByteStringEncoderTest.java @@ -105,7 +105,6 @@ public void TestBigDecimalEncoding() TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, TABLE); try (JsonStreamWriter jsonStreamWriter = JsonStreamWriter.newBuilder(parent.toString(), tableInfo.getDefinition().getSchema()) - .createDefaultStream() .build()) { JSONObject row = new JSONObject(); row.put( diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryTimeEncoderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryTimeEncoderTest.java index 28647b2ccf..e367a2b29e 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryTimeEncoderTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryTimeEncoderTest.java @@ -104,7 +104,6 @@ public void TestTimeEncoding() TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, TABLE); try (JsonStreamWriter jsonStreamWriter = JsonStreamWriter.newBuilder(parent.toString(), tableInfo.getDefinition().getSchema()) - .createDefaultStream() .build()) { JSONObject row = new JSONObject(); row.put("test_str", "Start of the day"); diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java index 4c89dd55e5..24132addc7 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java @@ -41,7 +41,6 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.threeten.bp.Duration; import org.threeten.bp.LocalDateTime; /** Integration tests for BigQuery Write API. */ @@ -180,54 +179,6 @@ private AppendRowsRequest.Builder createAppendRequestComplicateType( return requestBuilder.setProtoRows(dataBuilder.build()).setWriteStream(streamName); } - @Test - public void testBatchWriteWithCommittedStream() - throws IOException, InterruptedException, ExecutionException { - WriteStream writeStream = - client.createWriteStream( - CreateWriteStreamRequest.newBuilder() - .setParent(tableId) - .setWriteStream( - WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build()) - .build()); - try (StreamWriter streamWriter = - StreamWriter.newBuilder(writeStream.getName()) - .setBatchingSettings( - StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS - .toBuilder() - .setRequestByteThreshold(1024 * 1024L) // 1 Mb - .setElementCountThreshold(2L) - .setDelayThreshold(Duration.ofSeconds(2)) - .build()) - .build()) { - LOG.info("Sending one message"); - ApiFuture response = - streamWriter.append( - createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build()); - assertEquals(0, response.get().getAppendResult().getOffset().getValue()); - - LOG.info("Sending two more messages"); - ApiFuture response1 = - streamWriter.append( - createAppendRequest(writeStream.getName(), new String[] {"bbb", "ccc"}).build()); - ApiFuture response2 = - streamWriter.append( - createAppendRequest(writeStream.getName(), new String[] {"ddd"}).build()); - assertEquals(1, response1.get().getAppendResult().getOffset().getValue()); - assertEquals(3, response2.get().getAppendResult().getOffset().getValue()); - - TableResult result = - bigquery.listTableData( - tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L)); - Iterator iter = result.getValues().iterator(); - assertEquals("aaa", iter.next().get(0).getStringValue()); - assertEquals("bbb", iter.next().get(0).getStringValue()); - assertEquals("ccc", iter.next().get(0).getStringValue()); - assertEquals("ddd", iter.next().get(0).getStringValue()); - assertEquals(false, iter.hasNext()); - } - } - ProtoRows CreateProtoRows(String[] messages) { ProtoRows.Builder rows = ProtoRows.newBuilder(); for (String message : messages) { @@ -389,7 +340,6 @@ public void testJsonStreamWriterWithDefaultStream() TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); try (JsonStreamWriter jsonStreamWriter = JsonStreamWriter.newBuilder(parent.toString(), tableInfo.getDefinition().getSchema()) - .createDefaultStream() .build()) { LOG.info("Sending one message"); JSONObject row1 = new JSONObject(); diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java index 182d4469a4..028e1c037c 100644 --- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java +++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java @@ -54,7 +54,7 @@ public static void writeToDefaultStream(String projectId, String datasetName, St // For more information about JsonStreamWriter, see: // https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JstreamWriter.html try (JsonStreamWriter writer = - JsonStreamWriter.newBuilder(parentTable.toString(), schema).createDefaultStream().build()) { + JsonStreamWriter.newBuilder(parentTable.toString(), schema).build()) { // Append 10 JSON objects to the stream. for (int i = 0; i < 10; i++) { // Create a JSON object that is compatible with the table schema.