Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): Added Pub/Sub Lite clients and routing headers #3105

Merged
merged 2 commits into from Oct 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 3 additions & 6 deletions pubsublite/admin.go
Expand Up @@ -17,13 +17,12 @@ import (
"context"

"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"

vkit "cloud.google.com/go/pubsublite/apiv1"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

// AdminClient provides admin operations for Google Pub/Sub Lite resources
// AdminClient provides admin operations for Cloud Pub/Sub Lite resources
// within a Google Cloud region. An AdminClient may be shared by multiple
// goroutines.
type AdminClient struct {
Expand All @@ -33,14 +32,12 @@ type AdminClient struct {
// NewAdminClient creates a new Cloud Pub/Sub Lite client to perform admin
// operations for resources within a given region.
// See https://cloud.google.com/pubsub/lite/docs/locations for the list of
// regions and zones where Google Pub/Sub Lite is available.
// regions and zones where Cloud Pub/Sub Lite is available.
func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*AdminClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := []option.ClientOption{internaloption.WithDefaultEndpoint(region + "-pubsublite.googleapis.com:443")}
options = append(options, opts...)
admin, err := vkit.NewAdminClient(ctx, options...)
admin, err := newAdminClient(ctx, region, opts...)
if err != nil {
return nil, err
}
Expand Down
74 changes: 74 additions & 0 deletions pubsublite/rpc.go
Expand Up @@ -14,11 +14,18 @@
package pubsublite

import (
"context"
"fmt"
"net/url"
"time"

"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

vkit "cloud.google.com/go/pubsublite/apiv1"
gax "github.com/googleapis/gax-go/v2"
)

Expand Down Expand Up @@ -98,3 +105,70 @@ func isRetryableStreamError(err error, isEligible func(codes.Code) bool) bool {
}
return isEligible(s.Code())
}

const (
pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443"
routingMetadataHeader = "x-goog-request-params"
)

func defaultClientOptions(region string) []option.ClientOption {
return []option.ClientOption{
internaloption.WithDefaultEndpoint(region + pubsubLiteDefaultEndpoint),
}
}

func newAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewAdminClient(ctx, options...)
}

func newPublisherClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PublisherClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewPublisherClient(ctx, options...)
}

func newSubscriberClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.SubscriberClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewSubscriberClient(ctx, options...)
}

func newCursorClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.CursorClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewCursorClient(ctx, options...)
}

func newPartitionAssignmentClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PartitionAssignmentClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewPartitionAssignmentClient(ctx, options...)
}

func addTopicRoutingMetadata(ctx context.Context, topic TopicPath, partition int) context.Context {
md, _ := metadata.FromOutgoingContext(ctx)
md = md.Copy()
val := fmt.Sprintf("partition=%d&topic=%s", partition, url.QueryEscape(topic.String()))
md[routingMetadataHeader] = append(md[routingMetadataHeader], val)
return metadata.NewOutgoingContext(ctx, md)
}

func addSubscriptionRoutingMetadata(ctx context.Context, subs SubscriptionPath, partition int) context.Context {
md, _ := metadata.FromOutgoingContext(ctx)
md = md.Copy()
val := fmt.Sprintf("partition=%d&subscription=%s", partition, url.QueryEscape(subs.String()))
md[routingMetadataHeader] = append(md[routingMetadataHeader], val)
return metadata.NewOutgoingContext(ctx, md)
}