Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add schema update support in JsonStreamWriter #1447

Merged
merged 12 commits into from Dec 29, 2021
6 changes: 3 additions & 3 deletions README.md
Expand Up @@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies

```Groovy
implementation platform('com.google.cloud:libraries-bom:24.0.0')
implementation platform('com.google.cloud:libraries-bom:24.1.0')

implementation 'com.google.cloud:google-cloud-bigquerystorage'
```
If you are using Gradle without BOM, add this to your dependencies

```Groovy
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.6.3'
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.7.0'
```

If you are using SBT, add this to your dependencies

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.6.3"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.7.0"
```

## Authentication
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.logging.Logger;
Expand All @@ -34,7 +35,10 @@
/**
* A StreamWriter that can write JSON data (JSONObjects) to BigQuery tables. The JsonStreamWriter is
* built on top of a StreamWriter, and it simply converts all JSON data to protobuf messages then
* calls StreamWriter's append() method to write to BigQuery tables.
* calls StreamWriter's append() method to write to BigQuery tables. It maintains all StreamWriter
* functions, but also provides an additional feature: schema update support, where if the BigQuery
* table schema is updated, users will be able to ingest data on the new schema after some time (in
* order of minutes).
*/
public class JsonStreamWriter implements AutoCloseable {
private static String streamPatternString =
Expand Down Expand Up @@ -80,26 +84,51 @@ private JsonStreamWriter(Builder builder)

/**
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
* data to protobuf messages, then using StreamWriter's append() to write the data.
* data to protobuf messages, then using StreamWriter's append() to write the data. If there is a
* schema update, the current StreamWriter is closed. A new StreamWriter is created with the
* updated TableSchema.
*
* @param jsonArr The JSON array that contains JSONObjects to be written
* @return ApiFuture<AppendRowsResponse> returns an AppendRowsResponse message wrapped in an
* ApiFuture
*/
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr) {
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr)
throws IOException, DescriptorValidationException {
return append(jsonArr, -1);
}

/**
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
* data to protobuf messages, then using StreamWriter's append() to write the data.
* data to protobuf messages, then using StreamWriter's append() to write the data. If there is a
* schema update, the current StreamWriter is closed. A new StreamWriter is created with the
* updated TableSchema.
*
* @param jsonArr The JSON array that contains JSONObjects to be written
* @param offset Offset for deduplication
* @return ApiFuture<AppendRowsResponse> returns an AppendRowsResponse message wrapped in an
* ApiFuture
*/
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
throws IOException, DescriptorValidationException {
// Handle schema updates
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
if (this.streamWriter.getUpdatedSchema() != null) {
TableSchema updatedSchema = this.streamWriter.getUpdatedSchema();
// Close the StreamWriter
this.streamWriter.close();
// Update JsonStreamWriter's TableSchema and Descriptor
this.tableSchema = updatedSchema;
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema);
// Create a new underlying StreamWriter with the updated TableSchema and Descriptor
this.streamWriter =
streamWriterBuilder
.setUpdatedTableSchema(this.tableSchema)
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor))
.build();
// Clear the updateSchema field on the new underlying StreamWriter
this.streamWriter.clearUpdateSchema();
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
}

ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
// Any error in convertJsonToProtoMessage will throw an
// IllegalArgumentException/IllegalStateException/NullPointerException and will halt processing
Expand Down Expand Up @@ -153,9 +182,9 @@ private void setStreamWriterSettings(
streamWriterBuilder.setEndpoint(endpoint);
}
if (traceId != null) {
streamWriterBuilder.setTraceId("JsonWriterBeta_" + traceId);
streamWriterBuilder.setTraceId("JsonWriter_" + traceId);
} else {
streamWriterBuilder.setTraceId("JsonWriterBeta:null");
streamWriterBuilder.setTraceId("JsonWriter:null");
}
if (flowControlSettings != null) {
if (flowControlSettings.getMaxOutstandingRequestBytes() != null) {
Expand Down
Expand Up @@ -43,8 +43,6 @@
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
*
* <p>TODO: Support batching.
*
* <p>TODO: Support schema change.
*/
public class StreamWriter implements AutoCloseable {
private static final Logger log = Logger.getLogger(StreamWriter.class.getName());
Expand Down Expand Up @@ -114,6 +112,12 @@ public class StreamWriter implements AutoCloseable {
@GuardedBy("lock")
private final Deque<AppendRequestAndResponse> inflightRequestQueue;

/*
* Contains the updated TableSchema.
*/
@GuardedBy("lock")
private TableSchema updatedSchema;

/*
* A client used to interact with BigQuery.
*/
Expand Down Expand Up @@ -454,6 +458,9 @@ private void cleanupInflightRequests() {
private void requestCallback(AppendRowsResponse response) {
AppendRequestAndResponse requestWrapper;
this.lock.lock();
if (response.hasUpdatedSchema()) {
this.updatedSchema = response.getUpdatedSchema();
}
try {
requestWrapper = pollInflightRequestQueue();
} finally {
Expand Down Expand Up @@ -517,7 +524,15 @@ public static StreamWriter.Builder newBuilder(String streamName) {
return new StreamWriter.Builder(streamName);
}

/** A builder of {@link StreamWriterV2}s. */
public TableSchema getUpdatedSchema() {
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
return this.updatedSchema;
}

public void clearUpdateSchema() {
this.updatedSchema = null;
}

/** A builder of {@link StreamWriter}s. */
public static final class Builder {

private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L;
Expand All @@ -544,6 +559,8 @@ public static final class Builder {

private String traceId = null;

private TableSchema updatedTableSchema = null;

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
Expand Down Expand Up @@ -610,6 +627,16 @@ public Builder setTraceId(String traceId) {
return this;
}

/**
* Setter for table schema. Used for schema updates.
*
* @param tableSchema
*/
public Builder setUpdatedTableSchema(TableSchema tableSchema) {
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
this.updatedTableSchema = tableSchema;
return this;
}

/** Builds the {@code StreamWriterV2}. */
public StreamWriter build() throws IOException {
return new StreamWriter(this);
Expand Down
Expand Up @@ -193,7 +193,7 @@ public void testSingleAppendSimpleJson() throws Exception {
.getSerializedRows(0),
expectedProto.toByteString());
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriterBeta_test:empty");
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriter_test:empty");
}
}

Expand Down Expand Up @@ -284,8 +284,7 @@ public void testSingleAppendMultipleSimpleJson() throws Exception {
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriterBeta:null");
assertEquals(testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriter:null");
for (int i = 0; i < 4; i++) {
assertEquals(
testBigQueryWrite
Expand Down
Expand Up @@ -28,7 +28,9 @@
import com.google.cloud.bigquery.storage.test.Test.*;
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
import com.google.cloud.bigquery.storage.v1.*;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.*;
Expand Down Expand Up @@ -406,6 +408,92 @@ public void testJsonStreamWriterWithDefaultStream()
}
}

@Test
public void testJsonStreamWriterSchemaUpdate()
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
throws DescriptorValidationException, IOException, InterruptedException, ExecutionException {
String tableName = "SchemaUpdateTestTable";
TableId tableId = TableId.of(DATASET, tableName);
TableId SchemaUpdateTestTableId = tableId;
Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build();
Schema originalSchema = Schema.of(col1);
TableInfo tableInfo =
TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build();
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) {
// write the 1st row
JSONObject foo = new JSONObject();
foo.put("col1", "aaa");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
ApiFuture<AppendRowsResponse> response = jsonStreamWriter.append(jsonArr, 0);
assertEquals(0, response.get().getAppendResult().getOffset().getValue());

// update schema with a new column
Field col2 = Field.newBuilder("col2", StandardSQLTypeName.STRING).build();
Schema updatedSchema = Schema.of(ImmutableList.of(col1, col2));
TableInfo updatedTableInfo =
TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build();
Table updatedTable = bigquery.update(updatedTableInfo);
assertEquals(updatedSchema, updatedTable.getDefinition().getSchema());

// continue writing rows until backend acknowledges schema update
JSONObject foo2 = new JSONObject();
foo2.put("col1", "bbb");
JSONArray jsonArr2 = new JSONArray();
jsonArr2.put(foo2);

int next = 0;
for (int i = 1; i < 100; i++) {
ApiFuture<AppendRowsResponse> response2 = jsonStreamWriter.append(jsonArr2, i);
assertEquals(i, response2.get().getAppendResult().getOffset().getValue());
if (response2.get().hasUpdatedSchema()) {
next = i;
break;
} else {
Thread.sleep(1000);
}
}

// write rows with updated schema.
JSONObject updatedFoo = new JSONObject();
updatedFoo.put("col1", "ccc");
updatedFoo.put("col2", "ddd");
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedFoo);
for (int i = 0; i < 10; i++) {
ApiFuture<AppendRowsResponse> response3 =
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
jsonStreamWriter.append(updatedJsonArr, next + 1 + i);
assertEquals(next + 1 + i, response3.get().getAppendResult().getOffset().getValue());
}

// verify table data correctness
Iterator<FieldValueList> rowsIter = bigquery.listTableData(tableId).getValues().iterator();
// 1 row of aaa
assertEquals("aaa", rowsIter.next().get(0).getStringValue());
// a few rows of bbb
for (int j = 1; j <= next; j++) {
assertEquals("bbb", rowsIter.next().get(0).getStringValue());
}
// 10 rows of ccc, ddd
for (int j = next + 1; j < next + 1 + 10; j++) {
FieldValueList temp = rowsIter.next();
assertEquals("ccc", temp.get(0).getStringValue());
assertEquals("ddd", temp.get(1).getStringValue());
}
assertEquals(false, rowsIter.hasNext());
}
bigquery.delete(SchemaUpdateTestTableId);
}

@Test
public void testComplicateSchemaWithPendingStream()
throws IOException, InterruptedException, ExecutionException {
Expand Down