Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
fix: Add special type tests for Json writer and fix some type mapping…
… issues (#725)

* fix: a race condition in test

* .

* fix: offset_sets to offsetSets in jsonwritertest

* fix: add test coverage for special type handling in JSON object

* .

* .

* .

* .

* .

* .

* .

* .
  • Loading branch information
yirutang committed Dec 11, 2020
1 parent f4de72c commit ab6213c
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 30 deletions.
Expand Up @@ -47,14 +47,14 @@ public class BQTableSchemaToProtoDescriptor {
.put(Table.TableFieldSchema.Type.BOOL, FieldDescriptorProto.Type.TYPE_BOOL)
.put(Table.TableFieldSchema.Type.BYTES, FieldDescriptorProto.Type.TYPE_BYTES)
.put(Table.TableFieldSchema.Type.DATE, FieldDescriptorProto.Type.TYPE_INT32)
.put(Table.TableFieldSchema.Type.DATETIME, FieldDescriptorProto.Type.TYPE_INT64)
.put(Table.TableFieldSchema.Type.DATETIME, FieldDescriptorProto.Type.TYPE_STRING)
.put(Table.TableFieldSchema.Type.DOUBLE, FieldDescriptorProto.Type.TYPE_DOUBLE)
.put(Table.TableFieldSchema.Type.GEOGRAPHY, FieldDescriptorProto.Type.TYPE_STRING)
.put(Table.TableFieldSchema.Type.INT64, FieldDescriptorProto.Type.TYPE_INT64)
.put(Table.TableFieldSchema.Type.NUMERIC, FieldDescriptorProto.Type.TYPE_BYTES)
.put(Table.TableFieldSchema.Type.NUMERIC, FieldDescriptorProto.Type.TYPE_STRING)
.put(Table.TableFieldSchema.Type.STRING, FieldDescriptorProto.Type.TYPE_STRING)
.put(Table.TableFieldSchema.Type.STRUCT, FieldDescriptorProto.Type.TYPE_MESSAGE)
.put(Table.TableFieldSchema.Type.TIME, FieldDescriptorProto.Type.TYPE_INT64)
.put(Table.TableFieldSchema.Type.TIME, FieldDescriptorProto.Type.TYPE_STRING)
.put(Table.TableFieldSchema.Type.TIMESTAMP, FieldDescriptorProto.Type.TYPE_INT64)
.build();

Expand Down
Expand Up @@ -47,14 +47,14 @@ public class BQTableSchemaToProtoDescriptor {
.put(TableFieldSchema.Type.BOOL, FieldDescriptorProto.Type.TYPE_BOOL)
.put(TableFieldSchema.Type.BYTES, FieldDescriptorProto.Type.TYPE_BYTES)
.put(TableFieldSchema.Type.DATE, FieldDescriptorProto.Type.TYPE_INT32)
.put(TableFieldSchema.Type.DATETIME, FieldDescriptorProto.Type.TYPE_INT64)
.put(TableFieldSchema.Type.DATETIME, FieldDescriptorProto.Type.TYPE_STRING)
.put(TableFieldSchema.Type.DOUBLE, FieldDescriptorProto.Type.TYPE_DOUBLE)
.put(TableFieldSchema.Type.GEOGRAPHY, FieldDescriptorProto.Type.TYPE_STRING)
.put(TableFieldSchema.Type.INT64, FieldDescriptorProto.Type.TYPE_INT64)
.put(TableFieldSchema.Type.NUMERIC, FieldDescriptorProto.Type.TYPE_BYTES)
.put(TableFieldSchema.Type.NUMERIC, FieldDescriptorProto.Type.TYPE_STRING)
.put(TableFieldSchema.Type.STRING, FieldDescriptorProto.Type.TYPE_STRING)
.put(TableFieldSchema.Type.STRUCT, FieldDescriptorProto.Type.TYPE_MESSAGE)
.put(TableFieldSchema.Type.TIME, FieldDescriptorProto.Type.TYPE_INT64)
.put(TableFieldSchema.Type.TIME, FieldDescriptorProto.Type.TYPE_STRING)
.put(TableFieldSchema.Type.TIMESTAMP, FieldDescriptorProto.Type.TYPE_INT64)
.build();

Expand Down
Expand Up @@ -39,13 +39,13 @@ public class BQTableSchemaToProtoDescriptorTest {
.put(Table.TableFieldSchema.Type.BOOL, BoolType.getDescriptor())
.put(Table.TableFieldSchema.Type.BYTES, BytesType.getDescriptor())
.put(Table.TableFieldSchema.Type.DATE, Int32Type.getDescriptor())
.put(Table.TableFieldSchema.Type.DATETIME, Int64Type.getDescriptor())
.put(Table.TableFieldSchema.Type.DATETIME, StringType.getDescriptor())
.put(Table.TableFieldSchema.Type.DOUBLE, DoubleType.getDescriptor())
.put(Table.TableFieldSchema.Type.GEOGRAPHY, StringType.getDescriptor())
.put(Table.TableFieldSchema.Type.INT64, Int64Type.getDescriptor())
.put(Table.TableFieldSchema.Type.NUMERIC, BytesType.getDescriptor())
.put(Table.TableFieldSchema.Type.NUMERIC, StringType.getDescriptor())
.put(Table.TableFieldSchema.Type.STRING, StringType.getDescriptor())
.put(Table.TableFieldSchema.Type.TIME, Int64Type.getDescriptor())
.put(Table.TableFieldSchema.Type.TIME, StringType.getDescriptor())
.put(Table.TableFieldSchema.Type.TIMESTAMP, Int64Type.getDescriptor())
.build();

Expand Down Expand Up @@ -75,7 +75,7 @@ private void isDescriptorEqual(Descriptor convertedProto, Descriptor originalPro
// Check type
FieldDescriptor.Type convertedType = convertedField.getType();
FieldDescriptor.Type originalType = originalField.getType();
assertEquals(convertedType, originalType);
assertEquals(convertedField.getName(), convertedType, originalType);
// Check mode
assertTrue(
(originalField.isRepeated() == convertedField.isRepeated())
Expand Down Expand Up @@ -181,6 +181,30 @@ public void testStructComplex() throws Exception {
.addFields(1, ComplexLvl2)
.setName("complex_lvl1")
.build();
final Table.TableFieldSchema TEST_NUMERIC =
Table.TableFieldSchema.newBuilder()
.setType(Table.TableFieldSchema.Type.NUMERIC)
.setMode(Table.TableFieldSchema.Mode.NULLABLE)
.setName("test_numeric")
.build();
final Table.TableFieldSchema TEST_GEO =
Table.TableFieldSchema.newBuilder()
.setType(Table.TableFieldSchema.Type.GEOGRAPHY)
.setMode(Table.TableFieldSchema.Mode.NULLABLE)
.setName("test_geo")
.build();
final Table.TableFieldSchema TEST_TIMESTAMP =
Table.TableFieldSchema.newBuilder()
.setType(Table.TableFieldSchema.Type.TIMESTAMP)
.setMode(Table.TableFieldSchema.Mode.NULLABLE)
.setName("test_timestamp")
.build();
final Table.TableFieldSchema TEST_TIME =
Table.TableFieldSchema.newBuilder()
.setType(Table.TableFieldSchema.Type.TIME)
.setMode(Table.TableFieldSchema.Mode.NULLABLE)
.setName("test_time")
.build();
final Table.TableSchema tableSchema =
Table.TableSchema.newBuilder()
.addFields(0, test_int)
Expand All @@ -191,6 +215,10 @@ public void testStructComplex() throws Exception {
.addFields(5, test_date)
.addFields(6, ComplexLvl1)
.addFields(7, ComplexLvl2)
.addFields(8, TEST_NUMERIC)
.addFields(9, TEST_GEO)
.addFields(10, TEST_TIMESTAMP)
.addFields(11, TEST_TIME)
.build();
final Descriptor descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(tableSchema);
Expand Down
Expand Up @@ -38,13 +38,13 @@ public class BQTableSchemaToProtoDescriptorTest {
.put(TableFieldSchema.Type.BOOL, BoolType.getDescriptor())
.put(TableFieldSchema.Type.BYTES, BytesType.getDescriptor())
.put(TableFieldSchema.Type.DATE, Int32Type.getDescriptor())
.put(TableFieldSchema.Type.DATETIME, Int64Type.getDescriptor())
.put(TableFieldSchema.Type.DATETIME, StringType.getDescriptor())
.put(TableFieldSchema.Type.DOUBLE, DoubleType.getDescriptor())
.put(TableFieldSchema.Type.GEOGRAPHY, StringType.getDescriptor())
.put(TableFieldSchema.Type.INT64, Int64Type.getDescriptor())
.put(TableFieldSchema.Type.NUMERIC, BytesType.getDescriptor())
.put(TableFieldSchema.Type.NUMERIC, StringType.getDescriptor())
.put(TableFieldSchema.Type.STRING, StringType.getDescriptor())
.put(TableFieldSchema.Type.TIME, Int64Type.getDescriptor())
.put(TableFieldSchema.Type.TIME, StringType.getDescriptor())
.put(TableFieldSchema.Type.TIMESTAMP, Int64Type.getDescriptor())
.build();

Expand Down Expand Up @@ -74,7 +74,7 @@ private void isDescriptorEqual(Descriptor convertedProto, Descriptor originalPro
// Check type
FieldDescriptor.Type convertedType = convertedField.getType();
FieldDescriptor.Type originalType = originalField.getType();
assertEquals(convertedType, originalType);
assertEquals(convertedField.getName(), convertedType, originalType);
// Check mode
assertTrue(
(originalField.isRepeated() == convertedField.isRepeated())
Expand Down Expand Up @@ -179,6 +179,30 @@ public void testStructComplex() throws Exception {
.addFields(1, ComplexLvl2)
.setName("complex_lvl1")
.build();
final TableFieldSchema TEST_NUMERIC =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.NUMERIC)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_numeric")
.build();
final TableFieldSchema TEST_GEO =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.GEOGRAPHY)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_geo")
.build();
final TableFieldSchema TEST_TIMESTAMP =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.TIMESTAMP)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_timestamp")
.build();
final TableFieldSchema TEST_TIME =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.TIME)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_time")
.build();
final TableSchema tableSchema =
TableSchema.newBuilder()
.addFields(0, test_int)
Expand All @@ -189,6 +213,10 @@ public void testStructComplex() throws Exception {
.addFields(5, test_date)
.addFields(6, ComplexLvl1)
.addFields(7, ComplexLvl2)
.addFields(8, TEST_NUMERIC)
.addFields(9, TEST_GEO)
.addFields(10, TEST_TIMESTAMP)
.addFields(11, TEST_TIME)
.build();
final Descriptor descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(tableSchema);
Expand Down
Expand Up @@ -133,6 +133,30 @@ public class JsonStreamWriterTest {
.addFields(1, COMPLEXLVL2)
.setName("complex_lvl1")
.build();
private final TableFieldSchema TEST_NUMERIC =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.NUMERIC)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_numeric")
.build();
private final TableFieldSchema TEST_GEO =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.GEOGRAPHY)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_geo")
.build();
private final TableFieldSchema TEST_TIMESTAMP =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.TIMESTAMP)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_timestamp")
.build();
private final TableFieldSchema TEST_TIME =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.TIME)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_time")
.build();
private final TableSchema COMPLEX_TABLE_SCHEMA =
TableSchema.newBuilder()
.addFields(0, TEST_INT)
Expand All @@ -143,6 +167,10 @@ public class JsonStreamWriterTest {
.addFields(5, TEST_DATE)
.addFields(6, COMPLEXLVL1)
.addFields(7, COMPLEXLVL2)
.addFields(8, TEST_NUMERIC)
.addFields(9, TEST_GEO)
.addFields(10, TEST_TIMESTAMP)
.addFields(11, TEST_TIME)
.build();

@Before
Expand Down Expand Up @@ -353,6 +381,10 @@ public void testSingleAppendComplexJson() throws Exception {
com.google.cloud.bigquery.storage.test.JsonTest.ComplexLvl2.newBuilder()
.setTestInt(3)
.build())
.setTestNumeric("1.23456")
.setTestGeo("POINT(1,1)")
.setTestTimestamp(12345678)
.setTestTime("01:00:01")
.build();
JSONObject complex_lvl2 = new JSONObject();
complex_lvl2.put("test_int", 3);
Expand All @@ -370,6 +402,10 @@ public void testSingleAppendComplexJson() throws Exception {
json.put("test_date", 1);
json.put("complex_lvl1", complex_lvl1);
json.put("complex_lvl2", complex_lvl2);
json.put("test_numeric", "1.23456");
json.put("test_geo", "POINT(1,1)");
json.put("test_timestamp", 12345678);
json.put("test_time", "01:00:01");
JSONArray jsonArr = new JSONArray();
jsonArr.put(json);

Expand Down
Expand Up @@ -216,7 +216,15 @@ public void testJsonStreamWriterBatchWriteWithCommittedStream()
TableId.of(DATASET, tableName),
StandardTableDefinition.of(
Schema.of(
com.google.cloud.bigquery.Field.newBuilder("foo", LegacySQLTypeName.STRING)
com.google.cloud.bigquery.Field.newBuilder(
"test_str", StandardSQLTypeName.STRING)
.build(),
com.google.cloud.bigquery.Field.newBuilder(
"test_numerics", StandardSQLTypeName.NUMERIC)
.setMode(Field.Mode.REPEATED)
.build(),
com.google.cloud.bigquery.Field.newBuilder(
"test_datetime", StandardSQLTypeName.DATETIME)
.build())))
.build();
bigquery.create(tableInfo);
Expand All @@ -239,28 +247,31 @@ public void testJsonStreamWriterBatchWriteWithCommittedStream()
.build())
.build()) {
LOG.info("Sending one message");
JSONObject foo = new JSONObject();
foo.put("foo", "aaa");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);
JSONObject testStr = new JSONObject();
testStr.put("test_str", "aaa");
JSONObject testNumerics = new JSONObject();
testNumerics.put("test_numerics", new JSONArray(new String[] {"123.4", "-9000000"}));
JSONObject testDateTime = new JSONObject();
testDateTime.put("test_datetime", "2020-10-1 12:00:00");
JSONArray row = new JSONArray(new JSONObject[] {testStr, testNumerics, testDateTime});

ApiFuture<AppendRowsResponse> response =
jsonStreamWriter.append(jsonArr, -1, /* allowUnknownFields */ false);
jsonStreamWriter.append(row, -1, /* allowUnknownFields */ false);
assertEquals(0, response.get().getOffset());

LOG.info("Sending two more messages");
JSONObject foo1 = new JSONObject();
foo1.put("foo", "bbb");
JSONObject foo2 = new JSONObject();
foo2.put("foo", "ccc");
JSONObject row1 = new JSONObject();
row1.put("test_str", "bbb");
JSONObject row2 = new JSONObject();
row2.put("test_str", "ccc");
JSONArray jsonArr1 = new JSONArray();
jsonArr1.put(foo1);
jsonArr1.put(foo2);
jsonArr1.put(row1);
jsonArr1.put(row2);

JSONObject foo3 = new JSONObject();
foo3.put("foo", "ddd");
JSONObject row3 = new JSONObject();
row3.put("test_str", "ddd");
JSONArray jsonArr2 = new JSONArray();
jsonArr2.put(foo3);
jsonArr2.put(row3);

ApiFuture<AppendRowsResponse> response1 =
jsonStreamWriter.append(jsonArr1, -1, /* allowUnknownFields */ false);
Expand All @@ -276,11 +287,12 @@ public void testJsonStreamWriterBatchWriteWithCommittedStream()
tableInfo.getTableId(), BigQuery.TableDataListOption.startIndex(0L));
Iterator<FieldValueList> iter = result.getValues().iterator();
assertEquals("aaa", iter.next().get(0).getStringValue());
assertEquals("-9000000", iter.next().get(1).getRepeatedValue().get(1).getStringValue());
assertEquals("2020-10-01T12:00:00", iter.next().get(2).getStringValue());
assertEquals("bbb", iter.next().get(0).getStringValue());
assertEquals("ccc", iter.next().get(0).getStringValue());
assertEquals("ddd", iter.next().get(0).getStringValue());
assertEquals(false, iter.hasNext());
jsonStreamWriter.close();
}
}

Expand Down
4 changes: 4 additions & 0 deletions google-cloud-bigquerystorage/src/test/proto/jsonTest.proto
Expand Up @@ -11,6 +11,10 @@ message ComplexRoot {
required int32 test_date = 6;
required ComplexLvl1 complex_lvl1 = 7;
required ComplexLvl2 complex_lvl2 = 8;
optional string test_numeric = 9;
optional string test_geo = 10;
optional int64 test_timestamp = 11;
optional string test_time = 12;
}

message CasingComplex {
Expand Down

0 comments on commit ab6213c

Please sign in to comment.