Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ignoreUnknownField support in JsonWriter #1455

Merged
merged 5 commits into from Dec 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -52,6 +52,7 @@ public class JsonStreamWriter implements AutoCloseable {
private StreamWriter.Builder streamWriterBuilder;
private Descriptor descriptor;
private TableSchema tableSchema;
private boolean ignoreUnknownFields = false;

/**
* Constructs the JsonStreamWriter
Expand Down Expand Up @@ -80,6 +81,7 @@ private JsonStreamWriter(Builder builder)
this.streamWriter = streamWriterBuilder.build();
this.streamName = builder.streamName;
this.tableSchema = builder.tableSchema;
this.ignoreUnknownFields = builder.ignoreUnknownFields;
}

/**
Expand Down Expand Up @@ -135,7 +137,8 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
for (int i = 0; i < jsonArr.length(); i++) {
JSONObject json = jsonArr.getJSONObject(i);
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, this.tableSchema, json);
JsonToProtoMessage.convertJsonToProtoMessage(
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
}
// Need to make sure refreshAppendAndSetDescriptor finish first before this can run
Expand Down Expand Up @@ -249,6 +252,7 @@ public static final class Builder {
private String endpoint;
private boolean createDefaultStream = false;
private String traceId;
private boolean ignoreUnknownFields = false;

private static String streamPatternString =
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
Expand Down Expand Up @@ -350,6 +354,18 @@ public Builder setTraceId(String traceId) {
return this;
}

/**
* Setter for a ignoreUnkownFields, if true, unknown Json fields to BigQuery will be ignored
* instead of error out.
*
* @param ignoreUnknownFields
* @return Builder
*/
public Builder setIgnoreUnknownFields(boolean ignoreUnknownFields) {
this.ignoreUnknownFields = ignoreUnknownFields;
return this;
}

/**
* Builds JsonStreamWriter
*
Expand Down
Expand Up @@ -64,7 +64,8 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");

return convertJsonToProtoMessageImpl(protoSchema, null, json, "root", /*topLevel=*/ true);
return convertJsonToProtoMessageImpl(
protoSchema, null, json, "root", /*topLevel=*/ true, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, topLevel is not used in convertJsonToProtoMessageImpl. Hence, we should remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack

}

/**
Expand All @@ -85,7 +86,39 @@ public static DynamicMessage convertJsonToProtoMessage(
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");

return convertJsonToProtoMessageImpl(
protoSchema, tableSchema.getFieldsList(), json, "root", /*topLevel=*/ true);
protoSchema,
tableSchema.getFieldsList(),
json,
"root",
/*topLevel=*/ true,
/*ignoreUnknownFields*/ false);
}

/**
* Converts Json data to protocol buffer messages given the protocol buffer descriptor.
*
* @param protoSchema
* @param tableSchema bigquery table schema is needed for type conversion of DATETIME, TIME,
* NUMERIC, BIGNUMERIC
* @param json
* @param ignoreUnknownFields allows unknown fields in JSON input to be ignored.
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
*/
public static DynamicMessage convertJsonToProtoMessage(
Descriptor protoSchema, TableSchema tableSchema, JSONObject json, boolean ignoreUnknownFields)
throws IllegalArgumentException {
Preconditions.checkNotNull(json, "JSONObject is null.");
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");

return convertJsonToProtoMessageImpl(
protoSchema,
tableSchema.getFieldsList(),
json,
"root",
/*topLevel=*/ true,
ignoreUnknownFields);
}

