Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove IgnoreUnknownFields support on JsonStreamWriter #757

Merged
merged 12 commits into from Dec 30, 2020
26 changes: 26 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
@@ -0,0 +1,26 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- TODO(stephwang): To be removed after the release -->
<difference>
<className>com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter</className>
<differenceType>7005</differenceType>
<method>com.google.api.core.ApiFuture append(org.json.JSONArray, boolean)</method>
<from>com.google.api.core.ApiFuture append(org.json.JSONArray, boolean)</from>
<to>com.google.api.core.ApiFuture append(org.json.JSONArray, long)</to>
</difference>
<difference>
<className>com/google/cloud/bigquery/storage/v1beta2/JsonStreamWriter</className>
<differenceType>7004</differenceType>
<method>com.google.api.core.ApiFuture append(org.json.JSONArray, long, boolean)</method>
<from>com.google.api.core.ApiFuture append(org.json.JSONArray, long, boolean)</from>
<to>com.google.api.core.ApiFuture append(org.json.JSONArray)</to>
</difference>
<difference>
<className>com/google/cloud/bigquery/storage/v1beta2/JsonToProtoMessage</className>
<differenceType>7004</differenceType>
<method>com.google.protobuf.DynamicMessage convertJsonToProtoMessage(com.google.protobuf.Descriptors$Descriptor, org.json.JSONObject, boolean)</method>
<from>com.google.protobuf.DynamicMessage convertJsonToProtoMessage(com.google.protobuf.Descriptors$Descriptor, org.json.JSONObject, boolean)</from>
<to>com.google.protobuf.DynamicMessage convertJsonToProtoMessage(com.google.protobuf.Descriptors$Descriptor, org.json.JSONObject)</to>
</difference>
</differences>
Expand Up @@ -94,12 +94,11 @@ private JsonStreamWriter(Builder builder)
* schema update, the OnSchemaUpdateRunnable will be used to determine what actions to perform.
*
* @param jsonArr The JSON array that contains JSONObjects to be written
* @param allowUnknownFields if true, json data can have fields unknown to the BigQuery table.
* @return ApiFuture<AppendRowsResponse> returns an AppendRowsResponse message wrapped in an
* ApiFuture
*/
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, boolean allowUnknownFields) {
return append(jsonArr, -1, allowUnknownFields);
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr) {
return append(jsonArr, -1);
}

/**
Expand All @@ -109,20 +108,17 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, boolean allowUnkn
*
* @param jsonArr The JSON array that contains JSONObjects to be written
* @param offset Offset for deduplication
* @param allowUnknownFields if true, json data can have fields unknown to the BigQuery table.
* @return ApiFuture<AppendRowsResponse> returns an AppendRowsResponse message wrapped in an
* ApiFuture
*/
public ApiFuture<AppendRowsResponse> append(
JSONArray jsonArr, long offset, boolean allowUnknownFields) {
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
// Any error in convertJsonToProtoMessage will throw an
// IllegalArgumentException/IllegalStateException/NullPointerException and will halt processing
// of JSON data.
for (int i = 0; i < jsonArr.length(); i++) {
JSONObject json = jsonArr.getJSONObject(i);
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, json, allowUnknownFields);
Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, json);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
}
AppendRowsRequest.ProtoData.Builder data = AppendRowsRequest.ProtoData.newBuilder();
Expand Down
Expand Up @@ -47,18 +47,15 @@ public class JsonToProtoMessage {
*
* @param protoSchema
* @param json
* @param allowUnknownFields Ignores unknown JSON fields.
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
*/
public static DynamicMessage convertJsonToProtoMessage(
Descriptor protoSchema, JSONObject json, boolean allowUnknownFields)
public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, JSONObject json)
throws IllegalArgumentException {
Preconditions.checkNotNull(json, "JSONObject is null.");
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");

return convertJsonToProtoMessageImpl(
protoSchema, json, "root", /*topLevel=*/ true, allowUnknownFields);
return convertJsonToProtoMessageImpl(protoSchema, json, "root", /*topLevel=*/ true);
}

