Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat(bigquery/storage/managedwriter): improve protobuf support (#4589)
* feat(bigquery/storage/managedwriter): improve protobuf support

After some ongoing discussions with the Storage API team, this PR
improves support for proto2/proto3 syntax in protocol buffer code.

* Updates testdata so that we have a proto2 and proto3 form of
  our SimpleMessage data.

* Add reference schemas to testdata.

* Updates proto conversion code in the adapt package so it's creating
  proto2 messages by default.  The code paths for doing proto3
  conversions are present, but not exported yet as the storage API
  doesn't properly handle proto3 expectations for conversion. Namely,
  conversion doesn't properly account for default values and use of
  wrapper types.

* Adds benchmarks for dynamic schema generation and static
  serialization, to aid in some internal discussions.
  • Loading branch information
shollyman committed Aug 16, 2021
1 parent 84f86a6 commit a455082
Show file tree
Hide file tree
Showing 11 changed files with 1,607 additions and 262 deletions.
219 changes: 219 additions & 0 deletions bigquery/storage/managedwriter/adapt/protobenchmarks_test.go
@@ -0,0 +1,219 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package adapt_test

import (
"fmt"
"testing"
"time"

"cloud.google.com/go/bigquery"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
"cloud.google.com/go/bigquery/storage/managedwriter/testdata"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/known/wrapperspb"
)

var benchDescriptor protoreflect.Descriptor

func BenchmarkStorageSchemaToDescriptor(b *testing.B) {
syntaxLabels := []string{"proto2", "proto3"}
for _, bm := range []struct {
name string
in bigquery.Schema
}{
{
name: "SingleField",
in: bigquery.Schema{
{Name: "field", Type: bigquery.StringFieldType},
},
},
{
name: "NestedRecord",
in: bigquery.Schema{
{Name: "field1", Type: bigquery.StringFieldType},
{Name: "field2", Type: bigquery.IntegerFieldType},
{Name: "field3", Type: bigquery.BooleanFieldType},
{
Name: "field4",
Type: bigquery.RecordFieldType,
Schema: bigquery.Schema{
{Name: "recordfield1", Type: bigquery.GeographyFieldType},
{Name: "recordfield2", Type: bigquery.TimestampFieldType},
},
},
},
},
{
name: "SimpleMessage",
in: testdata.SimpleMessageSchema,
},
{
name: "GithubArchiveSchema",
in: testdata.GithubArchiveSchema,
},
} {
for _, s := range syntaxLabels {
b.Run(fmt.Sprintf("%s-%s", bm.name, s), func(b *testing.B) {
convSchema, err := adapt.BQSchemaToStorageTableSchema(bm.in)
if err != nil {
b.Errorf("%q: schema conversion fail: %v", bm.name, err)
}
for n := 0; n < b.N; n++ {
if s == "proto3" {
benchDescriptor, err = adapt.StorageSchemaToProto3Descriptor(convSchema, "root")
} else {
benchDescriptor, err = adapt.StorageSchemaToProto2Descriptor(convSchema, "root")
}
if err != nil {
b.Errorf("failed to convert %q: %v", bm.name, err)
}
}
})
}
}
}

var staticBytes []byte

func BenchmarkStaticProtoSerialization(b *testing.B) {
for _, bm := range []struct {
name string
in bigquery.Schema
syntax string
setterF func() protoreflect.ProtoMessage
}{
{
name: "SimpleMessageProto2",
setterF: func() protoreflect.ProtoMessage {
return &testdata.SimpleMessageProto2{
Name: proto.String(fmt.Sprintf("test-%d", time.Now().UnixNano())),
Value: proto.Int64(time.Now().UnixNano()),
}
},
},
{
name: "SimpleMessageProto3",
setterF: func() protoreflect.ProtoMessage {
return &testdata.SimpleMessageProto3{
Name: fmt.Sprintf("test-%d", time.Now().UnixNano()),
Value: &wrapperspb.Int64Value{Value: time.Now().UnixNano()},
}
},
},
{
name: "GithubArchiveProto2",
setterF: func() protoreflect.ProtoMessage {
nowNano := time.Now().UnixNano()
return &testdata.GithubArchiveMessageProto2{
Type: proto.String("SomeEvent"),
Public: proto.Bool(nowNano%2 == 0),
Payload: proto.String(fmt.Sprintf("stuff %d", nowNano)),
Repo: &testdata.GithubArchiveRepoProto2{
Id: proto.Int64(nowNano),
Name: proto.String("staticname"),
Url: proto.String(fmt.Sprintf("foo.com/%d", nowNano)),
},
Actor: &testdata.GithubArchiveEntityProto2{
Id: proto.Int64(nowNano % 1000),
Login: proto.String(fmt.Sprintf("login-%d", nowNano%1000)),
GravatarId: proto.String(fmt.Sprintf("grav-%d", nowNano%1000000)),
AvatarUrl: proto.String(fmt.Sprintf("https://something.com/img/%d", nowNano%10000000)),
Url: proto.String(fmt.Sprintf("https://something.com/img/%d", nowNano%10000000)),
},
Org: &testdata.GithubArchiveEntityProto2{
Id: proto.Int64(nowNano % 1000),
Login: proto.String(fmt.Sprintf("login-%d", nowNano%1000)),
GravatarId: proto.String(fmt.Sprintf("grav-%d", nowNano%1000000)),
AvatarUrl: proto.String(fmt.Sprintf("https://something.com/img/%d", nowNano%10000000)),
Url: proto.String(fmt.Sprintf("https://something.com/img/%d", nowNano%10000000)),
},
CreatedAt: proto.Int64(nowNano),
Id: proto.String(fmt.Sprintf("id%d", nowNano)),
Other: proto.String("other"),
}
},
},
{
// Only set a single top-level field in a larger message.
name: "GithubArchiveProto2_Sparse",
setterF: func() protoreflect.ProtoMessage {
nowNano := time.Now().UnixNano()
return &testdata.GithubArchiveMessageProto2{
Id: proto.String(fmt.Sprintf("id%d", nowNano)),
}
},
},
{
name: "GithubArchiveProto3",
setterF: func() protoreflect.ProtoMessage {
nowNano := time.Now().UnixNano()
return &testdata.GithubArchiveMessageProto3{
Type: &wrapperspb.StringValue{Value: "SomeEvent"},
Public: &wrapperspb.BoolValue{Value: nowNano%2 == 0},
Payload: &wrapperspb.StringValue{Value: fmt.Sprintf("stuff %d", nowNano)},
Repo: &testdata.GithubArchiveRepoProto3{
Id: &wrapperspb.Int64Value{Value: nowNano},
Name: &wrapperspb.StringValue{Value: "staticname"},
Url: &wrapperspb.StringValue{Value: fmt.Sprintf("foo.com/%d", nowNano)},
},
Actor: &testdata.GithubArchiveEntityProto3{
Id: &wrapperspb.Int64Value{Value: nowNano % 1000},
Login: &wrapperspb.StringValue{Value: fmt.Sprintf("login-%d", nowNano%1000)},
GravatarId: &wrapperspb.StringValue{Value: fmt.Sprintf("grav-%d", nowNano%1000000)},
AvatarUrl: &wrapperspb.StringValue{Value: fmt.Sprintf("https://something.com/img/%d", nowNano%10000000)},
Url: &wrapperspb.StringValue{Value: fmt.Sprintf("https://something.com/img/%d", nowNano%10000000)},
},
Org: &testdata.GithubArchiveEntityProto3{
Id: &wrapperspb.Int64Value{Value: nowNano % 1000},
Login: &wrapperspb.StringValue{Value: fmt.Sprintf("login-%d", nowNano%1000)},
GravatarId: &wrapperspb.StringValue{Value: fmt.Sprintf("grav-%d", nowNano%1000000)},
AvatarUrl: &wrapperspb.StringValue{Value: fmt.Sprintf("https://something.com/img/%d", nowNano%10000000)},
Url: &wrapperspb.StringValue{Value: fmt.Sprintf("https://something.com/img/%d", nowNano%10000000)},
},
CreatedAt: &wrapperspb.Int64Value{Value: nowNano},
Id: &wrapperspb.StringValue{Value: fmt.Sprintf("id%d", nowNano)},
Other: &wrapperspb.StringValue{Value: "other"},
}
},
},
{
// Only set a single field in a larger message.
name: "GithubArchiveProto3_Sparse",
setterF: func() protoreflect.ProtoMessage {
nowNano := time.Now().UnixNano()
return &testdata.GithubArchiveMessageProto3{
Id: &wrapperspb.StringValue{Value: fmt.Sprintf("id%d", nowNano)},
}
},
},
} {
b.Run(bm.name, func(b *testing.B) {
var totalBytes int64
for n := 0; n < b.N; n++ {
m := bm.setterF()
out, err := proto.Marshal(m)
if err != nil {
b.Errorf("%q %q: Marshal: %v", bm.name, bm.syntax, err)
}
totalBytes = totalBytes + int64(len(out))
staticBytes = out
}
b.Logf("N=%d, avg bytes/message: %d", b.N, totalBytes/int64(b.N))
})
}
}
63 changes: 42 additions & 21 deletions bigquery/storage/managedwriter/adapt/protoconversion.go
Expand Up @@ -27,15 +27,25 @@ import (
"google.golang.org/protobuf/types/known/wrapperspb"
)

// bqModeToFieldLabelMap holds mapping from field schema mode to proto label.
// proto3 no longer allows use of REQUIRED labels, so we solve that elsewhere
// and simply use optional.
var bqModeToFieldLabelMap = map[storagepb.TableFieldSchema_Mode]descriptorpb.FieldDescriptorProto_Label{
var bqModeToFieldLabelMapProto2 = map[storagepb.TableFieldSchema_Mode]descriptorpb.FieldDescriptorProto_Label{
storagepb.TableFieldSchema_NULLABLE: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL,
storagepb.TableFieldSchema_REPEATED: descriptorpb.FieldDescriptorProto_LABEL_REPEATED,
storagepb.TableFieldSchema_REQUIRED: descriptorpb.FieldDescriptorProto_LABEL_REQUIRED,
}

var bqModeToFieldLabelMapProto3 = map[storagepb.TableFieldSchema_Mode]descriptorpb.FieldDescriptorProto_Label{
storagepb.TableFieldSchema_NULLABLE: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL,
storagepb.TableFieldSchema_REPEATED: descriptorpb.FieldDescriptorProto_LABEL_REPEATED,
storagepb.TableFieldSchema_REQUIRED: descriptorpb.FieldDescriptorProto_LABEL_OPTIONAL,
}

func convertModeToLabel(mode storagepb.TableFieldSchema_Mode, useProto3 bool) *descriptorpb.FieldDescriptorProto_Label {
if useProto3 {
return bqModeToFieldLabelMapProto3[mode].Enum()
}
return bqModeToFieldLabelMapProto2[mode].Enum()
}

// Allows conversion between BQ schema type and FieldDescriptorProto's type.
var bqTypeToFieldTypeMap = map[storagepb.TableFieldSchema_Type]descriptorpb.FieldDescriptorProto_Type{
storagepb.TableFieldSchema_BIGNUMERIC: descriptorpb.FieldDescriptorProto_TYPE_BYTES,
Expand Down Expand Up @@ -106,15 +116,24 @@ func (dm dependencyCache) add(schema *storagepb.TableSchema, descriptor protoref
return nil
}

// StorageSchemaToDescriptor builds a protoreflect.Descriptor for a given table schema.
func StorageSchemaToDescriptor(inSchema *storagepb.TableSchema, scope string) (protoreflect.Descriptor, error) {
// StorageSchemaToProto2Descriptor builds a protoreflect.Descriptor for a given table schema using proto2 syntax.
func StorageSchemaToProto2Descriptor(inSchema *storagepb.TableSchema, scope string) (protoreflect.Descriptor, error) {
dc := make(dependencyCache)
// TODO: b/193064992 tracks support for wrapper types. In the interim, disable wrapper usage.
return storageSchemaToDescriptorInternal(inSchema, scope, &dc, false)
}

// StorageSchemaToProto3Descriptor builds a protoreflect.Descriptor for a given table schema using proto3 syntax.
//
// NOTE: Currently the write API doesn't yet support proto3 behaviors (default value, wrapper types, etc), but this is provided for
// completeness.
func StorageSchemaToProto3Descriptor(inSchema *storagepb.TableSchema, scope string) (protoreflect.Descriptor, error) {
dc := make(dependencyCache)
return storageSchemaToDescriptorInternal(inSchema, scope, &dc, true)
}

// internal implementation of the conversion code.
func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope string, cache *dependencyCache, allowWrapperTypes bool) (protoreflect.Descriptor, error) {
func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope string, cache *dependencyCache, useProto3 bool) (protoreflect.Descriptor, error) {
if inSchema == nil {
return nil, newConversionError(scope, fmt.Errorf("no input schema was provided"))
}
Expand Down Expand Up @@ -145,7 +164,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
deps = append(deps, foundDesc.ParentFile())
}
// construct field descriptor for the message
fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, string(foundDesc.FullName()), allowWrapperTypes)
fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, string(foundDesc.FullName()), useProto3)
if err != nil {
return nil, newConversionError(scope, fmt.Errorf("couldn't convert field to FieldDescriptorProto: %v", err))
}
Expand All @@ -155,7 +174,7 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
ts := &storagepb.TableSchema{
Fields: f.GetFields(),
}
desc, err := storageSchemaToDescriptorInternal(ts, currentScope, cache, allowWrapperTypes)
desc, err := storageSchemaToDescriptorInternal(ts, currentScope, cache, useProto3)
if err != nil {
return nil, newConversionError(currentScope, fmt.Errorf("couldn't convert message: %v", err))
}
Expand All @@ -166,14 +185,14 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
if err != nil {
return nil, newConversionError(currentScope, fmt.Errorf("failed to add descriptor to dependency cache: %v", err))
}
fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, allowWrapperTypes)
fdp, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
if err != nil {
return nil, newConversionError(currentScope, fmt.Errorf("couldn't compute field schema : %v", err))
}
fields = append(fields, fdp)
}
} else {
fd, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, allowWrapperTypes)
fd, err := tableFieldSchemaToFieldDescriptorProto(f, fNumber, currentScope, useProto3)
if err != nil {
return nil, newConversionError(currentScope, err)
}
Expand Down Expand Up @@ -201,6 +220,9 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
Syntax: proto.String("proto3"),
Dependency: depNames,
}
if !useProto3 {
fdp.Syntax = proto.String("proto2")
}

