Skip to content

Commit

Permalink
feat(pubsublite): add client library metadata to headers (#3458)
Browse files Browse the repository at this point in the history
Adds client library version and other properties to gRPC stream metadata for debugging purposes
  • Loading branch information
tmdiep committed Jan 7, 2021
1 parent fed8979 commit 8226811
Show file tree
Hide file tree
Showing 9 changed files with 356 additions and 17 deletions.
5 changes: 4 additions & 1 deletion pubsublite/internal/wire/assigner.go
Expand Up @@ -65,6 +65,7 @@ type assigner struct {
assignmentClient *vkit.PartitionAssignmentClient
initialReq *pb.PartitionAssignmentRequest
receiveAssignment partitionAssignmentReceiver
metadata pubsubMetadata

// Fields below must be guarded with mu.
stream *retryableStream
Expand All @@ -89,8 +90,10 @@ func newAssigner(ctx context.Context, assignmentClient *vkit.PartitionAssignment
},
},
receiveAssignment: receiver,
metadata: newPubsubMetadata(),
}
a.stream = newRetryableStream(ctx, a, settings.Timeout, reflect.TypeOf(pb.PartitionAssignment{}))
a.metadata.AddClientInfo(settings.Framework)
return a, nil
}

Expand All @@ -109,7 +112,7 @@ func (a *assigner) Stop() {
}

func (a *assigner) newStream(ctx context.Context) (grpc.ClientStream, error) {
return a.assignmentClient.AssignPartitions(ctx)
return a.assignmentClient.AssignPartitions(a.metadata.AddToContext(ctx))
}

func (a *assigner) initialRequest() (interface{}, initialResponseRequired) {
Expand Down
5 changes: 4 additions & 1 deletion pubsublite/internal/wire/committer.go
Expand Up @@ -43,6 +43,7 @@ type committer struct {
// Immutable after creation.
cursorClient *vkit.CursorClient
initialReq *pb.StreamingCommitCursorRequest
metadata pubsubMetadata

// Fields below must be guarded with mutex.
stream *retryableStream
Expand All @@ -66,10 +67,12 @@ func newCommitter(ctx context.Context, cursor *vkit.CursorClient, settings Recei
},
},
},
metadata: newPubsubMetadata(),
acks: acks,
cursorTracker: newCommitCursorTracker(acks),
}
c.stream = newRetryableStream(ctx, c, settings.Timeout, reflect.TypeOf(pb.StreamingCommitCursorResponse{}))
c.metadata.AddClientInfo(settings.Framework)

backgroundTask := c.commitOffsetToStream
if disableTasks {
Expand Down Expand Up @@ -109,7 +112,7 @@ func (c *committer) Terminate() {
}

func (c *committer) newStream(ctx context.Context) (grpc.ClientStream, error) {
return c.cursorClient.StreamingCommitCursor(ctx)
return c.cursorClient.StreamingCommitCursor(c.metadata.AddToContext(ctx))
}

func (c *committer) initialRequest() (interface{}, initialResponseRequired) {
Expand Down
6 changes: 5 additions & 1 deletion pubsublite/internal/wire/publisher.go
Expand Up @@ -54,6 +54,7 @@ type singlePartitionPublisher struct {
pubClient *vkit.PublisherClient
topic topicPartition
initialReq *pb.PublishRequest
metadata pubsubMetadata

// Fields below must be guarded with mu.
stream *retryableStream
Expand Down Expand Up @@ -84,9 +85,12 @@ func (f *singlePartitionPublisherFactory) New(partition int) *singlePartitionPub
},
},
},
metadata: newPubsubMetadata(),
}
pp.batcher = newPublishMessageBatcher(&f.settings, partition, pp.onNewBatch)
pp.stream = newRetryableStream(f.ctx, pp, f.settings.Timeout, reflect.TypeOf(pb.PublishResponse{}))
pp.metadata.AddTopicRoutingMetadata(pp.topic)
pp.metadata.AddClientInfo(f.settings.Framework)
return pp
}

Expand Down Expand Up @@ -139,7 +143,7 @@ func (pp *singlePartitionPublisher) Publish(msg *pb.PubSubMessage, onResult Publ
}

func (pp *singlePartitionPublisher) newStream(ctx context.Context) (grpc.ClientStream, error) {
return pp.pubClient.Publish(addTopicRoutingMetadata(ctx, pp.topic))
return pp.pubClient.Publish(pp.metadata.AddToContext(ctx))
}

