Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add schema update support in JsonStreamWriter #1447

Merged
merged 12 commits into from Dec 29, 2021
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.logging.Logger;
Expand All @@ -34,7 +35,10 @@
/**
* A StreamWriter that can write JSON data (JSONObjects) to BigQuery tables. The JsonStreamWriter is
* built on top of a StreamWriter, and it simply converts all JSON data to protobuf messages then
* calls StreamWriter's append() method to write to BigQuery tables.
* calls StreamWriter's append() method to write to BigQuery tables. It maintains all StreamWriter
* functions, but also provides an additional feature: schema update support, where if the BigQuery
* table schema is updated, users will be able to ingest data on the new schema after some time (in
* order of minutes).
*/
public class JsonStreamWriter implements AutoCloseable {
private static String streamPatternString =
Expand Down Expand Up @@ -81,27 +85,49 @@ private JsonStreamWriter(Builder builder)
/**
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
* data to protobuf messages, then using StreamWriter's append() to write the data at current end
* of stream.
* of stream. If there is a schema update, the current StreamWriter is closed. A new StreamWriter
* is created with the updated TableSchema.
*
* @param jsonArr The JSON array that contains JSONObjects to be written
* @return ApiFuture<AppendRowsResponse> returns an AppendRowsResponse message wrapped in an
* ApiFuture
*/
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr) {
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr)
throws IOException, DescriptorValidationException {
return append(jsonArr, -1);
}