/**
Expand All @@ -102,7 +135,8 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
List<TableFieldSchema> tableSchema,
JSONObject json,
String jsonScope,
boolean topLevel)
boolean topLevel,
boolean ignoreUnknownFields)
throws IllegalArgumentException {

DynamicMessage.Builder protoMsg = DynamicMessage.newBuilder(protoSchema);
Expand All @@ -117,9 +151,11 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
String jsonLowercaseName = jsonName.toLowerCase();
String currentScope = jsonScope + "." + jsonName;
FieldDescriptor field = protoSchema.findFieldByName(jsonLowercaseName);
if (field == null) {
if (field == null && !ignoreUnknownFields) {
throw new IllegalArgumentException(
String.format("JSONObject has fields unknown to BigQuery: %s.", currentScope));
} else if (field == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check seems redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is null, it means we cannot proceed on further processing. Otherwise, null ptr will be thrown if we continue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay makes sense -- this allows us to skip the unknown field in the middle and proceed onto other fields.

continue;
}
TableFieldSchema fieldSchema = null;
if (tableSchema != null) {
Expand All @@ -137,9 +173,10 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
}
}
if (!field.isRepeated()) {
fillField(protoMsg, field, fieldSchema, json, jsonName, currentScope);
fillField(protoMsg, field, fieldSchema, json, jsonName, currentScope, ignoreUnknownFields);
} else {
fillRepeatedField(protoMsg, field, fieldSchema, json, jsonName, currentScope);
fillRepeatedField(
protoMsg, field, fieldSchema, json, jsonName, currentScope, ignoreUnknownFields);
}
}

Expand Down Expand Up @@ -174,7 +211,8 @@ private static void fillField(
TableFieldSchema fieldSchema,
JSONObject json,
String exactJsonKeyName,
String currentScope)
String currentScope,
boolean ignoreUnknownFields)
throws IllegalArgumentException {

java.lang.Object val = json.get(exactJsonKeyName);
Expand Down Expand Up @@ -303,7 +341,8 @@ private static void fillField(
fieldSchema == null ? null : fieldSchema.getFieldsList(),
json.getJSONObject(exactJsonKeyName),
currentScope,
/*topLevel =*/ false));
/*topLevel =*/ false,
ignoreUnknownFields));
return;
}
break;
Expand Down Expand Up @@ -331,7 +370,8 @@ private static void fillRepeatedField(
TableFieldSchema fieldSchema,
JSONObject json,
String exactJsonKeyName,
String currentScope)
String currentScope,
boolean ignoreUnknownFields)
throws IllegalArgumentException {

JSONArray jsonArray;
Expand Down Expand Up @@ -478,7 +518,8 @@ private static void fillRepeatedField(
fieldSchema == null ? null : fieldSchema.getFieldsList(),
jsonArray.getJSONObject(i),
currentScope,
/*topLevel =*/ false));
/*topLevel =*/ false,
ignoreUnknownFields));
} else {
fail = true;
}
Expand Down
Expand Up @@ -519,11 +519,7 @@ private void cleanupInflightRequests() {
} finally {
this.lock.unlock();
}
log.fine(
"Cleaning "
+ localQueue.size()
+ " inflight requests with error: "
+ finalStatus.toString());
log.fine("Cleaning " + localQueue.size() + " inflight requests with error: " + finalStatus);
while (!localQueue.isEmpty()) {
localQueue.pollFirst().appendResult.setException(finalStatus);
}
Expand Down
Expand Up @@ -496,4 +496,48 @@ public void testSimpleSchemaUpdate() throws Exception {
|| testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema());
}
}

@Test
public void testWithoutIgnoreUnknownFields() throws Exception {
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {
JSONObject foo = new JSONObject();
foo.put("test_int", 10);
JSONObject bar = new JSONObject();
bar.put("test_unknown", 10);
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
jsonArr.put(bar);
try {
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
Assert.fail("expected ExecutionException");
} catch (Exception ex) {
assertEquals(
ex.getMessage(), "JSONObject has fields unknown to BigQuery: root.test_unknown.");
}
}
}

@Test
public void testWithIgnoreUnknownFields() throws Exception {
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(TEST_STREAM, tableSchema)
.setChannelProvider(channelProvider)
.setIgnoreUnknownFields(true)
.setCredentialsProvider(NoCredentialsProvider.create())
.build()) {
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
JSONObject foo = new JSONObject();
foo.put("test_int", 10);
JSONObject bar = new JSONObject();
bar.put("test_unknown", 10);
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
jsonArr.put(bar);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
appendFuture.get();
}
}
}
Expand Up @@ -352,7 +352,9 @@ public void testJsonStreamWriterWithDefaultStream()
bigquery.create(tableInfo);
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(parent.toString(), tableSchema).build()) {
JsonStreamWriter.newBuilder(parent.toString(), tableSchema)
.setIgnoreUnknownFields(true)
.build()) {
LOG.info("Sending one message");
JSONObject row1 = new JSONObject();
row1.put("test_str", "aaa");
Expand All @@ -365,6 +367,7 @@ public void testJsonStreamWriterWithDefaultStream()
BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("-9000000"))
.toByteArray()
}));
row1.put("unknown_field", "a");
row1.put(
"test_datetime",
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.of(2020, 10, 1, 12, 0)));
Expand Down