Skip to content

Commit

Permalink
feat: Added BQSchemaToProtoSchema functionality along with test cases…
Browse files Browse the repository at this point in the history
…; checked for linting
  • Loading branch information
allenc3 committed Jul 1, 2020
1 parent 4d3bce6 commit 941b6e2
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 0 deletions.
6 changes: 6 additions & 0 deletions google-cloud-bigquerystorage/pom.xml
Expand Up @@ -108,6 +108,12 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20200518</version>
</dependency>


<!-- Test dependencies -->
<dependency>
Expand Down
@@ -0,0 +1,150 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1alpha2;

import com.google.common.collect.ImmutableMap;
import com.google.protobuf.DescriptorProtos.DescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
import com.google.protobuf.DescriptorProtos.FileDescriptorProto;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FileDescriptor;
import java.util.ArrayList;
import java.util.List;

/**
* A class that checks the schema compatibility between user schema in proto descriptor and Bigquery
* table schema. If this check is passed, then user can write to BigQuery table using the user
* schema, otherwise the write will fail.
*
* <p>The implementation as of now is not complete, which measn, if this check passed, there is
* still a possbility of writing will fail.
*/
public class JsonToProtoConverter {
private static ImmutableMap<Table.TableFieldSchema.Mode, FieldDescriptorProto.Label>
BQTableSchemaModeMap =
ImmutableMap.of(
Table.TableFieldSchema.Mode.NULLABLE, FieldDescriptorProto.Label.LABEL_OPTIONAL,
Table.TableFieldSchema.Mode.REPEATED, FieldDescriptorProto.Label.LABEL_REPEATED,
Table.TableFieldSchema.Mode.REQUIRED, FieldDescriptorProto.Label.LABEL_REQUIRED);

private static ImmutableMap<Table.TableFieldSchema.Type, FieldDescriptorProto.Type>
BQTableSchemaTypeMap =
new ImmutableMap.Builder<Table.TableFieldSchema.Type, FieldDescriptorProto.Type>()
.put(Table.TableFieldSchema.Type.BOOL, FieldDescriptorProto.Type.TYPE_BOOL)
.put(Table.TableFieldSchema.Type.BYTES, FieldDescriptorProto.Type.TYPE_BYTES)
.put(Table.TableFieldSchema.Type.DATE, FieldDescriptorProto.Type.TYPE_INT64)
.put(Table.TableFieldSchema.Type.DATETIME, FieldDescriptorProto.Type.TYPE_INT64)
.put(Table.TableFieldSchema.Type.DOUBLE, FieldDescriptorProto.Type.TYPE_DOUBLE)
.put(Table.TableFieldSchema.Type.GEOGRAPHY, FieldDescriptorProto.Type.TYPE_BYTES)
.put(Table.TableFieldSchema.Type.INT64, FieldDescriptorProto.Type.TYPE_INT64)
.put(Table.TableFieldSchema.Type.NUMERIC, FieldDescriptorProto.Type.TYPE_DOUBLE)
.put(Table.TableFieldSchema.Type.STRING, FieldDescriptorProto.Type.TYPE_STRING)
.put(Table.TableFieldSchema.Type.STRUCT, FieldDescriptorProto.Type.TYPE_MESSAGE)
.put(Table.TableFieldSchema.Type.TIME, FieldDescriptorProto.Type.TYPE_INT64)
.put(Table.TableFieldSchema.Type.TIMESTAMP, FieldDescriptorProto.Type.TYPE_INT64)
.build();

/**
* Converts Table.TableSchema to a Descriptors.Descriptor object.
*
* @param BQTableSchema
* @throws Descriptors.DescriptorValidationException
*/
public static Descriptor BQTableSchemaToProtoSchema(Table.TableSchema BQTableSchema)
throws Descriptors.DescriptorValidationException {
Descriptor descriptor = BQTableSchemaToProtoSchemaImpl(BQTableSchema, "root");
return descriptor;
}

/**
* Implementation that converts a Table.TableSchema to a Descriptors.Descriptor object.
*
* @param BQTableSchema
* @param scope Keeps track of current scope to prevent repeated naming while constructing
* descriptor.
* @throws Descriptors.DescriptorValidationException
*/
private static Descriptor BQTableSchemaToProtoSchemaImpl(
Table.TableSchema BQTableSchema, String scope)
throws Descriptors.DescriptorValidationException {
List<FileDescriptor> dependenciesList = new ArrayList<FileDescriptor>();
List<FieldDescriptorProto> fields = new ArrayList<FieldDescriptorProto>();
int index = 1;
for (Table.TableFieldSchema BQTableField : BQTableSchema.getFieldsList()) {
if (BQTableField.getType() == Table.TableFieldSchema.Type.STRUCT) {
String currentScope = scope + BQTableField.getName();
dependenciesList.add(
BQTableSchemaToProtoSchemaImpl(
Table.TableSchema.newBuilder()
.addAllFields(BQTableField.getFieldsList())
.build(),
currentScope)
.getFile());
fields.add(BQStructToProtoMessage(BQTableField, index++, currentScope));
} else {
fields.add(BQTableFieldToProtoField(BQTableField, index++));
}
}
FileDescriptor[] dependenciesArray = new FileDescriptor[dependenciesList.size()];
dependenciesArray = dependenciesList.toArray(dependenciesArray);
DescriptorProto descriptorProto =
DescriptorProto.newBuilder().setName(scope).addAllField(fields).build();
FileDescriptorProto fileDescriptorProto =
FileDescriptorProto.newBuilder().addMessageType(descriptorProto).build();
FileDescriptor fileDescriptor =
FileDescriptor.buildFrom(fileDescriptorProto, dependenciesArray);
Descriptor descriptor = fileDescriptor.findMessageTypeByName(scope);
return descriptor;
}

/**
* Constructs a FieldDescriptorProto for non-struct BQ fields.
*
* @param BQTableField BQ Field used to construct a FieldDescriptorProto
* @param index Index for protobuf fields.
*/
private static FieldDescriptorProto BQTableFieldToProtoField(
Table.TableFieldSchema BQTableField, int index) {
String fieldName = BQTableField.getName();
Table.TableFieldSchema.Mode mode = BQTableField.getMode();
return FieldDescriptorProto.newBuilder()
.setName(fieldName)
.setType((FieldDescriptorProto.Type) BQTableSchemaTypeMap.get(BQTableField.getType()))
.setLabel((FieldDescriptorProto.Label) BQTableSchemaModeMap.get(mode))
.setNumber(index)
.build();
}

/**
* Constructs a FieldDescriptorProto for a Struct type BQ field.
*
* @param BQTableField BQ Field used to construct a FieldDescriptorProto
* @param index Index for protobuf fields.
* @param scope Need scope to prevent naming issues
*/
private static FieldDescriptorProto BQStructToProtoMessage(
Table.TableFieldSchema BQTableField, int index, String scope) {
String fieldName = BQTableField.getName();
Table.TableFieldSchema.Mode mode = BQTableField.getMode();
return FieldDescriptorProto.newBuilder()
.setName(fieldName)
.setTypeName(scope)
.setLabel((FieldDescriptorProto.Label) BQTableSchemaModeMap.get(mode))
.setNumber(index)
.build();
}
}
@@ -0,0 +1,105 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1alpha2;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

