Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery/storage/managedwriter/adapt): add NormalizeDescriptor #4681

Merged
merged 15 commits into from Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
133 changes: 133 additions & 0 deletions bigquery/storage/managedwriter/adapt/protoconversion.go
Expand Up @@ -279,3 +279,136 @@ func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, i
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
}, nil
}

// NormalizeDescriptor builds a self-contained DescriptorProto suitable for communicating schema
// information with the BigQuery Storage write API. It's primarily used for cases where users are
// interested in sending data using a predefined protocol buffer message.
//
// The storage API accepts a single DescriptorProto for decoding message data. However, the nature
// of protocol buffers allows one to define complex messages using independent messages, which normally would
// then require passing of either multiple DescriptorProto messages, or something like a FileDescriptorProto.
//
// DescriptorProto allows one to communicate the structure of a single message, but does contain a mechanism
// for encoding inner nested messages. This function leverages that to encode the DescriptorProto by transforming
// external messages to nested messages, as well as handling other tasks like fully qualifying name references.
//
// Other details of note:
// * Well known types are not normalized.
// * Enums are encapsulated in an outer struct.
func NormalizeDescriptor(in protoreflect.MessageDescriptor) (*descriptorpb.DescriptorProto, error) {
return normalizeDescriptorInternal(in, newStringSet(), newStringSet(), newStringSet(), nil)
}

func normalizeDescriptorInternal(in protoreflect.MessageDescriptor, visitedTypes, enumTypes, structTypes *stringSet, root *descriptorpb.DescriptorProto) (*descriptorpb.DescriptorProto, error) {
shollyman marked this conversation as resolved.
Show resolved Hide resolved
if in == nil {
return nil, fmt.Errorf("no messagedescriptor provided")
}
resultDP := &descriptorpb.DescriptorProto{}
if root == nil {
root = resultDP
}
fullProtoName := string(in.FullName())
normalizedProtoName := normalizeName(fullProtoName)
resultDP.Name = proto.String(normalizedProtoName)
visitedTypes.add(fullProtoName)
for i := 0; i < in.Fields().Len(); i++ {
inField := in.Fields().Get(i)
resultFDP := protodesc.ToFieldDescriptorProto(inField)
if inField.Kind() == protoreflect.MessageKind || inField.Kind() == protoreflect.GroupKind {
// Handle fields that reference messages.
// Groups are a proto2-ism which predated nested messages.
msgFullName := string(inField.Message().FullName())
if !skipNormalization(msgFullName) {
// for everything but well known types, normalize.
normName := normalizeName(string(msgFullName))
if structTypes.contains(msgFullName) {
resultFDP.TypeName = proto.String(normName)
} else {
if visitedTypes.contains(msgFullName) {
return nil, fmt.Errorf("recursize type not supported: %s", inField.FullName())
}
visitedTypes.add(msgFullName)
dp, err := normalizeDescriptorInternal(inField.Message(), visitedTypes, enumTypes, structTypes, root)
if err != nil {
return nil, fmt.Errorf("error converting message %s: %v", inField.FullName(), err)
}
root.NestedType = append(root.NestedType, dp)
visitedTypes.delete(msgFullName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we delete this? I guess because "visited" is more about detecting recursive types? I wonder if there's a more descriptive name we could use? (Ancestor types?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose the names to keep it aligned with the other implementations, but its definitely something I think we could consider. Currently the internal C++ impl is the baseline, so we'd want to start there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. IMO the C++ impl could have better names, then. :-)

lastNested := root.GetNestedType()[len(root.GetNestedType())-1].GetName()
resultFDP.TypeName = proto.String(lastNested)
}
}
}
if inField.Kind() == protoreflect.EnumKind {
// Deal with enum types. BigQuery doesn't support enum types directly.
enumFullName := string(inField.Enum().FullName())
enclosingTypeName := normalizeName(enumFullName) + "_E"
enumName := string(inField.Enum().Name())
actualName := fmt.Sprintf("%s.%s", enclosingTypeName, enumName)
if enumTypes.contains(enumFullName) {
resultFDP.TypeName = proto.String(actualName)
} else {
enumDP := protodesc.ToEnumDescriptorProto(inField.Enum())
enumDP.Name = proto.String(enumName)
resultDP.NestedType = append(resultDP.NestedType, &descriptorpb.DescriptorProto{
Name: proto.String(enclosingTypeName),
EnumType: []*descriptorpb.EnumDescriptorProto{enumDP},
})
resultFDP.TypeName = proto.String(actualName)
enumTypes.add(enumFullName)
}
}
resultDP.Field = append(resultDP.Field, resultFDP)
}
structTypes.add(fullProtoName)
return resultDP, nil
}

