Skip to content

Commit

Permalink
fix(pubsublite): change pubsub.Message.ID to an encoded publish.Metad…
Browse files Browse the repository at this point in the history
…ata (#3662)

Changed pubsub.Message.ID to encode the partition and offset for a message.
  • Loading branch information
tmdiep committed Feb 4, 2021
1 parent 691d713 commit 6b2807f
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 103 deletions.
7 changes: 4 additions & 3 deletions pubsublite/internal/wire/subscriber.go
Expand Up @@ -37,8 +37,9 @@ var (
// ReceivedMessage stores a received Pub/Sub message and AckConsumer for
// acknowledging the message.
type ReceivedMessage struct {
Msg *pb.SequencedMessage
Ack AckConsumer
Msg *pb.SequencedMessage
Ack AckConsumer
Partition int
}

// MessageReceiverFunc receives a Pub/Sub message from a topic partition.
Expand Down Expand Up @@ -277,7 +278,7 @@ func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse)

for _, msg := range response.Messages {
ack := newAckConsumer(msg.GetCursor().GetOffset(), msg.GetSizeBytes(), s.onAck)
s.messageQueue.Add(&ReceivedMessage{Msg: msg, Ack: ack})
s.messageQueue.Add(&ReceivedMessage{Msg: msg, Ack: ack, Partition: s.subscription.Partition})
}
return nil
}
Expand Down
41 changes: 29 additions & 12 deletions pubsublite/internal/wire/subscriber_test.go
Expand Up @@ -21,6 +21,7 @@ import (

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
Expand All @@ -41,6 +42,22 @@ func initFlowControlReq() *pb.SubscribeRequest {
return flowControlSubReq(flowControlTokens{Bytes: 1000, Messages: 10})
}

func partitionMsgs(partition int, msgs ...*pb.SequencedMessage) []*ReceivedMessage {
var received []*ReceivedMessage
for _, msg := range msgs {
received = append(received, &ReceivedMessage{Msg: msg, Partition: partition})
}
return received
}

func join(args ...[]*ReceivedMessage) []*ReceivedMessage {
var received []*ReceivedMessage
for _, msgs := range args {
received = append(received, msgs...)
}
return received
}

type testMessageReceiver struct {
t *testing.T
received chan *ReceivedMessage
Expand Down Expand Up @@ -70,29 +87,29 @@ func (tr *testMessageReceiver) ValidateMsg(want *pb.SequencedMessage) AckConsume
}
}

type ByMsgOffset []*pb.SequencedMessage
type ByMsgOffset []*ReceivedMessage

func (m ByMsgOffset) Len() int { return len(m) }
func (m ByMsgOffset) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m ByMsgOffset) Less(i, j int) bool {
return m[i].GetCursor().GetOffset() < m[j].GetCursor().GetOffset()
return m[i].Msg.GetCursor().GetOffset() < m[j].Msg.GetCursor().GetOffset()
}

