Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery/storage/managedwriter/adapt): add NormalizeDescriptor #4681

Merged
merged 15 commits into from Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
134 changes: 134 additions & 0 deletions bigquery/storage/managedwriter/adapt/protoconversion.go
Expand Up @@ -279,3 +279,137 @@ func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, i
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
}, nil
}

// NormalizeDescriptor builds a self-contained DescriptorProto suitable for communicating schema
// information with the BigQuery Storage write API. It's primarily used for cases where users are
// interested in sending data using a predefined protocol buffer message.
//
// The storage API accepts a single DescriptorProto for decoding message data. In many cases, a message
// is comprised of multiple independent messages, from the same .proto file or from multiple sources. Rather
// than being forced to communicate all these messages independently, what this method does is rewrite the
// DescriptorProto to inline all messages as nested submessages. As the backend only cares about the types
// and not the namespaces when decoding, this is sufficient for the needs of the API's representation.
//
// In addition to nesting messages, this method also handles some encapsulation of enum types to avoid possible
// conflicts due to ambiguities.
func NormalizeDescriptor(in protoreflect.MessageDescriptor) (*descriptorpb.DescriptorProto, error) {
return normalizeDescriptorInternal(in, newStringSet(), newStringSet(), newStringSet(), nil)
}

func normalizeDescriptorInternal(in protoreflect.MessageDescriptor, visitedTypes, enumTypes, structTypes *stringSet, root *descriptorpb.DescriptorProto) (*descriptorpb.DescriptorProto, error) {
shollyman marked this conversation as resolved.
Show resolved Hide resolved
if in == nil {
return nil, fmt.Errorf("no messagedescriptor provided")
}
resultDP := &descriptorpb.DescriptorProto{}
if root == nil {
root = resultDP
}
fullProtoName := string(in.FullName())
resultDP.Name = proto.String(normalizeName(fullProtoName))
visitedTypes.add(fullProtoName)
for i := 0; i < in.Fields().Len(); i++ {
inField := in.Fields().Get(i)
resultFDP := protodesc.ToFieldDescriptorProto(inField)
if inField.Kind() == protoreflect.MessageKind || inField.Kind() == protoreflect.GroupKind {
// Handle fields that reference messages.
// Groups are a proto2-ism which predated nested messages.
msgFullName := string(inField.Message().FullName())
if !skipNormalization(msgFullName) {
// for everything but well known types, normalize.
normName := normalizeName(string(msgFullName))
if structTypes.contains(msgFullName) {
resultFDP.TypeName = proto.String(normName)
} else {
if visitedTypes.contains(msgFullName) {
return nil, fmt.Errorf("recursize type not supported: %s", inField.FullName())
}
visitedTypes.add(msgFullName)
dp, err := normalizeDescriptorInternal(inField.Message(), visitedTypes, enumTypes, structTypes, root)
if err != nil {
return nil, fmt.Errorf("error converting message %s: %v", inField.FullName(), err)
}
root.NestedType = append(root.NestedType, dp)
visitedTypes.delete(msgFullName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we delete this? I guess because "visited" is more about detecting recursive types? I wonder if there's a more descriptive name we could use? (Ancestor types?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose the names to keep it aligned with the other implementations, but its definitely something I think we could consider. Currently the internal C++ impl is the baseline, so we'd want to start there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. IMO the C++ impl could have better names, then. :-)

lastNested := root.GetNestedType()[len(root.GetNestedType())-1].GetName()
resultFDP.TypeName = proto.String(lastNested)
}
}
}
if inField.Kind() == protoreflect.EnumKind {
// For enums, in order to avoid value conflict, we will always define
// a enclosing struct called enum_full_name_E that includes the actual
// enum.
enumFullName := string(inField.Enum().FullName())
enclosingTypeName := normalizeName(enumFullName) + "_E"
enumName := string(inField.Enum().Name())
actualFullName := fmt.Sprintf("%s.%s", enclosingTypeName, enumName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it just enums that get the dots? Why is that? Oh, is it because we actually generated the enclosing struct so we know it's properly nested?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, gets deep into enum internals.

From the proto guide:

You can also use an enum type declared in one message as the type of a field in a different message, using the syntax MessageType.EnumType.

It's basically ensuring the enums don't collide by being explicit about the struct wrapping.

if enumTypes.contains(enumFullName) {
resultFDP.TypeName = proto.String(actualFullName)
} else {
enumDP := protodesc.ToEnumDescriptorProto(inField.Enum())
enumDP.Name = proto.String(enumName)
resultDP.NestedType = append(resultDP.NestedType, &descriptorpb.DescriptorProto{
Name: proto.String(enclosingTypeName),
EnumType: []*descriptorpb.EnumDescriptorProto{enumDP},
})
resultFDP.TypeName = proto.String(actualFullName)
enumTypes.add(enumFullName)
}
}
resultDP.Field = append(resultDP.Field, resultFDP)
}
structTypes.add(fullProtoName)
return resultDP, nil
}

type stringSet struct {
m map[string]struct{}
}

func (s *stringSet) contains(k string) bool {
_, ok := s.m[k]
return ok
}

func (s *stringSet) add(k string) {
s.m[k] = struct{}{}
}

func (s *stringSet) delete(k string) {
delete(s.m, k)
}

func newStringSet() *stringSet {
return &stringSet{
m: make(map[string]struct{}),
}
}

func normalizeName(in string) string {
return strings.Replace(in, ".", "_", -1)
}

// these types don't get normalized into the fully-contained structure.
var normalizationSkipList = []string{
/*
TODO: when backend supports resolving well known types, this list should be enabled.
"google.protobuf.DoubleValue",
"google.protobuf.FloatValue",
"google.protobuf.Int64Value",
"google.protobuf.UInt64Value",
"google.protobuf.Int32Value",
"google.protobuf.Uint32Value",
"google.protobuf.BoolValue",
"google.protobuf.StringValue",
"google.protobuf.BytesValue",
*/
}

func skipNormalization(fullName string) bool {
for _, v := range normalizationSkipList {
if v == fullName {
return true
}
}
return false
}