Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: JsonWriter accepts string input for DATETIME, TIME, NUMERIC, BIGNUMERIC field #1339

Merged
merged 18 commits into from Oct 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -56,13 +56,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage'
If you are using Gradle without BOM, add this to your dependencies

```Groovy
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.3.1'
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.3.2'
```

If you are using SBT, add this to your dependencies

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.3.1"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.3.2"
```

## Authentication
Expand Down
Expand Up @@ -75,6 +75,7 @@ private JsonStreamWriter(Builder builder)
builder.traceId);
this.streamWriter = streamWriterBuilder.build();
this.streamName = builder.streamName;
this.tableSchema = builder.tableSchema;
}

/**
Expand Down Expand Up @@ -105,7 +106,8 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
// of JSON data.
for (int i = 0; i < jsonArr.length(); i++) {
JSONObject json = jsonArr.getJSONObject(i);
Message protoMessage = JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, json);
Message protoMessage =
JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, this.tableSchema, json);
rowsBuilder.addSerializedRows(protoMessage.toByteString());
}
// Need to make sure refreshAppendAndSetDescriptor finish first before this can run
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigquery.storage.v1beta2;

import com.google.api.pathtemplate.ValidationException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
Expand All @@ -23,10 +24,14 @@
import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import com.google.protobuf.UninitializedMessageException;
import java.math.BigDecimal;
import java.util.List;
import java.util.logging.Logger;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.threeten.bp.LocalDateTime;
import org.threeten.bp.LocalTime;

/**
* Converts Json data to protocol buffer messages given the protocol buffer descriptor. The protobuf
Expand Down Expand Up @@ -58,7 +63,28 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");

return convertJsonToProtoMessageImpl(protoSchema, json, "root", /*topLevel=*/ true);
return convertJsonToProtoMessageImpl(protoSchema, null, json, "root", /*topLevel=*/ true);
}

/**
* Converts Json data to protocol buffer messages given the protocol buffer descriptor.
*
* @param protoSchema
* @param tableSchema bigquery table schema is needed for type conversion of DATETIME, TIME,
* NUMERIC, BIGNUMERIC
* @param json
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
*/
public static DynamicMessage convertJsonToProtoMessage(
Descriptor protoSchema, TableSchema tableSchema, JSONObject json)
throws IllegalArgumentException {
Preconditions.checkNotNull(json, "JSONObject is null.");
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");

return convertJsonToProtoMessageImpl(
protoSchema, tableSchema.getFieldsList(), json, "root", /*topLevel=*/ true);
}

/**
Expand All @@ -71,9 +97,12 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
*/
private static DynamicMessage convertJsonToProtoMessageImpl(
Descriptor protoSchema, JSONObject json, String jsonScope, boolean topLevel)
Descriptor protoSchema,
List<TableFieldSchema> tableSchema,
JSONObject json,
String jsonScope,
boolean topLevel)
throws IllegalArgumentException {

DynamicMessage.Builder protoMsg = DynamicMessage.newBuilder(protoSchema);
String[] jsonNames = JSONObject.getNames(json);
if (jsonNames == null) {
Expand All @@ -90,10 +119,25 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
throw new IllegalArgumentException(
String.format("JSONObject has fields unknown to BigQuery: %s.", currentScope));
}
TableFieldSchema fieldSchema = null;
if (tableSchema != null) {
// protoSchema is generated from tableSchema so their field ordering should match.
fieldSchema = tableSchema.get(field.getIndex());
if (!fieldSchema.getName().equals(field.getName())) {
throw new ValidationException(
"Field at index "
+ field.getIndex()
+ " has mismatch names ("
+ fieldSchema.getName()
+ ") ("
+ field.getName()
+ ")");
}
}
if (!field.isRepeated()) {
fillField(protoMsg, field, json, jsonName, currentScope);
fillField(protoMsg, field, fieldSchema, json, jsonName, currentScope);
} else {
fillRepeatedField(protoMsg, field, json, jsonName, currentScope);
fillRepeatedField(protoMsg, field, fieldSchema, json, jsonName, currentScope);
}
}

Expand Down Expand Up @@ -127,6 +171,7 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
private static void fillField(
DynamicMessage.Builder protoMsg,
FieldDescriptor fieldDescriptor,
TableFieldSchema fieldSchema,
JSONObject json,
String exactJsonKeyName,
String currentScope)
Expand All @@ -144,6 +189,25 @@ private static void fillField(
}
break;
case BYTES:
if (fieldSchema != null) {
if (fieldSchema.getType() == TableFieldSchema.Type.NUMERIC) {
if (val instanceof String) {
protoMsg.setField(
fieldDescriptor,
BigDecimalByteStringEncoder.encodeToNumericByteString(
new BigDecimal((String) val)));
return;
}
} else if (fieldSchema.getType() == TableFieldSchema.Type.BIGNUMERIC) {
if (val instanceof String) {
protoMsg.setField(
fieldDescriptor,
BigDecimalByteStringEncoder.encodeToNumericByteString(
new BigDecimal((String) val)));
return;
}
}
}
if (val instanceof ByteString) {
protoMsg.setField(fieldDescriptor, ((ByteString) val).toByteArray());
return;
Expand All @@ -170,6 +234,29 @@ private static void fillField(
}
break;
case INT64:
if (fieldSchema != null) {
if (fieldSchema.getType() == TableFieldSchema.Type.DATETIME) {
if (val instanceof String) {
protoMsg.setField(
fieldDescriptor,
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.parse((String) val)));
return;
} else if (val instanceof Long) {
protoMsg.setField(fieldDescriptor, (Long) val);
return;
}
} else if (fieldSchema.getType() == TableFieldSchema.Type.TIME) {
if (val instanceof String) {
protoMsg.setField(
fieldDescriptor,
CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.parse((String) val)));
return;
} else if (val instanceof Long) {
protoMsg.setField(fieldDescriptor, (Long) val);
return;
}
}
}
if (val instanceof Integer) {
protoMsg.setField(fieldDescriptor, new Long((Integer) val));
return;
Expand Down Expand Up @@ -206,6 +293,7 @@ private static void fillField(
fieldDescriptor,
convertJsonToProtoMessageImpl(
fieldDescriptor.getMessageType(),
fieldSchema == null ? null : fieldSchema.getFieldsList(),
json.getJSONObject(exactJsonKeyName),
currentScope,
/*topLevel =*/ false));
Expand All @@ -232,6 +320,7 @@ private static void fillField(
private static void fillRepeatedField(
DynamicMessage.Builder protoMsg,
FieldDescriptor fieldDescriptor,
TableFieldSchema fieldSchema,
JSONObject json,
String exactJsonKeyName,
String currentScope)
Expand Down Expand Up @@ -259,40 +348,81 @@ private static void fillRepeatedField(
}
break;
case BYTES:
if (val instanceof JSONArray) {
try {
byte[] bytes = new byte[((JSONArray) val).length()];
for (int j = 0; j < ((JSONArray) val).length(); j++) {
bytes[j] = (byte) ((JSONArray) val).getInt(j);
if (bytes[j] != ((JSONArray) val).getInt(j)) {
throw new IllegalArgumentException(
String.format(
"Error: "
+ currentScope
+ "["
+ index
+ "] could not be converted to byte[]."));
Boolean added = false;
if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.NUMERIC) {
if (val instanceof String) {
protoMsg.addRepeatedField(
fieldDescriptor,
BigDecimalByteStringEncoder.encodeToNumericByteString(
new BigDecimal((String) val)));
added = true;
}
} else if (fieldSchema != null
&& fieldSchema.getType() == TableFieldSchema.Type.BIGNUMERIC) {
if (val instanceof String) {
protoMsg.addRepeatedField(
fieldDescriptor,
BigDecimalByteStringEncoder.encodeToNumericByteString(
new BigDecimal((String) val)));
added = true;
}
}
if (!added) {
if (val instanceof JSONArray) {
try {
byte[] bytes = new byte[((JSONArray) val).length()];
for (int j = 0; j < ((JSONArray) val).length(); j++) {
bytes[j] = (byte) ((JSONArray) val).getInt(j);
if (bytes[j] != ((JSONArray) val).getInt(j)) {
throw new IllegalArgumentException(
String.format(
"Error: "
+ currentScope
+ "["
+ index
+ "] could not be converted to byte[]."));
}
}
protoMsg.addRepeatedField(fieldDescriptor, bytes);
} catch (JSONException e) {
throw new IllegalArgumentException(
String.format(
"Error: "
+ currentScope
+ "["
+ index
+ "] could not be converted to byte[]."));
}
protoMsg.addRepeatedField(fieldDescriptor, bytes);
} catch (JSONException e) {
throw new IllegalArgumentException(
String.format(
"Error: "
+ currentScope
+ "["
+ index
+ "] could not be converted to byte[]."));
} else if (val instanceof ByteString) {
protoMsg.addRepeatedField(fieldDescriptor, ((ByteString) val).toByteArray());
return;
} else {
fail = true;
}
} else if (val instanceof ByteString) {
protoMsg.addRepeatedField(fieldDescriptor, ((ByteString) val).toByteArray());
return;
} else {
fail = true;
}
break;
case INT64:
if (val instanceof Integer) {
if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.DATETIME) {
if (val instanceof String) {
protoMsg.addRepeatedField(
fieldDescriptor,
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.parse((String) val)));
} else if (val instanceof Long) {
protoMsg.addRepeatedField(fieldDescriptor, (Long) val);
} else {
fail = true;
}
} else if (fieldSchema != null && fieldSchema.getType() == TableFieldSchema.Type.TIME) {
if (val instanceof String) {
protoMsg.addRepeatedField(
fieldDescriptor,
CivilTimeEncoder.encodePacked64TimeMicros(LocalTime.parse((String) val)));
} else if (val instanceof Long) {
protoMsg.addRepeatedField(fieldDescriptor, (Long) val);
} else {
fail = true;
}
} else if (val instanceof Integer) {
protoMsg.addRepeatedField(fieldDescriptor, new Long((Integer) val));
} else if (val instanceof Long) {
protoMsg.addRepeatedField(fieldDescriptor, (Long) val);
Expand Down Expand Up @@ -330,6 +460,7 @@ private static void fillRepeatedField(
fieldDescriptor,
convertJsonToProtoMessageImpl(
fieldDescriptor.getMessageType(),
fieldSchema == null ? null : fieldSchema.getFieldsList(),
jsonArray.getJSONObject(i),
currentScope,
/*topLevel =*/ false));
Expand Down