type stringSet struct {
m map[string]struct{}
}

func (s *stringSet) contains(k string) bool {
_, ok := s.m[k]
return ok
}

func (s *stringSet) add(k string) {
s.m[k] = struct{}{}
}

func (s *stringSet) delete(k string) {
delete(s.m, k)
}

func newStringSet() *stringSet {
return &stringSet{
m: make(map[string]struct{}),
}
}

func normalizeName(in string) string {
return strings.Replace(in, ".", "_", -1)
}

// these types don't get normalized into the fully-contained structure.
var normalizationSkipList = []string{
".google.protobuf.DoubleValue",
".google.protobuf.FloatValue",
".google.protobuf.Int64Value",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support well known types now? Or is this to make sure they're supported when the backend adds support?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're well known, so the backend shouldn't have issues resolving them without repacking. Getting them to behave as expected for the proto3 case is still the ongoing discussion. I could do some more integration testing on this front, but I wanted to get some other changes in to the main client before adding them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If these are skipped, what is happening? Maybe it is good to have a e2e test to show the behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added integration testing, turns out the backend won't do the resolution:

integration_test.go:617: error in response: rpc error: code = InvalidArgument desc = Invalid proto schema: BqMessage.proto: testdata_WithWellKnownTypes.wrapped_int64: ".google.protobuf.Int64Value" is not defined.
        BqMessage.proto: testdata_WithWellKnownTypes.wrapped_string: ".google.protobuf.StringValue" is not defined. Entity: projects/shollyman-demo-test/datasets/managedwriter_test_dataset_20210827_73498187205915_0001/tables/table_20210827_73498187245922_0002/_default

Yiru, you want a bug for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what you want is a direct support on wrappers, which we currently don't have. So it should be a dup to the same bug. Since Pavan suggested a way of not needing an annotation, we could make the decision based on the BQ table schema (which means I can fix it sooner). But again, such behavior needs to be consistent across client libs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want direct support on wrappers, but that's tied up in the other proto3 topics. I did expect that the well known types would just work out of the box, even if the api treated them as record types with a single value field.

".google.protobuf.UInt64Value",
".google.protobuf.Int32Value",
".google.protobuf.Uint32Value",
".google.protobuf.BoolValue",
".google.protobuf.StringValue",
".google.protobuf.BytesValue",
}

func skipNormalization(fullName string) bool {
for _, v := range normalizationSkipList {
if v == fullName {
return true
}
}
return false
}
187 changes: 187 additions & 0 deletions bigquery/storage/managedwriter/adapt/protoconversion_test.go
Expand Up @@ -19,6 +19,7 @@ import (
"reflect"
"testing"

"cloud.google.com/go/bigquery/storage/managedwriter/testdata"
"github.com/google/go-cmp/cmp"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
"google.golang.org/protobuf/encoding/protojson"
Expand Down Expand Up @@ -350,3 +351,189 @@ func TestProtoJSONSerialization(t *testing.T) {
}

}

