Skip to content

Commit

Permalink
feat(pubsublite): Added Pub/Sub Lite clients and routing headers (#3105)
Browse files Browse the repository at this point in the history
Added functions to create all the required gapic clients. Updated the AdminClient to use these.
Added utils for attaching headers for routing to the correct backend server handling a particular topic or subscription.
  • Loading branch information
tmdiep committed Oct 30, 2020
1 parent b66727a commit 98668fa
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 6 deletions.
9 changes: 3 additions & 6 deletions pubsublite/admin.go
Expand Up @@ -17,13 +17,12 @@ import (
"context"

"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"

vkit "cloud.google.com/go/pubsublite/apiv1"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

// AdminClient provides admin operations for Google Pub/Sub Lite resources
// AdminClient provides admin operations for Cloud Pub/Sub Lite resources
// within a Google Cloud region. An AdminClient may be shared by multiple
// goroutines.
type AdminClient struct {
Expand All @@ -33,14 +32,12 @@ type AdminClient struct {
// NewAdminClient creates a new Cloud Pub/Sub Lite client to perform admin
// operations for resources within a given region.
// See https://cloud.google.com/pubsub/lite/docs/locations for the list of
// regions and zones where Google Pub/Sub Lite is available.
// regions and zones where Cloud Pub/Sub Lite is available.
func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*AdminClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := []option.ClientOption{internaloption.WithDefaultEndpoint(region + "-pubsublite.googleapis.com:443")}
options = append(options, opts...)
admin, err := vkit.NewAdminClient(ctx, options...)
admin, err := newAdminClient(ctx, region, opts...)
if err != nil {
return nil, err
}
Expand Down
74 changes: 74 additions & 0 deletions pubsublite/rpc.go
Expand Up @@ -14,11 +14,18 @@
package pubsublite

import (
"context"
"fmt"
"net/url"
"time"

"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

vkit "cloud.google.com/go/pubsublite/apiv1"
gax "github.com/googleapis/gax-go/v2"
)

Expand Down Expand Up @@ -98,3 +105,70 @@ func isRetryableStreamError(err error, isEligible func(codes.Code) bool) bool {
}
return isEligible(s.Code())
}

const (
pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443"
routingMetadataHeader = "x-goog-request-params"
)

func defaultClientOptions(region string) []option.ClientOption {
return []option.ClientOption{
internaloption.WithDefaultEndpoint(region + pubsubLiteDefaultEndpoint),
}
}

func newAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewAdminClient(ctx, options...)
}

func newPublisherClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PublisherClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewPublisherClient(ctx, options...)
}

func newSubscriberClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.SubscriberClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewSubscriberClient(ctx, options...)
}

func newCursorClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.CursorClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewCursorClient(ctx, options...)
}

func newPartitionAssignmentClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PartitionAssignmentClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewPartitionAssignmentClient(ctx, options...)
}

func addTopicRoutingMetadata(ctx context.Context, topic TopicPath, partition int) context.Context {
md, _ := metadata.FromOutgoingContext(ctx)
md = md.Copy()
val := fmt.Sprintf("partition=%d&topic=%s", partition, url.QueryEscape(topic.String()))
md[routingMetadataHeader] = append(md[routingMetadataHeader], val)
return metadata.NewOutgoingContext(ctx, md)
}

func addSubscriptionRoutingMetadata(ctx context.Context, subs SubscriptionPath, partition int) context.Context {
md, _ := metadata.FromOutgoingContext(ctx)
md = md.Copy()
val := fmt.Sprintf("partition=%d&subscription=%s", partition, url.QueryEscape(subs.String()))
md[routingMetadataHeader] = append(md[routingMetadataHeader], val)
return metadata.NewOutgoingContext(ctx, md)
}

0 comments on commit 98668fa

Please sign in to comment.