// We'll need a FileDescriptorSet as we have a FileDescriptorProto for the current
// descriptor we're building, but we need to include all the referenced dependencies.
Expand All @@ -223,33 +245,32 @@ func storageSchemaToDescriptorInternal(inSchema *storagepb.TableSchema, scope st
}

// tableFieldSchemaToFieldDescriptorProto builds individual field descriptors for a proto message.
// We're using proto3 syntax, but BigQuery supports the notion of NULLs which conflicts with proto3 default value
// behavior. To enable it, we look for nullable fields in the schema that should be scalars, and use the
// well-known wrapper types.
//
// For proto3, in cases where the mode is nullable we use the well known wrapper types.
// For proto2, we propagate the mode->label annotation as expected.
//
// Messages are always nullable, and repeated fields are as well.
func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, idx int32, scope string, allowWrapperTypes bool) (*descriptorpb.FieldDescriptorProto, error) {
func tableFieldSchemaToFieldDescriptorProto(field *storagepb.TableFieldSchema, idx int32, scope string, useProto3 bool) (*descriptorpb.FieldDescriptorProto, error) {
name := strings.ToLower(field.GetName())
if field.GetType() == storagepb.TableFieldSchema_STRUCT {
return &descriptorpb.FieldDescriptorProto{
Name: proto.String(name),
Number: proto.Int32(idx),
TypeName: proto.String(scope),
Label: bqModeToFieldLabelMap[field.GetMode()].Enum(),
Label: convertModeToLabel(field.GetMode(), useProto3),
}, nil
}

// For (REQUIRED||REPEATED) fields, we use the expected scalar types, but the proto is
// still marked OPTIONAL (proto3 semantics).
if field.GetMode() != storagepb.TableFieldSchema_NULLABLE || !allowWrapperTypes {
// For (REQUIRED||REPEATED) fields for proto3, or all cases for proto2, we can use the expected scalar types.
if field.GetMode() != storagepb.TableFieldSchema_NULLABLE || !useProto3 {
return &descriptorpb.FieldDescriptorProto{
Name: proto.String(name),
Number: proto.Int32(idx),
Type: bqTypeToFieldTypeMap[field.GetType()].Enum(),
Label: bqModeToFieldLabelMap[field.GetMode()].Enum(),
Label: convertModeToLabel(field.GetMode(), useProto3),
}, nil
}
// For NULLABLE, optionally use wrapper types.
// For NULLABLE proto3 fields, use a wrapper type.
return &descriptorpb.FieldDescriptorProto{
Name: proto.String(name),
Number: proto.Int32(idx),
Expand Down

0 comments on commit a455082

Please sign in to comment.