diff --git a/pubsublite/message.go b/pubsublite/message.go new file mode 100644 index 00000000000..79e6a71c4f4 --- /dev/null +++ b/pubsublite/message.go @@ -0,0 +1,153 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package pubsublite + +import ( + "crypto/sha256" + "fmt" + "math/big" + "math/rand" + "time" + + "github.com/golang/protobuf/ptypes" + + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +// AttributeValues is a slice of strings. +type AttributeValues [][]byte + +// Message represents a Pub/Sub message. +type Message struct { + // Data is the actual data in the message. + Data []byte + + // Attributes can be used to label the message. A key may have multiple + // values. + Attributes map[string]AttributeValues + + // EventTime is an optional, user-specified event time for this message. + EventTime time.Time + + // OrderingKey identifies related messages for which publish order should + // be respected. Messages with the same ordering key are published to the + // same topic partition and subscribers will receive the messages in order. + // If the ordering key is empty, the message will be sent to an arbitrary + // partition. + OrderingKey []byte +} + +func (m *Message) toProto() (*pb.PubSubMessage, error) { + msgpb := &pb.PubSubMessage{ + Data: m.Data, + Key: m.OrderingKey, + } + + if len(m.Attributes) > 0 { + msgpb.Attributes = make(map[string]*pb.AttributeValues) + for key, values := range m.Attributes { + msgpb.Attributes[key] = &pb.AttributeValues{Values: values} + } + } + + if !m.EventTime.IsZero() { + ts, err := ptypes.TimestampProto(m.EventTime) + if err != nil { + return nil, fmt.Errorf("pubsublite: error converting message timestamp: %v", err) + } + msgpb.EventTime = ts + } + return msgpb, nil +} + +// messageRouter outputs a partition number, given an ordering key. Results are +// undefined when: +// - setPartitionCount() is called with count <= 0. +// - route() is called before setPartitionCount() to initialize the router. +// +// Message routers need to accommodate topic partition resizing. +type messageRouter interface { + SetPartitionCount(count int) + Route(orderingKey []byte) int +} + +// roundRobinMsgRouter sequentially cycles through partition numbers, starting +// from a random partition. +type roundRobinMsgRouter struct { + rng *rand.Rand + partitionCount int + nextPartition int +} + +func (r *roundRobinMsgRouter) SetPartitionCount(count int) { + r.partitionCount = count + r.nextPartition = int(r.rng.Int63n(int64(count))) +} + +func (r *roundRobinMsgRouter) Route(orderingKey []byte) (partition int) { + partition = r.nextPartition + r.nextPartition = (partition + 1) % r.partitionCount + return +} + +// hashingMsgRouter hashes an ordering key using SHA256 to obtain a partition +// number. It should only be used for messages with an ordering key. +// +// Matches implementation at: +// https://github.com/googleapis/java-pubsublite/blob/master/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/DefaultRoutingPolicy.java +type hashingMsgRouter struct { + partitionCount *big.Int +} + +func (r *hashingMsgRouter) SetPartitionCount(count int) { + r.partitionCount = big.NewInt(int64(count)) +} + +func (r *hashingMsgRouter) Route(orderingKey []byte) int { + if len(orderingKey) == 0 { + return -1 + } + h := sha256.Sum256(orderingKey) + num := new(big.Int).SetBytes(h[:]) + partition := new(big.Int).Mod(num, r.partitionCount) + return int(partition.Int64()) +} + +// compositeMsgRouter delegates to different message routers for messages +// with/without ordering keys. +type compositeMsgRouter struct { + keyedRouter messageRouter + keylessRouter messageRouter +} + +func (r *compositeMsgRouter) SetPartitionCount(count int) { + r.keyedRouter.SetPartitionCount(count) + r.keylessRouter.SetPartitionCount(count) +} + +func (r *compositeMsgRouter) Route(orderingKey []byte) int { + if len(orderingKey) > 0 { + return r.keyedRouter.Route(orderingKey) + } + return r.keylessRouter.Route(orderingKey) +} + +// defaultMessageRouter returns a compositeMsgRouter that uses hashingMsgRouter +// for messages with ordering key and roundRobinMsgRouter for messages without. +func newDefaultMessageRouter(rng *rand.Rand) messageRouter { + return &compositeMsgRouter{ + keyedRouter: &hashingMsgRouter{}, + keylessRouter: &roundRobinMsgRouter{rng: rng}, + } +} diff --git a/pubsublite/message_test.go b/pubsublite/message_test.go new file mode 100644 index 00000000000..1d5b3a97004 --- /dev/null +++ b/pubsublite/message_test.go @@ -0,0 +1,215 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package pubsublite + +import ( + "fmt" + "math/rand" + "testing" + "time" + + "github.com/golang/protobuf/proto" + + tspb "github.com/golang/protobuf/ptypes/timestamp" + pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1" +) + +type fakeSource struct { + ret int64 +} + +func (f *fakeSource) Int63() int64 { return f.ret } +func (f *fakeSource) Seed(seed int64) {} + +type fakeMsgRouter struct { + multiplier int + partitionCount int +} + +func (f *fakeMsgRouter) SetPartitionCount(count int) { + f.partitionCount = count +} + +func (f *fakeMsgRouter) Route(orderingKey []byte) int { + return f.partitionCount * f.multiplier +} + +func TestMessageToProto(t *testing.T) { + for _, tc := range []struct { + desc string + msg *Message + want *pb.PubSubMessage + }{ + { + desc: "valid: minimal", + msg: &Message{ + Data: []byte("Hello world"), + }, + want: &pb.PubSubMessage{ + Data: []byte("Hello world"), + }, + }, + { + desc: "valid: filled", + msg: &Message{ + Data: []byte("foo"), + Attributes: map[string]AttributeValues{ + "attr1": [][]byte{ + []byte("val1"), + []byte("val2"), + }, + }, + EventTime: time.Unix(1555593697, 154358*1000), + OrderingKey: []byte("order"), + }, + want: &pb.PubSubMessage{ + Data: []byte("foo"), + Attributes: map[string]*pb.AttributeValues{ + "attr1": { + Values: [][]byte{ + []byte("val1"), + []byte("val2"), + }, + }, + }, + EventTime: &tspb.Timestamp{ + Seconds: 1555593697, + Nanos: 154358 * 1000, + }, + Key: []byte("order"), + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + got, err := tc.msg.toProto() + if err != nil { + t.Errorf("toProto() err = %v", err) + } else if !proto.Equal(got, tc.want) { + t.Errorf("toProto() got = %v\nwant = %v", got, tc.want) + } + }) + } +} + +func TestRoundRobinMsgRouter(t *testing.T) { + // Using the same msgRouter for each test run ensures that it reinitializes + // when the partition count changes. + source := &fakeSource{} + msgRouter := &roundRobinMsgRouter{rng: rand.New(source)} + + for _, tc := range []struct { + partitionCount int + source int64 + want []int + }{ + { + partitionCount: 8, + source: 9, + want: []int{1, 2, 3, 4, 5, 6, 7, 0, 1}, + }, + { + partitionCount: 5, + source: 2, + want: []int{2, 3, 4, 0, 1, 2}, + }, + } { + t.Run(fmt.Sprintf("partitionCount=%d", tc.partitionCount), func(t *testing.T) { + source.ret = tc.source + msgRouter.SetPartitionCount(tc.partitionCount) + for i, want := range tc.want { + got := msgRouter.Route([]byte("IGNORED")) + if got != want { + t.Errorf("i=%d: Route() = %d, want = %d", i, got, want) + } + } + }) + } +} + +func TestHashingMsgRouter(t *testing.T) { + // Using the same msgRouter for each test run ensures that it reinitializes + // when the partition count changes. + msgRouter := &hashingMsgRouter{} + + keys := [][]byte{ + []byte("foo1"), + []byte("foo2"), + []byte("foo3"), + []byte("foo4"), + []byte("foo5"), + } + + for _, tc := range []struct { + partitionCount int + }{ + {partitionCount: 10}, + {partitionCount: 5}, + } { + t.Run(fmt.Sprintf("partitionCount=%d", tc.partitionCount), func(t *testing.T) { + msgRouter.SetPartitionCount(tc.partitionCount) + for _, key := range keys { + p1 := msgRouter.Route(key) + p2 := msgRouter.Route(key) + if p1 != p2 { + t.Errorf("Route() returned different partitions for same key %v", key) + } + if p1 < 0 || p1 >= tc.partitionCount { + t.Errorf("Route() returned partition out of range: %v", p1) + } + } + }) + } +} + +func TestCompositeMsgRouter(t *testing.T) { + keyedRouter := &fakeMsgRouter{multiplier: 10} + keylessRouter := &fakeMsgRouter{multiplier: 100} + msgRouter := &compositeMsgRouter{ + keyedRouter: keyedRouter, + keylessRouter: keylessRouter, + } + + for _, tc := range []struct { + desc string + partitionCount int + key []byte + want int + }{ + { + desc: "key", + partitionCount: 2, + key: []byte("foo"), + want: 20, + }, + { + desc: "nil key", + partitionCount: 8, + key: nil, + want: 800, + }, + { + desc: "empty key", + partitionCount: 5, + key: []byte{}, + want: 500, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + msgRouter.SetPartitionCount(tc.partitionCount) + if got := msgRouter.Route(tc.key); got != tc.want { + t.Errorf("Route() = %d, want = %d", got, tc.want) + } + }) + } +}