func (tr *testMessageReceiver) ValidateMsgs(want []*pb.SequencedMessage) {
var got []*pb.SequencedMessage
func (tr *testMessageReceiver) ValidateMsgs(want []*ReceivedMessage) {
var got []*ReceivedMessage
for count := 0; count < len(want); count++ {
select {
case <-time.After(serviceTestWaitTimeout):
tr.t.Errorf("Received messages count: got %d, want %d", count, len(want))
case received := <-tr.received:
received.Ack.Ack()
got = append(got, received.Msg)
got = append(got, received)
}
}

sort.Sort(ByMsgOffset(want))
sort.Sort(ByMsgOffset(got))
if !testutil.Equal(got, want) {
if !testutil.Equal(got, want, cmpopts.IgnoreFields(ReceivedMessage{}, "Ack")) {
tr.t.Errorf("Received messages: got: %v\nwant: %v", got, want)
}
}
Expand Down Expand Up @@ -698,7 +715,7 @@ func TestMultiPartitionSubscriberMultipleMessages(t *testing.T) {
if gotErr := sub.WaitStarted(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
receiver.ValidateMsgs([]*pb.SequencedMessage{msg1, msg2, msg3, msg4})
receiver.ValidateMsgs(join(partitionMsgs(1, msg1, msg2), partitionMsgs(2, msg3, msg4)))
sub.Stop()
if gotErr := sub.WaitStopped(); gotErr != nil {
t.Errorf("Stop() got err: (%v)", gotErr)
Expand Down Expand Up @@ -746,7 +763,7 @@ func TestMultiPartitionSubscriberPermanentError(t *testing.T) {
if gotErr := sub.WaitStarted(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
receiver.ValidateMsgs([]*pb.SequencedMessage{msg1, msg3})
receiver.ValidateMsgs(join(partitionMsgs(1, msg1), partitionMsgs(2, msg3)))
errorBarrier.Release() // Release server error now to ensure test is deterministic
if gotErr := sub.WaitStopped(); !test.ErrorEqual(gotErr, serverErr) {
t.Errorf("Final error got: (%v), want: (%v)", gotErr, serverErr)
Expand Down Expand Up @@ -875,14 +892,14 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {
}

// Partition assignments are initially {3, 6}.
receiver.ValidateMsgs([]*pb.SequencedMessage{msg1, msg3})
receiver.ValidateMsgs(join(partitionMsgs(3, msg1), partitionMsgs(6, msg3)))
if got, want := sub.Partitions(), []int{3, 6}; !testutil.Equal(got, want) {
t.Errorf("subscriber partitions: got %d, want %d", got, want)
}

// Partition assignments will now be {3, 8}.
assignmentBarrier.Release()
receiver.ValidateMsgs([]*pb.SequencedMessage{msg5})
receiver.ValidateMsgs(partitionMsgs(8, msg5))
if got, want := sub.Partitions(), []int{3, 8}; !testutil.Equal(got, want) {
t.Errorf("subscriber partitions: got %d, want %d", got, want)
}
Expand All @@ -892,7 +909,7 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {
sub.FlushCommits()
msg2Barrier.Release()
msg4Barrier.Release()
receiver.ValidateMsgs([]*pb.SequencedMessage{msg2})
receiver.ValidateMsgs(partitionMsgs(3, msg2))

// Stop should flush all commit cursors.
sub.Stop()
Expand Down Expand Up @@ -945,7 +962,7 @@ func TestAssigningSubscriberPermanentError(t *testing.T) {
if gotErr := sub.WaitStarted(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
receiver.ValidateMsgs([]*pb.SequencedMessage{msg1, msg2})
receiver.ValidateMsgs(join(partitionMsgs(1, msg1), partitionMsgs(2, msg2)))

// Permanent assignment stream error should terminate subscriber. Commits are
// still flushed.
Expand Down
81 changes: 45 additions & 36 deletions pubsublite/pscompat/integration_test.go
Expand Up @@ -28,6 +28,7 @@ import (
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/internal/test"
"cloud.google.com/go/pubsublite/internal/wire"
"cloud.google.com/go/pubsublite/publish"
"github.com/google/go-cmp/cmp/cmpopts"
"golang.org/x/sync/errgroup"
"google.golang.org/api/option"
Expand Down Expand Up @@ -177,30 +178,38 @@ func publishMessages(t *testing.T, settings PublishSettings, topic wire.TopicPat
waitForPublishResults(t, pubResults)
}

func publishPrefixedMessages(t *testing.T, settings PublishSettings, topic wire.TopicPath, msgPrefix string, messageCount int) []string {
func publishPrefixedMessages(t *testing.T, settings PublishSettings, topic wire.TopicPath, msgPrefix string, msgCount, msgSize int) []string {
ctx := context.Background()
publisher := publisherClient(ctx, t, settings, topic)
defer publisher.Stop()

orderingSender := test.NewOrderingSender()
var pubResults []*pubsub.PublishResult
var msgs []string
for i := 0; i < messageCount; i++ {
var msgData []string
for i := 0; i < msgCount; i++ {
data := orderingSender.Next(msgPrefix)
msgs = append(msgs, data)
pubResults = append(pubResults, publisher.Publish(ctx, &pubsub.Message{Data: []byte(data)}))
msgData = append(msgData, data)
msg := &pubsub.Message{Data: []byte(data)}
if msgSize > 0 {
// Add padding to achieve desired message size.
msg.Attributes = map[string]string{"attr": strings.Repeat("*", msgSize-len(data))}
}
pubResults = append(pubResults, publisher.Publish(ctx, msg))
}
waitForPublishResults(t, pubResults)
return msgs
return msgData
}

func waitForPublishResults(t *testing.T, pubResults []*pubsub.PublishResult) {
cctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
for i, result := range pubResults {
_, err := result.Get(cctx)
id, err := result.Get(cctx)
if err != nil {
t.Errorf("Publish(%d) got err: %v", i, err)
}
if _, err := publish.ParseMetadata(id); err != nil {
t.Error(err)
}
}
t.Logf("Published %d messages", len(pubResults))
cancel()
Expand All @@ -219,9 +228,7 @@ func messageDiff(got, want *pubsub.Message) string {
return testutil.Diff(got, want, cmpopts.IgnoreUnexported(pubsub.Message{}), cmpopts.IgnoreFields(pubsub.Message{}, "ID", "PublishTime"), cmpopts.EquateEmpty())
}

type checkOrdering bool

func receiveAllMessages(t *testing.T, msgTracker *test.MsgTracker, settings ReceiveSettings, subscription wire.SubscriptionPath, checkOrder checkOrdering) {
func receiveAllMessages(t *testing.T, msgTracker *test.MsgTracker, settings ReceiveSettings, subscription wire.SubscriptionPath) {
cctx, stopSubscriber := context.WithTimeout(context.Background(), defaultTestTimeout)
orderingValidator := test.NewOrderingReceiver()

Expand All @@ -233,13 +240,21 @@ func receiveAllMessages(t *testing.T, msgTracker *test.MsgTracker, settings Rece
t.Fatalf("Received unexpected message: %q", truncateMsg(data))
return
}
if checkOrder {
if err := orderingValidator.Receive(data, msg.OrderingKey); err != nil {
t.Errorf("Received unordered message: %q", truncateMsg(data))

// Check message ordering.
metadata, err := publish.ParseMetadata(msg.ID)
if err != nil {
t.Error(err)
} else {
orderingKey := fmt.Sprintf("%d", metadata.Partition)
if err := orderingValidator.Receive(data, orderingKey); err != nil {
t.Errorf("Received unordered message with id %s: %q", msg.ID, truncateMsg(data))
}
}

// Stop the subscriber when all messages have been received.
if msgTracker.Empty() {
stopSubscriber() // Stop the subscriber when all messages have been received
stopSubscriber()
}
}

Expand Down Expand Up @@ -331,7 +346,6 @@ func TestIntegration_PublishSubscribeSinglePartition(t *testing.T) {
// Swaps data and key.
msg.Data = wireMsg.GetMessage().GetKey()
msg.OrderingKey = string(wireMsg.GetMessage().GetData())
msg.ID = "FAKE_ID"
msg.PublishTime = time.Now()
return nil
}
Expand Down Expand Up @@ -474,12 +488,12 @@ func TestIntegration_PublishSubscribeSinglePartition(t *testing.T) {
pubSettings := DefaultPublishSettings
pubSettings.CountThreshold = publishBatchSize
pubSettings.DelayThreshold = 100 * time.Millisecond
msgs := publishPrefixedMessages(t, pubSettings, topicPath, "ordering", messageCount)
msgs := publishPrefixedMessages(t, pubSettings, topicPath, "ordering", messageCount, 0)

// Receive messages.
msgTracker := test.NewMsgTracker()
msgTracker.Add(msgs...)
receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath, checkOrdering(true))
receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath)
})

// Checks that subscriber flow control works.
Expand All @@ -488,33 +502,28 @@ func TestIntegration_PublishSubscribeSinglePartition(t *testing.T) {
const maxOutstandingMessages = 2 // Receive small batches of messages

// Publish messages.
msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "subscriber_flow_control", messageCount)
msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "subscriber_flow_control", messageCount, 0)

// Receive messages.
msgTracker := test.NewMsgTracker()
msgTracker.Add(msgs...)
customSettings := recvSettings
customSettings.MaxOutstandingMessages = maxOutstandingMessages
receiveAllMessages(t, msgTracker, customSettings, subscriptionPath, checkOrdering(true))
receiveAllMessages(t, msgTracker, customSettings, subscriptionPath)
})

// Verifies that large messages can be sent and received.
t.Run("LargeMessages", func(t *testing.T) {
const messageCount = 3
const messageLen = MaxPublishRequestBytes - 50
const messageCount = 5
const messageSize = MaxPublishRequestBytes - 50

// Publish messages.
msgTracker := test.NewMsgTracker()
var msgs []*pubsub.Message
for i := 0; i < messageCount; i++ {
data := strings.Repeat(fmt.Sprintf("%d", i), messageLen)
msgTracker.Add(data)
msgs = append(msgs, &pubsub.Message{Data: []byte(data)})
}
publishMessages(t, DefaultPublishSettings, topicPath, msgs...)
msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "large_messages", messageCount, messageSize)

// Receive messages.
receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath, checkOrdering(false))
msgTracker := test.NewMsgTracker()
msgTracker.Add(msgs...)
receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath)
})
}

Expand Down Expand Up @@ -542,13 +551,13 @@ func TestIntegration_PublishSubscribeMultiPartition(t *testing.T) {
const messageCount = 50 * partitionCount

// Publish messages.
msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "routing_no_key", messageCount)
msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "routing_no_key", messageCount, 0)

// Receive messages, not checking for ordering since they do not have a key.
// However, they would still be ordered within their partition.
msgTracker := test.NewMsgTracker()
msgTracker.Add(msgs...)
receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath, checkOrdering(false))
receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath)
})

// Tests messages published with ordering key.
Expand Down Expand Up @@ -577,7 +586,7 @@ func TestIntegration_PublishSubscribeMultiPartition(t *testing.T) {
publishMessages(t, pubSettings, topicPath, msgs...)

// Receive messages.
receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath, checkOrdering(true))
receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath)
})

