Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add string to DATETIME, TIME, NUMERIC, BIGNUMERIC support in JsonStreamWriter v1 #1345

Merged
merged 10 commits into from Oct 4, 2021
Expand Up @@ -75,6 +75,7 @@ private JsonStreamWriter(Builder builder)
builder.traceId);
this.streamWriter = streamWriterBuilder.build();
this.streamName = builder.streamName;
this.tableSchema = builder.tableSchema;
}

/**
Expand Down Expand Up @@ -105,7 +106,8 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
// of JSON data.
for (int i = 0; i < jsonArr.length(); i++) {
JSONObject json = jsonArr.getJSONObject(i);
Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, json);
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, this.tableSchema, json);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
}
// Need to make sure refreshAppendAndSetDescriptor finish first before this can run
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigquery.storage.v1;

import com.google.api.pathtemplate.ValidationException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
Expand All @@ -23,10 +24,14 @@
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import com.google.protobuf.UninitializedMessageException;
import java.math.BigDecimal;
import java.util.List;
import java.util.logging.Logger;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.threeten.bp.LocalDateTime;
import org.threeten.bp.LocalTime;

/**
* Converts Json data to protocol buffer messages given the protocol buffer descriptor. The protobuf
Expand Down Expand Up @@ -58,7 +63,28 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");

return convertJsonToProtoMessageImpl(protoSchema, json, "root", /*topLevel=*/ true);
return convertJsonToProtoMessageImpl(protoSchema, null, json, "root", /*topLevel=*/ true);
}

/**
* Converts Json data to protocol buffer messages given the protocol buffer descriptor.
*
* @param protoSchema
* @param tableSchema bigquery table schema is needed for type conversion of DATETIME, TIME,
* NUMERIC, BIGNUMERIC
* @param json
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
*/
public static DynamicMessage convertJsonToProtoMessage(
Descriptor protoSchema, TableSchema tableSchema, JSONObject json)
throws IllegalArgumentException {
Preconditions.checkNotNull(json, "JSONObject is null.");
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");

return convertJsonToProtoMessageImpl(
protoSchema, tableSchema.getFieldsList(), json, "root", /*topLevel=*/ true);
}

/**
Expand All @@ -71,7 +97,11 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
*/
private static DynamicMessage convertJsonToProtoMessageImpl(
Descriptor protoSchema, JSONObject json, String jsonScope, boolean topLevel)
Descriptor protoSchema,
List<TableFieldSchema> tableSchema,
JSONObject json,
String jsonScope,
boolean topLevel)
throws IllegalArgumentException {

DynamicMessage.Builder protoMsg = DynamicMessage.newBuilder(protoSchema);
Expand All @@ -90,10 +120,25 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
throw new IllegalArgumentException(
String.format("JSONObject has fields unknown to BigQuery: %s.", currentScope));
}
TableFieldSchema fieldSchema = null;
if (tableSchema != null) {
// protoSchema is generated from tableSchema so their field ordering should match.
fieldSchema = tableSchema.get(field.getIndex());
if (!fieldSchema.getName().equals(field.getName())) {
throw new ValidationException(
"Field at index "
+ field.getIndex()
+ " has mismatch names ("
+ fieldSchema.getName()
+ ") ("
+ field.getName()
+ ")");
}
}
if (!field.isRepeated()) {
fillField(protoMsg, field, json, jsonName, currentScope);
fillField(protoMsg, field, fieldSchema, json, jsonName, currentScope);
} else {
fillRepeatedField(protoMsg, field, json, jsonName, currentScope);
fillRepeatedField(protoMsg, field, fieldSchema, json, jsonName, currentScope);
}
}