func TestNormalizeDescriptor(t *testing.T) {
testCases := []struct {
description string
in protoreflect.MessageDescriptor
wantErr bool
want *descriptorpb.DescriptorProto
}{
{
description: "nil",
in: nil,
wantErr: true,
},
{
description: "AllSupportedTypes",
in: (&testdata.AllSupportedTypes{}).ProtoReflect().Descriptor(),
want: &descriptorpb.DescriptorProto{
Name: proto.String("testdata_AllSupportedTypes"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("int32_value"),
JsonName: proto.String("int32Value"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_INT32.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("int64_value"),
JsonName: proto.String("int64Value"),
Number: proto.Int32(2),
Type: descriptorpb.FieldDescriptorProto_TYPE_INT64.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("uint32_value"),
JsonName: proto.String("uint32Value"),
Number: proto.Int32(3),
Type: descriptorpb.FieldDescriptorProto_TYPE_UINT32.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("uint64_value"),
JsonName: proto.String("uint64Value"),
Number: proto.Int32(4),
Type: descriptorpb.FieldDescriptorProto_TYPE_UINT64.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("float_value"),
JsonName: proto.String("floatValue"),
Number: proto.Int32(5),
Type: descriptorpb.FieldDescriptorProto_TYPE_FLOAT.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("double_value"),
JsonName: proto.String("doubleValue"),
Number: proto.Int32(6),
Type: descriptorpb.FieldDescriptorProto_TYPE_DOUBLE.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("bool_value"),
JsonName: proto.String("boolValue"),
Number: proto.Int32(7),
Type: descriptorpb.FieldDescriptorProto_TYPE_BOOL.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("enum_value"),
JsonName: proto.String("enumValue"),
TypeName: proto.String("testdata_TestEnum_E.TestEnum"),
Number: proto.Int32(8),
Type: descriptorpb.FieldDescriptorProto_TYPE_ENUM.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
{
Name: proto.String("string_value"),
JsonName: proto.String("stringValue"),
Number: proto.Int32(9),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_REQUIRED.Enum(),
},
},
NestedType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("testdata_TestEnum_E"),
EnumType: []*descriptorpb.EnumDescriptorProto{
{
Name: proto.String("TestEnum"),
Value: []*descriptorpb.EnumValueDescriptorProto{
{
Name: proto.String("TestEnum0"),
Number: proto.Int32(0),
},
{
Name: proto.String("TestEnum1"),
Number: proto.Int32(1),
},
},
},
},
},
},
},
},
{
description: "ContainsRecursive",
in: (&testdata.ContainsRecursive{}).ProtoReflect().Descriptor(),
wantErr: true,
},
{
description: "RecursiveTypeTopMessage",
in: (&testdata.RecursiveTypeTopMessage{}).ProtoReflect().Descriptor(),
wantErr: true,
},
{
description: "ComplexType",
in: (&testdata.ComplexType{}).ProtoReflect().Descriptor(),
want: &descriptorpb.DescriptorProto{
Name: proto.String("testdata_ComplexType"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("nested_repeated_type"),
JsonName: proto.String("nestedRepeatedType"),
Number: proto.Int32(1),
TypeName: proto.String("testdata_NestedType"),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum(),
},
{
Name: proto.String("inner_type"),
JsonName: proto.String("innerType"),
Number: proto.Int32(2),
TypeName: proto.String("testdata_InnerType"),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
},
},
NestedType: []*descriptorpb.DescriptorProto{
{
Name: proto.String("testdata_InnerType"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("value"),
JsonName: proto.String("value"),
Number: proto.Int32(1),
Type: descriptorpb.FieldDescriptorProto_TYPE_STRING.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum(),
},
},
},
{
Name: proto.String("testdata_NestedType"),
Field: []*descriptorpb.FieldDescriptorProto{
{
Name: proto.String("inner_type"),
JsonName: proto.String("innerType"),
Number: proto.Int32(1),
TypeName: proto.String("testdata_InnerType"),
Type: descriptorpb.FieldDescriptorProto_TYPE_MESSAGE.Enum(),
Label: descriptorpb.FieldDescriptorProto_LABEL_REPEATED.Enum(),
},
},
},
},
},
},
}

for _, tc := range testCases {
gotDP, err := NormalizeDescriptor(tc.in)

if tc.wantErr && err == nil {
t.Errorf("%s: wanted err but got success", tc.description)
continue
}
if !tc.wantErr && err != nil {
t.Errorf("%s: wanted success, got err: %v", tc.description, err)
continue
}
if diff := cmp.Diff(gotDP, tc.want, protocmp.Transform()); diff != "" {
t.Errorf("%s: -got, +want:\n%s", tc.description, diff)
}
}
}