Skip to content

Commit

Permalink
feat(bigquery/storage/managedwriter/adapt): add NormalizeDescriptor (#…
Browse files Browse the repository at this point in the history
…4681)

* feat(bigquery/storage/managedwriter/adapt): add NormalizeDescriptor

This functionality supports the "bring your own proto" case for writing
data.

Towards: #4366
  • Loading branch information
shollyman committed Aug 27, 2021
1 parent b9226eb commit c54aa74
Show file tree
Hide file tree
Showing 6 changed files with 1,424 additions and 1 deletion.
134 changes: 134 additions & 0 deletions bigquery/storage/managedwriter/adapt/protoconversion.go
Expand Up @@ -279,3 +279,137 @@ func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, i
Label: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL.Enum(),
}, nil
}

// NormalizeDescriptor builds a self-contained DescriptorProto suitable for communicating schema
// information with the BigQuery Storage write API. It's primarily used for cases where users are
// interested in sending data using a predefined protocol buffer message.
//
// The storage API accepts a single DescriptorProto for decoding message data. In many cases, a message
// is comprised of multiple independent messages, from the same .proto file or from multiple sources. Rather
// than being forced to communicate all these messages independently, what this method does is rewrite the
// DescriptorProto to inline all messages as nested submessages. As the backend only cares about the types
// and not the namespaces when decoding, this is sufficient for the needs of the API's representation.
//
// In addition to nesting messages, this method also handles some encapsulation of enum types to avoid possible
// conflicts due to ambiguities.
func NormalizeDescriptor(in protoreflect.MessageDescriptor) (*descriptorpb.DescriptorProto, error) {
return normalizeDescriptorInternal(in, newStringSet(), newStringSet(), newStringSet(), nil)
}

func normalizeDescriptorInternal(in protoreflect.MessageDescriptor, visitedTypes, enumTypes, structTypes *stringSet, root *descriptorpb.DescriptorProto) (*descriptorpb.DescriptorProto, error) {
if in == nil {
return nil, fmt.Errorf("no messagedescriptor provided")
}
resultDP := &descriptorpb.DescriptorProto{}
if root == nil {
root = resultDP
}
fullProtoName := string(in.FullName())
resultDP.Name = proto.String(normalizeName(fullProtoName))
visitedTypes.add(fullProtoName)
for i := 0; i < in.Fields().Len(); i++ {
inField := in.Fields().Get(i)
resultFDP := protodesc.ToFieldDescriptorProto(inField)
if inField.Kind() == protoreflect.MessageKind || inField.Kind() == protoreflect.GroupKind {
// Handle fields that reference messages.
// Groups are a proto2-ism which predated nested messages.
msgFullName := string(inField.Message().FullName())
if !skipNormalization(msgFullName) {
// for everything but well known types, normalize.
normName := normalizeName(string(msgFullName))
if structTypes.contains(msgFullName) {
resultFDP.TypeName = proto.String(normName)
} else {
if visitedTypes.contains(msgFullName) {
return nil, fmt.Errorf("recursize type not supported: %s", inField.FullName())
}
visitedTypes.add(msgFullName)
dp, err := normalizeDescriptorInternal(inField.Message(), visitedTypes, enumTypes, structTypes, root)
if err != nil {
return nil, fmt.Errorf("error converting message %s: %v", inField.FullName(), err)
}
root.NestedType = append(root.NestedType, dp)
visitedTypes.delete(msgFullName)
lastNested := root.GetNestedType()[len(root.GetNestedType())-1].GetName()
resultFDP.TypeName = proto.String(lastNested)
}
}
}
if inField.Kind() == protoreflect.EnumKind {
// For enums, in order to avoid value conflict, we will always define
// a enclosing struct called enum_full_name_E that includes the actual
// enum.
enumFullName := string(inField.Enum().FullName())
enclosingTypeName := normalizeName(enumFullName) + "_E"
enumName := string(inField.Enum().Name())
actualFullName := fmt.Sprintf("%s.%s", enclosingTypeName, enumName)
if enumTypes.contains(enumFullName) {
resultFDP.TypeName = proto.String(actualFullName)
} else {
enumDP := protodesc.ToEnumDescriptorProto(inField.Enum())
enumDP.Name = proto.String(enumName)
resultDP.NestedType = append(resultDP.NestedType, &descriptorpb.DescriptorProto{
Name: proto.String(enclosingTypeName),
EnumType: []*descriptorpb.EnumDescriptorProto{enumDP},
})
resultFDP.TypeName = proto.String(actualFullName)
enumTypes.add(enumFullName)
}
}
resultDP.Field = append(resultDP.Field, resultFDP)
}
structTypes.add(fullProtoName)
return resultDP, nil
}

type stringSet struct {
m map[string]struct{}
}

func (s *stringSet) contains(k string) bool {
_, ok := s.m[k]
return ok
}

func (s *stringSet) add(k string) {
s.m[k] = struct{}{}
}

func (s *stringSet) delete(k string) {
delete(s.m, k)
}

func newStringSet() *stringSet {
return &stringSet{
m: make(map[string]struct{}),
}
}

func normalizeName(in string) string {
return strings.Replace(in, ".", "_", -1)
}

// these types don't get normalized into the fully-contained structure.
var normalizationSkipList = []string{
/*
TODO: when backend supports resolving well known types, this list should be enabled.
"google.protobuf.DoubleValue",
"google.protobuf.FloatValue",
"google.protobuf.Int64Value",
"google.protobuf.UInt64Value",
"google.protobuf.Int32Value",
"google.protobuf.Uint32Value",
"google.protobuf.BoolValue",
"google.protobuf.StringValue",
"google.protobuf.BytesValue",
*/
}

func skipNormalization(fullName string) bool {
for _, v := range normalizationSkipList {
if v == fullName {
return true
}
}
return false
}

0 comments on commit c54aa74

Please sign in to comment.