// Verifies usage of the partition assignment service.
Expand All @@ -586,7 +595,7 @@ func TestIntegration_PublishSubscribeMultiPartition(t *testing.T) {
const subscriberCount = 2 // Should be between [2, partitionCount]

// Publish messages.
msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "partition_assignment", messageCount)
msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "partition_assignment", messageCount, 0)

// Start multiple subscribers that use partition assignment.
msgTracker := test.NewMsgTracker()
Expand Down Expand Up @@ -649,7 +658,7 @@ func TestIntegration_SubscribeFanOut(t *testing.T) {
}

// Publish messages.
msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "fan_out", messageCount)
msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "fan_out", messageCount, 0)

// Receive messages from multiple subscriptions.
recvSettings := DefaultReceiveSettings
Expand All @@ -658,6 +667,6 @@ func TestIntegration_SubscribeFanOut(t *testing.T) {
for _, subscription := range subscriptionPaths {
msgTracker := test.NewMsgTracker()
msgTracker.Add(msgs...)
receiveAllMessages(t, msgTracker, recvSettings, subscription, checkOrdering(partitionCount == 1))
receiveAllMessages(t, msgTracker, recvSettings, subscription)
}
}
3 changes: 0 additions & 3 deletions pubsublite/pscompat/message.go
Expand Up @@ -104,9 +104,6 @@ func transformReceivedMessage(from *pb.SequencedMessage, to *pubsub.Message) err
return fmt.Errorf("%s: %s", errInvalidMessage.Error(), err)
}
}
if from.GetCursor() != nil {
to.ID = fmt.Sprintf("%d", from.GetCursor().GetOffset())
}
if len(msg.GetKey()) > 0 {
to.OrderingKey = string(msg.GetKey())
}
Expand Down
1 change: 0 additions & 1 deletion pubsublite/pscompat/message_test.go
Expand Up @@ -66,7 +66,6 @@ func TestMessageTransforms(t *testing.T) {
},
},
wantMsg: &pubsub.Message{
ID: "10",
PublishTime: time.Unix(1577836800, 900800700),
Data: []byte("foo"),
OrderingKey: "bar",
Expand Down
6 changes: 4 additions & 2 deletions pubsublite/pscompat/settings.go
Expand Up @@ -146,8 +146,10 @@ func (s *PublishSettings) toWireSettings() wire.PublishSettings {
type NackHandler func(*pubsub.Message) error

// ReceiveMessageTransformerFunc transforms a Pub/Sub Lite SequencedMessage API
// proto to a pubsub.Message. If this returns an error, the SubscriberClient
// will consider this a fatal error and terminate.
// proto to a pubsub.Message. The implementation must not set pubsub.Message.ID.
//
// If this returns an error, the SubscriberClient will consider this a fatal
// error and terminate.
type ReceiveMessageTransformerFunc func(*pb.SequencedMessage, *pubsub.Message) error

// ReceiveSettings configure the SubscriberClient. Flow control settings
Expand Down

0 comments on commit 6b2807f

Please sign in to comment.