func (pp *singlePartitionPublisher) initialRequest() (interface{}, initialResponseRequired) {
Expand Down
72 changes: 59 additions & 13 deletions pubsublite/internal/wire/rpc.go
Expand Up @@ -15,6 +15,7 @@ package wire

import (
"context"
"encoding/base64"
"fmt"
"net/url"
"time"
Expand All @@ -24,6 +25,8 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"

vkit "cloud.google.com/go/pubsublite/apiv1"
gax "github.com/googleapis/gax-go/v2"
Expand Down Expand Up @@ -127,10 +130,7 @@ func retryableReadOnlyCallOption() gax.CallOption {
})
}

const (
pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443"
routingMetadataHeader = "x-goog-request-params"
)
const pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443"

func defaultClientOptions(region string) []option.ClientOption {
return []option.ClientOption{
Expand Down Expand Up @@ -164,18 +164,64 @@ func newPartitionAssignmentClient(ctx context.Context, region string, opts ...op
return vkit.NewPartitionAssignmentClient(ctx, options...)
}

func addTopicRoutingMetadata(ctx context.Context, topic topicPartition) context.Context {
md, _ := metadata.FromOutgoingContext(ctx)
md = md.Copy()
val := fmt.Sprintf("partition=%d&topic=%s", topic.Partition, url.QueryEscape(topic.Path))
md[routingMetadataHeader] = append(md[routingMetadataHeader], val)
return metadata.NewOutgoingContext(ctx, md)
const (
routingMetadataHeader = "x-goog-request-params"
clientInfoMetadataHeader = "x-goog-pubsub-context"

languageKey = "language"
languageValue = "GOLANG"
frameworkKey = "framework"
majorVersionKey = "major_version"
minorVersionKey = "minor_version"
)

func stringValue(str string) *structpb.Value {
return &structpb.Value{
Kind: &structpb.Value_StringValue{StringValue: str},
}
}

// pubsubMetadata stores key/value pairs that should be added to gRPC metadata.
type pubsubMetadata map[string]string

func newPubsubMetadata() pubsubMetadata {
return make(map[string]string)
}

func addSubscriptionRoutingMetadata(ctx context.Context, subscription subscriptionPartition) context.Context {
func (pm pubsubMetadata) AddTopicRoutingMetadata(topic topicPartition) {
pm[routingMetadataHeader] = fmt.Sprintf("partition=%d&topic=%s", topic.Partition, url.QueryEscape(topic.Path))
}

func (pm pubsubMetadata) AddSubscriptionRoutingMetadata(subscription subscriptionPartition) {
pm[routingMetadataHeader] = fmt.Sprintf("partition=%d&subscription=%s", subscription.Partition, url.QueryEscape(subscription.Path))
}

func (pm pubsubMetadata) AddClientInfo(framework FrameworkType) {
pm.doAddClientInfo(framework, libraryVersion)
}

func (pm pubsubMetadata) doAddClientInfo(framework FrameworkType, getVersion func() (version, bool)) {
s := &structpb.Struct{
Fields: make(map[string]*structpb.Value),
}
s.Fields[languageKey] = stringValue(languageValue)
if len(framework) > 0 {
s.Fields[frameworkKey] = stringValue(string(framework))
}
if version, ok := getVersion(); ok {
s.Fields[majorVersionKey] = stringValue(version.Major)
s.Fields[minorVersionKey] = stringValue(version.Minor)
}
if bytes, err := proto.Marshal(s); err == nil {
pm[clientInfoMetadataHeader] = base64.StdEncoding.EncodeToString(bytes)
}
}

func (pm pubsubMetadata) AddToContext(ctx context.Context) context.Context {
md, _ := metadata.FromOutgoingContext(ctx)
md = md.Copy()
val := fmt.Sprintf("partition=%d&subscription=%s", subscription.Partition, url.QueryEscape(subscription.Path))
md[routingMetadataHeader] = append(md[routingMetadataHeader], val)
for key, val := range pm {
md[key] = append(md[key], val)
}
return metadata.NewOutgoingContext(ctx, md)
}
91 changes: 91 additions & 0 deletions pubsublite/internal/wire/rpc_test.go
@@ -0,0 +1,91 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
"encoding/base64"
"testing"

"cloud.google.com/go/internal/testutil"
"github.com/golang/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
)

func TestPubsubMetadataAddClientInfo(t *testing.T) {
for _, tc := range []struct {
desc string
framework FrameworkType
libraryVersion func() (version, bool)
wantClientInfo *structpb.Struct
}{
{
desc: "minimal",
libraryVersion: func() (version, bool) {
return version{}, false
},
wantClientInfo: &structpb.Struct{
Fields: map[string]*structpb.Value{
"language": stringValue("GOLANG"),
},
},
},
{
desc: "cps shim",
framework: FrameworkCloudPubSubShim,
libraryVersion: func() (version, bool) {
return version{}, false
},
wantClientInfo: &structpb.Struct{
Fields: map[string]*structpb.Value{
"language": stringValue("GOLANG"),
"framework": stringValue("CLOUD_PUBSUB_SHIM"),
},
},
},
{
desc: "version valid",
framework: FrameworkCloudPubSubShim,
libraryVersion: func() (version, bool) {
return version{Major: "1", Minor: "2"}, true
},
wantClientInfo: &structpb.Struct{
Fields: map[string]*structpb.Value{
"language": stringValue("GOLANG"),
"framework": stringValue("CLOUD_PUBSUB_SHIM"),
"major_version": stringValue("1"),
"minor_version": stringValue("2"),
},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
metadata := newPubsubMetadata()
metadata.doAddClientInfo(tc.framework, tc.libraryVersion)

b, err := base64.StdEncoding.DecodeString(metadata["x-goog-pubsub-context"])
if err != nil {
t.Errorf("Failed to decode base64 pubsub context header: %v", err)
return
}
gotClientInfo := new(structpb.Struct)
if err := proto.Unmarshal(b, gotClientInfo); err != nil {
t.Errorf("Failed to unmarshal pubsub context structpb: %v", err)
return
}
if diff := testutil.Diff(gotClientInfo, tc.wantClientInfo); diff != "" {
t.Errorf("Pubsub context structpb: got: -, want: +\n%s", diff)
}
})
}
}
6 changes: 5 additions & 1 deletion pubsublite/internal/wire/subscriber.go
Expand Up @@ -130,6 +130,7 @@ type subscribeStream struct {
subscription subscriptionPartition
initialReq *pb.SubscribeRequest
messageQueue *messageDeliveryQueue
metadata pubsubMetadata

// Fields below must be guarded with mu.
stream *retryableStream
Expand Down Expand Up @@ -157,8 +158,11 @@ func newSubscribeStream(ctx context.Context, subClient *vkit.SubscriberClient, s
},
},
messageQueue: newMessageDeliveryQueue(acks, receiver, settings.MaxOutstandingMessages),
metadata: newPubsubMetadata(),
}
s.stream = newRetryableStream(ctx, s, settings.Timeout, reflect.TypeOf(pb.SubscribeResponse{}))
s.metadata.AddSubscriptionRoutingMetadata(s.subscription)
s.metadata.AddClientInfo(settings.Framework)

backgroundTask := s.sendBatchFlowControl
if disableTasks {
Expand Down Expand Up @@ -194,7 +198,7 @@ func (s *subscribeStream) Stop() {
}

func (s *subscribeStream) newStream(ctx context.Context) (grpc.ClientStream, error) {
return s.subClient.Subscribe(addSubscriptionRoutingMetadata(ctx, s.subscription))
return s.subClient.Subscribe(s.metadata.AddToContext(ctx))
}

func (s *subscribeStream) initialRequest() (interface{}, initialResponseRequired) {
Expand Down
66 changes: 66 additions & 0 deletions pubsublite/internal/wire/version.go
@@ -0,0 +1,66 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

// +build go1.12

package wire

import (
"runtime/debug"
"strconv"
"strings"
)

const pubsubLiteModulePath = "cloud.google.com/go/pubsublite"

type version struct {
Major string
Minor string
}

// libraryVersion attempts to determine the pubsublite module version.
func libraryVersion() (version, bool) {
if buildInfo, ok := debug.ReadBuildInfo(); ok {
return pubsubliteModuleVersion(buildInfo)
}
return version{}, false
}

// pubsubliteModuleVersion extracts the module version from BuildInfo embedded
// in the binary. Only applies to binaries built with module support.
func pubsubliteModuleVersion(buildInfo *debug.BuildInfo) (version, bool) {
for _, dep := range buildInfo.Deps {
if dep.Path == pubsubLiteModulePath {
return parseModuleVersion(dep.Version)
}
}
return version{}, false
}

func parseModuleVersion(value string) (v version, ok bool) {
if strings.HasPrefix(value, "v") {
value = value[1:]
}
components := strings.Split(value, ".")
if len(components) >= 2 {
if _, err := strconv.ParseInt(components[0], 10, 32); err != nil {
return
}
if _, err := strconv.ParseInt(components[1], 10, 32); err != nil {
return
}
v = version{Major: components[0], Minor: components[1]}
ok = true
}
return
}

0 comments on commit 8226811

Please sign in to comment.