diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompact.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompact.java
index 3f17f44951..00c370c800 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompact.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompact.java
@@ -16,19 +16,27 @@
package com.google.cloud.bigquery.storage.v1alpha2;
import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
- * A class that checks the schema compatibility between user schema in proto descriptor and Bigquery
- * table schema. If this check is passed, then user can write to BigQuery table using the user
- * schema, otherwise the write will fail.
+ * A class that checks the schema compatibility between Proto schema in proto descriptor and
+ * Bigquery table schema. If this check is passed, then user can write to BigQuery table using the
+ * user schema, otherwise the write will fail.
*
*
The implementation as of now is not complete, which measn, if this check passed, there is
* still a possbility of writing will fail.
@@ -38,8 +46,32 @@ public class SchemaCompact {
private static SchemaCompact compact;
private static String tablePatternString = "projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)";
private static Pattern tablePattern = Pattern.compile(tablePatternString);
+ private static final int NestingLimit = 15;
+ // private static Set SupportedTypesHashSet =
+
+ private static Set SupportedTypes =
+ Collections.unmodifiableSet(
+ new HashSet<>(
+ Arrays.asList(
+ Descriptors.FieldDescriptor.Type.INT32,
+ Descriptors.FieldDescriptor.Type.INT64,
+ Descriptors.FieldDescriptor.Type.UINT32,
+ Descriptors.FieldDescriptor.Type.UINT64,
+ Descriptors.FieldDescriptor.Type.FIXED32,
+ Descriptors.FieldDescriptor.Type.FIXED64,
+ Descriptors.FieldDescriptor.Type.SFIXED32,
+ Descriptors.FieldDescriptor.Type.SFIXED64,
+ Descriptors.FieldDescriptor.Type.FLOAT,
+ Descriptors.FieldDescriptor.Type.DOUBLE,
+ Descriptors.FieldDescriptor.Type.BOOL,
+ Descriptors.FieldDescriptor.Type.BYTES,
+ Descriptors.FieldDescriptor.Type.STRING,
+ Descriptors.FieldDescriptor.Type.MESSAGE,
+ Descriptors.FieldDescriptor.Type.GROUP,
+ Descriptors.FieldDescriptor.Type.ENUM)));
private SchemaCompact(BigQuery bigquery) {
+ // TODO: Add functionality that allows SchemaCompact to build schemas.
this.bigquery = bigquery;
}
@@ -76,24 +108,430 @@ private TableId getTableId(String tableName) {
}
/**
- * Checks if the userSchema is compatible with the table's current schema for writing. The current
- * implementatoin is not complete. If the check failed, the write couldn't succeed.
+ * @param field
+ * @return True if fieldtype is supported by BQ Schema
+ */
+ public static boolean isSupportedType(Descriptors.FieldDescriptor field) {
+ Descriptors.FieldDescriptor.Type fieldType = field.getType();
+ if (!SupportedTypes.contains(fieldType)) {
+ return false;
+ }
+ return true;
+ }
+
+ private static boolean isCompatibleWithBQBool(Descriptors.FieldDescriptor.Type field) {
+ if (field == Descriptors.FieldDescriptor.Type.BOOL
+ || field == Descriptors.FieldDescriptor.Type.INT32
+ || field == Descriptors.FieldDescriptor.Type.INT64
+ || field == Descriptors.FieldDescriptor.Type.UINT32
+ || field == Descriptors.FieldDescriptor.Type.UINT64
+ || field == Descriptors.FieldDescriptor.Type.FIXED32
+ || field == Descriptors.FieldDescriptor.Type.FIXED64
+ || field == Descriptors.FieldDescriptor.Type.SFIXED32
+ || field == Descriptors.FieldDescriptor.Type.SFIXED64) {
+ return true;
+ }
+ return false;
+ }
+
+ private static boolean isCompatibleWithBQBytes(Descriptors.FieldDescriptor.Type field) {
+ if (field == Descriptors.FieldDescriptor.Type.BYTES) {
+ return true;
+ }
+ return false;
+ }
+
+ private static boolean isCompatibleWithBQDate(Descriptors.FieldDescriptor.Type field) {
+ if (field == Descriptors.FieldDescriptor.Type.INT32
+ || field == Descriptors.FieldDescriptor.Type.INT64
+ || field == Descriptors.FieldDescriptor.Type.SFIXED32
+ || field == Descriptors.FieldDescriptor.Type.SFIXED64) {
+
+ return true;
+ }
+ return false;
+ }
+
+ private static boolean isCompatibleWithBQDatetime(Descriptors.FieldDescriptor.Type field) {
+ if (field == Descriptors.FieldDescriptor.Type.INT64
+ || field == Descriptors.FieldDescriptor.Type.SFIXED64) {
+ return true;
+ }
+ return false;
+ }
+
+ private static boolean isCompatibleWithBQFloat(Descriptors.FieldDescriptor.Type field) {
+ if (field == Descriptors.FieldDescriptor.Type.FLOAT) {
+ return true;
+ }
+ if (field == Descriptors.FieldDescriptor.Type.DOUBLE) {
+ return true;
+ }
+ return false;
+ }
+
+ private static boolean isCompatibleWithBQGeography(Descriptors.FieldDescriptor.Type field) {
+ if (field == Descriptors.FieldDescriptor.Type.BYTES) {
+ return true;
+ }
+ return false;
+ }
+
+ private static boolean isCompatibleWithBQInteger(Descriptors.FieldDescriptor.Type field) {
+ if (field == Descriptors.FieldDescriptor.Type.INT64
+ || field == Descriptors.FieldDescriptor.Type.SFIXED64
+ || field == Descriptors.FieldDescriptor.Type.INT32
+ || field == Descriptors.FieldDescriptor.Type.UINT32
+ || field == Descriptors.FieldDescriptor.Type.FIXED32
+ || field == Descriptors.FieldDescriptor.Type.SFIXED32
+ || field == Descriptors.FieldDescriptor.Type.ENUM) {
+ return true;
+ }
+ return false;
+ }
+
+ private static boolean isCompatibleWithBQNumeric(Descriptors.FieldDescriptor.Type field) {
+ if (field == Descriptors.FieldDescriptor.Type.INT32
+ || field == Descriptors.FieldDescriptor.Type.INT64
+ || field == Descriptors.FieldDescriptor.Type.UINT32
+ || field == Descriptors.FieldDescriptor.Type.UINT64
+ || field == Descriptors.FieldDescriptor.Type.FIXED32
+ || field == Descriptors.FieldDescriptor.Type.FIXED64
+ || field == Descriptors.FieldDescriptor.Type.SFIXED32
+ || field == Descriptors.FieldDescriptor.Type.SFIXED64) {
+ return true;
+ }
+
+ if (field == Descriptors.FieldDescriptor.Type.BYTES) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private static boolean isCompatibleWithBQRecord(Descriptors.FieldDescriptor.Type field) {
+ if (field == Descriptors.FieldDescriptor.Type.MESSAGE
+ || field == Descriptors.FieldDescriptor.Type.GROUP) {
+ return true;
+ }
+ return false;
+ }
+
+ private static boolean isCompatibleWithBQString(Descriptors.FieldDescriptor.Type field) {
+ if (field == Descriptors.FieldDescriptor.Type.STRING
+ || field == Descriptors.FieldDescriptor.Type.ENUM) {
+ return true;
+ }
+ return false;
+ }
+
+ private static boolean isCompatibleWithBQTime(Descriptors.FieldDescriptor.Type field) {
+ if (field == Descriptors.FieldDescriptor.Type.INT64
+ || field == Descriptors.FieldDescriptor.Type.SFIXED64) {
+
+ return true;
+ }
+ return false;
+ }
+
+ private static boolean isCompatibleWithBQTimestamp(Descriptors.FieldDescriptor.Type field) {
+ if (isCompatibleWithBQInteger(field)) {
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Checks if proto field option is compatible with BQ field mode.
+ *
+ * @param protoField
+ * @param BQField
+ * @param protoScope Debugging purposes to show error if messages are nested.
+ * @param BQScope Debugging purposes to show error if messages are nested.
+ * @throws IllegalArgumentException if proto field type is incompatible with BQ field type.
+ */
+ private void protoFieldModeIsCompatibleWithBQFieldMode(
+ Descriptors.FieldDescriptor protoField, Field BQField, String protoScope, String BQScope)
+ throws IllegalArgumentException {
+ if (BQField.getMode() == null) {
+ throw new IllegalArgumentException(
+ "Big query schema contains invalid field option for " + BQScope + ".");
+ }
+ switch (BQField.getMode()) {
+ case REPEATED:
+ if (!protoField.isRepeated()) {
+ throw new IllegalArgumentException(
+ "Given proto field "
+ + protoScope
+ + " is not repeated but Big Query field "
+ + BQScope
+ + " is.");
+ }
+ break;
+ case REQUIRED:
+ if (!protoField.isRequired()) {
+ throw new IllegalArgumentException(
+ "Given proto field "
+ + protoScope
+ + " is not required but Big Query field "
+ + BQScope
+ + " is.");
+ }
+ break;
+ case NULLABLE:
+ if (protoField.isRepeated()) {
+ throw new IllegalArgumentException(
+ "Given proto field "
+ + protoScope
+ + " is repeated but Big Query field "
+ + BQScope
+ + " is optional.");
+ }
+ break;
+ }
+ }
+ /**
+ * Checks if proto field type is compatible with BQ field type.
*
- * @param tableName The name of the table to write to.
- * @param userSchema The schema user uses to append data.
- * @throws IllegalArgumentException the check failed.
+ * @param protoField
+ * @param BQField
+ * @param allowUnknownFields
+ * @param protoScope Debugging purposes to show error if messages are nested.
+ * @param BQScope Debugging purposes to show error if messages are nested.
+ * @param allMessageTypes Keeps track of all current protos to avoid recursively nested protos.
+ * @param rootProtoName Debugging purposes for nested level > 15.
+ * @throws IllegalArgumentException if proto field type is incompatible with BQ field type.
*/
- public void check(String tableName, Descriptors.Descriptor userSchema)
+ private void protoFieldTypeIsCompatibleWithBQFieldType(
+ Descriptors.FieldDescriptor protoField,
+ Field BQField,
+ boolean allowUnknownFields,
+ String protoScope,
+ String BQScope,
+ HashSet allMessageTypes,
+ String rootProtoName)
throws IllegalArgumentException {
- Table table = bigquery.getTable(getTableId(tableName));
- Schema schema = table.getDefinition().getSchema();
- // TODO: We only have very limited check here. More checks to be added.
- if (schema.getFields().size() != userSchema.getFields().size()) {
+
+ LegacySQLTypeName BQType = BQField.getType();
+ Descriptors.FieldDescriptor.Type protoType = protoField.getType();
+ boolean match = false;
+ switch (BQType.toString()) {
+ case "BOOLEAN":
+ match = isCompatibleWithBQBool(protoType);
+ break;
+ case "BYTES":
+ match = isCompatibleWithBQBytes(protoType);
+ break;
+ case "DATE":
+ match = isCompatibleWithBQDate(protoType);
+ break;
+ case "DATETIME":
+ match = isCompatibleWithBQDatetime(protoType);
+ break;
+ case "FLOAT":
+ match = isCompatibleWithBQFloat(protoType);
+ break;
+ case "GEOGRAPHY":
+ match = isCompatibleWithBQGeography(protoType);
+ break;
+ case "INTEGER":
+ match = isCompatibleWithBQInteger(protoType);
+ break;
+ case "NUMERIC":
+ match = isCompatibleWithBQNumeric(protoType);
+ break;
+ case "RECORD":
+ if (allMessageTypes.size() > NestingLimit) {
+ throw new IllegalArgumentException(
+ "Proto schema "
+ + rootProtoName
+ + " is not supported: contains nested messages of more than 15 levels.");
+ }
+ match = isCompatibleWithBQRecord(protoType);
+ if (!match) {
+ break;
+ }
+ Descriptors.Descriptor message = protoField.getMessageType();
+ if (allMessageTypes.contains(message)) {
+ throw new IllegalArgumentException(
+ "Proto schema " + protoScope + " is not supported: is a recursively nested message.");
+ }
+ allMessageTypes.add(message);
+ isProtoCompatibleWithBQ(
+ protoField.getMessageType(),
+ Schema.of(BQField.getSubFields()),
+ allowUnknownFields,
+ protoScope,
+ BQScope,
+ false,
+ allMessageTypes,
+ rootProtoName);
+ allMessageTypes.remove(message);
+ break;
+ case "STRING":
+ match = isCompatibleWithBQString(protoType);
+ break;
+ case "TIME":
+ match = isCompatibleWithBQTime(protoType);
+ break;
+ case "TIMESTAMP":
+ match = isCompatibleWithBQTimestamp(protoType);
+ break;
+ }
+ if (!match) {
throw new IllegalArgumentException(
- "User schema doesn't have expected field number with BigQuery table schema, expected: "
- + schema.getFields().size()
- + " actual: "
- + userSchema.getFields().size());
+ "The proto field "
+ + protoScope
+ + " does not have a matching type with the big query field "
+ + BQScope
+ + ".");
}
}
+
+ /**
+ * Checks if proto schema is compatible with BQ schema.
+ *
+ * @param protoSchema
+ * @param BQSchema
+ * @param allowUnknownFields
+ * @param protoScope Debugging purposes to show error if messages are nested.
+ * @param BQScope Debugging purposes to show error if messages are nested.
+ * @param topLevel True if this is the root level of proto (in terms of nested messages)
+ * @param allMessageTypes Keeps track of all current protos to avoid recursively nested protos.
+ * @param rootProtoName Debugging purposes for nested level > 15.
+ * @throws IllegalArgumentException if proto field type is incompatible with BQ field type.
+ */
+ private void isProtoCompatibleWithBQ(
+ Descriptors.Descriptor protoSchema,
+ Schema BQSchema,
+ boolean allowUnknownFields,
+ String protoScope,
+ String BQScope,
+ boolean topLevel,
+ HashSet allMessageTypes,
+ String rootProtoName)
+ throws IllegalArgumentException {
+
+ int matchedFields = 0;
+ HashMap protoFieldMap = new HashMap<>();
+ List protoFields = protoSchema.getFields();
+ List BQFields = BQSchema.getFields();
+
+ if (protoFields.size() > BQFields.size()) {
+ if (!allowUnknownFields) {
+ throw new IllegalArgumentException(
+ "Proto schema "
+ + protoScope
+ + " has "
+ + protoFields.size()
+ + " fields, while BQ schema "
+ + BQScope
+ + " has "
+ + BQFields.size()
+ + " fields.");
+ }
+ }
+ // Use hashmap to map from lowercased name to appropriate field to account for casing difference
+ for (Descriptors.FieldDescriptor field : protoFields) {
+ protoFieldMap.put(field.getName().toLowerCase(), field);
+ }
+
+ for (Field BQField : BQFields) {
+ String fieldName = BQField.getName().toLowerCase();
+ Descriptors.FieldDescriptor protoField = null;
+ if (protoFieldMap.containsKey(fieldName)) {
+ protoField = protoFieldMap.get(fieldName);
+ }
+
+ String currentBQScope = BQScope + "." + BQField.getName();
+ if (protoField == null && BQField.getMode() == Field.Mode.REQUIRED) {
+ throw new IllegalArgumentException(
+ "The required Big Query field "
+ + currentBQScope
+ + " is missing in the proto schema "
+ + protoScope
+ + ".");
+ }
+ if (protoField == null) {
+ continue;
+ }
+ String currentProtoScope = protoScope + "." + protoField.getName();
+ if (!isSupportedType(protoField)) {
+ throw new IllegalArgumentException(
+ "Proto schema "
+ + currentProtoScope
+ + " is not supported: contains "
+ + protoField.getType()
+ + " field type.");
+ }
+ if (protoField.isMapField()) {
+ throw new IllegalArgumentException(
+ "Proto schema " + currentProtoScope + " is not supported: is a map field.");
+ }
+ protoFieldModeIsCompatibleWithBQFieldMode(
+ protoField, BQField, currentProtoScope, currentBQScope);
+ protoFieldTypeIsCompatibleWithBQFieldType(
+ protoField,
+ BQField,
+ allowUnknownFields,
+ currentProtoScope,
+ currentBQScope,
+ allMessageTypes,
+ rootProtoName);
+ matchedFields++;
+ }
+
+ if (matchedFields == 0 && topLevel) {
+ throw new IllegalArgumentException(
+ "There is no matching fields found for the proto schema "
+ + protoScope
+ + " and the BQ table schema "
+ + BQScope
+ + ".");
+ }
+ }
+
+ /**
+ * Checks if proto schema is compatible with BQ schema after retrieving BQ schema by BQTableName.
+ *
+ * @param BQTableName Must include project_id, dataset_id, and table_id in the form that matches
+ * the regex "projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)"
+ * @param protoSchema
+ * @param allowUnknownFields Flag indicating proto can have unknown fields.
+ * @throws IllegalArgumentException if proto field type is incompatible with BQ field type.
+ */
+ public void check(
+ String BQTableName, Descriptors.Descriptor protoSchema, boolean allowUnknownFields)
+ throws IllegalArgumentException {
+ TableId tableId = getTableId(BQTableName);
+ Table table = bigquery.getTable(tableId);
+ Schema BQSchema = table.getDefinition().getSchema();
+ String protoSchemaName = protoSchema.getName();
+ HashSet allMessageTypes = new HashSet<>();
+ allMessageTypes.add(protoSchema);
+ isProtoCompatibleWithBQ(
+ protoSchema,
+ BQSchema,
+ allowUnknownFields,
+ protoSchemaName,
+ tableId.getTable(),
+ true,
+ allMessageTypes,
+ protoSchemaName);
+ }
+
+ /**
+ * Checks if proto schema is compatible with BQ schema after retrieving BQ schema by BQTableName.
+ * Assumes allowUnknownFields is false.
+ *
+ * @param BQTableName Must include project_id, dataset_id, and table_id in the form that matches
+ * the regex "projects/([^/]+)/datasets/([^/]+)/tables/([^/]+)"
+ * @param protoSchema
+ * @throws IllegalArgumentException if proto field type is incompatible with BQ field type.
+ */
+ public void check(String BQTableName, Descriptors.Descriptor protoSchema)
+ throws IllegalArgumentException {
+
+ check(BQTableName, protoSchema, false);
+ }
}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompactTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompactTest.java
index 205d814968..259bc59da0 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompactTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/SchemaCompactTest.java
@@ -23,8 +23,12 @@
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
+import com.google.cloud.bigquery.storage.test.SchemaTest.*;
import com.google.cloud.bigquery.storage.test.Test.FooType;
+import com.google.protobuf.Descriptors;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
import javax.annotation.Nullable;
import org.junit.After;
import org.junit.Before;
@@ -38,6 +42,24 @@
public class SchemaCompactTest {
@Mock private BigQuery mockBigquery;
@Mock private Table mockBigqueryTable;
+ Descriptors.Descriptor[] type_descriptors = {
+ Int32Type.getDescriptor(),
+ Int64Type.getDescriptor(),
+ UInt32Type.getDescriptor(),
+ UInt64Type.getDescriptor(),
+ Fixed32Type.getDescriptor(),
+ Fixed64Type.getDescriptor(),
+ SFixed32Type.getDescriptor(),
+ SFixed64Type.getDescriptor(),
+ FloatType.getDescriptor(),
+ DoubleType.getDescriptor(),
+ BoolType.getDescriptor(),
+ BytesType.getDescriptor(),
+ StringType.getDescriptor(),
+ EnumType.getDescriptor(),
+ MessageType.getDescriptor(),
+ GroupType.getDescriptor()
+ };
@Before
public void setUp() throws IOException {
@@ -51,8 +73,7 @@ public void tearDown() {
verifyNoMoreInteractions(mockBigqueryTable);
}
- @Test
- public void testSuccess() throws Exception {
+ public void customizeSchema(final Schema schema) {
TableDefinition definition =
new TableDefinition() {
@Override
@@ -63,7 +84,7 @@ public Type getType() {
@Nullable
@Override
public Schema getSchema() {
- return Schema.of(Field.of("Foo", LegacySQLTypeName.STRING));
+ return schema;
}
@Override
@@ -72,42 +93,60 @@ public Builder toBuilder() {
}
};
when(mockBigqueryTable.getDefinition()).thenReturn(definition);
+ }
+
+ @Test
+ public void testSuccess() throws Exception {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("Foo", LegacySQLTypeName.STRING)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
- compact.check("projects/p/datasets/d/tables/t", FooType.getDescriptor());
+ compact.check("projects/p/datasets/d/tables/t", FooType.getDescriptor(), false);
verify(mockBigquery, times(1)).getTable(any(TableId.class));
verify(mockBigqueryTable, times(1)).getDefinition();
}
@Test
- public void testFailed() throws Exception {
- TableDefinition definition =
- new TableDefinition() {
- @Override
- public Type getType() {
- return null;
- }
+ public void testBadTableName() throws Exception {
+ try {
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ compact.check("blah", FooType.getDescriptor(), false);
+ fail("should fail");
+ } catch (IllegalArgumentException expected) {
+ assertEquals("Invalid table name: blah", expected.getMessage());
+ }
+ }
- @Nullable
- @Override
- public Schema getSchema() {
- return Schema.of(
- Field.of("Foo", LegacySQLTypeName.STRING),
- Field.of("Bar", LegacySQLTypeName.STRING));
- }
+ @Test
+ public void testSupportedTypes() {
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ for (Descriptors.FieldDescriptor field : SupportedTypes.getDescriptor().getFields()) {
+ assertTrue(compact.isSupportedType(field));
+ }
- @Override
- public Builder toBuilder() {
- return null;
- }
- };
- when(mockBigqueryTable.getDefinition()).thenReturn(definition);
+ for (Descriptors.FieldDescriptor field : NonSupportedTypes.getDescriptor().getFields()) {
+ assertFalse(compact.isSupportedType(field));
+ }
+ }
+
+ @Test
+ public void testMap() {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("map_value", LegacySQLTypeName.INTEGER)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ Descriptors.Descriptor testMap = NonSupportedMap.getDescriptor();
+ String protoName = testMap.getName() + ".map_value";
try {
- compact.check("projects/p/datasets/d/tables/t", FooType.getDescriptor());
- fail("should fail");
+ compact.check("projects/p/datasets/d/tables/t", testMap, false);
+ fail("Should not be supported: field contains map");
} catch (IllegalArgumentException expected) {
assertEquals(
- "User schema doesn't have expected field number with BigQuery table schema, expected: 2 actual: 1",
+ "Proto schema " + protoName + " is not supported: is a map field.",
expected.getMessage());
}
verify(mockBigquery, times(1)).getTable(any(TableId.class));
@@ -115,13 +154,859 @@ public Builder toBuilder() {
}
@Test
- public void testBadTableName() throws Exception {
+ public void testNestingSupportedSimple() {
+ Field BQSupportedNestingLvl2 =
+ Field.newBuilder("int_value", LegacySQLTypeName.INTEGER)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("int_value", LegacySQLTypeName.INTEGER)
+ .setMode(Field.Mode.NULLABLE)
+ .build(),
+ Field.newBuilder("nesting_value", LegacySQLTypeName.RECORD, BQSupportedNestingLvl2)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ Descriptors.Descriptor testNesting = SupportedNestingLvl1.getDescriptor();
try {
- SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
- compact.check("blah", FooType.getDescriptor());
- fail("should fail");
+ compact.check("projects/p/datasets/d/tables/t", testNesting, false);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ verify(mockBigquery, times(1)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(1)).getDefinition();
+ }
+
+ @Test
+ public void testNestingSupportedStacked() {
+ Field BQSupportedNestingLvl2 =
+ Field.newBuilder("int_value", LegacySQLTypeName.INTEGER)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("int_value", LegacySQLTypeName.INTEGER)
+ .setMode(Field.Mode.NULLABLE)
+ .build(),
+ Field.newBuilder("nesting_value1", LegacySQLTypeName.RECORD, BQSupportedNestingLvl2)
+ .setMode(Field.Mode.NULLABLE)
+ .build(),
+ Field.newBuilder("nesting_value2", LegacySQLTypeName.RECORD, BQSupportedNestingLvl2)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ Descriptors.Descriptor testNesting = SupportedNestingStacked.getDescriptor();
+ try {
+ compact.check("projects/p/datasets/d/tables/t", testNesting, false);
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ verify(mockBigquery, times(1)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(1)).getDefinition();
+ }
+
+ /*
+ * This is not the "exact" test, as BigQuery fields cannot be recursive. Instead, this test uses
+ * two DIFFERENT records with the same name to simulate recursive protos (protos can't have the
+ * same name anyways unless they are the same proto).
+ */
+ @Test
+ public void testNestingContainsRecursive() {
+ Field BQNonSupportedNestingRecursive =
+ Field.newBuilder(
+ "nesting_value",
+ LegacySQLTypeName.RECORD,
+ Field.newBuilder("int_value", LegacySQLTypeName.INTEGER)
+ .setMode(Field.Mode.NULLABLE)
+ .build())
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("int_value", LegacySQLTypeName.INTEGER)
+ .setMode(Field.Mode.NULLABLE)
+ .build(),
+ Field.newBuilder(
+ "nesting_value", LegacySQLTypeName.RECORD, BQNonSupportedNestingRecursive)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ Descriptors.Descriptor testNesting = NonSupportedNestingContainsRecursive.getDescriptor();
+ try {
+ compact.check("projects/p/datasets/d/tables/t", testNesting, false);
+ fail("Should not be supported: contains nested messages of more than 15 levels.");
} catch (IllegalArgumentException expected) {
- assertEquals("Invalid table name: blah", expected.getMessage());
+ assertEquals(
+ "Proto schema "
+ + testNesting.getName()
+ + ".nesting_value.nesting_value is not supported: is a recursively nested message.",
+ expected.getMessage());
}
+ verify(mockBigquery, times(1)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(1)).getDefinition();
+ }
+
+ @Test
+ public void testNestingRecursiveLimit() {
+ Field NonSupportedNestingLvl16 =
+ Field.newBuilder("test1", LegacySQLTypeName.INTEGER).setMode(Field.Mode.NULLABLE).build();
+ Field NonSupportedNestingLvl15 =
+ Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl16)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ Field NonSupportedNestingLvl14 =
+ Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl15)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ Field NonSupportedNestingLvl13 =
+ Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl14)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ Field NonSupportedNestingLvl12 =
+ Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl13)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ Field NonSupportedNestingLvl11 =
+ Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl12)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ Field NonSupportedNestingLvl10 =
+ Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl11)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ Field NonSupportedNestingLvl9 =
+ Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl10)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ Field NonSupportedNestingLvl8 =
+ Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl9)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ Field NonSupportedNestingLvl7 =
+ Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl8)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ Field NonSupportedNestingLvl6 =
+ Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl7)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ Field NonSupportedNestingLvl5 =
+ Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl6)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ Field NonSupportedNestingLvl4 =
+ Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl5)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ Field NonSupportedNestingLvl3 =
+ Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl4)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ Field NonSupportedNestingLvl2 =
+ Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl3)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ Field NonSupportedNestingLvl1 =
+ Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl2)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("test1", LegacySQLTypeName.RECORD, NonSupportedNestingLvl1)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ Descriptors.Descriptor testNesting = NonSupportedNestingLvl0.getDescriptor();
+ try {
+ compact.check("projects/p/datasets/d/tables/t", testNesting, false);
+ fail("Should not be supported: contains nested messages of more than 15 levels.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "Proto schema "
+ + testNesting.getName()
+ + " is not supported: contains nested messages of more than 15 levels.",
+ expected.getMessage());
+ }
+ verify(mockBigquery, times(1)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(1)).getDefinition();
+ }
+
+ @Test
+ public void testProtoMoreFields() {
+ Schema customSchema = Schema.of(Field.of("int32_value", LegacySQLTypeName.INTEGER));
+ customizeSchema(customSchema);
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+
+ try {
+ compact.check("projects/p/datasets/d/tables/t", SupportedTypes.getDescriptor(), false);
+ fail("Should fail: proto has more fields and allowUnknownFields flag is false.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "Proto schema "
+ + SupportedTypes.getDescriptor().getName()
+ + " has "
+ + SupportedTypes.getDescriptor().getFields().size()
+ + " fields, while BQ schema t has "
+ + 1
+ + " fields.",
+ expected.getMessage());
+ }
+ verify(mockBigquery, times(1)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(1)).getDefinition();
+ }
+
+ @Test
+ public void testBQRepeated() {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("repeated_mode", LegacySQLTypeName.INTEGER)
+ .setMode(Field.Mode.REPEATED)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ compact.check("projects/p/datasets/d/tables/t", ProtoRepeatedBQRepeated.getDescriptor(), false);
+ try {
+ compact.check(
+ "projects/p/datasets/d/tables/t", ProtoOptionalBQRepeated.getDescriptor(), false);
+ fail("Should fail: BQ schema is repeated, but proto is optional.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "Given proto field "
+ + ProtoOptionalBQRepeated.getDescriptor().getName()
+ + ".repeated_mode"
+ + " is not repeated but Big Query field t.repeated_mode is.",
+ expected.getMessage());
+ }
+
+ try {
+ compact.check(
+ "projects/p/datasets/d/tables/t", ProtoRequiredBQRepeated.getDescriptor(), false);
+ fail("Should fail: BQ schema is repeated, but proto is required.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "Given proto field "
+ + ProtoRequiredBQRepeated.getDescriptor().getName()
+ + ".repeated_mode"
+ + " is not repeated but Big Query field t.repeated_mode is.",
+ expected.getMessage());
+ }
+ verify(mockBigquery, times(3)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(3)).getDefinition();
+ }
+
+ @Test
+ public void testBQRequired() {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("required_mode", LegacySQLTypeName.INTEGER)
+ .setMode(Field.Mode.REQUIRED)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ compact.check("projects/p/datasets/d/tables/t", ProtoRequiredBQRequired.getDescriptor(), false);
+
+ try {
+ compact.check("projects/p/datasets/d/tables/t", ProtoNoneBQRequired.getDescriptor(), false);
+ fail("Should fail: BQ schema is required, but proto does not have this field.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "The required Big Query field t.required_mode is missing in the proto schema "
+ + ProtoNoneBQRequired.getDescriptor().getName()
+ + ".",
+ expected.getMessage());
+ }
+
+ try {
+ compact.check(
+ "projects/p/datasets/d/tables/t", ProtoOptionalBQRequired.getDescriptor(), false);
+ fail("Should fail: BQ schema is required, but proto is optional.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "Given proto field "
+ + ProtoOptionalBQRequired.getDescriptor().getName()
+ + ".required_mode is not required but Big Query field t.required_mode is.",
+ expected.getMessage());
+ }
+
+ try {
+ compact.check(
+ "projects/p/datasets/d/tables/t", ProtoRepeatedBQRequired.getDescriptor(), false);
+ fail("Should fail: BQ schema is required, but proto is repeated.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "Given proto field "
+ + ProtoRepeatedBQRequired.getDescriptor().getName()
+ + ".required_mode is not required but Big Query field t.required_mode is.",
+ expected.getMessage());
+ }
+ verify(mockBigquery, times(4)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(4)).getDefinition();
+ }
+
+ @Test
+ public void testBQOptional() {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("optional_mode", LegacySQLTypeName.INTEGER)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ compact.check("projects/p/datasets/d/tables/t", ProtoOptionalBQOptional.getDescriptor(), false);
+ compact.check("projects/p/datasets/d/tables/t", ProtoRequiredBQOptional.getDescriptor(), false);
+
+ try {
+ compact.check(
+ "projects/p/datasets/d/tables/t", ProtoRepeatedBQOptional.getDescriptor(), false);
+ fail("Should fail: BQ schema is nullable, but proto field is repeated.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "Given proto field "
+ + ProtoRepeatedBQOptional.getDescriptor().getName()
+ + ".optional_mode is repeated but Big Query field t.optional_mode is optional.",
+ expected.getMessage());
+ }
+
+ verify(mockBigquery, times(3)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(3)).getDefinition();
+ }
+
+ @Test
+ public void testBQBool() {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("test_field_type", LegacySQLTypeName.BOOLEAN)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ HashSet compatible =
+ new HashSet<>(
+ Arrays.asList(
+ Int32Type.getDescriptor(),
+ Int64Type.getDescriptor(),
+ UInt32Type.getDescriptor(),
+ UInt64Type.getDescriptor(),
+ Fixed32Type.getDescriptor(),
+ Fixed64Type.getDescriptor(),
+ SFixed32Type.getDescriptor(),
+ SFixed64Type.getDescriptor(),
+ BoolType.getDescriptor()));
+
+ for (Descriptors.Descriptor descriptor : type_descriptors) {
+ if (compatible.contains(descriptor)) {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ } else {
+ try {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ fail("Should fail: Proto schema type should not match BQ Boolean.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "The proto field "
+ + descriptor.getName()
+ + ".test_field_type does not have a matching type with the big query field t.test_field_type.",
+ expected.getMessage());
+ }
+ }
+ }
+ verify(mockBigquery, times(16)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(16)).getDefinition();
+ }
+
+ @Test
+ public void testBQBytes() {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("test_field_type", LegacySQLTypeName.BYTES)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ HashSet compatible =
+ new HashSet<>(Arrays.asList(BytesType.getDescriptor()));
+
+ for (Descriptors.Descriptor descriptor : type_descriptors) {
+ if (compatible.contains(descriptor)) {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ } else {
+ try {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ fail("Should fail: Proto schema type should not match BQ Bytes.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "The proto field "
+ + descriptor.getName()
+ + ".test_field_type does not have a matching type with the big query field t.test_field_type.",
+ expected.getMessage());
+ }
+ }
+ }
+ verify(mockBigquery, times(16)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(16)).getDefinition();
+ }
+
+ @Test
+ public void testBQDate() {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("test_field_type", LegacySQLTypeName.DATE)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ HashSet compatible =
+ new HashSet<>(
+ Arrays.asList(
+ Int32Type.getDescriptor(),
+ Int64Type.getDescriptor(),
+ SFixed32Type.getDescriptor(),
+ SFixed64Type.getDescriptor()));
+
+ for (Descriptors.Descriptor descriptor : type_descriptors) {
+ if (compatible.contains(descriptor)) {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ } else {
+ try {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ fail("Should fail: Proto schema type should not match BQ Date.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "The proto field "
+ + descriptor.getName()
+ + ".test_field_type does not have a matching type with the big query field t.test_field_type.",
+ expected.getMessage());
+ }
+ }
+ }
+ verify(mockBigquery, times(16)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(16)).getDefinition();
+ }
+
+ @Test
+ public void testBQDatetime() {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("test_field_type", LegacySQLTypeName.DATETIME)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ HashSet compatible =
+ new HashSet<>(Arrays.asList(Int64Type.getDescriptor(), SFixed64Type.getDescriptor()));
+
+ for (Descriptors.Descriptor descriptor : type_descriptors) {
+ if (compatible.contains(descriptor)) {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ } else {
+ try {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ fail("Should fail: Proto schema type should not match BQ Datetime.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "The proto field "
+ + descriptor.getName()
+ + ".test_field_type does not have a matching type with the big query field t.test_field_type.",
+ expected.getMessage());
+ }
+ }
+ }
+ verify(mockBigquery, times(16)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(16)).getDefinition();
+ }
+
+ @Test
+ public void testBQFloat() {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("test_field_type", LegacySQLTypeName.FLOAT)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ HashSet compatible =
+ new HashSet<>(Arrays.asList(FloatType.getDescriptor(), DoubleType.getDescriptor()));
+
+ for (Descriptors.Descriptor descriptor : type_descriptors) {
+ if (compatible.contains(descriptor)) {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ } else {
+ try {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ fail("Should fail: Proto schema type should not match BQ Float.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "The proto field "
+ + descriptor.getName()
+ + ".test_field_type does not have a matching type with the big query field t.test_field_type.",
+ expected.getMessage());
+ }
+ }
+ }
+ verify(mockBigquery, times(16)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(16)).getDefinition();
+ }
+
+ @Test
+ public void testBQGeography() {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("test_field_type", LegacySQLTypeName.GEOGRAPHY)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ HashSet compatible =
+ new HashSet<>(Arrays.asList(BytesType.getDescriptor()));
+
+ for (Descriptors.Descriptor descriptor : type_descriptors) {
+ if (compatible.contains(descriptor)) {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ } else {
+ try {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ fail("Should fail: Proto schema type should not match BQ Geography.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "The proto field "
+ + descriptor.getName()
+ + ".test_field_type does not have a matching type with the big query field t.test_field_type.",
+ expected.getMessage());
+ }
+ }
+ }
+ verify(mockBigquery, times(16)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(16)).getDefinition();
+ }
+
+ @Test
+ public void testBQInteger() {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("test_field_type", LegacySQLTypeName.INTEGER)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ HashSet compatible =
+ new HashSet<>(
+ Arrays.asList(
+ Int32Type.getDescriptor(),
+ Int64Type.getDescriptor(),
+ UInt32Type.getDescriptor(),
+ Fixed32Type.getDescriptor(),
+ SFixed32Type.getDescriptor(),
+ SFixed64Type.getDescriptor(),
+ EnumType.getDescriptor()));
+
+ for (Descriptors.Descriptor descriptor : type_descriptors) {
+ if (compatible.contains(descriptor)) {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ } else {
+ try {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ fail("Should fail: Proto schema type should not match BQ Integer.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "The proto field "
+ + descriptor.getName()
+ + ".test_field_type does not have a matching type with the big query field t.test_field_type.",
+ expected.getMessage());
+ }
+ }
+ }
+ verify(mockBigquery, times(16)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(16)).getDefinition();
+ }
+
+ @Test
+ public void testBQNumeric() {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("test_field_type", LegacySQLTypeName.NUMERIC)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ HashSet compatible =
+ new HashSet<>(
+ Arrays.asList(
+ Int32Type.getDescriptor(),
+ Int64Type.getDescriptor(),
+ UInt32Type.getDescriptor(),
+ UInt64Type.getDescriptor(),
+ Fixed32Type.getDescriptor(),
+ Fixed64Type.getDescriptor(),
+ SFixed32Type.getDescriptor(),
+ SFixed64Type.getDescriptor(),
+ BytesType.getDescriptor()));
+
+ for (Descriptors.Descriptor descriptor : type_descriptors) {
+ if (compatible.contains(descriptor)) {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ } else {
+ try {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ fail("Should fail: Proto schema type should not match BQ Numeric.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "The proto field "
+ + descriptor.getName()
+ + ".test_field_type does not have a matching type with the big query field t.test_field_type.",
+ expected.getMessage());
+ }
+ }
+ }
+ verify(mockBigquery, times(16)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(16)).getDefinition();
+ }
+
+ @Test
+ public void testBQRecord() {
+ Field nestedMessage =
+ Field.newBuilder("test_field_type", LegacySQLTypeName.STRING)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("test_field_type", LegacySQLTypeName.RECORD, nestedMessage)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ HashSet compatible =
+ new HashSet<>(Arrays.asList(MessageType.getDescriptor(), GroupType.getDescriptor()));
+
+ for (Descriptors.Descriptor descriptor : type_descriptors) {
+ if (compatible.contains(descriptor)) {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ } else {
+ try {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ fail("Should fail: Proto schema type should not match BQ String.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "The proto field "
+ + descriptor.getName()
+ + ".test_field_type does not have a matching type with the big query field t.test_field_type.",
+ expected.getMessage());
+ }
+ }
+ }
+ verify(mockBigquery, times(16)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(16)).getDefinition();
+ }
+
+ @Test
+ public void testBQRecordMismatch() {
+ Field nestedMessage1 =
+ Field.newBuilder("test_field_type", LegacySQLTypeName.INTEGER)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ Field nestedMessage0 =
+ Field.newBuilder("mismatchlvl1", LegacySQLTypeName.RECORD, nestedMessage1)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("mismatchlvl0", LegacySQLTypeName.RECORD, nestedMessage0)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ try {
+ compact.check("projects/p/datasets/d/tables/t", MessageTypeMismatch.getDescriptor(), false);
+ fail("Should fail: Proto schema type should not match BQ String.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "The proto field "
+ + MessageTypeMismatch.getDescriptor().getName()
+ + ".mismatchlvl0.mismatchlvl1.test_field_type does not have a matching type with the big query field t.mismatchlvl0.mismatchlvl1.test_field_type.",
+ expected.getMessage());
+ }
+ verify(mockBigquery, times(1)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(1)).getDefinition();
+ }
+
+ @Test
+ public void testBQRecordMatch() {
+ Field nestedMessage1 =
+ Field.newBuilder("test_field_type", LegacySQLTypeName.STRING)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ Field nestedMessage0 =
+ Field.newBuilder("mismatchlvl1", LegacySQLTypeName.RECORD, nestedMessage1)
+ .setMode(Field.Mode.NULLABLE)
+ .build();
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("mismatchlvl0", LegacySQLTypeName.RECORD, nestedMessage0)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ compact.check("projects/p/datasets/d/tables/t", MessageTypeMismatch.getDescriptor(), false);
+ verify(mockBigquery, times(1)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(1)).getDefinition();
+ }
+
+ @Test
+ public void testBQString() {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("test_field_type", LegacySQLTypeName.STRING)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ HashSet compatible =
+ new HashSet<>(Arrays.asList(StringType.getDescriptor(), EnumType.getDescriptor()));
+
+ for (Descriptors.Descriptor descriptor : type_descriptors) {
+ if (compatible.contains(descriptor)) {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ } else {
+ try {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ fail("Should fail: Proto schema type should not match BQ String.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "The proto field "
+ + descriptor.getName()
+ + ".test_field_type does not have a matching type with the big query field t.test_field_type.",
+ expected.getMessage());
+ }
+ }
+ }
+ verify(mockBigquery, times(16)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(16)).getDefinition();
+ }
+
+ @Test
+ public void testBQTime() {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("test_field_type", LegacySQLTypeName.TIME)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ HashSet compatible =
+ new HashSet<>(Arrays.asList(Int64Type.getDescriptor(), SFixed64Type.getDescriptor()));
+
+ for (Descriptors.Descriptor descriptor : type_descriptors) {
+ if (compatible.contains(descriptor)) {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ } else {
+ try {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ fail("Should fail: Proto schema type should not match BQ Time.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "The proto field "
+ + descriptor.getName()
+ + ".test_field_type does not have a matching type with the big query field t.test_field_type.",
+ expected.getMessage());
+ }
+ }
+ }
+ verify(mockBigquery, times(16)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(16)).getDefinition();
+ }
+
+ @Test
+ public void testBQTimestamp() {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("test_field_type", LegacySQLTypeName.TIMESTAMP)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ HashSet compatible =
+ new HashSet<>(
+ Arrays.asList(
+ Int32Type.getDescriptor(),
+ Int64Type.getDescriptor(),
+ UInt32Type.getDescriptor(),
+ Fixed32Type.getDescriptor(),
+ SFixed32Type.getDescriptor(),
+ SFixed64Type.getDescriptor(),
+ EnumType.getDescriptor()));
+
+ for (Descriptors.Descriptor descriptor : type_descriptors) {
+ if (compatible.contains(descriptor)) {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ } else {
+ try {
+ compact.check("projects/p/datasets/d/tables/t", descriptor, false);
+ fail("Should fail: Proto schema type should not match BQ Timestamp.");
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "The proto field "
+ + descriptor.getName()
+ + ".test_field_type does not have a matching type with the big query field t.test_field_type.",
+ expected.getMessage());
+ }
+ }
+ }
+ verify(mockBigquery, times(16)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(16)).getDefinition();
+ }
+
+ /*
+ * Tests if having no matching fields in the top level causes an error.
+ */
+ @Test
+ public void testBQTopLevelMismatch() {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("test_toplevel_mismatch", LegacySQLTypeName.STRING)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ try {
+ compact.check("projects/p/datasets/d/tables/t", StringType.getDescriptor(), false);
+ } catch (IllegalArgumentException expected) {
+ assertEquals(
+ "There is no matching fields found for the proto schema "
+ + StringType.getDescriptor().getName()
+ + " and the BQ table schema t.",
+ expected.getMessage());
+ }
+ verify(mockBigquery, times(1)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(1)).getDefinition();
+ }
+
+ /*
+ * Tests if there is at least 1 matching field in the top level.
+ */
+ @Test
+ public void testBQTopLevelMatch() {
+ Field nestedMessage0 =
+ Field.newBuilder("mismatch", LegacySQLTypeName.STRING).setMode(Field.Mode.NULLABLE).build();
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("mismatch", LegacySQLTypeName.RECORD, nestedMessage0)
+ .setMode(Field.Mode.NULLABLE)
+ .build(),
+ Field.newBuilder("match", LegacySQLTypeName.STRING)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ compact.check("projects/p/datasets/d/tables/t", TopLevelMatch.getDescriptor(), false);
+ verify(mockBigquery, times(1)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(1)).getDefinition();
+ }
+
+ @Test
+ public void testAllowUnknownUnsupportedFields() {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("string_value", LegacySQLTypeName.STRING)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ compact.check(
+ "projects/p/datasets/d/tables/t", AllowUnknownUnsupportedFields.getDescriptor(), true);
+ verify(mockBigquery, times(1)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(1)).getDefinition();
+ }
+
+ @Test
+ public void testLowerCase() {
+ customizeSchema(
+ Schema.of(
+ Field.newBuilder("tEsT_fIeLd_TyPe", LegacySQLTypeName.STRING)
+ .setMode(Field.Mode.NULLABLE)
+ .build()));
+ SchemaCompact compact = SchemaCompact.getInstance(mockBigquery);
+ compact.check("projects/p/datasets/d/tables/t", StringType.getDescriptor(), true);
+ verify(mockBigquery, times(1)).getTable(any(TableId.class));
+ verify(mockBigqueryTable, times(1)).getDefinition();
}
}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java
index c75d6859e6..c64d55f53f 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1alpha2/it/ITBigQueryWriteManualClientTest.java
@@ -25,6 +25,7 @@
import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.*;
import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.storage.test.SchemaTest.FakeFooType;
import com.google.cloud.bigquery.storage.test.Test.*;
import com.google.cloud.bigquery.storage.v1alpha2.*;
import com.google.cloud.bigquery.storage.v1alpha2.Storage.*;
@@ -74,6 +75,7 @@ public static void beforeClass() throws IOException {
StandardTableDefinition.of(
Schema.of(
com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING)
+ .setMode(Field.Mode.NULLABLE)
.build())))
.build();
com.google.cloud.bigquery.Field.Builder innerTypeFieldBuilder =
@@ -400,6 +402,47 @@ public Long call() throws IOException, InterruptedException, ExecutionException
DirectWriter.clearCache();
}
+ @Test
+ public void testDirectWriteFail() throws IOException, InterruptedException, ExecutionException {
+ final FakeFooType fa = FakeFooType.newBuilder().setFoo(100).build();
+ Set expectedOffset = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ expectedOffset.add(Long.valueOf(i));
+ }
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+ List> responses = new ArrayList<>();
+ Callable callable =
+ new Callable() {
+ @Override
+ public Long call()
+ throws IOException, InterruptedException, ExecutionException,
+ IllegalArgumentException {
+ ApiFuture result = DirectWriter.append(tableId, Arrays.asList(fa));
+ return result.get();
+ }
+ };
+
+ for (int i = 0; i < 10; i++) {
+ responses.add(executor.submit(callable));
+ }
+ for (Future response : responses) {
+ try {
+ response.get();
+ } catch (ExecutionException e) {
+ assertEquals(
+ "The proto field FakeFooType.foo does not have a matching type with the big query field testtable.foo.",
+ e.getCause().getMessage());
+ }
+ }
+ executor.shutdown();
+ try {
+ executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ } catch (InterruptedException e) {
+ LOG.info(e.toString());
+ }
+ DirectWriter.clearCache();
+ }
+
@Test
public void testFlushRows() throws IOException, InterruptedException, ExecutionException {
String tableName = "BufferTable";
diff --git a/google-cloud-bigquerystorage/src/test/proto/schemaTest.proto b/google-cloud-bigquerystorage/src/test/proto/schemaTest.proto
new file mode 100644
index 0000000000..3ec935d6a6
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/test/proto/schemaTest.proto
@@ -0,0 +1,259 @@
+syntax = "proto2";
+
+package com.google.cloud.bigquery.storage.test;
+
+message SupportedTypes {
+ optional int32 int32_value = 1;
+ optional int64 int64_value = 2;
+ optional uint32 uint32_value = 3;
+ optional uint64 uint64_value = 4;
+ optional fixed32 fixed32_value = 7;
+ optional fixed64 fixed64_value = 8;
+ optional sfixed32 sfixed32_value = 9;
+ optional sfixed64 sfixed64_value = 10;
+ optional float float_value = 11;
+ optional double double_value = 12;
+ optional bool bool_value = 13;
+ optional bytes bytes_value = 14;
+ optional string string_value = 15;
+}
+
+message NonSupportedTypes {
+ optional sint32 sint32_value = 5;
+ optional sint64 sint64_value = 6;
+}
+
+message Int32Type {
+ optional int32 test_field_type = 1;
+}
+
+message Int64Type {
+ optional int64 test_field_type = 1;
+}
+
+message UInt32Type {
+ optional uint32 test_field_type = 1;
+}
+
+message UInt64Type {
+ optional uint64 test_field_type = 1;
+}
+
+message Fixed32Type {
+ optional fixed32 test_field_type = 1;
+}
+
+message Fixed64Type {
+ optional fixed64 test_field_type = 1;
+}
+
+message SFixed32Type {
+ optional sfixed32 test_field_type = 1;
+}
+
+message SFixed64Type {
+ optional sfixed64 test_field_type = 1;
+}
+
+message FloatType {
+ optional float test_field_type = 1;
+}
+
+message DoubleType {
+ optional double test_field_type = 1;
+}
+
+message BoolType {
+ optional bool test_field_type = 1;
+}
+
+message BytesType {
+ optional bytes test_field_type = 1;
+}
+
+message StringType {
+ optional string test_field_type = 1;
+}
+
+message EnumType {
+ enum EnumTest {
+ test1 = 0;
+ }
+ optional EnumTest test_field_type = 1;
+}
+
+message MessageType {
+ optional StringType test_field_type = 1;
+}
+
+message GroupType {
+ optional group Test_field_type = 1 {
+ optional string test_field_type = 2;
+ }
+}
+
+message MessageTypeMismatch {
+ optional MismatchLvl0 mismatchlvl0 = 1;
+}
+
+message MismatchLvl0 {
+ optional MismatchLvl1 mismatchlvl1 = 1;
+}
+
+message MismatchLvl1 {
+ optional string test_field_type = 1;
+}
+
+message TopLevelMatch {
+ optional string match = 1;
+ optional MismatchLvl1 mismatch = 2;
+}
+
+message ProtoRepeatedBQRepeated {
+ repeated int32 repeated_mode = 1;
+}
+
+message ProtoOptionalBQRepeated {
+ optional int32 repeated_mode = 1;
+}
+
+message ProtoRequiredBQRepeated {
+ required int32 repeated_mode = 1;
+}
+
+message ProtoRequiredBQRequired {
+ required int32 required_mode = 1;
+}
+
+message ProtoNoneBQRequired {
+ optional int32 no_required_mode = 1;
+}
+
+message ProtoOptionalBQRequired {
+ optional int32 required_mode = 1;
+}
+
+message ProtoRepeatedBQRequired {
+ repeated int32 required_mode = 1;
+}
+
+message ProtoOptionalBQOptional {
+ optional int32 optional_mode = 1;
+}
+
+message ProtoRequiredBQOptional{
+ required int32 optional_mode = 1;
+}
+
+message ProtoRepeatedBQOptional {
+ repeated int32 optional_mode = 1;
+}
+
+message ProtoCompatibleWithBQInt {
+ optional int32 optional_mode = 1;
+}
+
+message SupportedNestingLvl1 {
+ optional int32 int_value = 1;
+ optional SupportedNestingLvl2 nesting_value = 2;
+}
+
+message SupportedNestingLvl2 {
+ optional int32 int_value = 1;
+}
+
+message SupportedNestingStacked {
+ optional int32 test_int = 1;
+ optional SupportedNestingLvl2 nesting_value1 = 2;
+ optional SupportedNestingLvl2 nesting_value2 = 3;
+}
+
+message NonSupportedMap {
+ map map_value = 1;
+}
+
+message NonSupportedNestingRecursive {
+ optional NonSupportedNestingRecursive nesting_value = 2;
+}
+
+message NonSupportedNestingContainsRecursive {
+ optional int32 int_value = 1;
+ optional NonSupportedNestingRecursive nesting_value = 2;
+}
+
+message NonSupportedNestingLvl0 {
+ optional NonSupportedNestingLvl1 test1 = 1;
+}
+
+message NonSupportedNestingLvl1 {
+ optional NonSupportedNestingLvl2 test1 = 1;
+}
+
+message NonSupportedNestingLvl2 {
+ optional NonSupportedNestingLvl3 test1 = 1;
+}
+
+message NonSupportedNestingLvl3 {
+ optional NonSupportedNestingLvl4 test1 = 1;
+}
+
+message NonSupportedNestingLvl4 {
+ optional NonSupportedNestingLvl5 test1 = 1;
+}
+
+message NonSupportedNestingLvl5 {
+ optional NonSupportedNestingLvl6 test1 = 1;
+}
+
+message NonSupportedNestingLvl6 {
+ optional NonSupportedNestingLvl7 test1 = 1;
+}
+
+message NonSupportedNestingLvl7 {
+ optional NonSupportedNestingLvl8 test1 = 1;
+}
+
+message NonSupportedNestingLvl8 {
+ optional NonSupportedNestingLvl9 test1 = 1;
+}
+
+message NonSupportedNestingLvl9 {
+ optional NonSupportedNestingLvl10 test1 = 1;
+}
+
+message NonSupportedNestingLvl10 {
+ optional NonSupportedNestingLvl11 test1 = 1;
+}
+
+message NonSupportedNestingLvl11 {
+ optional NonSupportedNestingLvl12 test1 = 1;
+}
+
+message NonSupportedNestingLvl12 {
+ optional NonSupportedNestingLvl13 test1 = 1;
+}
+
+message NonSupportedNestingLvl13 {
+ optional NonSupportedNestingLvl14 test1 = 1;
+}
+
+message NonSupportedNestingLvl14 {
+ optional NonSupportedNestingLvl15 test1 = 1;
+}
+
+message NonSupportedNestingLvl15 {
+ optional NonSupportedNestingLvl16 test1 = 1;
+}
+
+message NonSupportedNestingLvl16 {
+ optional int32 test1 = 1;
+}
+
+message AllowUnknownUnsupportedFields {
+ optional NonSupportedMap map_value = 1;
+ optional string string_value = 2;
+}
+
+message FakeFooType {
+ optional int32 foo = 1;
+}