Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery/storage/managedwriter): improve protobuf support #4589

Merged
merged 14 commits into from Aug 16, 2021
Merged
63 changes: 42 additions & 21 deletions bigquery/storage/managedwriter/adapt/protoconversion.go
Expand Up @@ -27,15 +27,25 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"
)

// bqModeToFieldLabelMap holds mapping from field schema mode to proto label.
// proto3 no longer allows use of REQUIRED labels, so we solve that elsewhere
// and simply use optional.
var bqModeToFieldLabelMap = map[storagepb.TableFieldSchema_Mode]descriptorpb.FieldDescriptorProto_Label{
var bqModeToFieldLabelMapProto2 = map[storagepb.TableFieldSchema_Mode]descriptorpb.FieldDescriptorProto_Label{
storagepb.TableFieldSchema_NULLABLE: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL,
storagepb.TableFieldSchema_REPEATED: descriptorpb.FieldDescriptorProto_LABEL_REPEATED,
storagepb.TableFieldSchema_REQUIRED: descriptorpb.FieldDescriptorProto_LABEL_REQUIRED,
}

var bqModeToFieldLabelMapProto3 = map[storagepb.TableFieldSchema_Mode]descriptorpb.FieldDescriptorProto_Label{
storagepb.TableFieldSchema_NULLABLE: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL,
storagepb.TableFieldSchema_REPEATED: descriptorpb.FieldDescriptorProto_LABEL_REPEATED,
storagepb.TableFieldSchema_REQUIRED: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL,
}

func convertModeToLabel(mode storagepb.TableFieldSchema_Mode, useProto3 bool) *descriptorpb.FieldDescriptorProto_Label {
if useProto3 {
return bqModeToFieldLabelMapProto3[mode].Enum()
}
return bqModeToFieldLabelMapProto2[mode].Enum()
}

// Allows conversion between BQ schema type and FieldDescriptorProto's type.
var bqTypeToFieldTypeMap = map[storagepb.TableFieldSchema_Type]descriptorpb.FieldDescriptorProto_Type{
storagepb.TableFieldSchema_BIGNUMERIC: descriptorpb.FieldDescriptorProto_TYPE_BYTES,
Expand Down Expand Up @@ -106,15 +116,24 @@ func (dm dependencyCache) add(schema *storagepb.TableSchema, descriptor protoref
return nil
}

// StorageSchemaToDescriptor builds a protoreflect.Descriptor for a given table schema.
func StorageSchemaToDescriptor(inSchema *storagepb.TableSchema, scope string) (protoreflect.Descriptor, error) {
// StorageSchemaToProto2Descriptor builds a protoreflect.Descriptor for a given table schema using proto2 syntax.
func StorageSchemaToProto2Descriptor(inSchema *storagepb.TableSchema, scope string) (protoreflect.Descriptor, error) {
dc := make(dependencyCache)
// TODO: b/193064992 tracks support for wrapper types. In the interim, disable wrapper usage.
return storageSchemaToDescriptorInternal(inSchema, scope, &dc, false)
}

// StorageSchemaToProto3Descriptor builds a protoreflect.Descriptor for a given table schema using proto3 syntax.
//
// NOTE: Currently the write API doesn't yet support proto3 behaviors (default value, wrapper types, etc), but this is provided for
// completeness.
func StorageSchemaToProto3Descriptor(inSchema *storagepb.TableSchema, scope string) (protoreflect.Descriptor, error) {
dc := make(dependencyCache)
return storageSchemaToDescriptorInternal(inSchema, scope, &dc, true)
}

// internal implementation of the conversion code.
func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope string, cache *dependencyCache, allowWrapperTypes bool) (protoreflect.Descriptor, error) {
func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope string, cache *dependencyCache, useProto3 bool) (protoreflect.Descriptor, error) {
if inSchema == nil {
return nil, newConversionError(scope, fmt.Errorf("no input schema was provided"))
}
Expand Down Expand Up @@ -145,7 +164,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
deps = append(deps, foundDesc.ParentFile())
}
// construct field descriptor for the message
fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, string(foundDesc.FullName()), allowWrapperTypes)
fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, string(foundDesc.FullName()), useProto3)
if err != nil {
return nil, newConversionError(scope, fmt.Errorf("couldn't convert field to FieldDescriptorProto: %v", err))
}
Expand All @@ -155,7 +174,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
ts := &storagepb.TableSchema{
Fields: f.GetFields(),
}
desc, err := storageSchemaToDescriptorInternal(ts, currentScope, cache, allowWrapperTypes)
desc, err := storageSchemaToDescriptorInternal(ts, currentScope, cache, useProto3)
if err != nil {
return nil, newConversionError(currentScope, fmt.Errorf("couldn't convert message: %v", err))
}
Expand All @@ -166,14 +185,14 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
if err != nil {
return nil, newConversionError(currentScope, fmt.Errorf("failed to add descriptor to dependency cache: %v", err))
}
fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, allowWrapperTypes)
fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
if err != nil {
return nil, newConversionError(currentScope, fmt.Errorf("couldn't compute field schema : %v", err))
}
fields = append(fields, fdp)
}
} else {
fd, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, allowWrapperTypes)
fd, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
if err != nil {
return nil, newConversionError(currentScope, err)
}
Expand Down Expand Up @@ -201,6 +220,9 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
Syntax: proto.String("proto3"),
Dependency: depNames,
}
if !useProto3 {
fdp.Syntax = proto.String("proto2")
}

