From b4296def1aba7f2ef4073cd7fcb00b5f6a3ce753 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Sun, 13 Dec 2020 22:27:42 -0500 Subject: [PATCH 1/2] feat(pubsublite): add client library metadata to request headers --- pubsublite/internal/wire/assigner.go | 5 +- pubsublite/internal/wire/committer.go | 5 +- pubsublite/internal/wire/publisher.go | 6 +- pubsublite/internal/wire/rpc.go | 110 ++++++++++++++--- pubsublite/internal/wire/rpc_test.go | 156 +++++++++++++++++++++++++ pubsublite/internal/wire/settings.go | 12 ++ pubsublite/internal/wire/subscriber.go | 6 +- 7 files changed, 283 insertions(+), 17 deletions(-) create mode 100644 pubsublite/internal/wire/rpc_test.go diff --git a/pubsublite/internal/wire/assigner.go b/pubsublite/internal/wire/assigner.go index eeecfe9bdca..bb9d79e13fd 100644 --- a/pubsublite/internal/wire/assigner.go +++ b/pubsublite/internal/wire/assigner.go @@ -65,6 +65,7 @@ type assigner struct { assignmentClient *vkit.PartitionAssignmentClient initialReq *pb.PartitionAssignmentRequest receiveAssignment partitionAssignmentReceiver + metadata pubsubMetadata // Fields below must be guarded with mu. stream *retryableStream @@ -89,8 +90,10 @@ func newAssigner(ctx context.Context, assignmentClient *vkit.PartitionAssignment }, }, receiveAssignment: receiver, + metadata: newPubsubMetadata(), } a.stream = newRetryableStream(ctx, a, settings.Timeout, reflect.TypeOf(pb.PartitionAssignment{})) + a.metadata.AddClientInfo(settings.Framework) return a, nil } @@ -109,7 +112,7 @@ func (a *assigner) Stop() { } func (a *assigner) newStream(ctx context.Context) (grpc.ClientStream, error) { - return a.assignmentClient.AssignPartitions(ctx) + return a.assignmentClient.AssignPartitions(a.metadata.AddToContext(ctx)) } func (a *assigner) initialRequest() (interface{}, bool) { diff --git a/pubsublite/internal/wire/committer.go b/pubsublite/internal/wire/committer.go index 89eb2d9f997..eea28330a7b 100644 --- a/pubsublite/internal/wire/committer.go +++ b/pubsublite/internal/wire/committer.go @@ -43,6 +43,7 @@ type committer struct { // Immutable after creation. cursorClient *vkit.CursorClient initialReq *pb.StreamingCommitCursorRequest + metadata pubsubMetadata // Fields below must be guarded with mutex. stream *retryableStream @@ -66,10 +67,12 @@ func newCommitter(ctx context.Context, cursor *vkit.CursorClient, settings Recei }, }, }, + metadata: newPubsubMetadata(), acks: acks, cursorTracker: newCommitCursorTracker(acks), } c.stream = newRetryableStream(ctx, c, settings.Timeout, reflect.TypeOf(pb.StreamingCommitCursorResponse{})) + c.metadata.AddClientInfo(settings.Framework) backgroundTask := c.commitOffsetToStream if disableTasks { @@ -99,7 +102,7 @@ func (c *committer) Stop() { } func (c *committer) newStream(ctx context.Context) (grpc.ClientStream, error) { - return c.cursorClient.StreamingCommitCursor(ctx) + return c.cursorClient.StreamingCommitCursor(c.metadata.AddToContext(ctx)) } func (c *committer) initialRequest() (req interface{}, needsResp bool) { diff --git a/pubsublite/internal/wire/publisher.go b/pubsublite/internal/wire/publisher.go index e677b31c942..062e87373a5 100644 --- a/pubsublite/internal/wire/publisher.go +++ b/pubsublite/internal/wire/publisher.go @@ -54,6 +54,7 @@ type singlePartitionPublisher struct { pubClient *vkit.PublisherClient topic topicPartition initialReq *pb.PublishRequest + metadata pubsubMetadata // Fields below must be guarded with mu. stream *retryableStream @@ -84,9 +85,12 @@ func (f *singlePartitionPublisherFactory) New(partition int) *singlePartitionPub }, }, }, + metadata: newPubsubMetadata(), } pp.batcher = newPublishMessageBatcher(&f.settings, partition, pp.onNewBatch) pp.stream = newRetryableStream(f.ctx, pp, f.settings.Timeout, reflect.TypeOf(pb.PublishResponse{})) + pp.metadata.AddTopicRoutingMetadata(pp.topic) + pp.metadata.AddClientInfo(f.settings.Framework) return pp } @@ -139,7 +143,7 @@ func (pp *singlePartitionPublisher) Publish(msg *pb.PubSubMessage, onResult Publ } func (pp *singlePartitionPublisher) newStream(ctx context.Context) (grpc.ClientStream, error) { - return pp.pubClient.Publish(addTopicRoutingMetadata(ctx, pp.topic)) + return pp.pubClient.Publish(pp.metadata.AddToContext(ctx)) } func (pp *singlePartitionPublisher) initialRequest() (interface{}, bool) { diff --git a/pubsublite/internal/wire/rpc.go b/pubsublite/internal/wire/rpc.go index 17ad9d05bdc..9c2370d9920 100644 --- a/pubsublite/internal/wire/rpc.go +++ b/pubsublite/internal/wire/rpc.go @@ -15,8 +15,12 @@ package wire import ( "context" + "encoding/base64" "fmt" "net/url" + "runtime/debug" + "strconv" + "strings" "time" "google.golang.org/api/option" @@ -24,6 +28,8 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/structpb" vkit "cloud.google.com/go/pubsublite/apiv1" gax "github.com/googleapis/gax-go/v2" @@ -127,10 +133,7 @@ func retryableReadOnlyCallOption() gax.CallOption { }) } -const ( - pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443" - routingMetadataHeader = "x-goog-request-params" -) +const pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443" func defaultClientOptions(region string) []option.ClientOption { return []option.ClientOption{ @@ -164,18 +167,99 @@ func newPartitionAssignmentClient(ctx context.Context, region string, opts ...op return vkit.NewPartitionAssignmentClient(ctx, options...) } -func addTopicRoutingMetadata(ctx context.Context, topic topicPartition) context.Context { - md, _ := metadata.FromOutgoingContext(ctx) - md = md.Copy() - val := fmt.Sprintf("partition=%d&topic=%s", topic.Partition, url.QueryEscape(topic.Path)) - md[routingMetadataHeader] = append(md[routingMetadataHeader], val) - return metadata.NewOutgoingContext(ctx, md) +const ( + routingMetadataHeader = "x-goog-request-params" + clientInfoMetadataHeader = "x-goog-pubsub-context" + + languageKey = "language" + languageValue = "GOLANG" + frameworkKey = "framework" + majorVersionKey = "major_version" + minorVersionKey = "minor_version" + + pubsubLiteModulePath = "cloud.google.com/go/pubsublite" +) + +func parseModuleVersion(version string) (major string, minor string, ok bool) { + if strings.HasPrefix(version, "v") { + version = version[1:] + } + components := strings.Split(version, ".") + if len(components) >= 2 { + if _, err := strconv.ParseInt(components[0], 10, 32); err != nil { + return + } + if _, err := strconv.ParseInt(components[1], 10, 32); err != nil { + return + } + major = components[0] + minor = components[1] + ok = true + } + return +} + +// getModuleVersion extracts the module version from BuildInfo embedded in the +// binary. Only applies to binaries built with module support. +func getModuleVersion(buildInfo *debug.BuildInfo) (string, string, bool) { + for _, dep := range buildInfo.Deps { + if dep.Path == pubsubLiteModulePath { + return parseModuleVersion(dep.Version) + } + } + return "", "", false +} + +func stringValue(str string) *structpb.Value { + return &structpb.Value{ + Kind: &structpb.Value_StringValue{StringValue: str}, + } } -func addSubscriptionRoutingMetadata(ctx context.Context, subscription subscriptionPartition) context.Context { +// pubsubMetadata stores key/value pairs that should be added to gRPC metadata. +type pubsubMetadata map[string]string + +func newPubsubMetadata() pubsubMetadata { + return make(map[string]string) +} + +func (pm pubsubMetadata) AddTopicRoutingMetadata(topic topicPartition) { + pm[routingMetadataHeader] = fmt.Sprintf("partition=%d&topic=%s", topic.Partition, url.QueryEscape(topic.Path)) +} + +func (pm pubsubMetadata) AddSubscriptionRoutingMetadata(subscription subscriptionPartition) { + pm[routingMetadataHeader] = fmt.Sprintf("partition=%d&subscription=%s", subscription.Partition, url.QueryEscape(subscription.Path)) +} + +func (pm pubsubMetadata) AddClientInfo(framework FrameworkType) { + buildInfo, _ := debug.ReadBuildInfo() + pm.doAddClientInfo(framework, buildInfo) +} + +func (pm pubsubMetadata) doAddClientInfo(framework FrameworkType, buildInfo *debug.BuildInfo) { + s := &structpb.Struct{ + Fields: make(map[string]*structpb.Value), + } + s.Fields[languageKey] = stringValue(languageValue) + if len(framework) > 0 { + s.Fields[frameworkKey] = stringValue(string(framework)) + } + if buildInfo != nil { + if major, minor, ok := getModuleVersion(buildInfo); ok { + s.Fields[majorVersionKey] = stringValue(major) + s.Fields[minorVersionKey] = stringValue(minor) + } + } + if bytes, err := proto.Marshal(s); err == nil { + pm[clientInfoMetadataHeader] = base64.StdEncoding.EncodeToString(bytes) + } +} + +func (pm pubsubMetadata) AddToContext(ctx context.Context) context.Context { md, _ := metadata.FromOutgoingContext(ctx) md = md.Copy() - val := fmt.Sprintf("partition=%d&subscription=%s", subscription.Partition, url.QueryEscape(subscription.Path)) - md[routingMetadataHeader] = append(md[routingMetadataHeader], val) + for key, val := range pm { + md[key] = append(md[key], val) + } return metadata.NewOutgoingContext(ctx, md) } diff --git a/pubsublite/internal/wire/rpc_test.go b/pubsublite/internal/wire/rpc_test.go new file mode 100644 index 00000000000..bfbb1c526d0 --- /dev/null +++ b/pubsublite/internal/wire/rpc_test.go @@ -0,0 +1,156 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package wire + +import ( + "encoding/base64" + "runtime/debug" + "testing" + + "cloud.google.com/go/internal/testutil" + "github.com/golang/protobuf/proto" + "google.golang.org/protobuf/types/known/structpb" +) + +func TestPubsubMetadataAddClientInfo(t *testing.T) { + for _, tc := range []struct { + desc string + framework FrameworkType + buildInfo *debug.BuildInfo + wantClientInfo *structpb.Struct + }{ + { + desc: "minimal", + wantClientInfo: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "language": stringValue("GOLANG"), + }, + }, + }, + { + desc: "cps shim", + framework: FrameworkCloudPubSubShim, + wantClientInfo: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "language": stringValue("GOLANG"), + "framework": stringValue("CLOUD_PUBSUB_SHIM"), + }, + }, + }, + { + desc: "version valid", + framework: FrameworkCloudPubSubShim, + buildInfo: &debug.BuildInfo{ + Deps: []*debug.Module{ + {Path: "cloud.google.com/go/pubsublite", Version: "v1.2.2"}, + {Path: "cloud.google.com/go/pubsub", Version: "v1.8.3"}, + }, + }, + wantClientInfo: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "language": stringValue("GOLANG"), + "framework": stringValue("CLOUD_PUBSUB_SHIM"), + "major_version": stringValue("1"), + "minor_version": stringValue("2"), + }, + }, + }, + { + desc: "version corner case", + buildInfo: &debug.BuildInfo{ + Deps: []*debug.Module{ + {Path: "cloud.google.com/go/pubsublite", Version: "2.3"}, + }, + }, + wantClientInfo: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "language": stringValue("GOLANG"), + "major_version": stringValue("2"), + "minor_version": stringValue("3"), + }, + }, + }, + { + desc: "version missing", + buildInfo: &debug.BuildInfo{ + Deps: []*debug.Module{ + {Path: "cloud.google.com/go/pubsub", Version: "v1.8.3"}, + }, + }, + wantClientInfo: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "language": stringValue("GOLANG"), + }, + }, + }, + { + desc: "minor version invalid", + buildInfo: &debug.BuildInfo{ + Deps: []*debug.Module{ + {Path: "cloud.google.com/go/pubsublite", Version: "v1.a.2"}, + }, + }, + wantClientInfo: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "language": stringValue("GOLANG"), + }, + }, + }, + { + desc: "major version invalid", + buildInfo: &debug.BuildInfo{ + Deps: []*debug.Module{ + {Path: "cloud.google.com/go/pubsublite", Version: "vb.1.2"}, + }, + }, + wantClientInfo: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "language": stringValue("GOLANG"), + }, + }, + }, + { + desc: "minor version missing", + buildInfo: &debug.BuildInfo{ + Deps: []*debug.Module{ + {Path: "cloud.google.com/go/pubsublite", Version: "v4"}, + }, + }, + wantClientInfo: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "language": stringValue("GOLANG"), + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + metadata := newPubsubMetadata() + metadata.doAddClientInfo(tc.framework, tc.buildInfo) + + b, err := base64.StdEncoding.DecodeString(metadata["x-goog-pubsub-context"]) + if err != nil { + t.Errorf("Failed to decode base64 pubsub context header: %v", err) + return + } + gotClientInfo := new(structpb.Struct) + if err := proto.Unmarshal(b, gotClientInfo); err != nil { + t.Errorf("Failed to unmarshal pubsub context structpb: %v", err) + return + } + if diff := testutil.Diff(gotClientInfo, tc.wantClientInfo); diff != "" { + t.Errorf("Pubsub context structpb: got: -, want: +\n%s", diff) + } + }) + } +} diff --git a/pubsublite/internal/wire/settings.go b/pubsublite/internal/wire/settings.go index e2cc5699746..aac915be839 100644 --- a/pubsublite/internal/wire/settings.go +++ b/pubsublite/internal/wire/settings.go @@ -34,6 +34,12 @@ const ( MaxPublishRequestBytes = 3500000 ) +// FrameworkType is the user-facing API for Cloud Pub/Sub Lite. +type FrameworkType string + +// FrameworkCloudPubSubShim is the API that emulates Cloud Pub/Sub. +const FrameworkCloudPubSubShim FrameworkType = "CLOUD_PUBSUB_SHIM" + // PublishSettings control the batching of published messages. These settings // apply per partition. type PublishSettings struct { @@ -70,6 +76,9 @@ type PublishSettings struct { // The polling interval to watch for topic partition count updates. Set to 0 // to disable polling if the number of partitions will never update. ConfigPollPeriod time.Duration + + // The user-facing API type. + Framework FrameworkType } // DefaultPublishSettings holds the default values for PublishSettings. @@ -132,6 +141,9 @@ type ReceiveSettings struct { // specified, the client will use the partition assignment service to // determine which partitions it should connect to. Partitions []int + + // The user-facing API type. + Framework FrameworkType } // DefaultReceiveSettings holds the default values for ReceiveSettings. diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go index 8b9c9894c28..e2f6ed15070 100644 --- a/pubsublite/internal/wire/subscriber.go +++ b/pubsublite/internal/wire/subscriber.go @@ -130,6 +130,7 @@ type subscribeStream struct { subscription subscriptionPartition initialReq *pb.SubscribeRequest messageQueue *messageDeliveryQueue + metadata pubsubMetadata // Fields below must be guarded with mu. stream *retryableStream @@ -157,8 +158,11 @@ func newSubscribeStream(ctx context.Context, subClient *vkit.SubscriberClient, s }, }, messageQueue: newMessageDeliveryQueue(acks, receiver, settings.MaxOutstandingMessages), + metadata: newPubsubMetadata(), } s.stream = newRetryableStream(ctx, s, settings.Timeout, reflect.TypeOf(pb.SubscribeResponse{})) + s.metadata.AddSubscriptionRoutingMetadata(s.subscription) + s.metadata.AddClientInfo(settings.Framework) backgroundTask := s.sendBatchFlowControl if disableTasks { @@ -194,7 +198,7 @@ func (s *subscribeStream) Stop() { } func (s *subscribeStream) newStream(ctx context.Context) (grpc.ClientStream, error) { - return s.subClient.Subscribe(addSubscriptionRoutingMetadata(ctx, s.subscription)) + return s.subClient.Subscribe(s.metadata.AddToContext(ctx)) } func (s *subscribeStream) initialRequest() (interface{}, bool) { From e318dbee53cf5f296a4fe82a99317bfd1c058fd3 Mon Sep 17 00:00:00 2001 From: tmdiep Date: Mon, 14 Dec 2020 18:21:04 -0500 Subject: [PATCH 2/2] Handle debug.BuildInfo not being available before 1.12 --- pubsublite/internal/wire/rpc.go | 48 ++--------- pubsublite/internal/wire/rpc_test.go | 85 +++---------------- pubsublite/internal/wire/version.go | 66 +++++++++++++++ pubsublite/internal/wire/version_not112.go | 26 ++++++ pubsublite/internal/wire/version_test.go | 96 ++++++++++++++++++++++ 5 files changed, 203 insertions(+), 118 deletions(-) create mode 100644 pubsublite/internal/wire/version.go create mode 100644 pubsublite/internal/wire/version_not112.go create mode 100644 pubsublite/internal/wire/version_test.go diff --git a/pubsublite/internal/wire/rpc.go b/pubsublite/internal/wire/rpc.go index 9c2370d9920..c0bd1a96ab6 100644 --- a/pubsublite/internal/wire/rpc.go +++ b/pubsublite/internal/wire/rpc.go @@ -18,9 +18,6 @@ import ( "encoding/base64" "fmt" "net/url" - "runtime/debug" - "strconv" - "strings" "time" "google.golang.org/api/option" @@ -176,40 +173,8 @@ const ( frameworkKey = "framework" majorVersionKey = "major_version" minorVersionKey = "minor_version" - - pubsubLiteModulePath = "cloud.google.com/go/pubsublite" ) -func parseModuleVersion(version string) (major string, minor string, ok bool) { - if strings.HasPrefix(version, "v") { - version = version[1:] - } - components := strings.Split(version, ".") - if len(components) >= 2 { - if _, err := strconv.ParseInt(components[0], 10, 32); err != nil { - return - } - if _, err := strconv.ParseInt(components[1], 10, 32); err != nil { - return - } - major = components[0] - minor = components[1] - ok = true - } - return -} - -// getModuleVersion extracts the module version from BuildInfo embedded in the -// binary. Only applies to binaries built with module support. -func getModuleVersion(buildInfo *debug.BuildInfo) (string, string, bool) { - for _, dep := range buildInfo.Deps { - if dep.Path == pubsubLiteModulePath { - return parseModuleVersion(dep.Version) - } - } - return "", "", false -} - func stringValue(str string) *structpb.Value { return &structpb.Value{ Kind: &structpb.Value_StringValue{StringValue: str}, @@ -232,11 +197,10 @@ func (pm pubsubMetadata) AddSubscriptionRoutingMetadata(subscription subscriptio } func (pm pubsubMetadata) AddClientInfo(framework FrameworkType) { - buildInfo, _ := debug.ReadBuildInfo() - pm.doAddClientInfo(framework, buildInfo) + pm.doAddClientInfo(framework, libraryVersion) } -func (pm pubsubMetadata) doAddClientInfo(framework FrameworkType, buildInfo *debug.BuildInfo) { +func (pm pubsubMetadata) doAddClientInfo(framework FrameworkType, getVersion func() (version, bool)) { s := &structpb.Struct{ Fields: make(map[string]*structpb.Value), } @@ -244,11 +208,9 @@ func (pm pubsubMetadata) doAddClientInfo(framework FrameworkType, buildInfo *deb if len(framework) > 0 { s.Fields[frameworkKey] = stringValue(string(framework)) } - if buildInfo != nil { - if major, minor, ok := getModuleVersion(buildInfo); ok { - s.Fields[majorVersionKey] = stringValue(major) - s.Fields[minorVersionKey] = stringValue(minor) - } + if version, ok := getVersion(); ok { + s.Fields[majorVersionKey] = stringValue(version.Major) + s.Fields[minorVersionKey] = stringValue(version.Minor) } if bytes, err := proto.Marshal(s); err == nil { pm[clientInfoMetadataHeader] = base64.StdEncoding.EncodeToString(bytes) diff --git a/pubsublite/internal/wire/rpc_test.go b/pubsublite/internal/wire/rpc_test.go index bfbb1c526d0..8fe0f5b88a7 100644 --- a/pubsublite/internal/wire/rpc_test.go +++ b/pubsublite/internal/wire/rpc_test.go @@ -15,7 +15,6 @@ package wire import ( "encoding/base64" - "runtime/debug" "testing" "cloud.google.com/go/internal/testutil" @@ -27,11 +26,14 @@ func TestPubsubMetadataAddClientInfo(t *testing.T) { for _, tc := range []struct { desc string framework FrameworkType - buildInfo *debug.BuildInfo + libraryVersion func() (version, bool) wantClientInfo *structpb.Struct }{ { desc: "minimal", + libraryVersion: func() (version, bool) { + return version{}, false + }, wantClientInfo: &structpb.Struct{ Fields: map[string]*structpb.Value{ "language": stringValue("GOLANG"), @@ -41,6 +43,9 @@ func TestPubsubMetadataAddClientInfo(t *testing.T) { { desc: "cps shim", framework: FrameworkCloudPubSubShim, + libraryVersion: func() (version, bool) { + return version{}, false + }, wantClientInfo: &structpb.Struct{ Fields: map[string]*structpb.Value{ "language": stringValue("GOLANG"), @@ -51,11 +56,8 @@ func TestPubsubMetadataAddClientInfo(t *testing.T) { { desc: "version valid", framework: FrameworkCloudPubSubShim, - buildInfo: &debug.BuildInfo{ - Deps: []*debug.Module{ - {Path: "cloud.google.com/go/pubsublite", Version: "v1.2.2"}, - {Path: "cloud.google.com/go/pubsub", Version: "v1.8.3"}, - }, + libraryVersion: func() (version, bool) { + return version{Major: "1", Minor: "2"}, true }, wantClientInfo: &structpb.Struct{ Fields: map[string]*structpb.Value{ @@ -66,77 +68,10 @@ func TestPubsubMetadataAddClientInfo(t *testing.T) { }, }, }, - { - desc: "version corner case", - buildInfo: &debug.BuildInfo{ - Deps: []*debug.Module{ - {Path: "cloud.google.com/go/pubsublite", Version: "2.3"}, - }, - }, - wantClientInfo: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - "language": stringValue("GOLANG"), - "major_version": stringValue("2"), - "minor_version": stringValue("3"), - }, - }, - }, - { - desc: "version missing", - buildInfo: &debug.BuildInfo{ - Deps: []*debug.Module{ - {Path: "cloud.google.com/go/pubsub", Version: "v1.8.3"}, - }, - }, - wantClientInfo: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - "language": stringValue("GOLANG"), - }, - }, - }, - { - desc: "minor version invalid", - buildInfo: &debug.BuildInfo{ - Deps: []*debug.Module{ - {Path: "cloud.google.com/go/pubsublite", Version: "v1.a.2"}, - }, - }, - wantClientInfo: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - "language": stringValue("GOLANG"), - }, - }, - }, - { - desc: "major version invalid", - buildInfo: &debug.BuildInfo{ - Deps: []*debug.Module{ - {Path: "cloud.google.com/go/pubsublite", Version: "vb.1.2"}, - }, - }, - wantClientInfo: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - "language": stringValue("GOLANG"), - }, - }, - }, - { - desc: "minor version missing", - buildInfo: &debug.BuildInfo{ - Deps: []*debug.Module{ - {Path: "cloud.google.com/go/pubsublite", Version: "v4"}, - }, - }, - wantClientInfo: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - "language": stringValue("GOLANG"), - }, - }, - }, } { t.Run(tc.desc, func(t *testing.T) { metadata := newPubsubMetadata() - metadata.doAddClientInfo(tc.framework, tc.buildInfo) + metadata.doAddClientInfo(tc.framework, tc.libraryVersion) b, err := base64.StdEncoding.DecodeString(metadata["x-goog-pubsub-context"]) if err != nil { diff --git a/pubsublite/internal/wire/version.go b/pubsublite/internal/wire/version.go new file mode 100644 index 00000000000..c3f92783518 --- /dev/null +++ b/pubsublite/internal/wire/version.go @@ -0,0 +1,66 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +// +build go1.12 + +package wire + +import ( + "runtime/debug" + "strconv" + "strings" +) + +const pubsubLiteModulePath = "cloud.google.com/go/pubsublite" + +type version struct { + Major string + Minor string +} + +// libraryVersion attempts to determine the pubsublite module version. +func libraryVersion() (version, bool) { + if buildInfo, ok := debug.ReadBuildInfo(); ok { + return pubsubliteModuleVersion(buildInfo) + } + return version{}, false +} + +// pubsubliteModuleVersion extracts the module version from BuildInfo embedded +// in the binary. Only applies to binaries built with module support. +func pubsubliteModuleVersion(buildInfo *debug.BuildInfo) (version, bool) { + for _, dep := range buildInfo.Deps { + if dep.Path == pubsubLiteModulePath { + return parseModuleVersion(dep.Version) + } + } + return version{}, false +} + +func parseModuleVersion(value string) (v version, ok bool) { + if strings.HasPrefix(value, "v") { + value = value[1:] + } + components := strings.Split(value, ".") + if len(components) >= 2 { + if _, err := strconv.ParseInt(components[0], 10, 32); err != nil { + return + } + if _, err := strconv.ParseInt(components[1], 10, 32); err != nil { + return + } + v = version{Major: components[0], Minor: components[1]} + ok = true + } + return +} diff --git a/pubsublite/internal/wire/version_not112.go b/pubsublite/internal/wire/version_not112.go new file mode 100644 index 00000000000..28ccfbfce4d --- /dev/null +++ b/pubsublite/internal/wire/version_not112.go @@ -0,0 +1,26 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +// TODO: Remove file when the minimum supported version is Go1.12. +// +build !go1.12 + +package wire + +type version struct { + Major string + Minor string +} + +func libraryVersion() (version, bool) { + return version{}, false +} diff --git a/pubsublite/internal/wire/version_test.go b/pubsublite/internal/wire/version_test.go new file mode 100644 index 00000000000..a84c84c696f --- /dev/null +++ b/pubsublite/internal/wire/version_test.go @@ -0,0 +1,96 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +// +build go1.12 + +package wire + +import ( + "runtime/debug" + "testing" + + "cloud.google.com/go/internal/testutil" +) + +func TestPubsubliteModuleVersion(t *testing.T) { + for _, tc := range []struct { + desc string + buildInfo *debug.BuildInfo + wantVersion version + wantOk bool + }{ + { + desc: "version valid", + buildInfo: &debug.BuildInfo{ + Deps: []*debug.Module{ + {Path: "cloud.google.com/go/pubsublite", Version: "v1.2.2"}, + {Path: "cloud.google.com/go/pubsub", Version: "v1.8.3"}, + }, + }, + wantVersion: version{Major: "1", Minor: "2"}, + wantOk: true, + }, + { + desc: "version corner case", + buildInfo: &debug.BuildInfo{ + Deps: []*debug.Module{ + {Path: "cloud.google.com/go/pubsublite", Version: "2.3"}, + }, + }, + wantVersion: version{Major: "2", Minor: "3"}, + wantOk: true, + }, + { + desc: "version missing", + buildInfo: &debug.BuildInfo{ + Deps: []*debug.Module{ + {Path: "cloud.google.com/go/pubsub", Version: "v1.8.3"}, + }, + }, + wantOk: false, + }, + { + desc: "minor version invalid", + buildInfo: &debug.BuildInfo{ + Deps: []*debug.Module{ + {Path: "cloud.google.com/go/pubsublite", Version: "v1.a.2"}, + }, + }, + wantOk: false, + }, + { + desc: "major version invalid", + buildInfo: &debug.BuildInfo{ + Deps: []*debug.Module{ + {Path: "cloud.google.com/go/pubsublite", Version: "vb.1.2"}, + }, + }, + wantOk: false, + }, + { + desc: "minor version missing", + buildInfo: &debug.BuildInfo{ + Deps: []*debug.Module{ + {Path: "cloud.google.com/go/pubsublite", Version: "v4"}, + }, + }, + wantOk: false, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + if gotVersion, gotOk := pubsubliteModuleVersion(tc.buildInfo); !testutil.Equal(gotVersion, tc.wantVersion) || gotOk != tc.wantOk { + t.Errorf("pubsubliteModuleVersion(): got (%v, %v), want (%v, %v)", gotVersion, gotOk, tc.wantVersion, tc.wantOk) + } + }) + } +}