/**
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
* data to protobuf messages, then using StreamWriter's append() to write the data at the
* specified offset.
* specified offset. If there is a schema update, the current StreamWriter is closed. A new
* StreamWriter is created with the updated TableSchema.
*
* @param jsonArr The JSON array that contains JSONObjects to be written
* @param offset Offset for deduplication
* @return ApiFuture<AppendRowsResponse> returns an AppendRowsResponse message wrapped in an
* ApiFuture
*/
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
throws IOException, DescriptorValidationException {
// Handle schema updates in a Thread-safe way by locking down the operation
synchronized (this) {
TableSchema updatedSchema = this.streamWriter.getUpdatedSchema();
if (updatedSchema != null) {
// Close the StreamWriter
this.streamWriter.close();
// Update JsonStreamWriter's TableSchema and Descriptor
this.tableSchema = updatedSchema;
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema);
// Create a new underlying StreamWriter with the updated TableSchema and Descriptor
this.streamWriter =
streamWriterBuilder
.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to propagate anything else to the new writer like stream id, existing trace id, etc or does it all come from properties stored on the JsonStreamWriter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, those things are needed and are already being populated in the streamWriterBuilder.

.build();
}
}

ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
// Any error in convertJsonToProtoMessage will throw an
// IllegalArgumentException/IllegalStateException/NullPointerException and will halt processing
Expand Down Expand Up @@ -155,9 +181,9 @@ private void setStreamWriterSettings(
streamWriterBuilder.setEndpoint(endpoint);
}
if (traceId != null) {
streamWriterBuilder.setTraceId("JsonWriterBeta_" + traceId);
streamWriterBuilder.setTraceId("JsonWriter_" + traceId);
} else {
streamWriterBuilder.setTraceId("JsonWriterBeta:null");
streamWriterBuilder.setTraceId("JsonWriter:null");
}
if (flowControlSettings != null) {
if (flowControlSettings.getMaxOutstandingRequestBytes() != null) {
Expand Down
Expand Up @@ -44,8 +44,6 @@
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
*
* <p>TODO: Support batching.
*
* <p>TODO: Support schema change.
*/
public class StreamWriter implements AutoCloseable {
private static final Logger log = Logger.getLogger(StreamWriter.class.getName());
Expand Down Expand Up @@ -135,6 +133,12 @@ public class StreamWriter implements AutoCloseable {
@GuardedBy("lock")
private final Deque<AppendRequestAndResponse> inflightRequestQueue;

/*
* Contains the updated TableSchema.
*/
@GuardedBy("lock")
private TableSchema updatedSchema;

/*
* A client used to interact with BigQuery.
*/
Expand Down Expand Up @@ -526,6 +530,9 @@ private void cleanupInflightRequests() {
private void requestCallback(AppendRowsResponse response) {
AppendRequestAndResponse requestWrapper;
this.lock.lock();
if (response.hasUpdatedSchema()) {
this.updatedSchema = response.getUpdatedSchema();
}
try {
// Had a successful connection with at least one result, reset retries.
// conectionRetryCountWithoutCallback is reset so that only multiple retries, without
Expand Down Expand Up @@ -622,7 +629,13 @@ public static StreamWriter.Builder newBuilder(String streamName) {
return new StreamWriter.Builder(streamName);
}

/** A builder of {@link StreamWriterV2}s. */
/** Thread-safe getter of updated TableSchema */
public TableSchema getUpdatedSchema() {
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
this.lock.lock();
return this.updatedSchema;
}

/** A builder of {@link StreamWriter}s. */
public static final class Builder {

private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L;
Expand All @@ -649,6 +662,8 @@ public static final class Builder {

private String traceId = null;

private TableSchema updatedTableSchema = null;

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
Expand Down
Expand Up @@ -193,7 +193,7 @@ public void testSingleAppendSimpleJson() throws Exception {
.getSerializedRows(0),
expectedProto.toByteString());
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriterBeta_test:empty");
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriter_test:empty");
}
}

Expand Down Expand Up @@ -284,8 +284,7 @@ public void testSingleAppendMultipleSimpleJson() throws Exception {
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriterBeta:null");
assertEquals(testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriter:null");
for (int i = 0; i < 4; i++) {
assertEquals(
testBigQueryWrite
Expand Down
Expand Up @@ -28,11 +28,14 @@
import com.google.cloud.bigquery.storage.test.Test.*;
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
import com.google.cloud.bigquery.storage.v1.*;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
Expand Down Expand Up @@ -406,6 +409,215 @@ public void testJsonStreamWriterWithDefaultStream()
}
}

@Test
public void testJsonStreamWriterSchemaUpdate()
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
throws DescriptorValidationException, IOException, InterruptedException, ExecutionException {
String tableName = "SchemaUpdateTestTable";
TableId tableId = TableId.of(DATASET, tableName);
Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build();
Schema originalSchema = Schema.of(col1);
TableInfo tableInfo =
TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build();
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) {
// write the 1st row
JSONObject foo = new JSONObject();
foo.put("col1", "aaa");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
ApiFuture<AppendRowsResponse> response = jsonStreamWriter.append(jsonArr, 0);
assertEquals(0, response.get().getAppendResult().getOffset().getValue());

// update schema with a new column
Field col2 = Field.newBuilder("col2", StandardSQLTypeName.STRING).build();
Schema updatedSchema = Schema.of(ImmutableList.of(col1, col2));
TableInfo updatedTableInfo =
TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build();
Table updatedTable = bigquery.update(updatedTableInfo);
assertEquals(updatedSchema, updatedTable.getDefinition().getSchema());

// continue writing rows until backend acknowledges schema update
JSONObject foo2 = new JSONObject();
foo2.put("col1", "bbb");
JSONArray jsonArr2 = new JSONArray();
jsonArr2.put(foo2);

int next = 0;
for (int i = 1; i < 100; i++) {
ApiFuture<AppendRowsResponse> response2 = jsonStreamWriter.append(jsonArr2, i);
assertEquals(i, response2.get().getAppendResult().getOffset().getValue());
if (response2.get().hasUpdatedSchema()) {
next = i;
break;
} else {
Thread.sleep(1000);
}
}

// write rows with updated schema.
JSONObject updatedFoo = new JSONObject();
updatedFoo.put("col1", "ccc");
updatedFoo.put("col2", "ddd");
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedFoo);
for (int i = 0; i < 10; i++) {
ApiFuture<AppendRowsResponse> response3 =
stephaniewang526 marked this conversation as resolved.
Show resolved Hide resolved
jsonStreamWriter.append(updatedJsonArr, next + 1 + i);
assertEquals(next + 1 + i, response3.get().getAppendResult().getOffset().getValue());
}

// verify table data correctness
Iterator<FieldValueList> rowsIter = bigquery.listTableData(tableId).getValues().iterator();
// 1 row of aaa
assertEquals("aaa", rowsIter.next().get(0).getStringValue());
// a few rows of bbb
for (int j = 1; j <= next; j++) {
assertEquals("bbb", rowsIter.next().get(0).getStringValue());
}
// 10 rows of ccc, ddd
for (int j = next + 1; j < next + 1 + 10; j++) {
FieldValueList temp = rowsIter.next();
assertEquals("ccc", temp.get(0).getStringValue());
assertEquals("ddd", temp.get(1).getStringValue());
}
assertFalse(rowsIter.hasNext());
}
}

@Test
public void testJsonStreamWriterSchemaUpdateConcurrent()
throws DescriptorValidationException, IOException, InterruptedException {
// Create test table and test stream
String tableName = "ConcurrentSchemaUpdateTestTable";
TableId tableId = TableId.of(DATASET, tableName);
Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build();
Schema originalSchema = Schema.of(col1);
TableInfo tableInfo =
TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build();
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());

// Create test JSON objects
JSONObject foo = new JSONObject();
foo.put("col1", "aaa");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);

JSONObject foo2 = new JSONObject();
foo2.put("col1", "bbb");
JSONArray jsonArr2 = new JSONArray();
jsonArr2.put(foo2);

JSONObject updatedFoo = new JSONObject();
updatedFoo.put("col1", "ccc");
updatedFoo.put("col2", "ddd");
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedFoo);

// Prepare updated schema
Field col2 = Field.newBuilder("col2", StandardSQLTypeName.STRING).build();
Schema updatedSchema = Schema.of(ImmutableList.of(col1, col2));
TableInfo updatedTableInfo =
TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build();

// Start writing using the JsonWriter
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) {
int numberOfThreads = 5;
ExecutorService streamTaskExecutor = Executors.newFixedThreadPool(5);
CountDownLatch latch = new CountDownLatch(numberOfThreads);
// Used to verify data correctness
AtomicInteger next = new AtomicInteger();

// update TableSchema async
Runnable updateTableSchemaTask =
() -> {
Table updatedTable = bigquery.update(updatedTableInfo);
assertEquals(updatedSchema, updatedTable.getDefinition().getSchema());
};
streamTaskExecutor.execute(updateTableSchemaTask);

// stream data async
for (int i = 0; i < numberOfThreads; i++) {
streamTaskExecutor.submit(
() -> {
// write 2 rows of aaa on each Thread
for (int j = 0; j < 2; j++) {
try {
jsonStreamWriter.append(jsonArr);
next.getAndIncrement();
} catch (IOException | DescriptorValidationException e) {
e.printStackTrace();
}
}

// write filler rows bbb until backend acknowledges schema update due to possible
// delay
for (int w = 0; w < 15; w++) {
ApiFuture<AppendRowsResponse> response2 = null;
try {
response2 = jsonStreamWriter.append(jsonArr2);
next.getAndIncrement();
} catch (IOException | DescriptorValidationException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully with ignore unknown fields, we could keep writing ccc and ddd to the table and expect ddd eventually to show up in the table. This is good for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Will keep this in mind.

LOG.severe("Issue with append " + e.getMessage());
}
try {
assert response2 != null;
if (response2.get().hasUpdatedSchema()) {
break;
} else {
Thread.sleep(1000);
}
} catch (InterruptedException | ExecutionException e) {
LOG.severe("Issue with append " + e.getMessage());
}
}

// write 5 rows of ccc,ddd on each Thread
for (int m = 0; m < 5; m++) {
try {
jsonStreamWriter.append(updatedJsonArr);
next.getAndIncrement();
} catch (IOException | DescriptorValidationException e) {
LOG.severe("Issue with append " + e.getMessage());
}
}
latch.countDown();
});
}
latch.await();

// verify that the last 5 rows streamed are ccc,ddd
Iterator<FieldValueList> rowsIter = bigquery.listTableData(tableId).getValues().iterator();

int position = 0;
while (rowsIter.hasNext()) {
FieldValueList row = rowsIter.next();
position++;
if (position > next.get() - 5) {
assertEquals("ccc", row.get(0).getStringValue());
assertEquals("ddd", row.get(1).getStringValue());
}
}
}
}

@Test
public void testComplicateSchemaWithPendingStream()
throws IOException, InterruptedException, ExecutionException {
Expand Down