Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add schema update support in JsonStreamWriter #1447

Merged
merged 12 commits into from Dec 29, 2021
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.logging.Logger;
Expand All @@ -34,7 +35,10 @@
/**
* A StreamWriter that can write JSON data (JSONObjects) to BigQuery tables. The JsonStreamWriter is
* built on top of a StreamWriter, and it simply converts all JSON data to protobuf messages then
* calls StreamWriter's append() method to write to BigQuery tables.
* calls StreamWriter's append() method to write to BigQuery tables. It maintains all StreamWriter
* functions, but also provides an additional feature: schema update support, where if the BigQuery
* table schema is updated, users will be able to ingest data on the new schema after some time (in
* order of minutes).
*/
public class JsonStreamWriter implements AutoCloseable {
private static String streamPatternString =
Expand Down Expand Up @@ -81,27 +85,49 @@ private JsonStreamWriter(Builder builder)
/**
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
* data to protobuf messages, then using StreamWriter's append() to write the data at current end
* of stream.
* of stream. If there is a schema update, the current StreamWriter is closed. A new StreamWriter
* is created with the updated TableSchema.
*
* @param jsonArr The JSON array that contains JSONObjects to be written
* @return ApiFuture<AppendRowsResponse> returns an AppendRowsResponse message wrapped in an
* ApiFuture
*/
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr) {
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr)
throws IOException, DescriptorValidationException {
return append(jsonArr, -1);
}

/**
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
* data to protobuf messages, then using StreamWriter's append() to write the data at the
* specified offset.
* specified offset. If there is a schema update, the current StreamWriter is closed. A new
* StreamWriter is created with the updated TableSchema.
*
* @param jsonArr The JSON array that contains JSONObjects to be written
* @param offset Offset for deduplication
* @return ApiFuture<AppendRowsResponse> returns an AppendRowsResponse message wrapped in an
* ApiFuture
*/
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
throws IOException, DescriptorValidationException {
// Handle schema updates in a Thread-safe way by locking down the operation
synchronized (this) {
TableSchema updatedSchema = this.streamWriter.getUpdatedSchema();
if (updatedSchema != null) {
// Close the StreamWriter
this.streamWriter.close();
// Update JsonStreamWriter's TableSchema and Descriptor
this.tableSchema = updatedSchema;
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema);
// Create a new underlying StreamWriter with the updated TableSchema and Descriptor
this.streamWriter =
streamWriterBuilder
.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to propagate anything else to the new writer like stream id, existing trace id, etc or does it all come from properties stored on the JsonStreamWriter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, those things are needed and are already being populated in the streamWriterBuilder.

.build();
}
}

ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
// Any error in convertJsonToProtoMessage will throw an
// IllegalArgumentException/IllegalStateException/NullPointerException and will halt processing
Expand Down Expand Up @@ -155,9 +181,9 @@ private void setStreamWriterSettings(
streamWriterBuilder.setEndpoint(endpoint);
}
if (traceId != null) {
streamWriterBuilder.setTraceId("JsonWriterBeta_" + traceId);
streamWriterBuilder.setTraceId("JsonWriter_" + traceId);
} else {
streamWriterBuilder.setTraceId("JsonWriterBeta:null");
streamWriterBuilder.setTraceId("JsonWriter:null");
}
if (flowControlSettings != null) {
if (flowControlSettings.getMaxOutstandingRequestBytes() != null) {
Expand Down
Expand Up @@ -44,8 +44,6 @@
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
*
* <p>TODO: Support batching.
*
* <p>TODO: Support schema change.
*/
public class StreamWriter implements AutoCloseable {
private static final Logger log = Logger.getLogger(StreamWriter.class.getName());
Expand Down Expand Up @@ -135,6 +133,12 @@ public class StreamWriter implements AutoCloseable {
@GuardedBy("lock")
private final Deque<AppendRequestAndResponse> inflightRequestQueue;

/*
* Contains the updated TableSchema.
*/
@GuardedBy("lock")
private TableSchema updatedSchema;

/*
* A client used to interact with BigQuery.
*/
Expand Down Expand Up @@ -526,6 +530,9 @@ private void cleanupInflightRequests() {
private void requestCallback(AppendRowsResponse response) {
AppendRequestAndResponse requestWrapper;
this.lock.lock();
if (response.hasUpdatedSchema()) {
this.updatedSchema = response.getUpdatedSchema();
}
try {
// Had a successful connection with at least one result, reset retries.
// conectionRetryCountWithoutCallback is reset so that only multiple retries, without
Expand Down Expand Up @@ -622,7 +629,12 @@ public static StreamWriter.Builder newBuilder(String streamName) {
return new StreamWriter.Builder(streamName);
}

/** A builder of {@link StreamWriterV2}s. */
/** Thread-safe getter of updated TableSchema */
public synchronized TableSchema getUpdatedSchema() {
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
return this.updatedSchema;
}

/** A builder of {@link StreamWriter}s. */
public static final class Builder {

private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L;
Expand All @@ -649,6 +661,8 @@ public static final class Builder {

private String traceId = null;

private TableSchema updatedTableSchema = null;

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
Expand Down
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigquery.storage.v1;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.ExecutorProvider;
Expand All @@ -26,6 +27,7 @@
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.cloud.bigquery.storage.test.JsonTest;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Timestamp;
Expand Down Expand Up @@ -193,7 +195,7 @@ public void testSingleAppendSimpleJson() throws Exception {
.getSerializedRows(0),
expectedProto.toByteString());
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriterBeta_test:empty");
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriter_test:empty");
}
}

Expand Down Expand Up @@ -284,8 +286,7 @@ public void testSingleAppendMultipleSimpleJson() throws Exception {
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriterBeta:null");
assertEquals(testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriter:null");
for (int i = 0; i < 4; i++) {
assertEquals(
testBigQueryWrite
Expand Down Expand Up @@ -388,4 +389,111 @@ public void testCreateDefaultStream() throws Exception {
assertEquals("projects/p/datasets/d/tables/t/_default", writer.getStreamName());
}
}

@Test
public void testSimpleSchemaUpdate() throws Exception {
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.setUpdatedSchema(UPDATED_TABLE_SCHEMA)
.build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build())
.build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build())
.build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build())
.build());
// First append
JSONObject foo = new JSONObject();
foo.put("foo", "aaa");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);

ApiFuture<AppendRowsResponse> appendFuture1 = writer.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture2 = writer.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture3 = writer.append(jsonArr);

assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue());
assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue());
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRows(0),
FooType.newBuilder().setFoo("aaa").build().toByteString());

assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue());
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(1)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite
.getAppendRequests()
.get(1)
.getProtoRows()
.getRows()
.getSerializedRows(0),
FooType.newBuilder().setFoo("aaa").build().toByteString());

// Second append with updated schema.
JSONObject updatedFoo = new JSONObject();
updatedFoo.put("foo", "aaa");
updatedFoo.put("bar", "bbb");
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedFoo);

ApiFuture<AppendRowsResponse> appendFuture4 = writer.append(updatedJsonArr);

assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue());
assertEquals(4, testBigQueryWrite.getAppendRequests().size());
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(3)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite
.getAppendRequests()
.get(3)
.getProtoRows()
.getRows()
.getSerializedRows(0),
UpdatedFooType.newBuilder().setFoo("aaa").setBar("bbb").build().toByteString());

assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema());
assertTrue(
testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema()
|| testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema());
}
}
}