From bc2d8cc82adeeddb21aeb9845e0883d369101513 Mon Sep 17 00:00:00 2001 From: allenc3 <31832826+allenc3@users.noreply.github.com> Date: Thu, 18 Jun 2020 11:38:07 -0500 Subject: [PATCH] feat: Added schema compatibility check functionality (SchemaCompact.java) (#339) * Protobuf and annotations working; first test that chekcs if user schema types are supported by BQ. * Halfway through basic type checking of nonsupported types; wrote tests on scalar types and oneof types * Almost finished detecting cycles in protos. * Finished checking for nonsupported types, added zetasql package as dependency in pom. * Reformatted to pass checkstyle. * Added all BQ compatibility checks and started working on main checking method * Removed all exact match code; realized parameter exact match is a backend object. * Added field mode checks * Refactored field option checking code. * All current tests work. * Finished BQInteger testing; need to add testing for all types and annotations. * Removing zetasql related code * Removed all zetasql related code * Finished basic type checking and added tests, need to add more in-depth tests to account for BQRecord * feat: Added SchemaCompact class that checks compatibility between proto schemas and BQ schemas. * Changed some files to make compilation/testing work * Added to clirr-ignored-differences to ignore differences in check method signature * Changed clirr ignored method to be the original method signature * Fixed code for pull request * Fixed linting errors * Fixed all PR suggestions * Merged isSupported and isCompatible, fixed minor issues according to PR suggestions. * Removed duplicate commits * Fix linting * Added a testcase for field name casing. * Added Integration tests, also added a field mode for footype in tableInfo as SchemaCheck does not allow BQ fields with no field mode. * Fixed linting * Fixed testing errors * Fixing test pt 2 * Reverted clirr-ignored-differences.ml --- .../storage/v1alpha2/SchemaCompact.java | 472 ++++++++- .../storage/v1alpha2/SchemaCompactTest.java | 949 +++++++++++++++++- .../it/ITBigQueryWriteManualClientTest.java | 43 + .../src/test/proto/schemaTest.proto | 259 +++++ 4 files changed, 1674 insertions(+), 49 deletions(-) create mode 100644 google-cloud-bigquerystorage/src/test/proto/schemaTest.proto diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompact.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompact.java index 3f17f44951..00c370c800 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompact.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompact.java @@ -16,19 +16,27 @@ package com.google.cloud.bigquery.storage.v1alpha2; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Field; +import com.google.cloud.bigquery.LegacySQLTypeName; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.testing.RemoteBigQueryHelper; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.Descriptors; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; /** - * A class that checks the schema compatibility between user schema in proto descriptor and Bigquery - * table schema. If this check is passed, then user can write to BigQuery table using the user - * schema, otherwise the write will fail. + * A class that checks the schema compatibility between Proto schema in proto descriptor and + * Bigquery table schema. If this check is passed, then user can write to BigQuery table using the + * user schema, otherwise the write will fail. * *

The implementation as of now is not complete, which measn, if this check passed, there is * still a possbility of writing will fail. @@ -38,8 +46,32 @@ public class SchemaCompact { private static SchemaCompact compact; private static String tablePatternString = "projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)"; private static Pattern tablePattern = Pattern.compile(tablePatternString); + private static final int NestingLimit = 15; + // private static Set SupportedTypesHashSet = + + private static Set SupportedTypes = + Collections.unmodifiableSet( + new HashSet<>( + Arrays.asList( + Descriptors.FieldDescriptor.Type.INT32, + Descriptors.FieldDescriptor.Type.INT64, + Descriptors.FieldDescriptor.Type.UINT32, + Descriptors.FieldDescriptor.Type.UINT64, + Descriptors.FieldDescriptor.Type.FIXED32, + Descriptors.FieldDescriptor.Type.FIXED64, + Descriptors.FieldDescriptor.Type.SFIXED32, + Descriptors.FieldDescriptor.Type.SFIXED64, + Descriptors.FieldDescriptor.Type.FLOAT, + Descriptors.FieldDescriptor.Type.DOUBLE, + Descriptors.FieldDescriptor.Type.BOOL, + Descriptors.FieldDescriptor.Type.BYTES, + Descriptors.FieldDescriptor.Type.STRING, + Descriptors.FieldDescriptor.Type.MESSAGE, + Descriptors.FieldDescriptor.Type.GROUP, + Descriptors.FieldDescriptor.Type.ENUM))); private SchemaCompact(BigQuery bigquery) { + // TODO: Add functionality that allows SchemaCompact to build schemas. this.bigquery = bigquery; } @@ -76,24 +108,430 @@ private TableId getTableId(String tableName) { } /** - * Checks if the userSchema is compatible with the table's current schema for writing. The current - * implementatoin is not complete. If the check failed, the write couldn't succeed. + * @param field + * @return True if fieldtype is supported by BQ Schema + */ + public static boolean isSupportedType(Descriptors.FieldDescriptor field) { + Descriptors.FieldDescriptor.Type fieldType = field.getType(); + if (!SupportedTypes.contains(fieldType)) { + return false; + } + return true; + } + + private static boolean isCompatibleWithBQBool(Descriptors.FieldDescriptor.Type field) { + if (field == Descriptors.FieldDescriptor.Type.BOOL + || field == Descriptors.FieldDescriptor.Type.INT32 + || field == Descriptors.FieldDescriptor.Type.INT64 + || field == Descriptors.FieldDescriptor.Type.UINT32 + || field == Descriptors.FieldDescriptor.Type.UINT64 + || field == Descriptors.FieldDescriptor.Type.FIXED32 + || field == Descriptors.FieldDescriptor.Type.FIXED64 + || field == Descriptors.FieldDescriptor.Type.SFIXED32 + || field == Descriptors.FieldDescriptor.Type.SFIXED64) { + return true; + } + return false; + } + + private static boolean isCompatibleWithBQBytes(Descriptors.FieldDescriptor.Type field) { + if (field == Descriptors.FieldDescriptor.Type.BYTES) { + return true; + } + return false; + } + + private static boolean isCompatibleWithBQDate(Descriptors.FieldDescriptor.Type field) { + if (field == Descriptors.FieldDescriptor.Type.INT32 + || field == Descriptors.FieldDescriptor.Type.INT64 + || field == Descriptors.FieldDescriptor.Type.SFIXED32 + || field == Descriptors.FieldDescriptor.Type.SFIXED64) { + + return true; + } + return false; + } + + private static boolean isCompatibleWithBQDatetime(Descriptors.FieldDescriptor.Type field) { + if (field == Descriptors.FieldDescriptor.Type.INT64 + || field == Descriptors.FieldDescriptor.Type.SFIXED64) { + return true; + } + return false; + } + + private static boolean isCompatibleWithBQFloat(Descriptors.FieldDescriptor.Type field) { + if (field == Descriptors.FieldDescriptor.Type.FLOAT) { + return true; + } + if (field == Descriptors.FieldDescriptor.Type.DOUBLE) { + return true; + } + return false; + } + + private static boolean isCompatibleWithBQGeography(Descriptors.FieldDescriptor.Type field) { + if (field == Descriptors.FieldDescriptor.Type.BYTES) { + return true; + } + return false; + } + + private static boolean isCompatibleWithBQInteger(Descriptors.FieldDescriptor.Type field) { + if (field == Descriptors.FieldDescriptor.Type.INT64 + || field == Descriptors.FieldDescriptor.Type.SFIXED64 + || field == Descriptors.FieldDescriptor.Type.INT32 + || field == Descriptors.FieldDescriptor.Type.UINT32 + || field == Descriptors.FieldDescriptor.Type.FIXED32 + || field == Descriptors.FieldDescriptor.Type.SFIXED32 + || field == Descriptors.FieldDescriptor.Type.ENUM) { + return true; + } + return false; + } + + private static boolean isCompatibleWithBQNumeric(Descriptors.FieldDescriptor.Type field) { + if (field == Descriptors.FieldDescriptor.Type.INT32 + || field == Descriptors.FieldDescriptor.Type.INT64 + || field == Descriptors.FieldDescriptor.Type.UINT32 + || field == Descriptors.FieldDescriptor.Type.UINT64 + || field == Descriptors.FieldDescriptor.Type.FIXED32 + || field == Descriptors.FieldDescriptor.Type.FIXED64 + || field == Descriptors.FieldDescriptor.Type.SFIXED32 + || field == Descriptors.FieldDescriptor.Type.SFIXED64) { + return true; + } + + if (field == Descriptors.FieldDescriptor.Type.BYTES) { + return true; + } + + return false; + } + + private static boolean isCompatibleWithBQRecord(Descriptors.FieldDescriptor.Type field) { + if (field == Descriptors.FieldDescriptor.Type.MESSAGE + || field == Descriptors.FieldDescriptor.Type.GROUP) { + return true; + } + return false; + } + + private static boolean isCompatibleWithBQString(Descriptors.FieldDescriptor.Type field) { + if (field == Descriptors.FieldDescriptor.Type.STRING + || field == Descriptors.FieldDescriptor.Type.ENUM) { + return true; + } + return false; + } + + private static boolean isCompatibleWithBQTime(Descriptors.FieldDescriptor.Type field) { + if (field == Descriptors.FieldDescriptor.Type.INT64 + || field == Descriptors.FieldDescriptor.Type.SFIXED64) { + + return true; + } + return false; + } + + private static boolean isCompatibleWithBQTimestamp(Descriptors.FieldDescriptor.Type field) { + if (isCompatibleWithBQInteger(field)) { + return true; + } + return false; + } + + /** + * Checks if proto field option is compatible with BQ field mode. + * + * @param protoField + * @param BQField + * @param protoScope Debugging purposes to show error if messages are nested. + * @param BQScope Debugging purposes to show error if messages are nested. + * @throws IllegalArgumentException if proto field type is incompatible with BQ field type. + */ + private void protoFieldModeIsCompatibleWithBQFieldMode( + Descriptors.FieldDescriptor protoField, Field BQField, String protoScope, String BQScope) + throws IllegalArgumentException { + if (BQField.getMode() == null) { + throw new IllegalArgumentException( + "Big query schema contains invalid field option for " + BQScope + "."); + } + switch (BQField.getMode()) { + case REPEATED: + if (!protoField.isRepeated()) { + throw new IllegalArgumentException( + "Given proto field " + + protoScope + + " is not repeated but Big Query field " + + BQScope + + " is."); + } + break; + case REQUIRED: + if (!protoField.isRequired()) { + throw new IllegalArgumentException( + "Given proto field " + + protoScope + + " is not required but Big Query field " + + BQScope + + " is."); + } + break; + case NULLABLE: + if (protoField.isRepeated()) { + throw new IllegalArgumentException( + "Given proto field " + + protoScope + + " is repeated but Big Query field " + + BQScope + + " is optional."); + } + break; + } + } + /** + * Checks if proto field type is compatible with BQ field type. * - * @param tableName The name of the table to write to. - * @param userSchema The schema user uses to append data. - * @throws IllegalArgumentException the check failed. + * @param protoField + * @param BQField + * @param allowUnknownFields + * @param protoScope Debugging purposes to show error if messages are nested. + * @param BQScope Debugging purposes to show error if messages are nested. + * @param allMessageTypes Keeps track of all current protos to avoid recursively nested protos. + * @param rootProtoName Debugging purposes for nested level > 15. + * @throws IllegalArgumentException if proto field type is incompatible with BQ field type. */ - public void check(String tableName, Descriptors.Descriptor userSchema) + private void protoFieldTypeIsCompatibleWithBQFieldType( + Descriptors.FieldDescriptor protoField, + Field BQField, + boolean allowUnknownFields, + String protoScope, + String BQScope, + HashSet allMessageTypes, + String rootProtoName) throws IllegalArgumentException { - Table table = bigquery.getTable(getTableId(tableName)); - Schema schema = table.getDefinition().getSchema(); - // TODO: We only have very limited check here. More checks to be added. - if (schema.getFields().size() != userSchema.getFields().size()) { + + LegacySQLTypeName BQType = BQField.getType(); + Descriptors.FieldDescriptor.Type protoType = protoField.getType(); + boolean match = false; + switch (BQType.toString()) { + case "BOOLEAN": + match = isCompatibleWithBQBool(protoType); + break; + case "BYTES": + match = isCompatibleWithBQBytes(protoType); + break; + case "DATE": + match = isCompatibleWithBQDate(protoType); + break; + case "DATETIME": + match = isCompatibleWithBQDatetime(protoType); + break; + case "FLOAT": + match = isCompatibleWithBQFloat(protoType); + break; + case "GEOGRAPHY": + match = isCompatibleWithBQGeography(protoType); + break; + case "INTEGER": + match = isCompatibleWithBQInteger(protoType); + break; + case "NUMERIC": + match = isCompatibleWithBQNumeric(protoType); + break; + case "RECORD": + if (allMessageTypes.size() > NestingLimit) { + throw new IllegalArgumentException( + "Proto schema " + + rootProtoName + + " is not supported: contains nested messages of more than 15 levels."); + } + match = isCompatibleWithBQRecord(protoType); + if (!match) { + break; + } + Descriptors.Descriptor message = protoField.getMessageType(); + if (allMessageTypes.contains(message)) { + throw new IllegalArgumentException( + "Proto schema " + protoScope + " is not supported: is a recursively nested message."); + } + allMessageTypes.add(message); + isProtoCompatibleWithBQ( + protoField.getMessageType(), + Schema.of(BQField.getSubFields()), + allowUnknownFields, + protoScope, + BQScope, + false, + allMessageTypes, + rootProtoName); + allMessageTypes.remove(message); + break; + case "STRING": + match = isCompatibleWithBQString(protoType); + break; + case "TIME": + match = isCompatibleWithBQTime(protoType); + break; + case "TIMESTAMP": + match = isCompatibleWithBQTimestamp(protoType); + break; + } + if (!match) { throw new IllegalArgumentException( - "User schema doesn't have expected field number with BigQuery table schema, expected: " - + schema.getFields().size() - + " actual: " - + userSchema.getFields().size()); + "The proto field " + + protoScope + + " does not have a matching type with the big query field " + + BQScope + + "."); } } + + /** + * Checks if proto schema is compatible with BQ schema. + * + * @param protoSchema + * @param BQSchema + * @param allowUnknownFields + * @param protoScope Debugging purposes to show error if messages are nested. + * @param BQScope Debugging purposes to show error if messages are nested. + * @param topLevel True if this is the root level of proto (in terms of nested messages) + * @param allMessageTypes Keeps track of all current protos to avoid recursively nested protos. + * @param rootProtoName Debugging purposes for nested level > 15. + * @throws IllegalArgumentException if proto field type is incompatible with BQ field type. + */ + private void isProtoCompatibleWithBQ( + Descriptors.Descriptor protoSchema, + Schema BQSchema, + boolean allowUnknownFields, + String protoScope, + String BQScope, + boolean topLevel, + HashSet allMessageTypes, + String rootProtoName) + throws IllegalArgumentException { + + int matchedFields = 0; + HashMap protoFieldMap = new HashMap<>(); + List protoFields = protoSchema.getFields(); + List BQFields = BQSchema.getFields(); + + if (protoFields.size() > BQFields.size()) { + if (!allowUnknownFields) { + throw new IllegalArgumentException( + "Proto schema " + + protoScope + + " has " + + protoFields.size() + + " fields, while BQ schema " + + BQScope + + " has " + + BQFields.size() + + " fields."); + } + } + // Use hashmap to map from lowercased name to appropriate field to account for casing difference + for (Descriptors.FieldDescriptor field : protoFields) { + protoFieldMap.put(field.getName().toLowerCase(), field); + } + + for (Field BQField : BQFields) { + String fieldName = BQField.getName().toLowerCase(); + Descriptors.FieldDescriptor protoField = null; + if (protoFieldMap.containsKey(fieldName)) { + protoField = protoFieldMap.get(fieldName); + } + + String currentBQScope = BQScope + "." + BQField.getName(); + if (protoField == null && BQField.getMode() == Field.Mode.REQUIRED) { + throw new IllegalArgumentException( + "The required Big Query field " + + currentBQScope + + " is missing in the proto schema " + + protoScope + + "."); + } + if (protoField == null) { + continue; + } + String currentProtoScope = protoScope + "." + protoField.getName(); + if (!isSupportedType(protoField)) { + throw new IllegalArgumentException( + "Proto schema " + + currentProtoScope + + " is not supported: contains " + + protoField.getType() + + " field type."); + } + if (protoField.isMapField()) { + throw new IllegalArgumentException( + "Proto schema " + currentProtoScope + " is not supported: is a map field."); + } + protoFieldModeIsCompatibleWithBQFieldMode( + protoField, BQField, currentProtoScope, currentBQScope); + protoFieldTypeIsCompatibleWithBQFieldType( + protoField, + BQField, + allowUnknownFields, + currentProtoScope, + currentBQScope, + allMessageTypes, + rootProtoName); + matchedFields++; + } + + if (matchedFields == 0 && topLevel) { + throw new IllegalArgumentException( + "There is no matching fields found for the proto schema " + + protoScope + + " and the BQ table schema " + + BQScope + + "."); + } + } + + /** + * Checks if proto schema is compatible with BQ schema after retrieving BQ schema by BQTableName. + * + * @param BQTableName Must include project_id, dataset_id, and table_id in the form that matches + * the regex "projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)" + * @param protoSchema + * @param allowUnknownFields Flag indicating proto can have unknown fields. + * @throws IllegalArgumentException if proto field type is incompatible with BQ field type. + */ + public void check( + String BQTableName, Descriptors.Descriptor protoSchema, boolean allowUnknownFields) + throws IllegalArgumentException { + TableId tableId = getTableId(BQTableName); + Table table = bigquery.getTable(tableId); + Schema BQSchema = table.getDefinition().getSchema(); + String protoSchemaName = protoSchema.getName(); + HashSet allMessageTypes = new HashSet<>(); + allMessageTypes.add(protoSchema); + isProtoCompatibleWithBQ( + protoSchema, + BQSchema, + allowUnknownFields, + protoSchemaName, + tableId.getTable(), + true, + allMessageTypes, + protoSchemaName); + } + + /** + * Checks if proto schema is compatible with BQ schema after retrieving BQ schema by BQTableName. + * Assumes allowUnknownFields is false. + * + * @param BQTableName Must include project_id, dataset_id, and table_id in the form that matches + * the regex "projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)" + * @param protoSchema + * @throws IllegalArgumentException if proto field type is incompatible with BQ field type. + */ + public void check(String BQTableName, Descriptors.Descriptor protoSchema) + throws IllegalArgumentException { + + check(BQTableName, protoSchema, false); + } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompactTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompactTest.java index 205d814968..259bc59da0 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompactTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompactTest.java @@ -23,8 +23,12 @@ import com.google.cloud.bigquery.LegacySQLTypeName; import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.storage.test.SchemaTest.*; import com.google.cloud.bigquery.storage.test.Test.FooType; +import com.google.protobuf.Descriptors; import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; import javax.annotation.Nullable; import org.junit.After; import org.junit.Before; @@ -38,6 +42,24 @@ public class SchemaCompactTest { @Mock private BigQuery mockBigquery; @Mock private Table mockBigqueryTable; + Descriptors.Descriptor[] type_descriptors = { + Int32Type.getDescriptor(), + Int64Type.getDescriptor(), + UInt32Type.getDescriptor(), + UInt64Type.getDescriptor(), + Fixed32Type.getDescriptor(), + Fixed64Type.getDescriptor(), + SFixed32Type.getDescriptor(), + SFixed64Type.getDescriptor(), + FloatType.getDescriptor(), + DoubleType.getDescriptor(), + BoolType.getDescriptor(), + BytesType.getDescriptor(), + StringType.getDescriptor(), + EnumType.getDescriptor(), + MessageType.getDescriptor(), + GroupType.getDescriptor() + }; @Before public void setUp() throws IOException { @@ -51,8 +73,7 @@ public void tearDown() { verifyNoMoreInteractions(mockBigqueryTable); } - @Test - public void testSuccess() throws Exception { + public void customizeSchema(final Schema schema) { TableDefinition definition = new TableDefinition() { @Override @@ -63,7 +84,7 @@ public Type getType() { @Nullable @Override public Schema getSchema() { - return Schema.of(Field.of("Foo", LegacySQLTypeName.STRING)); + return schema; } @Override @@ -72,42 +93,60 @@ public Builder toBuilder() { } }; when(mockBigqueryTable.getDefinition()).thenReturn(definition); + } + + @Test + public void testSuccess() throws Exception { + customizeSchema( + Schema.of( + Field.newBuilder("Foo", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build())); SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); - compact.check("projects/p/datasets/d/tables/t", FooType.getDescriptor()); + compact.check("projects/p/datasets/d/tables/t", FooType.getDescriptor(), false); verify(mockBigquery, times(1)).getTable(any(TableId.class)); verify(mockBigqueryTable, times(1)).getDefinition(); } @Test - public void testFailed() throws Exception { - TableDefinition definition = - new TableDefinition() { - @Override - public Type getType() { - return null; - } + public void testBadTableName() throws Exception { + try { + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + compact.check("blah", FooType.getDescriptor(), false); + fail("should fail"); + } catch (IllegalArgumentException expected) { + assertEquals("Invalid table name: blah", expected.getMessage()); + } + } - @Nullable - @Override - public Schema getSchema() { - return Schema.of( - Field.of("Foo", LegacySQLTypeName.STRING), - Field.of("Bar", LegacySQLTypeName.STRING)); - } + @Test + public void testSupportedTypes() { + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + for (Descriptors.FieldDescriptor field : SupportedTypes.getDescriptor().getFields()) { + assertTrue(compact.isSupportedType(field)); + } - @Override - public Builder toBuilder() { - return null; - } - }; - when(mockBigqueryTable.getDefinition()).thenReturn(definition); + for (Descriptors.FieldDescriptor field : NonSupportedTypes.getDescriptor().getFields()) { + assertFalse(compact.isSupportedType(field)); + } + } + + @Test + public void testMap() { + customizeSchema( + Schema.of( + Field.newBuilder("map_value", LegacySQLTypeName.INTEGER) + .setMode(Field.Mode.NULLABLE) + .build())); SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + Descriptors.Descriptor testMap = NonSupportedMap.getDescriptor(); + String protoName = testMap.getName() + ".map_value"; try { - compact.check("projects/p/datasets/d/tables/t", FooType.getDescriptor()); - fail("should fail"); + compact.check("projects/p/datasets/d/tables/t", testMap, false); + fail("Should not be supported: field contains map"); } catch (IllegalArgumentException expected) { assertEquals( - "User schema doesn't have expected field number with BigQuery table schema, expected: 2 actual: 1", + "Proto schema " + protoName + " is not supported: is a map field.", expected.getMessage()); } verify(mockBigquery, times(1)).getTable(any(TableId.class)); @@ -115,13 +154,859 @@ public Builder toBuilder() { } @Test - public void testBadTableName() throws Exception { + public void testNestingSupportedSimple() { + Field BQSupportedNestingLvl2 = + Field.newBuilder("int_value", LegacySQLTypeName.INTEGER) + .setMode(Field.Mode.NULLABLE) + .build(); + customizeSchema( + Schema.of( + Field.newBuilder("int_value", LegacySQLTypeName.INTEGER) + .setMode(Field.Mode.NULLABLE) + .build(), + Field.newBuilder("nesting_value", LegacySQLTypeName.RECORD, BQSupportedNestingLvl2) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + Descriptors.Descriptor testNesting = SupportedNestingLvl1.getDescriptor(); try { - SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); - compact.check("blah", FooType.getDescriptor()); - fail("should fail"); + compact.check("projects/p/datasets/d/tables/t", testNesting, false); + } catch (Exception e) { + fail(e.getMessage()); + } + verify(mockBigquery, times(1)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(1)).getDefinition(); + } + + @Test + public void testNestingSupportedStacked() { + Field BQSupportedNestingLvl2 = + Field.newBuilder("int_value", LegacySQLTypeName.INTEGER) + .setMode(Field.Mode.NULLABLE) + .build(); + customizeSchema( + Schema.of( + Field.newBuilder("int_value", LegacySQLTypeName.INTEGER) + .setMode(Field.Mode.NULLABLE) + .build(), + Field.newBuilder("nesting_value1", LegacySQLTypeName.RECORD, BQSupportedNestingLvl2) + .setMode(Field.Mode.NULLABLE) + .build(), + Field.newBuilder("nesting_value2", LegacySQLTypeName.RECORD, BQSupportedNestingLvl2) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + Descriptors.Descriptor testNesting = SupportedNestingStacked.getDescriptor(); + try { + compact.check("projects/p/datasets/d/tables/t", testNesting, false); + } catch (Exception e) { + fail(e.getMessage()); + } + verify(mockBigquery, times(1)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(1)).getDefinition(); + } + + /* + * This is not the "exact" test, as BigQuery fields cannot be recursive. Instead, this test uses + * two DIFFERENT records with the same name to simulate recursive protos (protos can't have the + * same name anyways unless they are the same proto). + */ + @Test + public void testNestingContainsRecursive() { + Field BQNonSupportedNestingRecursive = + Field.newBuilder( + "nesting_value", + LegacySQLTypeName.RECORD, + Field.newBuilder("int_value", LegacySQLTypeName.INTEGER) + .setMode(Field.Mode.NULLABLE) + .build()) + .setMode(Field.Mode.NULLABLE) + .build(); + + customizeSchema( + Schema.of( + Field.newBuilder("int_value", LegacySQLTypeName.INTEGER) + .setMode(Field.Mode.NULLABLE) + .build(), + Field.newBuilder( + "nesting_value", LegacySQLTypeName.RECORD, BQNonSupportedNestingRecursive) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + Descriptors.Descriptor testNesting = NonSupportedNestingContainsRecursive.getDescriptor(); + try { + compact.check("projects/p/datasets/d/tables/t", testNesting, false); + fail("Should not be supported: contains nested messages of more than 15 levels."); } catch (IllegalArgumentException expected) { - assertEquals("Invalid table name: blah", expected.getMessage()); + assertEquals( + "Proto schema " + + testNesting.getName() + + ".nesting_value.nesting_value is not supported: is a recursively nested message.", + expected.getMessage()); } + verify(mockBigquery, times(1)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(1)).getDefinition(); + } + + @Test + public void testNestingRecursiveLimit() { + Field NonSupportedNestingLvl16 = + Field.newBuilder("test1", LegacySQLTypeName.INTEGER).setMode(Field.Mode.NULLABLE).build(); + Field NonSupportedNestingLvl15 = + Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl16) + .setMode(Field.Mode.NULLABLE) + .build(); + Field NonSupportedNestingLvl14 = + Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl15) + .setMode(Field.Mode.NULLABLE) + .build(); + Field NonSupportedNestingLvl13 = + Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl14) + .setMode(Field.Mode.NULLABLE) + .build(); + Field NonSupportedNestingLvl12 = + Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl13) + .setMode(Field.Mode.NULLABLE) + .build(); + Field NonSupportedNestingLvl11 = + Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl12) + .setMode(Field.Mode.NULLABLE) + .build(); + Field NonSupportedNestingLvl10 = + Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl11) + .setMode(Field.Mode.NULLABLE) + .build(); + Field NonSupportedNestingLvl9 = + Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl10) + .setMode(Field.Mode.NULLABLE) + .build(); + Field NonSupportedNestingLvl8 = + Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl9) + .setMode(Field.Mode.NULLABLE) + .build(); + Field NonSupportedNestingLvl7 = + Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl8) + .setMode(Field.Mode.NULLABLE) + .build(); + Field NonSupportedNestingLvl6 = + Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl7) + .setMode(Field.Mode.NULLABLE) + .build(); + Field NonSupportedNestingLvl5 = + Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl6) + .setMode(Field.Mode.NULLABLE) + .build(); + Field NonSupportedNestingLvl4 = + Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl5) + .setMode(Field.Mode.NULLABLE) + .build(); + Field NonSupportedNestingLvl3 = + Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl4) + .setMode(Field.Mode.NULLABLE) + .build(); + Field NonSupportedNestingLvl2 = + Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl3) + .setMode(Field.Mode.NULLABLE) + .build(); + Field NonSupportedNestingLvl1 = + Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl2) + .setMode(Field.Mode.NULLABLE) + .build(); + customizeSchema( + Schema.of( + Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl1) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + Descriptors.Descriptor testNesting = NonSupportedNestingLvl0.getDescriptor(); + try { + compact.check("projects/p/datasets/d/tables/t", testNesting, false); + fail("Should not be supported: contains nested messages of more than 15 levels."); + } catch (IllegalArgumentException expected) { + assertEquals( + "Proto schema " + + testNesting.getName() + + " is not supported: contains nested messages of more than 15 levels.", + expected.getMessage()); + } + verify(mockBigquery, times(1)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(1)).getDefinition(); + } + + @Test + public void testProtoMoreFields() { + Schema customSchema = Schema.of(Field.of("int32_value", LegacySQLTypeName.INTEGER)); + customizeSchema(customSchema); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + + try { + compact.check("projects/p/datasets/d/tables/t", SupportedTypes.getDescriptor(), false); + fail("Should fail: proto has more fields and allowUnknownFields flag is false."); + } catch (IllegalArgumentException expected) { + assertEquals( + "Proto schema " + + SupportedTypes.getDescriptor().getName() + + " has " + + SupportedTypes.getDescriptor().getFields().size() + + " fields, while BQ schema t has " + + 1 + + " fields.", + expected.getMessage()); + } + verify(mockBigquery, times(1)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(1)).getDefinition(); + } + + @Test + public void testBQRepeated() { + customizeSchema( + Schema.of( + Field.newBuilder("repeated_mode", LegacySQLTypeName.INTEGER) + .setMode(Field.Mode.REPEATED) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + compact.check("projects/p/datasets/d/tables/t", ProtoRepeatedBQRepeated.getDescriptor(), false); + try { + compact.check( + "projects/p/datasets/d/tables/t", ProtoOptionalBQRepeated.getDescriptor(), false); + fail("Should fail: BQ schema is repeated, but proto is optional."); + } catch (IllegalArgumentException expected) { + assertEquals( + "Given proto field " + + ProtoOptionalBQRepeated.getDescriptor().getName() + + ".repeated_mode" + + " is not repeated but Big Query field t.repeated_mode is.", + expected.getMessage()); + } + + try { + compact.check( + "projects/p/datasets/d/tables/t", ProtoRequiredBQRepeated.getDescriptor(), false); + fail("Should fail: BQ schema is repeated, but proto is required."); + } catch (IllegalArgumentException expected) { + assertEquals( + "Given proto field " + + ProtoRequiredBQRepeated.getDescriptor().getName() + + ".repeated_mode" + + " is not repeated but Big Query field t.repeated_mode is.", + expected.getMessage()); + } + verify(mockBigquery, times(3)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(3)).getDefinition(); + } + + @Test + public void testBQRequired() { + customizeSchema( + Schema.of( + Field.newBuilder("required_mode", LegacySQLTypeName.INTEGER) + .setMode(Field.Mode.REQUIRED) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + compact.check("projects/p/datasets/d/tables/t", ProtoRequiredBQRequired.getDescriptor(), false); + + try { + compact.check("projects/p/datasets/d/tables/t", ProtoNoneBQRequired.getDescriptor(), false); + fail("Should fail: BQ schema is required, but proto does not have this field."); + } catch (IllegalArgumentException expected) { + assertEquals( + "The required Big Query field t.required_mode is missing in the proto schema " + + ProtoNoneBQRequired.getDescriptor().getName() + + ".", + expected.getMessage()); + } + + try { + compact.check( + "projects/p/datasets/d/tables/t", ProtoOptionalBQRequired.getDescriptor(), false); + fail("Should fail: BQ schema is required, but proto is optional."); + } catch (IllegalArgumentException expected) { + assertEquals( + "Given proto field " + + ProtoOptionalBQRequired.getDescriptor().getName() + + ".required_mode is not required but Big Query field t.required_mode is.", + expected.getMessage()); + } + + try { + compact.check( + "projects/p/datasets/d/tables/t", ProtoRepeatedBQRequired.getDescriptor(), false); + fail("Should fail: BQ schema is required, but proto is repeated."); + } catch (IllegalArgumentException expected) { + assertEquals( + "Given proto field " + + ProtoRepeatedBQRequired.getDescriptor().getName() + + ".required_mode is not required but Big Query field t.required_mode is.", + expected.getMessage()); + } + verify(mockBigquery, times(4)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(4)).getDefinition(); + } + + @Test + public void testBQOptional() { + customizeSchema( + Schema.of( + Field.newBuilder("optional_mode", LegacySQLTypeName.INTEGER) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + compact.check("projects/p/datasets/d/tables/t", ProtoOptionalBQOptional.getDescriptor(), false); + compact.check("projects/p/datasets/d/tables/t", ProtoRequiredBQOptional.getDescriptor(), false); + + try { + compact.check( + "projects/p/datasets/d/tables/t", ProtoRepeatedBQOptional.getDescriptor(), false); + fail("Should fail: BQ schema is nullable, but proto field is repeated."); + } catch (IllegalArgumentException expected) { + assertEquals( + "Given proto field " + + ProtoRepeatedBQOptional.getDescriptor().getName() + + ".optional_mode is repeated but Big Query field t.optional_mode is optional.", + expected.getMessage()); + } + + verify(mockBigquery, times(3)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(3)).getDefinition(); + } + + @Test + public void testBQBool() { + customizeSchema( + Schema.of( + Field.newBuilder("test_field_type", LegacySQLTypeName.BOOLEAN) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + HashSet compatible = + new HashSet<>( + Arrays.asList( + Int32Type.getDescriptor(), + Int64Type.getDescriptor(), + UInt32Type.getDescriptor(), + UInt64Type.getDescriptor(), + Fixed32Type.getDescriptor(), + Fixed64Type.getDescriptor(), + SFixed32Type.getDescriptor(), + SFixed64Type.getDescriptor(), + BoolType.getDescriptor())); + + for (Descriptors.Descriptor descriptor : type_descriptors) { + if (compatible.contains(descriptor)) { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + } else { + try { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + fail("Should fail: Proto schema type should not match BQ Boolean."); + } catch (IllegalArgumentException expected) { + assertEquals( + "The proto field " + + descriptor.getName() + + ".test_field_type does not have a matching type with the big query field t.test_field_type.", + expected.getMessage()); + } + } + } + verify(mockBigquery, times(16)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(16)).getDefinition(); + } + + @Test + public void testBQBytes() { + customizeSchema( + Schema.of( + Field.newBuilder("test_field_type", LegacySQLTypeName.BYTES) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + HashSet compatible = + new HashSet<>(Arrays.asList(BytesType.getDescriptor())); + + for (Descriptors.Descriptor descriptor : type_descriptors) { + if (compatible.contains(descriptor)) { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + } else { + try { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + fail("Should fail: Proto schema type should not match BQ Bytes."); + } catch (IllegalArgumentException expected) { + assertEquals( + "The proto field " + + descriptor.getName() + + ".test_field_type does not have a matching type with the big query field t.test_field_type.", + expected.getMessage()); + } + } + } + verify(mockBigquery, times(16)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(16)).getDefinition(); + } + + @Test + public void testBQDate() { + customizeSchema( + Schema.of( + Field.newBuilder("test_field_type", LegacySQLTypeName.DATE) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + HashSet compatible = + new HashSet<>( + Arrays.asList( + Int32Type.getDescriptor(), + Int64Type.getDescriptor(), + SFixed32Type.getDescriptor(), + SFixed64Type.getDescriptor())); + + for (Descriptors.Descriptor descriptor : type_descriptors) { + if (compatible.contains(descriptor)) { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + } else { + try { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + fail("Should fail: Proto schema type should not match BQ Date."); + } catch (IllegalArgumentException expected) { + assertEquals( + "The proto field " + + descriptor.getName() + + ".test_field_type does not have a matching type with the big query field t.test_field_type.", + expected.getMessage()); + } + } + } + verify(mockBigquery, times(16)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(16)).getDefinition(); + } + + @Test + public void testBQDatetime() { + customizeSchema( + Schema.of( + Field.newBuilder("test_field_type", LegacySQLTypeName.DATETIME) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + HashSet compatible = + new HashSet<>(Arrays.asList(Int64Type.getDescriptor(), SFixed64Type.getDescriptor())); + + for (Descriptors.Descriptor descriptor : type_descriptors) { + if (compatible.contains(descriptor)) { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + } else { + try { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + fail("Should fail: Proto schema type should not match BQ Datetime."); + } catch (IllegalArgumentException expected) { + assertEquals( + "The proto field " + + descriptor.getName() + + ".test_field_type does not have a matching type with the big query field t.test_field_type.", + expected.getMessage()); + } + } + } + verify(mockBigquery, times(16)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(16)).getDefinition(); + } + + @Test + public void testBQFloat() { + customizeSchema( + Schema.of( + Field.newBuilder("test_field_type", LegacySQLTypeName.FLOAT) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + HashSet compatible = + new HashSet<>(Arrays.asList(FloatType.getDescriptor(), DoubleType.getDescriptor())); + + for (Descriptors.Descriptor descriptor : type_descriptors) { + if (compatible.contains(descriptor)) { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + } else { + try { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + fail("Should fail: Proto schema type should not match BQ Float."); + } catch (IllegalArgumentException expected) { + assertEquals( + "The proto field " + + descriptor.getName() + + ".test_field_type does not have a matching type with the big query field t.test_field_type.", + expected.getMessage()); + } + } + } + verify(mockBigquery, times(16)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(16)).getDefinition(); + } + + @Test + public void testBQGeography() { + customizeSchema( + Schema.of( + Field.newBuilder("test_field_type", LegacySQLTypeName.GEOGRAPHY) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + HashSet compatible = + new HashSet<>(Arrays.asList(BytesType.getDescriptor())); + + for (Descriptors.Descriptor descriptor : type_descriptors) { + if (compatible.contains(descriptor)) { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + } else { + try { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + fail("Should fail: Proto schema type should not match BQ Geography."); + } catch (IllegalArgumentException expected) { + assertEquals( + "The proto field " + + descriptor.getName() + + ".test_field_type does not have a matching type with the big query field t.test_field_type.", + expected.getMessage()); + } + } + } + verify(mockBigquery, times(16)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(16)).getDefinition(); + } + + @Test + public void testBQInteger() { + customizeSchema( + Schema.of( + Field.newBuilder("test_field_type", LegacySQLTypeName.INTEGER) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + HashSet compatible = + new HashSet<>( + Arrays.asList( + Int32Type.getDescriptor(), + Int64Type.getDescriptor(), + UInt32Type.getDescriptor(), + Fixed32Type.getDescriptor(), + SFixed32Type.getDescriptor(), + SFixed64Type.getDescriptor(), + EnumType.getDescriptor())); + + for (Descriptors.Descriptor descriptor : type_descriptors) { + if (compatible.contains(descriptor)) { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + } else { + try { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + fail("Should fail: Proto schema type should not match BQ Integer."); + } catch (IllegalArgumentException expected) { + assertEquals( + "The proto field " + + descriptor.getName() + + ".test_field_type does not have a matching type with the big query field t.test_field_type.", + expected.getMessage()); + } + } + } + verify(mockBigquery, times(16)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(16)).getDefinition(); + } + + @Test + public void testBQNumeric() { + customizeSchema( + Schema.of( + Field.newBuilder("test_field_type", LegacySQLTypeName.NUMERIC) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + HashSet compatible = + new HashSet<>( + Arrays.asList( + Int32Type.getDescriptor(), + Int64Type.getDescriptor(), + UInt32Type.getDescriptor(), + UInt64Type.getDescriptor(), + Fixed32Type.getDescriptor(), + Fixed64Type.getDescriptor(), + SFixed32Type.getDescriptor(), + SFixed64Type.getDescriptor(), + BytesType.getDescriptor())); + + for (Descriptors.Descriptor descriptor : type_descriptors) { + if (compatible.contains(descriptor)) { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + } else { + try { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + fail("Should fail: Proto schema type should not match BQ Numeric."); + } catch (IllegalArgumentException expected) { + assertEquals( + "The proto field " + + descriptor.getName() + + ".test_field_type does not have a matching type with the big query field t.test_field_type.", + expected.getMessage()); + } + } + } + verify(mockBigquery, times(16)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(16)).getDefinition(); + } + + @Test + public void testBQRecord() { + Field nestedMessage = + Field.newBuilder("test_field_type", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build(); + customizeSchema( + Schema.of( + Field.newBuilder("test_field_type", LegacySQLTypeName.RECORD, nestedMessage) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + HashSet compatible = + new HashSet<>(Arrays.asList(MessageType.getDescriptor(), GroupType.getDescriptor())); + + for (Descriptors.Descriptor descriptor : type_descriptors) { + if (compatible.contains(descriptor)) { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + } else { + try { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + fail("Should fail: Proto schema type should not match BQ String."); + } catch (IllegalArgumentException expected) { + assertEquals( + "The proto field " + + descriptor.getName() + + ".test_field_type does not have a matching type with the big query field t.test_field_type.", + expected.getMessage()); + } + } + } + verify(mockBigquery, times(16)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(16)).getDefinition(); + } + + @Test + public void testBQRecordMismatch() { + Field nestedMessage1 = + Field.newBuilder("test_field_type", LegacySQLTypeName.INTEGER) + .setMode(Field.Mode.NULLABLE) + .build(); + Field nestedMessage0 = + Field.newBuilder("mismatchlvl1", LegacySQLTypeName.RECORD, nestedMessage1) + .setMode(Field.Mode.NULLABLE) + .build(); + customizeSchema( + Schema.of( + Field.newBuilder("mismatchlvl0", LegacySQLTypeName.RECORD, nestedMessage0) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + try { + compact.check("projects/p/datasets/d/tables/t", MessageTypeMismatch.getDescriptor(), false); + fail("Should fail: Proto schema type should not match BQ String."); + } catch (IllegalArgumentException expected) { + assertEquals( + "The proto field " + + MessageTypeMismatch.getDescriptor().getName() + + ".mismatchlvl0.mismatchlvl1.test_field_type does not have a matching type with the big query field t.mismatchlvl0.mismatchlvl1.test_field_type.", + expected.getMessage()); + } + verify(mockBigquery, times(1)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(1)).getDefinition(); + } + + @Test + public void testBQRecordMatch() { + Field nestedMessage1 = + Field.newBuilder("test_field_type", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build(); + Field nestedMessage0 = + Field.newBuilder("mismatchlvl1", LegacySQLTypeName.RECORD, nestedMessage1) + .setMode(Field.Mode.NULLABLE) + .build(); + customizeSchema( + Schema.of( + Field.newBuilder("mismatchlvl0", LegacySQLTypeName.RECORD, nestedMessage0) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + compact.check("projects/p/datasets/d/tables/t", MessageTypeMismatch.getDescriptor(), false); + verify(mockBigquery, times(1)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(1)).getDefinition(); + } + + @Test + public void testBQString() { + customizeSchema( + Schema.of( + Field.newBuilder("test_field_type", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + HashSet compatible = + new HashSet<>(Arrays.asList(StringType.getDescriptor(), EnumType.getDescriptor())); + + for (Descriptors.Descriptor descriptor : type_descriptors) { + if (compatible.contains(descriptor)) { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + } else { + try { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + fail("Should fail: Proto schema type should not match BQ String."); + } catch (IllegalArgumentException expected) { + assertEquals( + "The proto field " + + descriptor.getName() + + ".test_field_type does not have a matching type with the big query field t.test_field_type.", + expected.getMessage()); + } + } + } + verify(mockBigquery, times(16)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(16)).getDefinition(); + } + + @Test + public void testBQTime() { + customizeSchema( + Schema.of( + Field.newBuilder("test_field_type", LegacySQLTypeName.TIME) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + HashSet compatible = + new HashSet<>(Arrays.asList(Int64Type.getDescriptor(), SFixed64Type.getDescriptor())); + + for (Descriptors.Descriptor descriptor : type_descriptors) { + if (compatible.contains(descriptor)) { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + } else { + try { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + fail("Should fail: Proto schema type should not match BQ Time."); + } catch (IllegalArgumentException expected) { + assertEquals( + "The proto field " + + descriptor.getName() + + ".test_field_type does not have a matching type with the big query field t.test_field_type.", + expected.getMessage()); + } + } + } + verify(mockBigquery, times(16)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(16)).getDefinition(); + } + + @Test + public void testBQTimestamp() { + customizeSchema( + Schema.of( + Field.newBuilder("test_field_type", LegacySQLTypeName.TIMESTAMP) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + HashSet compatible = + new HashSet<>( + Arrays.asList( + Int32Type.getDescriptor(), + Int64Type.getDescriptor(), + UInt32Type.getDescriptor(), + Fixed32Type.getDescriptor(), + SFixed32Type.getDescriptor(), + SFixed64Type.getDescriptor(), + EnumType.getDescriptor())); + + for (Descriptors.Descriptor descriptor : type_descriptors) { + if (compatible.contains(descriptor)) { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + } else { + try { + compact.check("projects/p/datasets/d/tables/t", descriptor, false); + fail("Should fail: Proto schema type should not match BQ Timestamp."); + } catch (IllegalArgumentException expected) { + assertEquals( + "The proto field " + + descriptor.getName() + + ".test_field_type does not have a matching type with the big query field t.test_field_type.", + expected.getMessage()); + } + } + } + verify(mockBigquery, times(16)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(16)).getDefinition(); + } + + /* + * Tests if having no matching fields in the top level causes an error. + */ + @Test + public void testBQTopLevelMismatch() { + customizeSchema( + Schema.of( + Field.newBuilder("test_toplevel_mismatch", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + try { + compact.check("projects/p/datasets/d/tables/t", StringType.getDescriptor(), false); + } catch (IllegalArgumentException expected) { + assertEquals( + "There is no matching fields found for the proto schema " + + StringType.getDescriptor().getName() + + " and the BQ table schema t.", + expected.getMessage()); + } + verify(mockBigquery, times(1)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(1)).getDefinition(); + } + + /* + * Tests if there is at least 1 matching field in the top level. + */ + @Test + public void testBQTopLevelMatch() { + Field nestedMessage0 = + Field.newBuilder("mismatch", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build(); + customizeSchema( + Schema.of( + Field.newBuilder("mismatch", LegacySQLTypeName.RECORD, nestedMessage0) + .setMode(Field.Mode.NULLABLE) + .build(), + Field.newBuilder("match", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + compact.check("projects/p/datasets/d/tables/t", TopLevelMatch.getDescriptor(), false); + verify(mockBigquery, times(1)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(1)).getDefinition(); + } + + @Test + public void testAllowUnknownUnsupportedFields() { + customizeSchema( + Schema.of( + Field.newBuilder("string_value", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + compact.check( + "projects/p/datasets/d/tables/t", AllowUnknownUnsupportedFields.getDescriptor(), true); + verify(mockBigquery, times(1)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(1)).getDefinition(); + } + + @Test + public void testLowerCase() { + customizeSchema( + Schema.of( + Field.newBuilder("tEsT_fIeLd_TyPe", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) + .build())); + SchemaCompact compact = SchemaCompact.getInstance(mockBigquery); + compact.check("projects/p/datasets/d/tables/t", StringType.getDescriptor(), true); + verify(mockBigquery, times(1)).getTable(any(TableId.class)); + verify(mockBigqueryTable, times(1)).getDefinition(); } } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java index c75d6859e6..c64d55f53f 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java @@ -25,6 +25,7 @@ import com.google.cloud.ServiceOptions; import com.google.cloud.bigquery.*; import com.google.cloud.bigquery.Schema; +import com.google.cloud.bigquery.storage.test.SchemaTest.FakeFooType; import com.google.cloud.bigquery.storage.test.Test.*; import com.google.cloud.bigquery.storage.v1alpha2.*; import com.google.cloud.bigquery.storage.v1alpha2.Storage.*; @@ -74,6 +75,7 @@ public static void beforeClass() throws IOException { StandardTableDefinition.of( Schema.of( com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING) + .setMode(Field.Mode.NULLABLE) .build()))) .build(); com.google.cloud.bigquery.Field.Builder innerTypeFieldBuilder = @@ -400,6 +402,47 @@ public Long call() throws IOException, InterruptedException, ExecutionException DirectWriter.clearCache(); } + @Test + public void testDirectWriteFail() throws IOException, InterruptedException, ExecutionException { + final FakeFooType fa = FakeFooType.newBuilder().setFoo(100).build(); + Set expectedOffset = new HashSet<>(); + for (int i = 0; i < 10; i++) { + expectedOffset.add(Long.valueOf(i)); + } + ExecutorService executor = Executors.newFixedThreadPool(10); + List> responses = new ArrayList<>(); + Callable callable = + new Callable() { + @Override + public Long call() + throws IOException, InterruptedException, ExecutionException, + IllegalArgumentException { + ApiFuture result = DirectWriter.append(tableId, Arrays.asList(fa)); + return result.get(); + } + }; + + for (int i = 0; i < 10; i++) { + responses.add(executor.submit(callable)); + } + for (Future response : responses) { + try { + response.get(); + } catch (ExecutionException e) { + assertEquals( + "The proto field FakeFooType.foo does not have a matching type with the big query field testtable.foo.", + e.getCause().getMessage()); + } + } + executor.shutdown(); + try { + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + LOG.info(e.toString()); + } + DirectWriter.clearCache(); + } + @Test public void testFlushRows() throws IOException, InterruptedException, ExecutionException { String tableName = "BufferTable"; diff --git a/google-cloud-bigquerystorage/src/test/proto/schemaTest.proto b/google-cloud-bigquerystorage/src/test/proto/schemaTest.proto new file mode 100644 index 0000000000..3ec935d6a6 --- /dev/null +++ b/google-cloud-bigquerystorage/src/test/proto/schemaTest.proto @@ -0,0 +1,259 @@ +syntax = "proto2"; + +package com.google.cloud.bigquery.storage.test; + +message SupportedTypes { + optional int32 int32_value = 1; + optional int64 int64_value = 2; + optional uint32 uint32_value = 3; + optional uint64 uint64_value = 4; + optional fixed32 fixed32_value = 7; + optional fixed64 fixed64_value = 8; + optional sfixed32 sfixed32_value = 9; + optional sfixed64 sfixed64_value = 10; + optional float float_value = 11; + optional double double_value = 12; + optional bool bool_value = 13; + optional bytes bytes_value = 14; + optional string string_value = 15; +} + +message NonSupportedTypes { + optional sint32 sint32_value = 5; + optional sint64 sint64_value = 6; +} + +message Int32Type { + optional int32 test_field_type = 1; +} + +message Int64Type { + optional int64 test_field_type = 1; +} + +message UInt32Type { + optional uint32 test_field_type = 1; +} + +message UInt64Type { + optional uint64 test_field_type = 1; +} + +message Fixed32Type { + optional fixed32 test_field_type = 1; +} + +message Fixed64Type { + optional fixed64 test_field_type = 1; +} + +message SFixed32Type { + optional sfixed32 test_field_type = 1; +} + +message SFixed64Type { + optional sfixed64 test_field_type = 1; +} + +message FloatType { + optional float test_field_type = 1; +} + +message DoubleType { + optional double test_field_type = 1; +} + +message BoolType { + optional bool test_field_type = 1; +} + +message BytesType { + optional bytes test_field_type = 1; +} + +message StringType { + optional string test_field_type = 1; +} + +message EnumType { + enum EnumTest { + test1 = 0; + } + optional EnumTest test_field_type = 1; +} + +message MessageType { + optional StringType test_field_type = 1; +} + +message GroupType { + optional group Test_field_type = 1 { + optional string test_field_type = 2; + } +} + +message MessageTypeMismatch { + optional MismatchLvl0 mismatchlvl0 = 1; +} + +message MismatchLvl0 { + optional MismatchLvl1 mismatchlvl1 = 1; +} + +message MismatchLvl1 { + optional string test_field_type = 1; +} + +message TopLevelMatch { + optional string match = 1; + optional MismatchLvl1 mismatch = 2; +} + +message ProtoRepeatedBQRepeated { + repeated int32 repeated_mode = 1; +} + +message ProtoOptionalBQRepeated { + optional int32 repeated_mode = 1; +} + +message ProtoRequiredBQRepeated { + required int32 repeated_mode = 1; +} + +message ProtoRequiredBQRequired { + required int32 required_mode = 1; +} + +message ProtoNoneBQRequired { + optional int32 no_required_mode = 1; +} + +message ProtoOptionalBQRequired { + optional int32 required_mode = 1; +} + +message ProtoRepeatedBQRequired { + repeated int32 required_mode = 1; +} + +message ProtoOptionalBQOptional { + optional int32 optional_mode = 1; +} + +message ProtoRequiredBQOptional{ + required int32 optional_mode = 1; +} + +message ProtoRepeatedBQOptional { + repeated int32 optional_mode = 1; +} + +message ProtoCompatibleWithBQInt { + optional int32 optional_mode = 1; +} + +message SupportedNestingLvl1 { + optional int32 int_value = 1; + optional SupportedNestingLvl2 nesting_value = 2; +} + +message SupportedNestingLvl2 { + optional int32 int_value = 1; +} + +message SupportedNestingStacked { + optional int32 test_int = 1; + optional SupportedNestingLvl2 nesting_value1 = 2; + optional SupportedNestingLvl2 nesting_value2 = 3; +} + +message NonSupportedMap { + map map_value = 1; +} + +message NonSupportedNestingRecursive { + optional NonSupportedNestingRecursive nesting_value = 2; +} + +message NonSupportedNestingContainsRecursive { + optional int32 int_value = 1; + optional NonSupportedNestingRecursive nesting_value = 2; +} + +message NonSupportedNestingLvl0 { + optional NonSupportedNestingLvl1 test1 = 1; +} + +message NonSupportedNestingLvl1 { + optional NonSupportedNestingLvl2 test1 = 1; +} + +message NonSupportedNestingLvl2 { + optional NonSupportedNestingLvl3 test1 = 1; +} + +message NonSupportedNestingLvl3 { + optional NonSupportedNestingLvl4 test1 = 1; +} + +message NonSupportedNestingLvl4 { + optional NonSupportedNestingLvl5 test1 = 1; +} + +message NonSupportedNestingLvl5 { + optional NonSupportedNestingLvl6 test1 = 1; +} + +message NonSupportedNestingLvl6 { + optional NonSupportedNestingLvl7 test1 = 1; +} + +message NonSupportedNestingLvl7 { + optional NonSupportedNestingLvl8 test1 = 1; +} + +message NonSupportedNestingLvl8 { + optional NonSupportedNestingLvl9 test1 = 1; +} + +message NonSupportedNestingLvl9 { + optional NonSupportedNestingLvl10 test1 = 1; +} + +message NonSupportedNestingLvl10 { + optional NonSupportedNestingLvl11 test1 = 1; +} + +message NonSupportedNestingLvl11 { + optional NonSupportedNestingLvl12 test1 = 1; +} + +message NonSupportedNestingLvl12 { + optional NonSupportedNestingLvl13 test1 = 1; +} + +message NonSupportedNestingLvl13 { + optional NonSupportedNestingLvl14 test1 = 1; +} + +message NonSupportedNestingLvl14 { + optional NonSupportedNestingLvl15 test1 = 1; +} + +message NonSupportedNestingLvl15 { + optional NonSupportedNestingLvl16 test1 = 1; +} + +message NonSupportedNestingLvl16 { + optional int32 test1 = 1; +} + +message AllowUnknownUnsupportedFields { + optional NonSupportedMap map_value = 1; + optional string string_value = 2; +} + +message FakeFooType { + optional int32 foo = 1; +}