Expand All @@ -119,6 +164,7 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
*
* @param protoMsg The protocol buffer message being constructed
* @param fieldDescriptor
* @param fieldSchema
* @param json
* @param exactJsonKeyName Exact key name in JSONObject instead of lowercased version
* @param currentScope Debugging purposes
Expand All @@ -127,6 +173,7 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
private static void fillField(
DynamicMessage.Builder protoMsg,
FieldDescriptor fieldDescriptor,
TableFieldSchema fieldSchema,
JSONObject json,
String exactJsonKeyName,
String currentScope)
Expand All @@ -144,6 +191,25 @@ private static void fillField(
}
break;
case BYTES:
if (fieldSchema != null) {
if (fieldSchema.getType() == TableFieldSchema.Type.NUMERIC) {
if (val instanceof String) {
protoMsg.setField(
fieldDescriptor,
BigDecimalByteStringEncoder.encodeToNumericByteString(
new BigDecimal((String) val)));
return;
}
} else if (fieldSchema.getType() == TableFieldSchema.Type.BIGNUMERIC) {
if (val instanceof String) {
protoMsg.setField(
fieldDescriptor,
BigDecimalByteStringEncoder.encodeToNumericByteString(
new BigDecimal((String) val)));
return;
}
}
}
if (val instanceof ByteString) {
protoMsg.setField(fieldDescriptor, ((ByteString) val).toByteArray());
return;
Expand All @@ -170,6 +236,29 @@ private static void fillField(
}
break;
case INT64:
if (fieldSchema != null) {
if (fieldSchema.getType() == TableFieldSchema.Type.DATETIME) {
if (val instanceof String) {
protoMsg.setField(
fieldDescriptor,
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.parse((String) val)));
return;
} else if (val instanceof Long) {
protoMsg.setField(fieldDescriptor, (Long) val);
return;
}
} else if (fieldSchema.getType() == TableFieldSchema.Type.TIME) {
if (val instanceof String) {
protoMsg.setField(
fieldDescriptor,
CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.parse((String) val)));
return;
} else if (val instanceof Long) {
protoMsg.setField(fieldDescriptor, (Long) val);
return;
}
}
}
if (val instanceof Integer) {
protoMsg.setField(fieldDescriptor, new Long((Integer) val));
return;
Expand Down Expand Up @@ -206,6 +295,7 @@ private static void fillField(
fieldDescriptor,
convertJsonToProtoMessageImpl(
fieldDescriptor.getMessageType(),
fieldSchema == null ? null : fieldSchema.getFieldsList(),
json.getJSONObject(exactJsonKeyName),
currentScope,
/*topLevel =*/ false));
Expand All @@ -224,6 +314,7 @@ private static void fillField(
*
* @param protoMsg The protocol buffer message being constructed
* @param fieldDescriptor
* @param fieldSchema
* @param json If root level has no matching fields, throws exception.
* @param exactJsonKeyName Exact key name in JSONObject instead of lowercased version
* @param currentScope Debugging purposes
Expand All @@ -232,6 +323,7 @@ private static void fillField(
private static void fillRepeatedField(
DynamicMessage.Builder protoMsg,
FieldDescriptor fieldDescriptor,
TableFieldSchema fieldSchema,
JSONObject json,
String exactJsonKeyName,
String currentScope)
Expand Down Expand Up @@ -259,40 +351,81 @@ private static void fillRepeatedField(
}
break;
case BYTES:
if (val instanceof JSONArray) {
try {
byte[] bytes = new byte[((JSONArray) val).length()];
for (int j = 0; j < ((JSONArray) val).length(); j++) {
bytes[j] = (byte) ((JSONArray) val).getInt(j);
if (bytes[j] != ((JSONArray) val).getInt(j)) {
throw new IllegalArgumentException(
String.format(
"Error: "
+ currentScope
+ "["
+ index
+ "] could not be converted to byte[]."));
Boolean added = false;
if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.NUMERIC) {
if (val instanceof String) {
protoMsg.addRepeatedField(
fieldDescriptor,
BigDecimalByteStringEncoder.encodeToNumericByteString(
new BigDecimal((String) val)));
added = true;
}
} else if (fieldSchema != null
&& fieldSchema.getType() == TableFieldSchema.Type.BIGNUMERIC) {
if (val instanceof String) {
protoMsg.addRepeatedField(
fieldDescriptor,
BigDecimalByteStringEncoder.encodeToNumericByteString(
new BigDecimal((String) val)));
added = true;
}
}
if (!added) {
if (val instanceof JSONArray) {
try {
byte[] bytes = new byte[((JSONArray) val).length()];
for (int j = 0; j < ((JSONArray) val).length(); j++) {
bytes[j] = (byte) ((JSONArray) val).getInt(j);
if (bytes[j] != ((JSONArray) val).getInt(j)) {
throw new IllegalArgumentException(
String.format(
"Error: "
+ currentScope
+ "["
+ index
+ "] could not be converted to byte[]."));
}
}
protoMsg.addRepeatedField(fieldDescriptor, bytes);
} catch (JSONException e) {
throw new IllegalArgumentException(
String.format(
"Error: "
+ currentScope
+ "["
+ index
+ "] could not be converted to byte[]."));
}
protoMsg.addRepeatedField(fieldDescriptor, bytes);
} catch (JSONException e) {
throw new IllegalArgumentException(
String.format(
"Error: "
+ currentScope
+ "["
+ index
+ "] could not be converted to byte[]."));
} else if (val instanceof ByteString) {
protoMsg.addRepeatedField(fieldDescriptor, ((ByteString) val).toByteArray());
return;
} else {
fail = true;
}
} else if (val instanceof ByteString) {
protoMsg.addRepeatedField(fieldDescriptor, ((ByteString) val).toByteArray());
return;
} else {
fail = true;
}
break;
case INT64:
if (val instanceof Integer) {
if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.DATETIME) {
if (val instanceof String) {
protoMsg.addRepeatedField(
fieldDescriptor,
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.parse((String) val)));
} else if (val instanceof Long) {
protoMsg.addRepeatedField(fieldDescriptor, (Long) val);
} else {
fail = true;
}
} else if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.TIME) {
if (val instanceof String) {
protoMsg.addRepeatedField(
fieldDescriptor,
CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.parse((String) val)));
} else if (val instanceof Long) {
protoMsg.addRepeatedField(fieldDescriptor, (Long) val);
} else {
fail = true;
}
} else if (val instanceof Integer) {
protoMsg.addRepeatedField(fieldDescriptor, new Long((Integer) val));
} else if (val instanceof Long) {
protoMsg.addRepeatedField(fieldDescriptor, (Long) val);
Expand Down Expand Up @@ -330,6 +463,7 @@ private static void fillRepeatedField(
fieldDescriptor,
convertJsonToProtoMessageImpl(
fieldDescriptor.getMessageType(),
fieldSchema == null ? null : fieldSchema.getFieldsList(),
jsonArray.getJSONObject(i),
currentScope,
/*topLevel =*/ false));
Expand Down
Expand Up @@ -24,6 +24,7 @@
import com.google.api.gax.grpc.testing.LocalChannelProvider;
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.cloud.bigquery.storage.test.JsonTest;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Int64Value;
Expand All @@ -42,6 +43,7 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.threeten.bp.Instant;
import org.threeten.bp.LocalTime;

@RunWith(JUnit4.class)
public class JsonStreamWriterTest {
Expand Down Expand Up @@ -195,6 +197,56 @@ public void testSingleAppendSimpleJson() throws Exception {
}
}

@Test
public void testSpecialTypeAppend() throws Exception {
TableFieldSchema field =
TableFieldSchema.newBuilder()
.setName("time")
.setType(TableFieldSchema.Type.TIME)
.setMode(TableFieldSchema.Mode.REPEATED)
.build();
TableSchema tableSchema = TableSchema.newBuilder().addFields(field).build();

JsonTest.TestTime expectedProto =
JsonTest.TestTime.newBuilder()
.addTime(CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.of(1, 0, 1)))
.build();
JSONObject foo = new JSONObject();
foo.put("time", new JSONArray(new String[] {"01:00:01"}));
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);

try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {

testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.build());

ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
assertEquals(0L, appendFuture.get().getAppendResult().getOffset().getValue());
appendFuture.get();
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRows(0),
expectedProto.toByteString());
}
}

@Test
public void testSingleAppendMultipleSimpleJson() throws Exception {
FooType expectedProto = FooType.newBuilder().setFoo("allen").build();
Expand Down