Skip to content

Commit

Permalink
Address additional review comments in googleapis#3026
Browse files Browse the repository at this point in the history
  • Loading branch information
tmdiep committed Oct 20, 2020
1 parent 343b0c3 commit a71385e
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 43 deletions.
32 changes: 16 additions & 16 deletions pubsublite/config.go
Expand Up @@ -33,15 +33,15 @@ type TopicConfig struct {

// The number of partitions in the topic. Must be at least 1. Cannot be
// changed after creation.
PartitionCount int64
PartitionCount int

// Publish throughput capacity per partition in MiB/s.
// Must be >= 4 and <= 16.
PublishCapacityMiBPerSec int32
PublishCapacityMiBPerSec int

// Subscribe throughput capacity per partition in MiB/s.
// Must be >= 4 and <= 32.
SubscribeCapacityMiBPerSec int32
SubscribeCapacityMiBPerSec int

// The provisioned storage, in bytes, per partition. If the number of bytes
// stored in any of the topic's partitions grows beyond this value, older
Expand All @@ -59,11 +59,11 @@ func (tc *TopicConfig) toProto() *pb.Topic {
topicpb := &pb.Topic{
Name: tc.Name.String(),
PartitionConfig: &pb.Topic_PartitionConfig{
Count: tc.PartitionCount,
Count: int64(tc.PartitionCount),
Dimension: &pb.Topic_PartitionConfig_Capacity_{
Capacity: &pb.Topic_PartitionConfig_Capacity{
PublishMibPerSec: tc.PublishCapacityMiBPerSec,
SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec,
PublishMibPerSec: int32(tc.PublishCapacityMiBPerSec),
SubscribeMibPerSec: int32(tc.SubscribeCapacityMiBPerSec),
},
},
},
Expand All @@ -81,7 +81,7 @@ func (tc *TopicConfig) toProto() *pb.Topic {
}

func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) {
name, err := ParseTopicPath(t.GetName())
name, err := parseTopicPath(t.GetName())
if err != nil {
return nil, fmt.Errorf("pubsublite: invalid topic name %q in topic config", t.GetName())
}
Expand All @@ -90,9 +90,9 @@ func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) {
retentionCfg := t.GetRetentionConfig()
topic := &TopicConfig{
Name: name,
PartitionCount: partitionCfg.GetCount(),
PublishCapacityMiBPerSec: partitionCfg.GetCapacity().GetPublishMibPerSec(),
SubscribeCapacityMiBPerSec: partitionCfg.GetCapacity().GetSubscribeMibPerSec(),
PartitionCount: int(partitionCfg.GetCount()),
PublishCapacityMiBPerSec: int(partitionCfg.GetCapacity().GetPublishMibPerSec()),
SubscribeCapacityMiBPerSec: int(partitionCfg.GetCapacity().GetSubscribeMibPerSec()),
PerPartitionBytes: retentionCfg.GetPerPartitionBytes(),
}
// An unset retention period proto denotes "infinite retention".
Expand All @@ -117,10 +117,10 @@ type TopicConfigToUpdate struct {
Name TopicPath

// If non-zero, will update the publish throughput capacity per partition.
PublishCapacityMiBPerSec int32
PublishCapacityMiBPerSec int

// If non-zero, will update the subscribe throughput capacity per partition.
SubscribeCapacityMiBPerSec int32
SubscribeCapacityMiBPerSec int

// If non-zero, will update the provisioned storage per partition.
PerPartitionBytes int64
Expand All @@ -137,8 +137,8 @@ func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest {
PartitionConfig: &pb.Topic_PartitionConfig{
Dimension: &pb.Topic_PartitionConfig_Capacity_{
Capacity: &pb.Topic_PartitionConfig_Capacity{
PublishMibPerSec: tc.PublishCapacityMiBPerSec,
SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec,
PublishMibPerSec: int32(tc.PublishCapacityMiBPerSec),
SubscribeMibPerSec: int32(tc.SubscribeCapacityMiBPerSec),
},
},
},
Expand Down Expand Up @@ -219,11 +219,11 @@ func (sc *SubscriptionConfig) toProto() *pb.Subscription {
}

func protoToSubscriptionConfig(s *pb.Subscription) (*SubscriptionConfig, error) {
name, err := ParseSubscriptionPath(s.GetName())
name, err := parseSubscriptionPath(s.GetName())
if err != nil {
return nil, fmt.Errorf("pubsublite: invalid subscription name %q in subscription config", s.GetName())
}
topic, err := ParseTopicPath(s.GetTopic())
topic, err := parseTopicPath(s.GetTopic())
if err != nil {
return nil, fmt.Errorf("pubsublite: invalid topic name %q in subscription config", s.GetTopic())
}
Expand Down
12 changes: 6 additions & 6 deletions pubsublite/pubsublite.go
Expand Up @@ -35,7 +35,7 @@ type Client struct {
// See https://cloud.google.com/pubsub/lite/docs/locations for the list of
// regions and zones where Google Pub/Sub Lite is available.
func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (c *Client, err error) {
if err := ValidateRegion(region); err != nil {
if err := validateRegion(region); err != nil {
return nil, err
}
options := []option.ClientOption{option.WithEndpoint(region + "-pubsublite.googleapis.com:443")}
Expand All @@ -49,7 +49,7 @@ func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOpt
// CreateTopic creates a new topic from the given config.
func (c *Client) CreateTopic(ctx context.Context, config *TopicConfig) (*TopicConfig, error) {
req := &pb.CreateTopicRequest{
Parent: config.Name.Location().String(),
Parent: config.Name.location().String(),
Topic: config.toProto(),
TopicId: config.Name.TopicID,
}
Expand Down Expand Up @@ -85,12 +85,12 @@ func (c *Client) Topic(ctx context.Context, topic TopicPath) (*TopicConfig, erro
}

// TopicPartitions returns the number of partitions for a topic.
func (c *Client) TopicPartitions(ctx context.Context, topic TopicPath) (int64, error) {
func (c *Client) TopicPartitions(ctx context.Context, topic TopicPath) (int, error) {
partitions, err := c.admin.GetTopicPartitions(ctx, &pb.GetTopicPartitionsRequest{Name: topic.String()})
if err != nil {
return 0, err
}
return partitions.GetPartitionCount(), nil
return int(partitions.GetPartitionCount()), nil
}

// TopicSubscriptions retrieves the list of subscription paths for a topic.
Expand All @@ -109,7 +109,7 @@ func (c *Client) Topics(ctx context.Context, location LocationPath) *TopicIterat
// CreateSubscription creates a new subscription from the given config.
func (c *Client) CreateSubscription(ctx context.Context, config *SubscriptionConfig) (*SubscriptionConfig, error) {
req := &pb.CreateSubscriptionRequest{
Parent: config.Name.Location().String(),
Parent: config.Name.location().String(),
Subscription: config.toProto(),
SubscriptionId: config.Name.SubscriptionID,
}
Expand Down Expand Up @@ -203,5 +203,5 @@ func (sp *SubscriptionPathIterator) Next() (SubscriptionPath, error) {
if err != nil {
return SubscriptionPath{}, err
}
return ParseSubscriptionPath(subsPath)
return parseSubscriptionPath(subsPath)
}
28 changes: 15 additions & 13 deletions pubsublite/types.go
Expand Up @@ -48,23 +48,24 @@ type TopicPath struct {
Zone string

// The ID of the Google Pub/Sub Lite topic, for example "my-topic-name".
// See https://cloud.google.com/pubsub/docs/admin#resource_names for more
// information.
TopicID string
}

func (t TopicPath) String() string {
return fmt.Sprintf("projects/%s/locations/%s/topics/%s", t.Project, t.Zone, t.TopicID)
}

// Location returns the path of the parent location.
func (t TopicPath) Location() LocationPath {
func (t TopicPath) location() LocationPath {
return LocationPath{Project: t.Project, Zone: t.Zone}
}

var topicPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)/topics/([^/]+)$`)

// ParseTopicPath parses the full path of a Google Pub/Sub Lite topic, which
// parseTopicPath parses the full path of a Google Pub/Sub Lite topic, which
// should have the format: `projects/{project}/locations/{zone}/topics/{id}`.
func ParseTopicPath(input string) (TopicPath, error) {
func parseTopicPath(input string) (TopicPath, error) {
parts := topicPathRE.FindStringSubmatch(input)
if len(parts) < 4 {
return TopicPath{}, fmt.Errorf("pubsublite: invalid topic path %q", input)
Expand All @@ -87,46 +88,47 @@ type SubscriptionPath struct {

// The ID of the Google Pub/Sub Lite subscription, for example
// "my-subscription-name".
// See https://cloud.google.com/pubsub/docs/admin#resource_names for more
// information.
SubscriptionID string
}

func (s SubscriptionPath) String() string {
return fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", s.Project, s.Zone, s.SubscriptionID)
}

// Location returns the path of the parent location.
func (s SubscriptionPath) Location() LocationPath {
func (s SubscriptionPath) location() LocationPath {
return LocationPath{Project: s.Project, Zone: s.Zone}
}

var subsPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)/subscriptions/([^/]+)$`)

// ParseSubscriptionPath parses the full path of a Google Pub/Sub Lite
// parseSubscriptionPath parses the full path of a Google Pub/Sub Lite
// subscription, which should have the format:
// `projects/{project}/locations/{zone}/subscriptions/{id}`.
func ParseSubscriptionPath(input string) (SubscriptionPath, error) {
func parseSubscriptionPath(input string) (SubscriptionPath, error) {
parts := subsPathRE.FindStringSubmatch(input)
if len(parts) < 4 {
return SubscriptionPath{}, fmt.Errorf("pubsublite: invalid subscription path %q", input)
}
return SubscriptionPath{Project: parts[1], Zone: parts[2], SubscriptionID: parts[3]}, nil
}

// ValidateZone verifies that the `input` string has the format of a valid
// validateZone verifies that the `input` string has the format of a valid
// Google Cloud zone. An example zone is "europe-west1-b".
// See https://cloud.google.com/compute/docs/regions-zones for more information.
func ValidateZone(input string) error {
func validateZone(input string) error {
parts := strings.Split(input, "-")
if len(parts) != 3 {
return fmt.Errorf("pubsublite: invalid zone %q", input)
}
return nil
}

// ValidateRegion verifies that the `input` string has the format of a valid
// validateRegion verifies that the `input` string has the format of a valid
// Google Cloud region. An example region is "europe-west1".
// See https://cloud.google.com/compute/docs/regions-zones for more information.
func ValidateRegion(input string) error {
func validateRegion(input string) error {
parts := strings.Split(input, "-")
if len(parts) != 2 {
return fmt.Errorf("pubsublite: invalid region %q", input)
Expand All @@ -136,7 +138,7 @@ func ValidateRegion(input string) error {

// ZoneToRegion returns the region that the given zone is in.
func ZoneToRegion(zone string) (string, error) {
if err := ValidateZone(zone); err != nil {
if err := validateZone(zone); err != nil {
return "", err
}
return zone[0:strings.LastIndex(zone, "-")], nil
Expand Down
16 changes: 8 additions & 8 deletions pubsublite/types_test.go
Expand Up @@ -64,9 +64,9 @@ func TestParseTopicPath(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
gotPath, gotErr := ParseTopicPath(tc.input)
gotPath, gotErr := parseTopicPath(tc.input)
if gotPath != tc.wantPath || (gotErr != nil) != tc.wantErr {
t.Errorf("ParseTopicPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr)
t.Errorf("parseTopicPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr)
}
})
}
Expand Down Expand Up @@ -121,9 +121,9 @@ func TestParseSubscriptionPath(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
gotPath, gotErr := ParseSubscriptionPath(tc.input)
gotPath, gotErr := parseSubscriptionPath(tc.input)
if gotPath != tc.wantPath || (gotErr != nil) != tc.wantErr {
t.Errorf("ParseSubscriptionPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr)
t.Errorf("parseSubscriptionPath(%q) = (%v, %v), want (%v, err=%v)", tc.input, gotPath, gotErr, tc.wantPath, tc.wantErr)
}
})
}
Expand Down Expand Up @@ -152,9 +152,9 @@ func TestValidateZone(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
err := ValidateZone(tc.input)
err := validateZone(tc.input)
if (err != nil) != tc.wantErr {
t.Errorf("ValidateZone(%q) = %v, want err=%v", tc.input, err, tc.wantErr)
t.Errorf("validateZone(%q) = %v, want err=%v", tc.input, err, tc.wantErr)
}
})
}
Expand Down Expand Up @@ -183,9 +183,9 @@ func TestValidateRegion(t *testing.T) {
},
} {
t.Run(tc.desc, func(t *testing.T) {
err := ValidateRegion(tc.input)
err := validateRegion(tc.input)
if (err != nil) != tc.wantErr {
t.Errorf("ValidateRegion(%q) = %v, want err=%v", tc.input, err, tc.wantErr)
t.Errorf("validateRegion(%q) = %v, want err=%v", tc.input, err, tc.wantErr)
}
})
}
Expand Down

0 comments on commit a71385e

Please sign in to comment.