/**
Expand All @@ -67,24 +64,18 @@ public static DynamicMessage convertJsonToProtoMessage(
* @param protoSchema
* @param json
* @param jsonScope Debugging purposes
* @param allowUnknownFields Ignores unknown JSON fields.
* @param topLevel checks if root level has any matching fields.
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
*/
private static DynamicMessage convertJsonToProtoMessageImpl(
Descriptor protoSchema,
JSONObject json,
String jsonScope,
boolean topLevel,
boolean allowUnknownFields)
Descriptor protoSchema, JSONObject json, String jsonScope, boolean topLevel)
throws IllegalArgumentException {

DynamicMessage.Builder protoMsg = DynamicMessage.newBuilder(protoSchema);
String[] jsonNames = JSONObject.getNames(json);
if (jsonNames == null) {
return protoMsg.build();
}
int matchedFields = 0;
for (int i = 0; i < jsonNames.length; i++) {
String jsonName = jsonNames[i];
// We want lowercase here to support case-insensitive data writes.
Expand All @@ -93,27 +84,16 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
String currentScope = jsonScope + "." + jsonName;
FieldDescriptor field = protoSchema.findFieldByName(jsonLowercaseName);
if (field == null) {
if (!allowUnknownFields) {
throw new IllegalArgumentException(
String.format(
"JSONObject has fields unknown to BigQuery: %s. Set allowUnknownFields to True to allow unknown fields.",
currentScope));
} else {
continue;
}
throw new IllegalArgumentException(
String.format("JSONObject has fields unknown to BigQuery: %s.", currentScope));
}
matchedFields++;
if (!field.isRepeated()) {
fillField(protoMsg, field, json, jsonName, currentScope, allowUnknownFields);
fillField(protoMsg, field, json, jsonName, currentScope);
} else {
fillRepeatedField(protoMsg, field, json, jsonName, currentScope, allowUnknownFields);
fillRepeatedField(protoMsg, field, json, jsonName, currentScope);
}
}

if (matchedFields == 0 && topLevel) {
throw new IllegalArgumentException(
"There are no matching fields found for the JSONObject and the protocol buffer descriptor.");
}
DynamicMessage msg;
try {
msg = protoMsg.build();
Expand All @@ -139,16 +119,14 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
* @param json
* @param exactJsonKeyName Exact key name in JSONObject instead of lowercased version
* @param currentScope Debugging purposes
* @param allowUnknownFields Ignores unknown JSON fields.
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
*/
private static void fillField(
DynamicMessage.Builder protoMsg,
FieldDescriptor fieldDescriptor,
JSONObject json,
String exactJsonKeyName,
String currentScope,
boolean allowUnknownFields)
String currentScope)
throws IllegalArgumentException {

java.lang.Object val = json.get(exactJsonKeyName);
Expand Down Expand Up @@ -204,8 +182,7 @@ private static void fillField(
fieldDescriptor.getMessageType(),
json.getJSONObject(exactJsonKeyName),
currentScope,
/*topLevel =*/ false,
allowUnknownFields));
/*topLevel =*/ false));
return;
}
break;
Expand All @@ -224,16 +201,14 @@ private static void fillField(
* @param json If root level has no matching fields, throws exception.
* @param exactJsonKeyName Exact key name in JSONObject instead of lowercased version
* @param currentScope Debugging purposes
* @param allowUnknownFields Ignores unknown JSON fields.
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
*/
private static void fillRepeatedField(
DynamicMessage.Builder protoMsg,
FieldDescriptor fieldDescriptor,
JSONObject json,
String exactJsonKeyName,
String currentScope,
boolean allowUnknownFields)
String currentScope)
throws IllegalArgumentException {

JSONArray jsonArray;
Expand Down Expand Up @@ -305,8 +280,7 @@ private static void fillRepeatedField(
fieldDescriptor.getMessageType(),
jsonArray.getJSONObject(i),
currentScope,
/*topLevel =*/ false,
allowUnknownFields));
/*topLevel =*/ false));
} else {
fail = true;
}
Expand Down
Expand Up @@ -876,6 +876,8 @@ public void testFlushAllFailed() throws Exception {
.build();

testBigQueryWrite.addException(Status.DATA_LOSS.asException());
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
testBigQueryWrite.addException(Status.DATA_LOSS.asException());

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
Expand Down
Expand Up @@ -251,8 +251,7 @@ public void testSingleAppendSimpleJson() throws Exception {
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());

ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue());
appendFuture.get();
assertEquals(
Expand Down Expand Up @@ -299,8 +298,7 @@ public void testSingleAppendMultipleSimpleJson() throws Exception {
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());

ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);

assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue());
appendFuture.get();
Expand Down Expand Up @@ -357,7 +355,7 @@ public void testMultipleAppendSimpleJson() throws Exception {
.build());
ApiFuture<AppendRowsResponse> appendFuture;
for (int i = 0; i < 4; i++) {
appendFuture = writer.append(jsonArr, -1, /* allowUnknownFields */ false);
appendFuture = writer.append(jsonArr);
assertEquals((long) i, appendFuture.get().getAppendResult().getOffset().getValue());
appendFuture.get();
assertEquals(
Expand Down Expand Up @@ -443,8 +441,7 @@ public void testSingleAppendComplexJson() throws Exception {
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());

ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);

assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue());
appendFuture.get();
Expand Down Expand Up @@ -495,8 +492,7 @@ public void testAppendMultipleSchemaUpdate() throws Exception {
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);

ApiFuture<AppendRowsResponse> appendFuture1 =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture1 = writer.append(jsonArr);

int millis = 0;
while (millis <= 10000) {
Expand Down Expand Up @@ -532,8 +528,7 @@ public void testAppendMultipleSchemaUpdate() throws Exception {
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedFoo);

ApiFuture<AppendRowsResponse> appendFuture2 =
writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture2 = writer.append(updatedJsonArr);

millis = 0;
while (millis <= 10000) {
Expand Down Expand Up @@ -570,8 +565,7 @@ public void testAppendMultipleSchemaUpdate() throws Exception {
JSONArray updatedJsonArr2 = new JSONArray();
updatedJsonArr2.put(updatedFoo2);

ApiFuture<AppendRowsResponse> appendFuture3 =
writer.append(updatedJsonArr2, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture3 = writer.append(updatedJsonArr2);

assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue());
assertEquals(
Expand Down Expand Up @@ -614,8 +608,7 @@ public void testAppendOutOfRangeException() throws Exception {
foo.put("foo", "allen");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
try {
appendFuture.get();
Assert.fail("expected ExecutionException");
Expand Down Expand Up @@ -644,8 +637,7 @@ public void testAppendOutOfRangeAndUpdateSchema() throws Exception {
foo.put("foo", "allen");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
try {
appendFuture.get();
Assert.fail("expected ExecutionException");
Expand All @@ -668,8 +660,7 @@ public void testAppendOutOfRangeAndUpdateSchema() throws Exception {
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedFoo);

ApiFuture<AppendRowsResponse> appendFuture2 =
writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture2 = writer.append(updatedJsonArr);
assertEquals(0L, appendFuture2.get().getAppendResult().getOffset().getValue());
appendFuture2.get();
assertEquals(
Expand Down Expand Up @@ -727,12 +718,9 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception {
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);

ApiFuture<AppendRowsResponse> appendFuture1 =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture2 =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture3 =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture1 = writer.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture2 = writer.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture3 = writer.append(jsonArr);

assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue());
assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue());
Expand Down Expand Up @@ -796,8 +784,7 @@ public void testSchemaUpdateWithNonemptyBatch() throws Exception {
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedFoo);

ApiFuture<AppendRowsResponse> appendFuture4 =
writer.append(updatedJsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture4 = writer.append(updatedJsonArr);

assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue());
assertEquals(
Expand Down Expand Up @@ -857,8 +844,7 @@ public void testMultiThreadAppendNoSchemaUpdate() throws Exception {
new Runnable() {
public void run() {
try {
ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
AppendRowsResponse response = appendFuture.get();
offsetSets.remove(response.getAppendResult().getOffset().getValue());
} catch (Exception e) {
Expand Down Expand Up @@ -940,8 +926,7 @@ public void testMultiThreadAppendWithSchemaUpdate() throws Exception {
new Runnable() {
public void run() {
try {
ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
AppendRowsResponse response = appendFuture.get();
offsetSets.remove(response.getAppendResult().getOffset().getValue());
} catch (Exception e) {
Expand Down Expand Up @@ -1004,8 +989,7 @@ public void run() {
new Runnable() {
public void run() {
try {
ApiFuture<AppendRowsResponse> appendFuture =
writer.append(jsonArr2, -1, /* allowUnknownFields */ false);
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr2);
AppendRowsResponse response = appendFuture.get();
offsetSets.remove(response.getAppendResult().getOffset().getValue());
} catch (Exception e) {
Expand Down