diff --git a/pubsublite/admin.go b/pubsublite/admin.go index 485f63513ef..0b7fd836445 100644 --- a/pubsublite/admin.go +++ b/pubsublite/admin.go @@ -16,6 +16,7 @@ package pubsublite import ( "context" + "cloud.google.com/go/pubsublite/internal/wire" "google.golang.org/api/option" vkit "cloud.google.com/go/pubsublite/apiv1" @@ -34,10 +35,10 @@ type AdminClient struct { // See https://cloud.google.com/pubsub/lite/docs/locations for the list of // regions and zones where Cloud Pub/Sub Lite is available. func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*AdminClient, error) { - if err := validateRegion(region); err != nil { + if err := wire.ValidateRegion(region); err != nil { return nil, err } - admin, err := newAdminClient(ctx, region, opts...) + admin, err := wire.NewAdminClient(ctx, region, opts...) if err != nil { return nil, err } diff --git a/pubsublite/internal/wire/README.md b/pubsublite/internal/wire/README.md new file mode 100644 index 00000000000..133e1fed5b3 --- /dev/null +++ b/pubsublite/internal/wire/README.md @@ -0,0 +1,17 @@ +# Wire + +This directory contains internal implementation details for Cloud Pub/Sub Lite. +Its exported interface can change at any time. + +## Conventions + +The following are general conventions used in this package: + +* Capitalized methods and fields of a struct denotes its public interface. They + are safe to call from outside the struct (e.g. accesses immutable fields or + guarded by a mutex). All other methods are considered internal implementation + details that should not be called from outside the struct. +* unsafeFoo() methods indicate that the caller is expected to have already + acquired the struct's mutex. Since Go does not support re-entrant locks, they + do not acquire the mutex. These are typically common util methods that need + to be atomic with other operations. diff --git a/pubsublite/errors.go b/pubsublite/internal/wire/errors.go similarity index 97% rename from pubsublite/errors.go rename to pubsublite/internal/wire/errors.go index 1101188e5a9..881624cfeb2 100644 --- a/pubsublite/errors.go +++ b/pubsublite/internal/wire/errors.go @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and -package pubsublite +package wire import "errors" diff --git a/pubsublite/message.go b/pubsublite/internal/wire/message_router.go similarity index 68% rename from pubsublite/message.go rename to pubsublite/internal/wire/message_router.go index 79e6a71c4f4..23edebb9af8 100644 --- a/pubsublite/message.go +++ b/pubsublite/internal/wire/message_router.go @@ -11,66 +11,14 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and -package pubsublite +package wire import ( "crypto/sha256" - "fmt" "math/big" "math/rand" - "time" - - "github.com/golang/protobuf/ptypes" - - pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" ) -// AttributeValues is a slice of strings. -type AttributeValues [][]byte - -// Message represents a Pub/Sub message. -type Message struct { - // Data is the actual data in the message. - Data []byte - - // Attributes can be used to label the message. A key may have multiple - // values. - Attributes map[string]AttributeValues - - // EventTime is an optional, user-specified event time for this message. - EventTime time.Time - - // OrderingKey identifies related messages for which publish order should - // be respected. Messages with the same ordering key are published to the - // same topic partition and subscribers will receive the messages in order. - // If the ordering key is empty, the message will be sent to an arbitrary - // partition. - OrderingKey []byte -} - -func (m *Message) toProto() (*pb.PubSubMessage, error) { - msgpb := &pb.PubSubMessage{ - Data: m.Data, - Key: m.OrderingKey, - } - - if len(m.Attributes) > 0 { - msgpb.Attributes = make(map[string]*pb.AttributeValues) - for key, values := range m.Attributes { - msgpb.Attributes[key] = &pb.AttributeValues{Values: values} - } - } - - if !m.EventTime.IsZero() { - ts, err := ptypes.TimestampProto(m.EventTime) - if err != nil { - return nil, fmt.Errorf("pubsublite: error converting message timestamp: %v", err) - } - msgpb.EventTime = ts - } - return msgpb, nil -} - // messageRouter outputs a partition number, given an ordering key. Results are // undefined when: // - setPartitionCount() is called with count <= 0. diff --git a/pubsublite/message_test.go b/pubsublite/internal/wire/message_router_test.go similarity index 69% rename from pubsublite/message_test.go rename to pubsublite/internal/wire/message_router_test.go index 1d5b3a97004..0f847074afa 100644 --- a/pubsublite/message_test.go +++ b/pubsublite/internal/wire/message_router_test.go @@ -11,27 +11,16 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and -package pubsublite +package wire import ( "fmt" "math/rand" "testing" - "time" - "github.com/golang/protobuf/proto" - - tspb "github.com/golang/protobuf/ptypes/timestamp" - pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" + "cloud.google.com/go/pubsublite/internal/test" ) -type fakeSource struct { - ret int64 -} - -func (f *fakeSource) Int63() int64 { return f.ret } -func (f *fakeSource) Seed(seed int64) {} - type fakeMsgRouter struct { multiplier int partitionCount int @@ -45,67 +34,10 @@ func (f *fakeMsgRouter) Route(orderingKey []byte) int { return f.partitionCount * f.multiplier } -func TestMessageToProto(t *testing.T) { - for _, tc := range []struct { - desc string - msg *Message - want *pb.PubSubMessage - }{ - { - desc: "valid: minimal", - msg: &Message{ - Data: []byte("Hello world"), - }, - want: &pb.PubSubMessage{ - Data: []byte("Hello world"), - }, - }, - { - desc: "valid: filled", - msg: &Message{ - Data: []byte("foo"), - Attributes: map[string]AttributeValues{ - "attr1": [][]byte{ - []byte("val1"), - []byte("val2"), - }, - }, - EventTime: time.Unix(1555593697, 154358*1000), - OrderingKey: []byte("order"), - }, - want: &pb.PubSubMessage{ - Data: []byte("foo"), - Attributes: map[string]*pb.AttributeValues{ - "attr1": { - Values: [][]byte{ - []byte("val1"), - []byte("val2"), - }, - }, - }, - EventTime: &tspb.Timestamp{ - Seconds: 1555593697, - Nanos: 154358 * 1000, - }, - Key: []byte("order"), - }, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - got, err := tc.msg.toProto() - if err != nil { - t.Errorf("toProto() err = %v", err) - } else if !proto.Equal(got, tc.want) { - t.Errorf("toProto() got = %v\nwant = %v", got, tc.want) - } - }) - } -} - func TestRoundRobinMsgRouter(t *testing.T) { // Using the same msgRouter for each test run ensures that it reinitializes // when the partition count changes. - source := &fakeSource{} + source := &test.FakeSource{} msgRouter := &roundRobinMsgRouter{rng: rand.New(source)} for _, tc := range []struct { @@ -125,7 +57,7 @@ func TestRoundRobinMsgRouter(t *testing.T) { }, } { t.Run(fmt.Sprintf("partitionCount=%d", tc.partitionCount), func(t *testing.T) { - source.ret = tc.source + source.Ret = tc.source msgRouter.SetPartitionCount(tc.partitionCount) for i, want := range tc.want { got := msgRouter.Route([]byte("IGNORED")) diff --git a/pubsublite/internal/wire/resources.go b/pubsublite/internal/wire/resources.go new file mode 100644 index 00000000000..b31b4dd0ff6 --- /dev/null +++ b/pubsublite/internal/wire/resources.go @@ -0,0 +1,51 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package wire + +import ( + "fmt" + "strings" +) + +// ValidateZone verifies that the `input` string has the format of a valid +// Google Cloud zone. An example zone is "europe-west1-b". +// See https://cloud.google.com/compute/docs/regions-zones for more information. +func ValidateZone(input string) error { + parts := strings.Split(input, "-") + if len(parts) != 3 { + return fmt.Errorf("pubsublite: invalid zone %q", input) + } + return nil +} + +// ValidateRegion verifies that the `input` string has the format of a valid +// Google Cloud region. An example region is "europe-west1". +// See https://cloud.google.com/compute/docs/regions-zones for more information. +func ValidateRegion(input string) error { + parts := strings.Split(input, "-") + if len(parts) != 2 { + return fmt.Errorf("pubsublite: invalid region %q", input) + } + return nil +} + +type topicPartition struct { + Path string + Partition int +} + +type subscriptionPartition struct { + Path string + Partition int +} diff --git a/pubsublite/internal/wire/resources_test.go b/pubsublite/internal/wire/resources_test.go new file mode 100644 index 00000000000..8191234a191 --- /dev/null +++ b/pubsublite/internal/wire/resources_test.go @@ -0,0 +1,78 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package wire + +import "testing" + +func TestValidateZone(t *testing.T) { + for _, tc := range []struct { + desc string + input string + wantErr bool + }{ + { + desc: "valid", + input: "us-central1-a", + wantErr: false, + }, + { + desc: "invalid: insufficient dashes", + input: "us-central1", + wantErr: true, + }, + { + desc: "invalid: excess dashes", + input: "us-central1-a-b", + wantErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + err := ValidateZone(tc.input) + if (err != nil) != tc.wantErr { + t.Errorf("ValidateZone(%q) = %v, want err=%v", tc.input, err, tc.wantErr) + } + }) + } +} + +func TestValidateRegion(t *testing.T) { + for _, tc := range []struct { + desc string + input string + wantErr bool + }{ + { + desc: "valid", + input: "europe-west1", + wantErr: false, + }, + { + desc: "invalid: insufficient dashes", + input: "europewest1", + wantErr: true, + }, + { + desc: "invalid: excess dashes", + input: "europe-west1-b", + wantErr: true, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + err := ValidateRegion(tc.input) + if (err != nil) != tc.wantErr { + t.Errorf("ValidateRegion(%q) = %v, want err=%v", tc.input, err, tc.wantErr) + } + }) + } +} diff --git a/pubsublite/rpc.go b/pubsublite/internal/wire/rpc.go similarity index 84% rename from pubsublite/rpc.go rename to pubsublite/internal/wire/rpc.go index f3e051d8f96..2dad85cb537 100644 --- a/pubsublite/rpc.go +++ b/pubsublite/internal/wire/rpc.go @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and -package pubsublite +package wire import ( "context" @@ -117,58 +117,44 @@ func defaultClientOptions(region string) []option.ClientOption { } } -func newAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error) { - if err := validateRegion(region); err != nil { - return nil, err - } +// NewAdminClient creates a new gapic AdminClient for a region. +func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error) { options := append(defaultClientOptions(region), opts...) return vkit.NewAdminClient(ctx, options...) } func newPublisherClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PublisherClient, error) { - if err := validateRegion(region); err != nil { - return nil, err - } options := append(defaultClientOptions(region), opts...) return vkit.NewPublisherClient(ctx, options...) } func newSubscriberClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.SubscriberClient, error) { - if err := validateRegion(region); err != nil { - return nil, err - } options := append(defaultClientOptions(region), opts...) return vkit.NewSubscriberClient(ctx, options...) } func newCursorClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.CursorClient, error) { - if err := validateRegion(region); err != nil { - return nil, err - } options := append(defaultClientOptions(region), opts...) return vkit.NewCursorClient(ctx, options...) } func newPartitionAssignmentClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PartitionAssignmentClient, error) { - if err := validateRegion(region); err != nil { - return nil, err - } options := append(defaultClientOptions(region), opts...) return vkit.NewPartitionAssignmentClient(ctx, options...) } -func addTopicRoutingMetadata(ctx context.Context, topic TopicPath, partition int) context.Context { +func addTopicRoutingMetadata(ctx context.Context, topic topicPartition) context.Context { md, _ := metadata.FromOutgoingContext(ctx) md = md.Copy() - val := fmt.Sprintf("partition=%d&topic=%s", partition, url.QueryEscape(topic.String())) + val := fmt.Sprintf("partition=%d&topic=%s", topic.Partition, url.QueryEscape(topic.Path)) md[routingMetadataHeader] = append(md[routingMetadataHeader], val) return metadata.NewOutgoingContext(ctx, md) } -func addSubscriptionRoutingMetadata(ctx context.Context, subs SubscriptionPath, partition int) context.Context { +func addSubscriptionRoutingMetadata(ctx context.Context, subscription subscriptionPartition) context.Context { md, _ := metadata.FromOutgoingContext(ctx) md = md.Copy() - val := fmt.Sprintf("partition=%d&subscription=%s", partition, url.QueryEscape(subs.String())) + val := fmt.Sprintf("partition=%d&subscription=%s", subscription.Partition, url.QueryEscape(subscription.Path)) md[routingMetadataHeader] = append(md[routingMetadataHeader], val) return metadata.NewOutgoingContext(ctx, md) } diff --git a/pubsublite/settings.go b/pubsublite/internal/wire/settings.go similarity index 99% rename from pubsublite/settings.go rename to pubsublite/internal/wire/settings.go index b1db10fac5f..64a6665a183 100644 --- a/pubsublite/settings.go +++ b/pubsublite/internal/wire/settings.go @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and -package pubsublite +package wire import ( "errors" diff --git a/pubsublite/settings_test.go b/pubsublite/internal/wire/settings_test.go similarity index 99% rename from pubsublite/settings_test.go rename to pubsublite/internal/wire/settings_test.go index 6c49880ed60..175ca6d3aa7 100644 --- a/pubsublite/settings_test.go +++ b/pubsublite/internal/wire/settings_test.go @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and -package pubsublite +package wire import ( "testing" diff --git a/pubsublite/streams.go b/pubsublite/internal/wire/streams.go similarity index 97% rename from pubsublite/streams.go rename to pubsublite/internal/wire/streams.go index 2e1cb321a41..1df7ed32db3 100644 --- a/pubsublite/streams.go +++ b/pubsublite/internal/wire/streams.go @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and -package pubsublite +package wire import ( "context" @@ -172,8 +172,8 @@ func (rs *retryableStream) currentStream() grpc.ClientStream { return rs.stream } -// clearStream must be called with the retryableStream.mu locked. -func (rs *retryableStream) clearStream() { +// unsafeClearStream must be called with the retryableStream.mu locked. +func (rs *retryableStream) unsafeClearStream() { if rs.cancelStream != nil { // If the stream did not already abort due to error, this will abort it. rs.cancelStream() @@ -210,7 +210,7 @@ func (rs *retryableStream) reconnect() { return false } rs.status = streamReconnecting - rs.clearStream() + rs.unsafeClearStream() return true } if !canReconnect() { @@ -229,7 +229,7 @@ func (rs *retryableStream) reconnect() { defer rs.mu.Unlock() if rs.status == streamTerminated { - rs.clearStream() + rs.unsafeClearStream() return false } rs.status = streamConnected @@ -330,7 +330,7 @@ func (rs *retryableStream) terminate(err error) { } rs.status = streamTerminated rs.finalErr = err - rs.clearStream() + rs.unsafeClearStream() // terminate can be called from within a streamHandler method with a lock // held. So notify from a goroutine to prevent deadlock. diff --git a/pubsublite/types.go b/pubsublite/types.go index 40104d2d3c7..dfb2b56438d 100644 --- a/pubsublite/types.go +++ b/pubsublite/types.go @@ -17,6 +17,8 @@ import ( "fmt" "regexp" "strings" + + "cloud.google.com/go/pubsublite/internal/wire" ) // LocationPath stores a path consisting of a project and zone. @@ -27,7 +29,7 @@ type LocationPath struct { // A Google Cloud zone, for example "us-central1-a". // See https://cloud.google.com/pubsub/lite/docs/locations for the list of - // zones where Google Pub/Sub Lite is available. + // zones where Cloud Pub/Sub Lite is available. Zone string } @@ -35,7 +37,7 @@ func (l LocationPath) String() string { return fmt.Sprintf("projects/%s/locations/%s", l.Project, l.Zone) } -// TopicPath stores the full path of a Google Pub/Sub Lite topic. +// TopicPath stores the full path of a Cloud Pub/Sub Lite topic. // See https://cloud.google.com/pubsub/lite/docs/topics for more information. type TopicPath struct { // A Google Cloud project. The project ID (e.g. "my-project") or the project @@ -44,10 +46,10 @@ type TopicPath struct { // A Google Cloud zone, for example "us-central1-a". // See https://cloud.google.com/pubsub/lite/docs/locations for the list of - // zones where Google Pub/Sub Lite is available. + // zones where Cloud Pub/Sub Lite is available. Zone string - // The ID of the Google Pub/Sub Lite topic, for example "my-topic-name". + // The ID of the Cloud Pub/Sub Lite topic, for example "my-topic-name". // See https://cloud.google.com/pubsub/docs/admin#resource_names for more // information. TopicID string @@ -63,7 +65,7 @@ func (t TopicPath) location() LocationPath { var topicPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)/topics/([^/]+)$`) -// parseTopicPath parses the full path of a Google Pub/Sub Lite topic, which +// parseTopicPath parses the full path of a Cloud Pub/Sub Lite topic, which // should have the format: `projects/{project}/locations/{zone}/topics/{id}`. func parseTopicPath(input string) (TopicPath, error) { parts := topicPathRE.FindStringSubmatch(input) @@ -73,7 +75,7 @@ func parseTopicPath(input string) (TopicPath, error) { return TopicPath{Project: parts[1], Zone: parts[2], TopicID: parts[3]}, nil } -// SubscriptionPath stores the full path of a Google Pub/Sub Lite subscription. +// SubscriptionPath stores the full path of a Cloud Pub/Sub Lite subscription. // See https://cloud.google.com/pubsub/lite/docs/subscriptions for more // information. type SubscriptionPath struct { @@ -83,10 +85,10 @@ type SubscriptionPath struct { // A Google Cloud zone. An example zone is "us-central1-a". // See https://cloud.google.com/pubsub/lite/docs/locations for the list of - // zones where Google Pub/Sub Lite is available. + // zones where Cloud Pub/Sub Lite is available. Zone string - // The ID of the Google Pub/Sub Lite subscription, for example + // The ID of the Cloud Pub/Sub Lite subscription, for example // "my-subscription-name". // See https://cloud.google.com/pubsub/docs/admin#resource_names for more // information. @@ -103,7 +105,7 @@ func (s SubscriptionPath) location() LocationPath { var subsPathRE = regexp.MustCompile(`^projects/([^/]+)/locations/([^/]+)/subscriptions/([^/]+)$`) -// parseSubscriptionPath parses the full path of a Google Pub/Sub Lite +// parseSubscriptionPath parses the full path of a Cloud Pub/Sub Lite // subscription, which should have the format: // `projects/{project}/locations/{zone}/subscriptions/{id}`. func parseSubscriptionPath(input string) (SubscriptionPath, error) { @@ -114,31 +116,9 @@ func parseSubscriptionPath(input string) (SubscriptionPath, error) { return SubscriptionPath{Project: parts[1], Zone: parts[2], SubscriptionID: parts[3]}, nil } -// validateZone verifies that the `input` string has the format of a valid -// Google Cloud zone. An example zone is "europe-west1-b". -// See https://cloud.google.com/compute/docs/regions-zones for more information. -func validateZone(input string) error { - parts := strings.Split(input, "-") - if len(parts) != 3 { - return fmt.Errorf("pubsublite: invalid zone %q", input) - } - return nil -} - -// validateRegion verifies that the `input` string has the format of a valid -// Google Cloud region. An example region is "europe-west1". -// See https://cloud.google.com/compute/docs/regions-zones for more information. -func validateRegion(input string) error { - parts := strings.Split(input, "-") - if len(parts) != 2 { - return fmt.Errorf("pubsublite: invalid region %q", input) - } - return nil -} - // ZoneToRegion returns the region that the given zone is in. func ZoneToRegion(zone string) (string, error) { - if err := validateZone(zone); err != nil { + if err := wire.ValidateZone(zone); err != nil { return "", err } return zone[0:strings.LastIndex(zone, "-")], nil diff --git a/pubsublite/types_test.go b/pubsublite/types_test.go index 18f02b96ac6..fcd89af79c4 100644 --- a/pubsublite/types_test.go +++ b/pubsublite/types_test.go @@ -129,68 +129,6 @@ func TestParseSubscriptionPath(t *testing.T) { } } -func TestValidateZone(t *testing.T) { - for _, tc := range []struct { - desc string - input string - wantErr bool - }{ - { - desc: "valid", - input: "us-central1-a", - wantErr: false, - }, - { - desc: "invalid: insufficient dashes", - input: "us-central1", - wantErr: true, - }, - { - desc: "invalid: excess dashes", - input: "us-central1-a-b", - wantErr: true, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - err := validateZone(tc.input) - if (err != nil) != tc.wantErr { - t.Errorf("validateZone(%q) = %v, want err=%v", tc.input, err, tc.wantErr) - } - }) - } -} - -func TestValidateRegion(t *testing.T) { - for _, tc := range []struct { - desc string - input string - wantErr bool - }{ - { - desc: "valid", - input: "europe-west1", - wantErr: false, - }, - { - desc: "invalid: insufficient dashes", - input: "europewest1", - wantErr: true, - }, - { - desc: "invalid: excess dashes", - input: "europe-west1-b", - wantErr: true, - }, - } { - t.Run(tc.desc, func(t *testing.T) { - err := validateRegion(tc.input) - if (err != nil) != tc.wantErr { - t.Errorf("validateRegion(%q) = %v, want err=%v", tc.input, err, tc.wantErr) - } - }) - } -} - func TestZoneToRegion(t *testing.T) { for _, tc := range []struct { desc string