diff --git a/pubsublite/internal/wire/assigner.go b/pubsublite/internal/wire/assigner.go index e6e722bb1d7..14e71f6085e 100644 --- a/pubsublite/internal/wire/assigner.go +++ b/pubsublite/internal/wire/assigner.go @@ -65,6 +65,7 @@ type assigner struct { assignmentClient *vkit.PartitionAssignmentClient initialReq *pb.PartitionAssignmentRequest receiveAssignment partitionAssignmentReceiver + metadata pubsubMetadata // Fields below must be guarded with mu. stream *retryableStream @@ -89,8 +90,10 @@ func newAssigner(ctx context.Context, assignmentClient *vkit.PartitionAssignment }, }, receiveAssignment: receiver, + metadata: newPubsubMetadata(), } a.stream = newRetryableStream(ctx, a, settings.Timeout, reflect.TypeOf(pb.PartitionAssignment{})) + a.metadata.AddClientInfo(settings.Framework) return a, nil } @@ -109,7 +112,7 @@ func (a *assigner) Stop() { } func (a *assigner) newStream(ctx context.Context) (grpc.ClientStream, error) { - return a.assignmentClient.AssignPartitions(ctx) + return a.assignmentClient.AssignPartitions(a.metadata.AddToContext(ctx)) } func (a *assigner) initialRequest() (interface{}, initialResponseRequired) { diff --git a/pubsublite/internal/wire/committer.go b/pubsublite/internal/wire/committer.go index 0e32b95a346..c830f8730b9 100644 --- a/pubsublite/internal/wire/committer.go +++ b/pubsublite/internal/wire/committer.go @@ -43,6 +43,7 @@ type committer struct { // Immutable after creation. cursorClient *vkit.CursorClient initialReq *pb.StreamingCommitCursorRequest + metadata pubsubMetadata // Fields below must be guarded with mutex. stream *retryableStream @@ -66,10 +67,12 @@ func newCommitter(ctx context.Context, cursor *vkit.CursorClient, settings Recei }, }, }, + metadata: newPubsubMetadata(), acks: acks, cursorTracker: newCommitCursorTracker(acks), } c.stream = newRetryableStream(ctx, c, settings.Timeout, reflect.TypeOf(pb.StreamingCommitCursorResponse{})) + c.metadata.AddClientInfo(settings.Framework) backgroundTask := c.commitOffsetToStream if disableTasks { @@ -109,7 +112,7 @@ func (c *committer) Terminate() { } func (c *committer) newStream(ctx context.Context) (grpc.ClientStream, error) { - return c.cursorClient.StreamingCommitCursor(ctx) + return c.cursorClient.StreamingCommitCursor(c.metadata.AddToContext(ctx)) } func (c *committer) initialRequest() (interface{}, initialResponseRequired) { diff --git a/pubsublite/internal/wire/publisher.go b/pubsublite/internal/wire/publisher.go index 0c876b70c1e..6fcf8f0cf26 100644 --- a/pubsublite/internal/wire/publisher.go +++ b/pubsublite/internal/wire/publisher.go @@ -54,6 +54,7 @@ type singlePartitionPublisher struct { pubClient *vkit.PublisherClient topic topicPartition initialReq *pb.PublishRequest + metadata pubsubMetadata // Fields below must be guarded with mu. stream *retryableStream @@ -84,9 +85,12 @@ func (f *singlePartitionPublisherFactory) New(partition int) *singlePartitionPub }, }, }, + metadata: newPubsubMetadata(), } pp.batcher = newPublishMessageBatcher(&f.settings, partition, pp.onNewBatch) pp.stream = newRetryableStream(f.ctx, pp, f.settings.Timeout, reflect.TypeOf(pb.PublishResponse{})) + pp.metadata.AddTopicRoutingMetadata(pp.topic) + pp.metadata.AddClientInfo(f.settings.Framework) return pp } @@ -139,7 +143,7 @@ func (pp *singlePartitionPublisher) Publish(msg *pb.PubSubMessage, onResult Publ } func (pp *singlePartitionPublisher) newStream(ctx context.Context) (grpc.ClientStream, error) { - return pp.pubClient.Publish(addTopicRoutingMetadata(ctx, pp.topic)) + return pp.pubClient.Publish(pp.metadata.AddToContext(ctx)) } func (pp *singlePartitionPublisher) initialRequest() (interface{}, initialResponseRequired) { diff --git a/pubsublite/internal/wire/rpc.go b/pubsublite/internal/wire/rpc.go index 17ad9d05bdc..c0bd1a96ab6 100644 --- a/pubsublite/internal/wire/rpc.go +++ b/pubsublite/internal/wire/rpc.go @@ -15,6 +15,7 @@ package wire import ( "context" + "encoding/base64" "fmt" "net/url" "time" @@ -24,6 +25,8 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/structpb" vkit "cloud.google.com/go/pubsublite/apiv1" gax "github.com/googleapis/gax-go/v2" @@ -127,10 +130,7 @@ func retryableReadOnlyCallOption() gax.CallOption { }) } -const ( - pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443" - routingMetadataHeader = "x-goog-request-params" -) +const pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443" func defaultClientOptions(region string) []option.ClientOption { return []option.ClientOption{ @@ -164,18 +164,64 @@ func newPartitionAssignmentClient(ctx context.Context, region string, opts ...op return vkit.NewPartitionAssignmentClient(ctx, options...) } -func addTopicRoutingMetadata(ctx context.Context, topic topicPartition) context.Context { - md, _ := metadata.FromOutgoingContext(ctx) - md = md.Copy() - val := fmt.Sprintf("partition=%d&topic=%s", topic.Partition, url.QueryEscape(topic.Path)) - md[routingMetadataHeader] = append(md[routingMetadataHeader], val) - return metadata.NewOutgoingContext(ctx, md) +const ( + routingMetadataHeader = "x-goog-request-params" + clientInfoMetadataHeader = "x-goog-pubsub-context" + + languageKey = "language" + languageValue = "GOLANG" + frameworkKey = "framework" + majorVersionKey = "major_version" + minorVersionKey = "minor_version" +) + +func stringValue(str string) *structpb.Value { + return &structpb.Value{ + Kind: &structpb.Value_StringValue{StringValue: str}, + } +} + +// pubsubMetadata stores key/value pairs that should be added to gRPC metadata. +type pubsubMetadata map[string]string + +func newPubsubMetadata() pubsubMetadata { + return make(map[string]string) } -func addSubscriptionRoutingMetadata(ctx context.Context, subscription subscriptionPartition) context.Context { +func (pm pubsubMetadata) AddTopicRoutingMetadata(topic topicPartition) { + pm[routingMetadataHeader] = fmt.Sprintf("partition=%d&topic=%s", topic.Partition, url.QueryEscape(topic.Path)) +} + +func (pm pubsubMetadata) AddSubscriptionRoutingMetadata(subscription subscriptionPartition) { + pm[routingMetadataHeader] = fmt.Sprintf("partition=%d&subscription=%s", subscription.Partition, url.QueryEscape(subscription.Path)) +} + +func (pm pubsubMetadata) AddClientInfo(framework FrameworkType) { + pm.doAddClientInfo(framework, libraryVersion) +} + +func (pm pubsubMetadata) doAddClientInfo(framework FrameworkType, getVersion func() (version, bool)) { + s := &structpb.Struct{ + Fields: make(map[string]*structpb.Value), + } + s.Fields[languageKey] = stringValue(languageValue) + if len(framework) > 0 { + s.Fields[frameworkKey] = stringValue(string(framework)) + } + if version, ok := getVersion(); ok { + s.Fields[majorVersionKey] = stringValue(version.Major) + s.Fields[minorVersionKey] = stringValue(version.Minor) + } + if bytes, err := proto.Marshal(s); err == nil { + pm[clientInfoMetadataHeader] = base64.StdEncoding.EncodeToString(bytes) + } +} + +func (pm pubsubMetadata) AddToContext(ctx context.Context) context.Context { md, _ := metadata.FromOutgoingContext(ctx) md = md.Copy() - val := fmt.Sprintf("partition=%d&subscription=%s", subscription.Partition, url.QueryEscape(subscription.Path)) - md[routingMetadataHeader] = append(md[routingMetadataHeader], val) + for key, val := range pm { + md[key] = append(md[key], val) + } return metadata.NewOutgoingContext(ctx, md) } diff --git a/pubsublite/internal/wire/rpc_test.go b/pubsublite/internal/wire/rpc_test.go new file mode 100644 index 00000000000..8fe0f5b88a7 --- /dev/null +++ b/pubsublite/internal/wire/rpc_test.go @@ -0,0 +1,91 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package wire + +import ( + "encoding/base64" + "testing" + + "cloud.google.com/go/internal/testutil" + "github.com/golang/protobuf/proto" + "google.golang.org/protobuf/types/known/structpb" +) + +func TestPubsubMetadataAddClientInfo(t *testing.T) { + for _, tc := range []struct { + desc string + framework FrameworkType + libraryVersion func() (version, bool) + wantClientInfo *structpb.Struct + }{ + { + desc: "minimal", + libraryVersion: func() (version, bool) { + return version{}, false + }, + wantClientInfo: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "language": stringValue("GOLANG"), + }, + }, + }, + { + desc: "cps shim", + framework: FrameworkCloudPubSubShim, + libraryVersion: func() (version, bool) { + return version{}, false + }, + wantClientInfo: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "language": stringValue("GOLANG"), + "framework": stringValue("CLOUD_PUBSUB_SHIM"), + }, + }, + }, + { + desc: "version valid", + framework: FrameworkCloudPubSubShim, + libraryVersion: func() (version, bool) { + return version{Major: "1", Minor: "2"}, true + }, + wantClientInfo: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "language": stringValue("GOLANG"), + "framework": stringValue("CLOUD_PUBSUB_SHIM"), + "major_version": stringValue("1"), + "minor_version": stringValue("2"), + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + metadata := newPubsubMetadata() + metadata.doAddClientInfo(tc.framework, tc.libraryVersion) + + b, err := base64.StdEncoding.DecodeString(metadata["x-goog-pubsub-context"]) + if err != nil { + t.Errorf("Failed to decode base64 pubsub context header: %v", err) + return + } + gotClientInfo := new(structpb.Struct) + if err := proto.Unmarshal(b, gotClientInfo); err != nil { + t.Errorf("Failed to unmarshal pubsub context structpb: %v", err) + return + } + if diff := testutil.Diff(gotClientInfo, tc.wantClientInfo); diff != "" { + t.Errorf("Pubsub context structpb: got: -, want: +\n%s", diff) + } + }) + } +} diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go index b7800d08d11..00c427629dd 100644 --- a/pubsublite/internal/wire/subscriber.go +++ b/pubsublite/internal/wire/subscriber.go @@ -130,6 +130,7 @@ type subscribeStream struct { subscription subscriptionPartition initialReq *pb.SubscribeRequest messageQueue *messageDeliveryQueue + metadata pubsubMetadata // Fields below must be guarded with mu. stream *retryableStream @@ -157,8 +158,11 @@ func newSubscribeStream(ctx context.Context, subClient *vkit.SubscriberClient, s }, }, messageQueue: newMessageDeliveryQueue(acks, receiver, settings.MaxOutstandingMessages), + metadata: newPubsubMetadata(), } s.stream = newRetryableStream(ctx, s, settings.Timeout, reflect.TypeOf(pb.SubscribeResponse{})) + s.metadata.AddSubscriptionRoutingMetadata(s.subscription) + s.metadata.AddClientInfo(settings.Framework) backgroundTask := s.sendBatchFlowControl if disableTasks { @@ -194,7 +198,7 @@ func (s *subscribeStream) Stop() { } func (s *subscribeStream) newStream(ctx context.Context) (grpc.ClientStream, error) { - return s.subClient.Subscribe(addSubscriptionRoutingMetadata(ctx, s.subscription)) + return s.subClient.Subscribe(s.metadata.AddToContext(ctx)) } func (s *subscribeStream) initialRequest() (interface{}, initialResponseRequired) { diff --git a/pubsublite/internal/wire/version.go b/pubsublite/internal/wire/version.go new file mode 100644 index 00000000000..c3f92783518 --- /dev/null +++ b/pubsublite/internal/wire/version.go @@ -0,0 +1,66 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +// +build go1.12 + +package wire + +import ( + "runtime/debug" + "strconv" + "strings" +) + +const pubsubLiteModulePath = "cloud.google.com/go/pubsublite" + +type version struct { + Major string + Minor string +} + +// libraryVersion attempts to determine the pubsublite module version. +func libraryVersion() (version, bool) { + if buildInfo, ok := debug.ReadBuildInfo(); ok { + return pubsubliteModuleVersion(buildInfo) + } + return version{}, false +} + +// pubsubliteModuleVersion extracts the module version from BuildInfo embedded +// in the binary. Only applies to binaries built with module support. +func pubsubliteModuleVersion(buildInfo *debug.BuildInfo) (version, bool) { + for _, dep := range buildInfo.Deps { + if dep.Path == pubsubLiteModulePath { + return parseModuleVersion(dep.Version) + } + } + return version{}, false +} + +func parseModuleVersion(value string) (v version, ok bool) { + if strings.HasPrefix(value, "v") { + value = value[1:] + } + components := strings.Split(value, ".") + if len(components) >= 2 { + if _, err := strconv.ParseInt(components[0], 10, 32); err != nil { + return + } + if _, err := strconv.ParseInt(components[1], 10, 32); err != nil { + return + } + v = version{Major: components[0], Minor: components[1]} + ok = true + } + return +} diff --git a/pubsublite/internal/wire/version_not112.go b/pubsublite/internal/wire/version_not112.go new file mode 100644 index 00000000000..28ccfbfce4d --- /dev/null +++ b/pubsublite/internal/wire/version_not112.go @@ -0,0 +1,26 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +// TODO: Remove file when the minimum supported version is Go1.12. +// +build !go1.12 + +package wire + +type version struct { + Major string + Minor string +} + +func libraryVersion() (version, bool) { + return version{}, false +} diff --git a/pubsublite/internal/wire/version_test.go b/pubsublite/internal/wire/version_test.go new file mode 100644 index 00000000000..a84c84c696f --- /dev/null +++ b/pubsublite/internal/wire/version_test.go @@ -0,0 +1,96 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +// +build go1.12 + +package wire + +import ( + "runtime/debug" + "testing" + + "cloud.google.com/go/internal/testutil" +) + +func TestPubsubliteModuleVersion(t *testing.T) { + for _, tc := range []struct { + desc string + buildInfo *debug.BuildInfo + wantVersion version + wantOk bool + }{ + { + desc: "version valid", + buildInfo: &debug.BuildInfo{ + Deps: []*debug.Module{ + {Path: "cloud.google.com/go/pubsublite", Version: "v1.2.2"}, + {Path: "cloud.google.com/go/pubsub", Version: "v1.8.3"}, + }, + }, + wantVersion: version{Major: "1", Minor: "2"}, + wantOk: true, + }, + { + desc: "version corner case", + buildInfo: &debug.BuildInfo{ + Deps: []*debug.Module{ + {Path: "cloud.google.com/go/pubsublite", Version: "2.3"}, + }, + }, + wantVersion: version{Major: "2", Minor: "3"}, + wantOk: true, + }, + { + desc: "version missing", + buildInfo: &debug.BuildInfo{ + Deps: []*debug.Module{ + {Path: "cloud.google.com/go/pubsub", Version: "v1.8.3"}, + }, + }, + wantOk: false, + }, + { + desc: "minor version invalid", + buildInfo: &debug.BuildInfo{ + Deps: []*debug.Module{ + {Path: "cloud.google.com/go/pubsublite", Version: "v1.a.2"}, + }, + }, + wantOk: false, + }, + { + desc: "major version invalid", + buildInfo: &debug.BuildInfo{ + Deps: []*debug.Module{ + {Path: "cloud.google.com/go/pubsublite", Version: "vb.1.2"}, + }, + }, + wantOk: false, + }, + { + desc: "minor version missing", + buildInfo: &debug.BuildInfo{ + Deps: []*debug.Module{ + {Path: "cloud.google.com/go/pubsublite", Version: "v4"}, + }, + }, + wantOk: false, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + if gotVersion, gotOk := pubsubliteModuleVersion(tc.buildInfo); !testutil.Equal(gotVersion, tc.wantVersion) || gotOk != tc.wantOk { + t.Errorf("pubsubliteModuleVersion(): got (%v, %v), want (%v, %v)", gotVersion, gotOk, tc.wantVersion, tc.wantOk) + } + }) + } +}