diff --git a/google-cloud-bigquerystorage/clirr-ignored-differences.xml b/google-cloud-bigquerystorage/clirr-ignored-differences.xml
index ea19962285..edc5073600 100644
--- a/google-cloud-bigquerystorage/clirr-ignored-differences.xml
+++ b/google-cloud-bigquerystorage/clirr-ignored-differences.xml
@@ -26,4 +26,9 @@
8001
com/google/cloud/bigquery/storage/v1alpha2/stub/GrpcBigQueryWriteStub
+
+ 7002
+ com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter$Builder
+ com.google.cloud.bigquery.storage.v1beta2.JsonStreamWriter$Builder createDefaultStream()
+
\ No newline at end of file
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java
index 19fc29f9cf..4cceeda16d 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter.java
@@ -16,7 +16,6 @@
package com.google.cloud.bigquery.storage.v1beta2;
import com.google.api.core.ApiFuture;
-import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
@@ -24,10 +23,10 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
-import com.google.protobuf.Int64Value;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.logging.Logger;
+import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.json.JSONArray;
@@ -46,8 +45,8 @@ public class JsonStreamWriter implements AutoCloseable {
private BigQueryWriteClient client;
private String streamName;
- private StreamWriter streamWriter;
- private StreamWriter.Builder streamWriterBuilder;
+ private StreamWriterV2 streamWriter;
+ private StreamWriterV2.Builder streamWriterBuilder;
private Descriptor descriptor;
private TableSchema tableSchema;
@@ -64,18 +63,18 @@ private JsonStreamWriter(Builder builder)
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(builder.tableSchema);
if (this.client == null) {
- streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName);
+ streamWriterBuilder = StreamWriterV2.newBuilder(builder.streamName);
} else {
- streamWriterBuilder = StreamWriter.newBuilder(builder.streamOrTableName, builder.client);
+ streamWriterBuilder = StreamWriterV2.newBuilder(builder.streamName, builder.client);
}
+ streamWriterBuilder.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor));
setStreamWriterSettings(
builder.channelProvider,
builder.credentialsProvider,
builder.endpoint,
- builder.flowControlSettings,
- builder.createDefaultStream);
+ builder.flowControlSettings);
this.streamWriter = streamWriterBuilder.build();
- this.streamName = this.streamWriter.getStreamNameString();
+ this.streamName = builder.streamName;
}
/**
@@ -109,17 +108,10 @@ public ApiFuture append(JSONArray jsonArr, long offset) {
Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, json);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
}
- AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder();
// Need to make sure refreshAppendAndSetDescriptor finish first before this can run
synchronized (this) {
- data.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor));
- data.setRows(rowsBuilder.build());
- AppendRowsRequest.Builder request = AppendRowsRequest.newBuilder().setProtoRows(data.build());
- if (offset >= 0) {
- request.setOffset(Int64Value.of(offset));
- }
final ApiFuture appendResponseFuture =
- this.streamWriter.append(request.build());
+ this.streamWriter.append(rowsBuilder.build(), offset);
return appendResponseFuture;
}
}
@@ -134,7 +126,7 @@ public ApiFuture append(JSONArray jsonArr, long offset) {
void refreshConnection()
throws IOException, InterruptedException, Descriptors.DescriptorValidationException {
synchronized (this) {
- this.streamWriter.shutdown();
+ this.streamWriter.close();
this.streamWriter = streamWriterBuilder.build();
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(this.tableSchema);
@@ -164,41 +156,28 @@ private void setStreamWriterSettings(
@Nullable TransportChannelProvider channelProvider,
@Nullable CredentialsProvider credentialsProvider,
@Nullable String endpoint,
- @Nullable FlowControlSettings flowControlSettings,
- Boolean createDefaultStream) {
+ @Nullable FlowControlSettings flowControlSettings) {
if (channelProvider != null) {
streamWriterBuilder.setChannelProvider(channelProvider);
}
if (credentialsProvider != null) {
streamWriterBuilder.setCredentialsProvider(credentialsProvider);
}
- BatchingSettings.Builder batchSettingBuilder =
- BatchingSettings.newBuilder()
- .setElementCountThreshold(1L)
- .setRequestByteThreshold(4 * 1024 * 1024L);
- if (flowControlSettings != null) {
- streamWriterBuilder.setBatchingSettings(
- batchSettingBuilder.setFlowControlSettings(flowControlSettings).build());
- } else {
- streamWriterBuilder.setBatchingSettings(batchSettingBuilder.build());
- }
if (endpoint != null) {
streamWriterBuilder.setEndpoint(endpoint);
}
- if (createDefaultStream) {
- streamWriterBuilder.createDefaultStream();
+ if (flowControlSettings != null) {
+ if (flowControlSettings.getMaxOutstandingRequestBytes() != null) {
+ streamWriterBuilder.setMaxInflightBytes(
+ flowControlSettings.getMaxOutstandingRequestBytes());
+ }
+ if (flowControlSettings.getMaxOutstandingElementCount() != null) {
+ streamWriterBuilder.setMaxInflightRequests(
+ flowControlSettings.getMaxOutstandingElementCount());
+ }
}
}
- /**
- * Setter for table schema. Used for schema updates.
- *
- * @param tableSchema
- */
- void setTableSchema(TableSchema tableSchema) {
- this.tableSchema = tableSchema;
- }
-
/**
* newBuilder that constructs a JsonStreamWriter builder with BigQuery client being initialized by
* StreamWriter by default.
@@ -259,7 +238,7 @@ public void close() {
}
public static final class Builder {
- private String streamOrTableName;
+ private String streamName;
private BigQueryWriteClient client;
private TableSchema tableSchema;
@@ -269,6 +248,13 @@ public static final class Builder {
private String endpoint;
private boolean createDefaultStream = false;
+ private static String streamPatternString =
+ "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
+ private static String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)";
+
+ private static Pattern streamPattern = Pattern.compile(streamPatternString);
+ private static Pattern tablePattern = Pattern.compile(tablePatternString);
+
/**
* Constructor for JsonStreamWriter's Builder
*
@@ -279,7 +265,17 @@ public static final class Builder {
* @param client
*/
private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWriteClient client) {
- this.streamOrTableName = streamOrTableName;
+ Matcher streamMatcher = streamPattern.matcher(streamOrTableName);
+ if (!streamMatcher.matches()) {
+ Matcher tableMatcher = tablePattern.matcher(streamOrTableName);
+ if (!tableMatcher.matches()) {
+ throw new IllegalArgumentException("Invalid name: " + streamOrTableName);
+ } else {
+ this.streamName = streamOrTableName + "/_default";
+ }
+ } else {
+ this.streamName = streamOrTableName;
+ }
this.tableSchema = tableSchema;
this.client = client;
}
@@ -322,13 +318,12 @@ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
}
/**
- * If it is writing to a default stream.
+ * Stream name on the builder.
*
* @return Builder
*/
- public Builder createDefaultStream() {
- this.createDefaultStream = true;
- return this;
+ public String getStreamName() {
+ return streamName;
}
/**
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java
index 7dae1cd5d3..0e888ef267 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriterTest.java
@@ -529,7 +529,6 @@ public void testCreateDefaultStream() throws Exception {
.build());
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(TEST_TABLE, v2Schema)
- .createDefaultStream()
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build()) {
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryBigDecimalByteStringEncoderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryBigDecimalByteStringEncoderTest.java
index e61354363a..2738bc0db7 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryBigDecimalByteStringEncoderTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryBigDecimalByteStringEncoderTest.java
@@ -105,7 +105,6 @@ public void TestBigDecimalEncoding()
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, TABLE);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(parent.toString(), tableInfo.getDefinition().getSchema())
- .createDefaultStream()
.build()) {
JSONObject row = new JSONObject();
row.put(
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryTimeEncoderTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryTimeEncoderTest.java
index 28647b2ccf..e367a2b29e 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryTimeEncoderTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryTimeEncoderTest.java
@@ -104,7 +104,6 @@ public void TestTimeEncoding()
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, TABLE);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(parent.toString(), tableInfo.getDefinition().getSchema())
- .createDefaultStream()
.build()) {
JSONObject row = new JSONObject();
row.put("test_str", "Start of the day");
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java
index 4c89dd55e5..24132addc7 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryWriteManualClientTest.java
@@ -41,7 +41,6 @@
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.threeten.bp.Duration;
import org.threeten.bp.LocalDateTime;
/** Integration tests for BigQuery Write API. */
@@ -180,54 +179,6 @@ private AppendRowsRequest.Builder createAppendRequestComplicateType(
return requestBuilder.setProtoRows(dataBuilder.build()).setWriteStream(streamName);
}
- @Test
- public void testBatchWriteWithCommittedStream()
- throws IOException, InterruptedException, ExecutionException {
- WriteStream writeStream =
- client.createWriteStream(
- CreateWriteStreamRequest.newBuilder()
- .setParent(tableId)
- .setWriteStream(
- WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
- .build());
- try (StreamWriter streamWriter =
- StreamWriter.newBuilder(writeStream.getName())
- .setBatchingSettings(
- StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
- .toBuilder()
- .setRequestByteThreshold(1024 * 1024L) // 1 Mb
- .setElementCountThreshold(2L)
- .setDelayThreshold(Duration.ofSeconds(2))
- .build())
- .build()) {
- LOG.info("Sending one message");
- ApiFuture response =
- streamWriter.append(
- createAppendRequest(writeStream.getName(), new String[] {"aaa"}).build());
- assertEquals(0, response.get().getAppendResult().getOffset().getValue());
-
- LOG.info("Sending two more messages");
- ApiFuture response1 =
- streamWriter.append(
- createAppendRequest(writeStream.getName(), new String[] {"bbb", "ccc"}).build());
- ApiFuture response2 =
- streamWriter.append(
- createAppendRequest(writeStream.getName(), new String[] {"ddd"}).build());
- assertEquals(1, response1.get().getAppendResult().getOffset().getValue());
- assertEquals(3, response2.get().getAppendResult().getOffset().getValue());
-
- TableResult result =
- bigquery.listTableData(
- tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
- Iterator iter = result.getValues().iterator();
- assertEquals("aaa", iter.next().get(0).getStringValue());
- assertEquals("bbb", iter.next().get(0).getStringValue());
- assertEquals("ccc", iter.next().get(0).getStringValue());
- assertEquals("ddd", iter.next().get(0).getStringValue());
- assertEquals(false, iter.hasNext());
- }
- }
-
ProtoRows CreateProtoRows(String[] messages) {
ProtoRows.Builder rows = ProtoRows.newBuilder();
for (String message : messages) {
@@ -389,7 +340,6 @@ public void testJsonStreamWriterWithDefaultStream()
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(parent.toString(), tableInfo.getDefinition().getSchema())
- .createDefaultStream()
.build()) {
LOG.info("Sending one message");
JSONObject row1 = new JSONObject();
diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java
index 182d4469a4..028e1c037c 100644
--- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java
+++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java
@@ -54,7 +54,7 @@ public static void writeToDefaultStream(String projectId, String datasetName, St
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1beta2/JstreamWriter.html
try (JsonStreamWriter writer =
- JsonStreamWriter.newBuilder(parentTable.toString(), schema).createDefaultStream().build()) {
+ JsonStreamWriter.newBuilder(parentTable.toString(), schema).build()) {
// Append 10 JSON objects to the stream.
for (int i = 0; i < 10; i++) {
// Create a JSON object that is compatible with the table schema.