// We'll need a FileDescriptorSet as we have a FileDescriptorProto for the current
// descriptor we're building, but we need to include all the referenced dependencies.
Expand All @@ -223,33 +245,32 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
}

// tableFieldSchemaToFieldDescriptorProto builds individual field descriptors for a proto message.
// We're using proto3 syntax, but BigQuery supports the notion of NULLs which conflicts with proto3 default value
// behavior. To enable it, we look for nullable fields in the schema that should be scalars, and use the
// well-known wrapper types.
//
// For proto3, in cases where the mode is nullable we use the well known wrapper types.
// For proto2, we propagate the mode->label annotation as expected.
//
// Messages are always nullable, and repeated fields are as well.
func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, idx int32, scope string, allowWrapperTypes bool) (*descriptorpb.FieldDescriptorProto, error) {
func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, idx int32, scope string, useProto3 bool) (*descriptorpb.FieldDescriptorProto, error) {
name := strings.ToLower(field.GetName())
if field.GetType() == storagepb.TableFieldSchema_STRUCT {
return &descriptorpb.FieldDescriptorProto{
Name: proto.String(name),
Number: proto.Int32(idx),
TypeName: proto.String(scope),
Label: bqModeToFieldLabelMap[field.GetMode()].Enum(),
Label: convertModeToLabel(field.GetMode(), useProto3),
}, nil
}

// For (REQUIRED||REPEATED) fields, we use the expected scalar types, but the proto is
// still marked OPTIONAL (proto3 semantics).
if field.GetMode() != storagepb.TableFieldSchema_NULLABLE || !allowWrapperTypes {
// For (REQUIRED||REPEATED) fields for proto3, or all cases for proto2, we can use the expected scalar types.
if field.GetMode() != storagepb.TableFieldSchema_NULLABLE || !useProto3 {
return &descriptorpb.FieldDescriptorProto{
Name: proto.String(name),
Number: proto.Int32(idx),
Type: bqTypeToFieldTypeMap[field.GetType()].Enum(),
Label: bqModeToFieldLabelMap[field.GetMode()].Enum(),
Label: convertModeToLabel(field.GetMode(), useProto3),
}, nil
}
// For NULLABLE, optionally use wrapper types.
// For NULLABLE proto3 fields, use a wrapper type.
return &descriptorpb.FieldDescriptorProto{
Name: proto.String(name),
Number: proto.Int32(idx),
Expand Down
97 changes: 85 additions & 12 deletions bigquery/storage/managedwriter/adapt/protoconversion_test.go
Expand Up @@ -34,7 +34,8 @@ func TestSchemaToProtoConversion(t *testing.T) {
testCases := []struct {
description string
bq *storagepb.TableSchema
want *descriptorpb.DescriptorProto
wantProto2 *descriptorpb.DescriptorProto
wantProto3 *descriptorpb.DescriptorProto
}{
{
description: "basic",
Expand All @@ -44,14 +45,27 @@ func TestSchemaToProtoConversion(t *testing.T) {
{Name: "bar", Type: storagepb.TableFieldSchema_INT64, Mode: storagepb.TableFieldSchema_REQUIRED},
{Name: "baz", Type: storagepb.TableFieldSchema_BYTES, Mode: storagepb.TableFieldSchema_REPEATED},
}},
want: &descriptorpb.DescriptorProto{
wantProto2: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("foo"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()},
{Name: proto.String("bar"), Number: proto.Int32(2), Type: descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(), Label: descriptorpb.FieldDescriptorProto_LABEL_REQUIRED.Enum()},
{Name: proto.String("baz"), Number: proto.Int32(3), Type: descriptorpb.FieldDescriptorProto_TYPE_BYTES.Enum(), Label: descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()},
},
},
wantProto3: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("foo"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String(".google.protobuf.StringValue"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()},
{Name: proto.String("bar"), Number: proto.Int32(2), Type: descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(), Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum()},
{Name: proto.String("baz"), Number: proto.Int32(3), Type: descriptorpb.FieldDescriptorProto_TYPE_BYTES.Enum(), Label: descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum()},
},
Expand All @@ -74,7 +88,7 @@ func TestSchemaToProtoConversion(t *testing.T) {
},
},
},
want: &descriptorpb.DescriptorProto{
wantProto2: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Expand All @@ -92,6 +106,25 @@ func TestSchemaToProtoConversion(t *testing.T) {
},
},
},
wantProto3: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("curdate"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String(".google.protobuf.Int32Value"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("rec"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String(".root__rec"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
},
},
{
// We expect to re-use the submessage twice, as the schema contains two identical structs.
Expand Down Expand Up @@ -119,7 +152,7 @@ func TestSchemaToProtoConversion(t *testing.T) {
},
},
},
want: &descriptorpb.DescriptorProto{
wantProto2: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Expand All @@ -144,24 +177,64 @@ func TestSchemaToProtoConversion(t *testing.T) {
},
},
},
wantProto3: &descriptorpb.DescriptorProto{
Name: proto.String("root"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("curdate"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String(".google.protobuf.Int32Value"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("rec1"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String(".root__rec1"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("rec2"),
Number: proto.Int32(3),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
TypeName: proto.String(".root__rec1"),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
},
},
}
for _, tc := range testCases {
d, err := StorageSchemaToDescriptor(tc.bq, "root")
p2d, err := StorageSchemaToProto2Descriptor(tc.bq, "root")
if err != nil {
t.Fatalf("case (%s) failed conversion: %v", tc.description, err)
t.Errorf("case (%s) failed proto2 conversion: %v", tc.description, err)
}

// convert it to DP form
mDesc, ok := d.(protoreflect.MessageDescriptor)
mDesc, ok := p2d.(protoreflect.MessageDescriptor)
if !ok {
t.Fatalf("%s: couldn't convert to messagedescriptor", tc.description)
t.Errorf("%s: couldn't convert proto2 to messagedescriptor", tc.description)
}
gotDP := protodesc.ToDescriptorProto(mDesc)
if diff := cmp.Diff(gotDP, tc.wantProto2, protocmp.Transform()); diff != "" {
t.Errorf("%s proto2: -got, +want:\n%s", tc.description, diff)
}

if diff := cmp.Diff(gotDP, tc.want, protocmp.Transform()); diff != "" {
t.Fatalf("%s: -got, +want:\n%s", tc.description, diff)
p3d, err := StorageSchemaToProto3Descriptor(tc.bq, "root")
if err != nil {
t.Fatalf("case (%s) failed proto3 conversion: %v", tc.description, err)
}

mDesc, ok = p3d.(protoreflect.MessageDescriptor)
if !ok {
t.Errorf("%s: couldn't convert proto3 to messagedescriptor", tc.description)
}
gotDP = protodesc.ToDescriptorProto(mDesc)
if diff := cmp.Diff(gotDP, tc.wantProto3, protocmp.Transform()); diff != "" {
t.Errorf("%s proto3: -got, +want:\n%s", tc.description, diff)
}

}
}

Expand All @@ -182,7 +255,7 @@ func TestProtoJSONSerialization(t *testing.T) {
},
}

descriptor, err := StorageSchemaToDescriptor(sourceSchema, "root")
descriptor, err := StorageSchemaToProto2Descriptor(sourceSchema, "root")
if err != nil {
t.Fatalf("failed to construct descriptor")
}
Expand Down Expand Up @@ -229,7 +302,7 @@ func TestProtoJSONSerialization(t *testing.T) {
Name: proto.String("key"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_REQUIRED.Enum(),
},
{
Name: proto.String("value"),
Expand Down