import com.google.cloud.bigquery.storage.test.SchemaTest.*;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.FieldDescriptor;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class JsonToProtoConverterTest {
private static ImmutableMap<Table.TableFieldSchema.Type, Descriptor>
BQTableTypeToProtoDescriptor =
new ImmutableMap.Builder<Table.TableFieldSchema.Type, Descriptor>()
.put(Table.TableFieldSchema.Type.BOOL, BoolType.getDescriptor())
.put(Table.TableFieldSchema.Type.BYTES, BytesType.getDescriptor())
.put(Table.TableFieldSchema.Type.DATE, Int64Type.getDescriptor())
.put(Table.TableFieldSchema.Type.DATETIME, Int64Type.getDescriptor())
.put(Table.TableFieldSchema.Type.DOUBLE, DoubleType.getDescriptor())
.put(Table.TableFieldSchema.Type.GEOGRAPHY, BytesType.getDescriptor())
.put(Table.TableFieldSchema.Type.INT64, Int64Type.getDescriptor())
.put(Table.TableFieldSchema.Type.NUMERIC, DoubleType.getDescriptor())
.put(Table.TableFieldSchema.Type.STRING, StringType.getDescriptor())
.put(Table.TableFieldSchema.Type.TIME, Int64Type.getDescriptor())
.put(Table.TableFieldSchema.Type.TIMESTAMP, Int64Type.getDescriptor())
.build();

private boolean isDescriptorEqual(Descriptor convertedProto, Descriptor originalProto) {
for (FieldDescriptor convertedField : convertedProto.getFields()) {
FieldDescriptor originalField = originalProto.findFieldByName(convertedField.getName());
if (originalField == null) {
return false;
}
FieldDescriptor.Type convertedType = convertedField.getType();
FieldDescriptor.Type originalType = originalField.getType();
if (convertedType != originalType) {
return false;
}
if (convertedType == FieldDescriptor.Type.MESSAGE) {
if (!isDescriptorEqual(convertedField.getMessageType(), originalField.getMessageType())) {
return false;
}
}
}
return true;
}

@Test
public void testBQTableSchemaToProtoDescriptorSimpleTypes() throws Exception {
for (Map.Entry<Table.TableFieldSchema.Type, Descriptor> entry :
BQTableTypeToProtoDescriptor.entrySet()) {
Table.TableFieldSchema tableFieldSchema =
Table.TableFieldSchema.newBuilder()
.setType(entry.getKey())
.setMode(Table.TableFieldSchema.Mode.NULLABLE)
.setName("test_field_type")
.build();
Table.TableSchema tableSchema =
Table.TableSchema.newBuilder().addFields(0, tableFieldSchema).build();
Descriptor descriptor = JsonToProtoConverter.BQTableSchemaToProtoSchema(tableSchema);
assertTrue(isDescriptorEqual(descriptor, entry.getValue()));
}
}

@Test
public void testBQTableSchemaToProtoDescriptorComplex() throws Exception {
Table.TableFieldSchema StringType =
Table.TableFieldSchema.newBuilder()
.setType(Table.TableFieldSchema.Type.STRING)
.setMode(Table.TableFieldSchema.Mode.NULLABLE)
.setName("test_field_type")
.build();
Table.TableFieldSchema tableFieldSchema =
Table.TableFieldSchema.newBuilder()
.setType(Table.TableFieldSchema.Type.STRUCT)
.setMode(Table.TableFieldSchema.Mode.NULLABLE)
.setName("test_field_type")
.addFields(0, StringType)
.build();
Table.TableSchema tableSchema =
Table.TableSchema.newBuilder().addFields(0, tableFieldSchema).build();
Descriptor descriptor = JsonToProtoConverter.BQTableSchemaToProtoSchema(tableSchema);
assertTrue(isDescriptorEqual(descriptor, MessageType.getDescriptor()));
}
}

0 comments on commit 941b6e2

Please sign in to comment.