Skip to content

Commit

Permalink
refactor(pubsublite): allow publish/subscribe for regional resources (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed Sep 15, 2021
1 parent 8ffed36 commit aa01955
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 85 deletions.
4 changes: 2 additions & 2 deletions pubsublite/admin.go
Expand Up @@ -61,7 +61,7 @@ func (ac *AdminClient) CreateTopic(ctx context.Context, config TopicConfig) (*To
return nil, err
}
req := &pb.CreateTopicRequest{
Parent: topicPath.Location().String(),
Parent: topicPath.LocationPath().String(),
Topic: config.toProto(),
TopicId: topicPath.TopicID,
}
Expand Down Expand Up @@ -190,7 +190,7 @@ func (ac *AdminClient) CreateSubscription(ctx context.Context, config Subscripti
return nil, err
}
req := &pb.CreateSubscriptionRequest{
Parent: subsPath.Location().String(),
Parent: subsPath.LocationPath().String(),
Subscription: config.toProto(),
SubscriptionId: subsPath.SubscriptionID,
SkipBacklog: settings.backlogLocation != Beginning,
Expand Down
6 changes: 3 additions & 3 deletions pubsublite/integration_test.go
Expand Up @@ -180,12 +180,12 @@ func TestIntegration_ResourceAdminOperations(t *testing.T) {
ctx := context.Background()
proj := testutil.ProjID()
zone := test.RandomLiteZone()
region, _ := wire.ZoneToRegion(zone)
region, _ := wire.LocationToRegion(zone)
resourceID := resourceIDs.New()

locationPath := wire.LocationPath{Project: proj, Location: zone}.String()
topicPath := wire.TopicPath{Project: proj, Zone: zone, TopicID: resourceID}.String()
subscriptionPath := wire.SubscriptionPath{Project: proj, Zone: zone, SubscriptionID: resourceID}.String()
topicPath := wire.TopicPath{Project: proj, Location: zone, TopicID: resourceID}.String()
subscriptionPath := wire.SubscriptionPath{Project: proj, Location: zone, SubscriptionID: resourceID}.String()
reservationPath := wire.ReservationPath{Project: proj, Region: region, ReservationID: resourceID}.String()
t.Logf("Topic path: %s", topicPath)

Expand Down
59 changes: 26 additions & 33 deletions pubsublite/internal/wire/resources.go
Expand Up @@ -19,17 +19,6 @@ import (
"strings"
)

// ValidateZone verifies that the `input` string has the format of a valid
// Google Cloud zone. An example zone is "europe-west1-b".
// See https://cloud.google.com/compute/docs/regions-zones for more information.
func ValidateZone(input string) error {
parts := strings.Split(input, "-")
if len(parts) != 3 {
return fmt.Errorf("pubsublite: invalid zone %q", input)
}
return nil
}

// ValidateRegion verifies that the `input` string has the format of a valid
// Google Cloud region. An example region is "europe-west1".
// See https://cloud.google.com/compute/docs/regions-zones for more information.
Expand All @@ -41,12 +30,16 @@ func ValidateRegion(input string) error {
return nil
}

// ZoneToRegion returns the region that the given zone is in.
func ZoneToRegion(zone string) (string, error) {
if err := ValidateZone(zone); err != nil {
return "", err
// LocationToRegion returns the region that the given location is in.
func LocationToRegion(location string) (string, error) {
parts := strings.Split(location, "-")
if len(parts) == 3 {
return strings.Join(parts[0:len(parts)-1], "-"), nil
}
if len(parts) == 2 {
return location, nil
}
return zone[0:strings.LastIndex(zone, "-")], nil
return "", fmt.Errorf("pubsublite: location %q is not a valid zone or region", location)
}

// LocationPath stores a path consisting of a project and zone/region.
Expand All @@ -55,7 +48,7 @@ type LocationPath struct {
// number (e.g. "987654321") can be provided.
Project string

// A Google Cloud zone or region, for example "us-central1-a", "us-central1".
// A Google Cloud zone (e.g. "us-central1-a") or region (e.g. "us-central1").
Location string
}

Expand All @@ -81,20 +74,20 @@ type TopicPath struct {
// number (e.g. "987654321") can be provided.
Project string

// A Google Cloud zone, for example "us-central1-a".
Zone string
// A Google Cloud zone (e.g. "us-central1-a") or region (e.g. "us-central1").
Location string

// The ID of the Pub/Sub Lite topic, for example "my-topic-name".
TopicID string
}

func (t TopicPath) String() string {
return fmt.Sprintf("projects/%s/locations/%s/topics/%s", t.Project, t.Zone, t.TopicID)
return fmt.Sprintf("projects/%s/locations/%s/topics/%s", t.Project, t.Location, t.TopicID)
}

// Location returns the topic's location path.
func (t TopicPath) Location() LocationPath {
return LocationPath{Project: t.Project, Location: t.Zone}
// LocationPath returns the topic's location path.
func (t TopicPath) LocationPath() LocationPath {
return LocationPath{Project: t.Project, Location: t.Location}
}

var topicPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)/topics/([^/]+)$`)
Expand All @@ -104,9 +97,9 @@ func ParseTopicPath(input string) (TopicPath, error) {
parts := topicPathRE.FindStringSubmatch(input)
if len(parts) < 4 {
return TopicPath{}, fmt.Errorf("pubsublite: invalid topic path %q. valid format is %q",
input, "projects/PROJECT_ID/locations/ZONE/topics/TOPIC_ID")
input, "projects/PROJECT_ID/locations/ZONE_OR_REGION/topics/TOPIC_ID")
}
return TopicPath{Project: parts[1], Zone: parts[2], TopicID: parts[3]}, nil
return TopicPath{Project: parts[1], Location: parts[2], TopicID: parts[3]}, nil
}

// SubscriptionPath stores the full path of a Pub/Sub Lite subscription.
Expand All @@ -115,21 +108,21 @@ type SubscriptionPath struct {
// number (e.g. "987654321") can be provided.
Project string

// A Google Cloud zone. An example zone is "us-central1-a".
Zone string
// A Google Cloud zone (e.g. "us-central1-a") or region (e.g. "us-central1").
Location string

// The ID of the Pub/Sub Lite subscription, for example
// "my-subscription-name".
SubscriptionID string
}

func (s SubscriptionPath) String() string {
return fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", s.Project, s.Zone, s.SubscriptionID)
return fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", s.Project, s.Location, s.SubscriptionID)
}

// Location returns the subscription's location path.
func (s SubscriptionPath) Location() LocationPath {
return LocationPath{Project: s.Project, Location: s.Zone}
// LocationPath returns the subscription's location path.
func (s SubscriptionPath) LocationPath() LocationPath {
return LocationPath{Project: s.Project, Location: s.Location}
}

var subsPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)/subscriptions/([^/]+)$`)
Expand All @@ -139,9 +132,9 @@ func ParseSubscriptionPath(input string) (SubscriptionPath, error) {
parts := subsPathRE.FindStringSubmatch(input)
if len(parts) < 4 {
return SubscriptionPath{}, fmt.Errorf("pubsublite: invalid subscription path %q. valid format is %q",
input, "projects/PROJECT_ID/locations/ZONE/subscriptions/SUBSCRIPTION_ID")
input, "projects/PROJECT_ID/locations/ZONE_OR_REGION/subscriptions/SUBSCRIPTION_ID")
}
return SubscriptionPath{Project: parts[1], Zone: parts[2], SubscriptionID: parts[3]}, nil
return SubscriptionPath{Project: parts[1], Location: parts[2], SubscriptionID: parts[3]}, nil
}

// ReservationPath stores the full path of a Pub/Sub Lite reservation.
Expand Down
74 changes: 32 additions & 42 deletions pubsublite/internal/wire/resources_test.go
Expand Up @@ -15,37 +15,6 @@ package wire

import "testing"

func TestValidateZone(t *testing.T) {
for _, tc := range []struct {
desc string
input string
wantErr bool
}{
{
desc: "valid",
input: "us-central1-a",
wantErr: false,
},
{
desc: "invalid: insufficient dashes",
input: "us-central1",
wantErr: true,
},
{
desc: "invalid: excess dashes",
input: "us-central1-a-b",
wantErr: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
err := ValidateZone(tc.input)
if (err != nil) != tc.wantErr {
t.Errorf("ValidateZone(%q) = %v, want err=%v", tc.input, err, tc.wantErr)
}
})
}
}

func TestValidateRegion(t *testing.T) {
for _, tc := range []struct {
desc string
Expand Down Expand Up @@ -77,22 +46,28 @@ func TestValidateRegion(t *testing.T) {
}
}

func TestZoneToRegion(t *testing.T) {
func TestLocationToRegion(t *testing.T) {
for _, tc := range []struct {
desc string
zone string
wantRegion string
wantErr bool
}{
{
desc: "valid",
desc: "valid zone",
zone: "europe-west1-d",
wantRegion: "europe-west1",
wantErr: false,
},
{
desc: "invalid: insufficient dashes",
zone: "europe-west1",
desc: "valid region",
zone: "europe-west1",
wantRegion: "europe-west1",
wantErr: false,
},
{
desc: "invalid: too many dashes",
zone: "europe-west1-b-d",
wantErr: true,
},
{
Expand All @@ -102,9 +77,9 @@ func TestZoneToRegion(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
gotRegion, gotErr := ZoneToRegion(tc.zone)
gotRegion, gotErr := LocationToRegion(tc.zone)
if gotRegion != tc.wantRegion || (gotErr != nil) != tc.wantErr {
t.Errorf("ZoneToRegion(%q) = (%v, %v), want (%v, err=%v)", tc.zone, gotRegion, gotErr, tc.wantRegion, tc.wantErr)
t.Errorf("LocationToRegion(%q) = (%v, %v), want (%v, err=%v)", tc.zone, gotRegion, gotErr, tc.wantRegion, tc.wantErr)
}
})
}
Expand All @@ -118,10 +93,15 @@ func TestParseLocationPath(t *testing.T) {
wantErr bool
}{
{
desc: "valid: location path",
desc: "valid: zone path",
input: "projects/987654321/locations/europe-west1-d",
wantPath: LocationPath{Project: "987654321", Location: "europe-west1-d"},
},
{
desc: "valid: region path",
input: "projects/987654321/locations/europe-west1",
wantPath: LocationPath{Project: "987654321", Location: "europe-west1"},
},
{
desc: "invalid: zone",
input: "europe-west1-d",
Expand Down Expand Up @@ -165,9 +145,14 @@ func TestParseTopicPath(t *testing.T) {
wantErr bool
}{
{
desc: "valid: topic path",
desc: "valid: topic in zone",
input: "projects/987654321/locations/europe-west1-d/topics/my-topic",
wantPath: TopicPath{Project: "987654321", Zone: "europe-west1-d", TopicID: "my-topic"},
wantPath: TopicPath{Project: "987654321", Location: "europe-west1-d", TopicID: "my-topic"},
},
{
desc: "valid: topic in region",
input: "projects/987654321/locations/europe-west1/topics/my-topic",
wantPath: TopicPath{Project: "987654321", Location: "europe-west1", TopicID: "my-topic"},
},
{
desc: "invalid: zone",
Expand Down Expand Up @@ -222,9 +207,14 @@ func TestParseSubscriptionPath(t *testing.T) {
wantErr bool
}{
{
desc: "valid: subscription path",
desc: "valid: subscription in zone",
input: "projects/987654321/locations/europe-west1-d/subscriptions/my-subs",
wantPath: SubscriptionPath{Project: "987654321", Zone: "europe-west1-d", SubscriptionID: "my-subs"},
wantPath: SubscriptionPath{Project: "987654321", Location: "europe-west1-d", SubscriptionID: "my-subs"},
},
{
desc: "valid: subscription in region",
input: "projects/987654321/locations/europe-west1/subscriptions/my-subs",
wantPath: SubscriptionPath{Project: "987654321", Location: "europe-west1", SubscriptionID: "my-subs"},
},
{
desc: "invalid: zone",
Expand Down
6 changes: 3 additions & 3 deletions pubsublite/pscompat/integration_test.go
Expand Up @@ -105,11 +105,11 @@ func initResourcePaths(t *testing.T) (string, wire.TopicPath, wire.SubscriptionP

proj := testutil.ProjID()
zone := test.RandomLiteZone()
region, _ := wire.ZoneToRegion(zone)
region, _ := wire.LocationToRegion(zone)
resourceID := resourceIDs.New()

topicPath := wire.TopicPath{Project: proj, Zone: zone, TopicID: resourceID}
subscriptionPath := wire.SubscriptionPath{Project: proj, Zone: zone, SubscriptionID: resourceID}
topicPath := wire.TopicPath{Project: proj, Location: zone, TopicID: resourceID}
subscriptionPath := wire.SubscriptionPath{Project: proj, Location: zone, SubscriptionID: resourceID}
return region, topicPath, subscriptionPath
}

Expand Down
2 changes: 1 addition & 1 deletion pubsublite/pscompat/publisher.go
Expand Up @@ -77,7 +77,7 @@ func NewPublisherClientWithSettings(ctx context.Context, topic string, settings
if err != nil {
return nil, err
}
region, err := wire.ZoneToRegion(topicPath.Zone)
region, err := wire.LocationToRegion(topicPath.Location)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pubsublite/pscompat/subscriber.go
Expand Up @@ -256,7 +256,7 @@ func NewSubscriberClientWithSettings(ctx context.Context, subscription string, s
if err != nil {
return nil, err
}
region, err := wire.ZoneToRegion(subscriptionPath.Zone)
region, err := wire.LocationToRegion(subscriptionPath.Location)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit aa01955

Please sign in to comment.