Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat(pubsublite): Message type and message routers (#3077)
pubsublite.Message is similar to pubsub.Message, with the following differences:
- Attributes can have multiple values for the same key.
- Pub/Sub Lite uses []byte for data, attribute values and ordering keys.

Message routers select a partition to route a published message to. SHA256 hash is used for routing messages with ordering keys. Round robin is used for routing messages without ordering keys.
  • Loading branch information
tmdiep committed Oct 28, 2020
1 parent 9eb9fcb commit 179fc55
Show file tree
Hide file tree
Showing 2 changed files with 368 additions and 0 deletions.
153 changes: 153 additions & 0 deletions pubsublite/message.go
@@ -0,0 +1,153 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsublite

import (
"crypto/sha256"
"fmt"
"math/big"
"math/rand"
"time"

"github.com/golang/protobuf/ptypes"

pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

// AttributeValues is a slice of strings.
type AttributeValues [][]byte

// Message represents a Pub/Sub message.
type Message struct {
// Data is the actual data in the message.
Data []byte

// Attributes can be used to label the message. A key may have multiple
// values.
Attributes map[string]AttributeValues

// EventTime is an optional, user-specified event time for this message.
EventTime time.Time

// OrderingKey identifies related messages for which publish order should
// be respected. Messages with the same ordering key are published to the
// same topic partition and subscribers will receive the messages in order.
// If the ordering key is empty, the message will be sent to an arbitrary
// partition.
OrderingKey []byte
}

func (m *Message) toProto() (*pb.PubSubMessage, error) {
msgpb := &pb.PubSubMessage{
Data: m.Data,
Key: m.OrderingKey,
}

if len(m.Attributes) > 0 {
msgpb.Attributes = make(map[string]*pb.AttributeValues)
for key, values := range m.Attributes {
msgpb.Attributes[key] = &pb.AttributeValues{Values: values}
}
}

if !m.EventTime.IsZero() {
ts, err := ptypes.TimestampProto(m.EventTime)
if err != nil {
return nil, fmt.Errorf("pubsublite: error converting message timestamp: %v", err)
}
msgpb.EventTime = ts
}
return msgpb, nil
}

// messageRouter outputs a partition number, given an ordering key. Results are
// undefined when:
// - setPartitionCount() is called with count <= 0.
// - route() is called before setPartitionCount() to initialize the router.
//
// Message routers need to accommodate topic partition resizing.
type messageRouter interface {
SetPartitionCount(count int)
Route(orderingKey []byte) int
}

// roundRobinMsgRouter sequentially cycles through partition numbers, starting
// from a random partition.
type roundRobinMsgRouter struct {
rng *rand.Rand
partitionCount int
nextPartition int
}

func (r *roundRobinMsgRouter) SetPartitionCount(count int) {
r.partitionCount = count
r.nextPartition = int(r.rng.Int63n(int64(count)))
}

func (r *roundRobinMsgRouter) Route(orderingKey []byte) (partition int) {
partition = r.nextPartition
r.nextPartition = (partition + 1) % r.partitionCount
return
}

// hashingMsgRouter hashes an ordering key using SHA256 to obtain a partition
// number. It should only be used for messages with an ordering key.
//
// Matches implementation at:
// https://github.com/googleapis/java-pubsublite/blob/master/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/DefaultRoutingPolicy.java
type hashingMsgRouter struct {
partitionCount *big.Int
}

func (r *hashingMsgRouter) SetPartitionCount(count int) {
r.partitionCount = big.NewInt(int64(count))
}

func (r *hashingMsgRouter) Route(orderingKey []byte) int {
if len(orderingKey) == 0 {
return -1
}
h := sha256.Sum256(orderingKey)
num := new(big.Int).SetBytes(h[:])
partition := new(big.Int).Mod(num, r.partitionCount)
return int(partition.Int64())
}

// compositeMsgRouter delegates to different message routers for messages
// with/without ordering keys.
type compositeMsgRouter struct {
keyedRouter messageRouter
keylessRouter messageRouter
}

func (r *compositeMsgRouter) SetPartitionCount(count int) {
r.keyedRouter.SetPartitionCount(count)
r.keylessRouter.SetPartitionCount(count)
}

func (r *compositeMsgRouter) Route(orderingKey []byte) int {
if len(orderingKey) > 0 {
return r.keyedRouter.Route(orderingKey)
}
return r.keylessRouter.Route(orderingKey)
}

// defaultMessageRouter returns a compositeMsgRouter that uses hashingMsgRouter
// for messages with ordering key and roundRobinMsgRouter for messages without.
func newDefaultMessageRouter(rng *rand.Rand) messageRouter {
return &compositeMsgRouter{
keyedRouter: &hashingMsgRouter{},
keylessRouter: &roundRobinMsgRouter{rng: rng},
}
}
215 changes: 215 additions & 0 deletions pubsublite/message_test.go
@@ -0,0 +1,215 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package pubsublite

import (
"fmt"
"math/rand"
"testing"
"time"

"github.com/golang/protobuf/proto"

tspb "github.com/golang/protobuf/ptypes/timestamp"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

type fakeSource struct {
ret int64
}

func (f *fakeSource) Int63() int64 { return f.ret }
func (f *fakeSource) Seed(seed int64) {}

type fakeMsgRouter struct {
multiplier int
partitionCount int
}

func (f *fakeMsgRouter) SetPartitionCount(count int) {
f.partitionCount = count
}

func (f *fakeMsgRouter) Route(orderingKey []byte) int {
return f.partitionCount * f.multiplier
}

func TestMessageToProto(t *testing.T) {
for _, tc := range []struct {
desc string
msg *Message
want *pb.PubSubMessage
}{
{
desc: "valid: minimal",
msg: &Message{
Data: []byte("Hello world"),
},
want: &pb.PubSubMessage{
Data: []byte("Hello world"),
},
},
{
desc: "valid: filled",
msg: &Message{
Data: []byte("foo"),
Attributes: map[string]AttributeValues{
"attr1": [][]byte{
[]byte("val1"),
[]byte("val2"),
},
},
EventTime: time.Unix(1555593697, 154358*1000),
OrderingKey: []byte("order"),
},
want: &pb.PubSubMessage{
Data: []byte("foo"),
Attributes: map[string]*pb.AttributeValues{
"attr1": {
Values: [][]byte{
[]byte("val1"),
[]byte("val2"),
},
},
},
EventTime: &tspb.Timestamp{
Seconds: 1555593697,
Nanos: 154358 * 1000,
},
Key: []byte("order"),
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
got, err := tc.msg.toProto()
if err != nil {
t.Errorf("toProto() err = %v", err)
} else if !proto.Equal(got, tc.want) {
t.Errorf("toProto() got = %v\nwant = %v", got, tc.want)
}
})
}
}

func TestRoundRobinMsgRouter(t *testing.T) {
// Using the same msgRouter for each test run ensures that it reinitializes
// when the partition count changes.
source := &fakeSource{}
msgRouter := &roundRobinMsgRouter{rng: rand.New(source)}

for _, tc := range []struct {
partitionCount int
source int64
want []int
}{
{
partitionCount: 8,
source: 9,
want: []int{1, 2, 3, 4, 5, 6, 7, 0, 1},
},
{
partitionCount: 5,
source: 2,
want: []int{2, 3, 4, 0, 1, 2},
},
} {
t.Run(fmt.Sprintf("partitionCount=%d", tc.partitionCount), func(t *testing.T) {
source.ret = tc.source
msgRouter.SetPartitionCount(tc.partitionCount)
for i, want := range tc.want {
got := msgRouter.Route([]byte("IGNORED"))
if got != want {
t.Errorf("i=%d: Route() = %d, want = %d", i, got, want)
}
}
})
}
}

func TestHashingMsgRouter(t *testing.T) {
// Using the same msgRouter for each test run ensures that it reinitializes
// when the partition count changes.
msgRouter := &hashingMsgRouter{}

keys := [][]byte{
[]byte("foo1"),
[]byte("foo2"),
[]byte("foo3"),
[]byte("foo4"),
[]byte("foo5"),
}

for _, tc := range []struct {
partitionCount int
}{
{partitionCount: 10},
{partitionCount: 5},
} {
t.Run(fmt.Sprintf("partitionCount=%d", tc.partitionCount), func(t *testing.T) {
msgRouter.SetPartitionCount(tc.partitionCount)
for _, key := range keys {
p1 := msgRouter.Route(key)
p2 := msgRouter.Route(key)
if p1 != p2 {
t.Errorf("Route() returned different partitions for same key %v", key)
}
if p1 < 0 || p1 >= tc.partitionCount {
t.Errorf("Route() returned partition out of range: %v", p1)
}
}
})
}
}

func TestCompositeMsgRouter(t *testing.T) {
keyedRouter := &fakeMsgRouter{multiplier: 10}
keylessRouter := &fakeMsgRouter{multiplier: 100}
msgRouter := &compositeMsgRouter{
keyedRouter: keyedRouter,
keylessRouter: keylessRouter,
}

for _, tc := range []struct {
desc string
partitionCount int
key []byte
want int
}{
{
desc: "key",
partitionCount: 2,
key: []byte("foo"),
want: 20,
},
{
desc: "nil key",
partitionCount: 8,
key: nil,
want: 800,
},
{
desc: "empty key",
partitionCount: 5,
key: []byte{},
want: 500,
},
} {
t.Run(tc.desc, func(t *testing.T) {
msgRouter.SetPartitionCount(tc.partitionCount)
if got := msgRouter.Route(tc.key); got != tc.want {
t.Errorf("Route() = %d, want = %d", got, tc.want)
}
})
}
}

0 comments on commit 179fc55

Please sign in to comment.