diff --git a/pubsublite/admin.go b/pubsublite/admin.go index 4195c3f25f7..485f63513ef 100644 --- a/pubsublite/admin.go +++ b/pubsublite/admin.go @@ -17,13 +17,12 @@ import ( "context" "google.golang.org/api/option" - "google.golang.org/api/option/internaloption" vkit "cloud.google.com/go/pubsublite/apiv1" pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" ) -// AdminClient provides admin operations for Google Pub/Sub Lite resources +// AdminClient provides admin operations for Cloud Pub/Sub Lite resources // within a Google Cloud region. An AdminClient may be shared by multiple // goroutines. type AdminClient struct { @@ -33,14 +32,12 @@ type AdminClient struct { // NewAdminClient creates a new Cloud Pub/Sub Lite client to perform admin // operations for resources within a given region. // See https://cloud.google.com/pubsub/lite/docs/locations for the list of -// regions and zones where Google Pub/Sub Lite is available. +// regions and zones where Cloud Pub/Sub Lite is available. func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*AdminClient, error) { if err := validateRegion(region); err != nil { return nil, err } - options := []option.ClientOption{internaloption.WithDefaultEndpoint(region + "-pubsublite.googleapis.com:443")} - options = append(options, opts...) - admin, err := vkit.NewAdminClient(ctx, options...) + admin, err := newAdminClient(ctx, region, opts...) if err != nil { return nil, err } diff --git a/pubsublite/rpc.go b/pubsublite/rpc.go index 94ac1d7dbf5..f3e051d8f96 100644 --- a/pubsublite/rpc.go +++ b/pubsublite/rpc.go @@ -14,11 +14,18 @@ package pubsublite import ( + "context" + "fmt" + "net/url" "time" + "google.golang.org/api/option" + "google.golang.org/api/option/internaloption" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + vkit "cloud.google.com/go/pubsublite/apiv1" gax "github.com/googleapis/gax-go/v2" ) @@ -98,3 +105,70 @@ func isRetryableStreamError(err error, isEligible func(codes.Code) bool) bool { } return isEligible(s.Code()) } + +const ( + pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443" + routingMetadataHeader = "x-goog-request-params" +) + +func defaultClientOptions(region string) []option.ClientOption { + return []option.ClientOption{ + internaloption.WithDefaultEndpoint(region + pubsubLiteDefaultEndpoint), + } +} + +func newAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error) { + if err := validateRegion(region); err != nil { + return nil, err + } + options := append(defaultClientOptions(region), opts...) + return vkit.NewAdminClient(ctx, options...) +} + +func newPublisherClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PublisherClient, error) { + if err := validateRegion(region); err != nil { + return nil, err + } + options := append(defaultClientOptions(region), opts...) + return vkit.NewPublisherClient(ctx, options...) +} + +func newSubscriberClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.SubscriberClient, error) { + if err := validateRegion(region); err != nil { + return nil, err + } + options := append(defaultClientOptions(region), opts...) + return vkit.NewSubscriberClient(ctx, options...) +} + +func newCursorClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.CursorClient, error) { + if err := validateRegion(region); err != nil { + return nil, err + } + options := append(defaultClientOptions(region), opts...) + return vkit.NewCursorClient(ctx, options...) +} + +func newPartitionAssignmentClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PartitionAssignmentClient, error) { + if err := validateRegion(region); err != nil { + return nil, err + } + options := append(defaultClientOptions(region), opts...) + return vkit.NewPartitionAssignmentClient(ctx, options...) +} + +func addTopicRoutingMetadata(ctx context.Context, topic TopicPath, partition int) context.Context { + md, _ := metadata.FromOutgoingContext(ctx) + md = md.Copy() + val := fmt.Sprintf("partition=%d&topic=%s", partition, url.QueryEscape(topic.String())) + md[routingMetadataHeader] = append(md[routingMetadataHeader], val) + return metadata.NewOutgoingContext(ctx, md) +} + +func addSubscriptionRoutingMetadata(ctx context.Context, subs SubscriptionPath, partition int) context.Context { + md, _ := metadata.FromOutgoingContext(ctx) + md = md.Copy() + val := fmt.Sprintf("partition=%d&subscription=%s", partition, url.QueryEscape(subs.String())) + md[routingMetadataHeader] = append(md[routingMetadataHeader], val) + return metadata.NewOutgoingContext(ctx, md) +}