diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java index a16273db44..b43676ed18 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java @@ -52,6 +52,7 @@ public class JsonStreamWriter implements AutoCloseable { private StreamWriter.Builder streamWriterBuilder; private Descriptor descriptor; private TableSchema tableSchema; + private boolean ignoreUnknownFields = false; /** * Constructs the JsonStreamWriter @@ -80,6 +81,7 @@ private JsonStreamWriter(Builder builder) this.streamWriter = streamWriterBuilder.build(); this.streamName = builder.streamName; this.tableSchema = builder.tableSchema; + this.ignoreUnknownFields = builder.ignoreUnknownFields; } /** @@ -135,7 +137,8 @@ public ApiFuture append(JSONArray jsonArr, long offset) for (int i = 0; i < jsonArr.length(); i++) { JSONObject json = jsonArr.getJSONObject(i); Message protoMessage = - JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, this.tableSchema, json); + JsonToProtoMessage.convertJsonToProtoMessage( + this.descriptor, this.tableSchema, json, ignoreUnknownFields); rowsBuilder.addSerializedRows(protoMessage.toByteString()); } // Need to make sure refreshAppendAndSetDescriptor finish first before this can run @@ -249,6 +252,7 @@ public static final class Builder { private String endpoint; private boolean createDefaultStream = false; private String traceId; + private boolean ignoreUnknownFields = false; private static String streamPatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+"; @@ -350,6 +354,18 @@ public Builder setTraceId(String traceId) { return this; } + /** + * Setter for a ignoreUnkownFields, if true, unknown Json fields to BigQuery will be ignored + * instead of error out. + * + * @param ignoreUnknownFields + * @return Builder + */ + public Builder setIgnoreUnknownFields(boolean ignoreUnknownFields) { + this.ignoreUnknownFields = ignoreUnknownFields; + return this; + } + /** * Builds JsonStreamWriter * diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java index 6f5c0cf10b..c9ec09fdab 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java @@ -64,7 +64,8 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null."); Preconditions.checkState(json.length() != 0, "JSONObject is empty."); - return convertJsonToProtoMessageImpl(protoSchema, null, json, "root", /*topLevel=*/ true); + return convertJsonToProtoMessageImpl( + protoSchema, null, json, "root", /*topLevel=*/ true, false); } /** @@ -85,7 +86,39 @@ public static DynamicMessage convertJsonToProtoMessage( Preconditions.checkState(json.length() != 0, "JSONObject is empty."); return convertJsonToProtoMessageImpl( - protoSchema, tableSchema.getFieldsList(), json, "root", /*topLevel=*/ true); + protoSchema, + tableSchema.getFieldsList(), + json, + "root", + /*topLevel=*/ true, + /*ignoreUnknownFields*/ false); + } + + /** + * Converts Json data to protocol buffer messages given the protocol buffer descriptor. + * + * @param protoSchema + * @param tableSchema bigquery table schema is needed for type conversion of DATETIME, TIME, + * NUMERIC, BIGNUMERIC + * @param json + * @param ignoreUnknownFields allows unknown fields in JSON input to be ignored. + * @throws IllegalArgumentException when JSON data is not compatible with proto descriptor. + */ + public static DynamicMessage convertJsonToProtoMessage( + Descriptor protoSchema, TableSchema tableSchema, JSONObject json, boolean ignoreUnknownFields) + throws IllegalArgumentException { + Preconditions.checkNotNull(json, "JSONObject is null."); + Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null."); + Preconditions.checkNotNull(tableSchema, "TableSchema is null."); + Preconditions.checkState(json.length() != 0, "JSONObject is empty."); + + return convertJsonToProtoMessageImpl( + protoSchema, + tableSchema.getFieldsList(), + json, + "root", + /*topLevel=*/ true, + ignoreUnknownFields); } /** @@ -102,7 +135,8 @@ private static DynamicMessage convertJsonToProtoMessageImpl( List tableSchema, JSONObject json, String jsonScope, - boolean topLevel) + boolean topLevel, + boolean ignoreUnknownFields) throws IllegalArgumentException { DynamicMessage.Builder protoMsg = DynamicMessage.newBuilder(protoSchema); @@ -117,9 +151,11 @@ private static DynamicMessage convertJsonToProtoMessageImpl( String jsonLowercaseName = jsonName.toLowerCase(); String currentScope = jsonScope + "." + jsonName; FieldDescriptor field = protoSchema.findFieldByName(jsonLowercaseName); - if (field == null) { + if (field == null && !ignoreUnknownFields) { throw new IllegalArgumentException( String.format("JSONObject has fields unknown to BigQuery: %s.", currentScope)); + } else if (field == null) { + continue; } TableFieldSchema fieldSchema = null; if (tableSchema != null) { @@ -137,9 +173,10 @@ private static DynamicMessage convertJsonToProtoMessageImpl( } } if (!field.isRepeated()) { - fillField(protoMsg, field, fieldSchema, json, jsonName, currentScope); + fillField(protoMsg, field, fieldSchema, json, jsonName, currentScope, ignoreUnknownFields); } else { - fillRepeatedField(protoMsg, field, fieldSchema, json, jsonName, currentScope); + fillRepeatedField( + protoMsg, field, fieldSchema, json, jsonName, currentScope, ignoreUnknownFields); } } @@ -174,7 +211,8 @@ private static void fillField( TableFieldSchema fieldSchema, JSONObject json, String exactJsonKeyName, - String currentScope) + String currentScope, + boolean ignoreUnknownFields) throws IllegalArgumentException { java.lang.Object val = json.get(exactJsonKeyName); @@ -303,7 +341,8 @@ private static void fillField( fieldSchema == null ? null : fieldSchema.getFieldsList(), json.getJSONObject(exactJsonKeyName), currentScope, - /*topLevel =*/ false)); + /*topLevel =*/ false, + ignoreUnknownFields)); return; } break; @@ -331,7 +370,8 @@ private static void fillRepeatedField( TableFieldSchema fieldSchema, JSONObject json, String exactJsonKeyName, - String currentScope) + String currentScope, + boolean ignoreUnknownFields) throws IllegalArgumentException { JSONArray jsonArray; @@ -478,7 +518,8 @@ private static void fillRepeatedField( fieldSchema == null ? null : fieldSchema.getFieldsList(), jsonArray.getJSONObject(i), currentScope, - /*topLevel =*/ false)); + /*topLevel =*/ false, + ignoreUnknownFields)); } else { fail = true; } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index 3b8c47585d..43d18b6129 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -519,11 +519,7 @@ private void cleanupInflightRequests() { } finally { this.lock.unlock(); } - log.fine( - "Cleaning " - + localQueue.size() - + " inflight requests with error: " - + finalStatus.toString()); + log.fine("Cleaning " + localQueue.size() + " inflight requests with error: " + finalStatus); while (!localQueue.isEmpty()) { localQueue.pollFirst().appendResult.setException(finalStatus); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java index 24475c03e3..0e11aca77a 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java @@ -496,4 +496,48 @@ public void testSimpleSchemaUpdate() throws Exception { || testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema()); } } + + @Test + public void testWithoutIgnoreUnknownFields() throws Exception { + TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build(); + try (JsonStreamWriter writer = + getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) { + JSONObject foo = new JSONObject(); + foo.put("test_int", 10); + JSONObject bar = new JSONObject(); + bar.put("test_unknown", 10); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + jsonArr.put(bar); + try { + ApiFuture appendFuture = writer.append(jsonArr); + Assert.fail("expected ExecutionException"); + } catch (Exception ex) { + assertEquals( + ex.getMessage(), "JSONObject has fields unknown to BigQuery: root.test_unknown."); + } + } + } + + @Test + public void testWithIgnoreUnknownFields() throws Exception { + TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build(); + try (JsonStreamWriter writer = + JsonStreamWriter.newBuilder(TEST_STREAM, tableSchema) + .setChannelProvider(channelProvider) + .setIgnoreUnknownFields(true) + .setCredentialsProvider(NoCredentialsProvider.create()) + .build()) { + testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build()); + JSONObject foo = new JSONObject(); + foo.put("test_int", 10); + JSONObject bar = new JSONObject(); + bar.put("test_unknown", 10); + JSONArray jsonArr = new JSONArray(); + jsonArr.put(foo); + jsonArr.put(bar); + ApiFuture appendFuture = writer.append(jsonArr); + appendFuture.get(); + } + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java index a6b386f2b2..764cfa7f53 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java @@ -352,7 +352,9 @@ public void testJsonStreamWriterWithDefaultStream() bigquery.create(tableInfo); TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName); try (JsonStreamWriter jsonStreamWriter = - JsonStreamWriter.newBuilder(parent.toString(), tableSchema).build()) { + JsonStreamWriter.newBuilder(parent.toString(), tableSchema) + .setIgnoreUnknownFields(true) + .build()) { LOG.info("Sending one message"); JSONObject row1 = new JSONObject(); row1.put("test_str", "aaa"); @@ -365,6 +367,7 @@ public void testJsonStreamWriterWithDefaultStream() BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("-9000000")) .toByteArray() })); + row1.put("unknown_field", "a"); row1.put( "test_datetime", CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.of(2020, 10, 1, 12, 0)));