Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsublite): Move internal implementation details to internal/wire subpackage #3123

Merged
merged 7 commits into from Nov 6, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions pubsublite/admin.go
Expand Up @@ -16,6 +16,7 @@ package pubsublite
import (
"context"

"cloud.google.com/go/pubsublite/internal/wire"
"google.golang.org/api/option"

vkit "cloud.google.com/go/pubsublite/apiv1"
Expand All @@ -34,10 +35,10 @@ type AdminClient struct {
// See https://cloud.google.com/pubsub/lite/docs/locations for the list of
// regions and zones where Cloud Pub/Sub Lite is available.
func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*AdminClient, error) {
if err := validateRegion(region); err != nil {
if err := wire.ValidateRegion(region); err != nil {
return nil, err
}
admin, err := newAdminClient(ctx, region, opts...)
admin, err := wire.NewAdminClient(ctx, region, opts...)
if err != nil {
return nil, err
}
Expand Down
17 changes: 17 additions & 0 deletions pubsublite/internal/wire/README.md
@@ -0,0 +1,17 @@
# Wire

This directory contains internal implementation details for Cloud Pub/Sub Lite.
Its exported interface can change at any time.

## Conventions

The following are general conventions used in this package:

* Capitalized methods and fields of a struct denotes its public interface. They
are safe to call from outside the struct (e.g. accesses immutable fields or
guarded by a mutex). All other methods are considered internal implementation
details that should not be called from outside the struct.
* unsafeFoo() methods indicate that the caller is expected to have already
acquired the struct's mutex. Since Go does not support re-entrant locks, they
do not acquire the mutex. These are typically common util methods that need
to be atomic with other operations.
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsublite
package wire

import "errors"

Expand Down
Expand Up @@ -11,66 +11,14 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsublite
package wire

import (
"crypto/sha256"
"fmt"
"math/big"
"math/rand"
"time"

"github.com/golang/protobuf/ptypes"

pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

// AttributeValues is a slice of strings.
type AttributeValues [][]byte

// Message represents a Pub/Sub message.
type Message struct {
// Data is the actual data in the message.
Data []byte

// Attributes can be used to label the message. A key may have multiple
// values.
Attributes map[string]AttributeValues

// EventTime is an optional, user-specified event time for this message.
EventTime time.Time

// OrderingKey identifies related messages for which publish order should
// be respected. Messages with the same ordering key are published to the
// same topic partition and subscribers will receive the messages in order.
// If the ordering key is empty, the message will be sent to an arbitrary
// partition.
OrderingKey []byte
}

func (m *Message) toProto() (*pb.PubSubMessage, error) {
msgpb := &pb.PubSubMessage{
Data: m.Data,
Key: m.OrderingKey,
}

if len(m.Attributes) > 0 {
msgpb.Attributes = make(map[string]*pb.AttributeValues)
for key, values := range m.Attributes {
msgpb.Attributes[key] = &pb.AttributeValues{Values: values}
}
}

if !m.EventTime.IsZero() {
ts, err := ptypes.TimestampProto(m.EventTime)
if err != nil {
return nil, fmt.Errorf("pubsublite: error converting message timestamp: %v", err)
}
msgpb.EventTime = ts
}
return msgpb, nil
}

// messageRouter outputs a partition number, given an ordering key. Results are
// undefined when:
// - setPartitionCount() is called with count <= 0.
Expand Down
Expand Up @@ -11,27 +11,16 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsublite
package wire

import (
"fmt"
"math/rand"
"testing"
"time"

"github.com/golang/protobuf/proto"

tspb "github.com/golang/protobuf/ptypes/timestamp"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
"cloud.google.com/go/pubsublite/internal/test"
)

type fakeSource struct {
ret int64
}

func (f *fakeSource) Int63() int64 { return f.ret }
func (f *fakeSource) Seed(seed int64) {}

type fakeMsgRouter struct {
multiplier int
partitionCount int
Expand All @@ -45,67 +34,10 @@ func (f *fakeMsgRouter) Route(orderingKey []byte) int {
return f.partitionCount * f.multiplier
}

func TestMessageToProto(t *testing.T) {
for _, tc := range []struct {
desc string
msg *Message
want *pb.PubSubMessage
}{
{
desc: "valid: minimal",
msg: &Message{
Data: []byte("Hello world"),
},
want: &pb.PubSubMessage{
Data: []byte("Hello world"),
},
},
{
desc: "valid: filled",
msg: &Message{
Data: []byte("foo"),
Attributes: map[string]AttributeValues{
"attr1": [][]byte{
[]byte("val1"),
[]byte("val2"),
},
},
EventTime: time.Unix(1555593697, 154358*1000),
OrderingKey: []byte("order"),
},
want: &pb.PubSubMessage{
Data: []byte("foo"),
Attributes: map[string]*pb.AttributeValues{
"attr1": {
Values: [][]byte{
[]byte("val1"),
[]byte("val2"),
},
},
},
EventTime: &tspb.Timestamp{
Seconds: 1555593697,
Nanos: 154358 * 1000,
},
Key: []byte("order"),
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
got, err := tc.msg.toProto()
if err != nil {
t.Errorf("toProto() err = %v", err)
} else if !proto.Equal(got, tc.want) {
t.Errorf("toProto() got = %v\nwant = %v", got, tc.want)
}
})
}
}

func TestRoundRobinMsgRouter(t *testing.T) {
// Using the same msgRouter for each test run ensures that it reinitializes
// when the partition count changes.
source := &fakeSource{}
source := &test.FakeSource{}
msgRouter := &roundRobinMsgRouter{rng: rand.New(source)}

for _, tc := range []struct {
Expand All @@ -125,7 +57,7 @@ func TestRoundRobinMsgRouter(t *testing.T) {
},
} {
t.Run(fmt.Sprintf("partitionCount=%d", tc.partitionCount), func(t *testing.T) {
source.ret = tc.source
source.Ret = tc.source
msgRouter.SetPartitionCount(tc.partitionCount)
for i, want := range tc.want {
got := msgRouter.Route([]byte("IGNORED"))
Expand Down
51 changes: 51 additions & 0 deletions pubsublite/internal/wire/resources.go
@@ -0,0 +1,51 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
"fmt"
"strings"
)

// ValidateZone verifies that the `input` string has the format of a valid
// Google Cloud zone. An example zone is "europe-west1-b".
// See https://cloud.google.com/compute/docs/regions-zones for more information.
func ValidateZone(input string) error {
parts := strings.Split(input, "-")
if len(parts) != 3 {
return fmt.Errorf("pubsublite: invalid zone %q", input)
}
return nil
}

// ValidateRegion verifies that the `input` string has the format of a valid
// Google Cloud region. An example region is "europe-west1".
// See https://cloud.google.com/compute/docs/regions-zones for more information.
func ValidateRegion(input string) error {
parts := strings.Split(input, "-")
if len(parts) != 2 {
return fmt.Errorf("pubsublite: invalid region %q", input)
}
return nil
}

type topicPartition struct {
Path string
Partition int
}

type subscriptionPartition struct {
Path string
Partition int
}
78 changes: 78 additions & 0 deletions pubsublite/internal/wire/resources_test.go
@@ -0,0 +1,78 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import "testing"

func TestValidateZone(t *testing.T) {
for _, tc := range []struct {
desc string
input string
wantErr bool
}{
{
desc: "valid",
input: "us-central1-a",
wantErr: false,
},
{
desc: "invalid: insufficient dashes",
input: "us-central1",
wantErr: true,
},
{
desc: "invalid: excess dashes",
input: "us-central1-a-b",
wantErr: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
err := ValidateZone(tc.input)
if (err != nil) != tc.wantErr {
t.Errorf("ValidateZone(%q) = %v, want err=%v", tc.input, err, tc.wantErr)
}
})
}
}

func TestValidateRegion(t *testing.T) {
for _, tc := range []struct {
desc string
input string
wantErr bool
}{
{
desc: "valid",
input: "europe-west1",
wantErr: false,
},
{
desc: "invalid: insufficient dashes",
input: "europewest1",
wantErr: true,
},
{
desc: "invalid: excess dashes",
input: "europe-west1-b",
wantErr: true,
},
} {
t.Run(tc.desc, func(t *testing.T) {
err := ValidateRegion(tc.input)
if (err != nil) != tc.wantErr {
t.Errorf("ValidateRegion(%q) = %v, want err=%v", tc.input, err, tc.wantErr)
}
})
}
}