Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsublite): change pubsub.Message.ID to an encoded publish.Metadata #3662

Merged
merged 3 commits into from Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 4 additions & 3 deletions pubsublite/internal/wire/subscriber.go
Expand Up @@ -37,8 +37,9 @@ var (
// ReceivedMessage stores a received Pub/Sub message and AckConsumer for
// acknowledging the message.
type ReceivedMessage struct {
Msg *pb.SequencedMessage
Ack AckConsumer
Msg *pb.SequencedMessage
Ack AckConsumer
Partition int
}

// MessageReceiverFunc receives a Pub/Sub message from a topic partition.
Expand Down Expand Up @@ -277,7 +278,7 @@ func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse)

for _, msg := range response.Messages {
ack := newAckConsumer(msg.GetCursor().GetOffset(), msg.GetSizeBytes(), s.onAck)
s.messageQueue.Add(&ReceivedMessage{Msg: msg, Ack: ack})
s.messageQueue.Add(&ReceivedMessage{Msg: msg, Ack: ack, Partition: s.subscription.Partition})
}
return nil
}
Expand Down
41 changes: 29 additions & 12 deletions pubsublite/internal/wire/subscriber_test.go
Expand Up @@ -21,6 +21,7 @@ import (

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
Expand All @@ -41,6 +42,22 @@ func initFlowControlReq() *pb.SubscribeRequest {
return flowControlSubReq(flowControlTokens{Bytes: 1000, Messages: 10})
}

func partitionMsgs(partition int, msgs ...*pb.SequencedMessage) []*ReceivedMessage {
var received []*ReceivedMessage
for _, msg := range msgs {
received = append(received, &ReceivedMessage{Msg: msg, Partition: partition})
}
return received
}

func join(args ...[]*ReceivedMessage) []*ReceivedMessage {
var received []*ReceivedMessage
for _, msgs := range args {
received = append(received, msgs...)
}
return received
}

type testMessageReceiver struct {
t *testing.T
received chan *ReceivedMessage
Expand Down Expand Up @@ -70,29 +87,29 @@ func (tr *testMessageReceiver) ValidateMsg(want *pb.SequencedMessage) AckConsume
}
}

type ByMsgOffset []*pb.SequencedMessage
type ByMsgOffset []*ReceivedMessage

func (m ByMsgOffset) Len() int { return len(m) }
func (m ByMsgOffset) Swap(i, j int) { m[i], m[j] = m[j], m[i] }
func (m ByMsgOffset) Less(i, j int) bool {
return m[i].GetCursor().GetOffset() < m[j].GetCursor().GetOffset()
return m[i].Msg.GetCursor().GetOffset() < m[j].Msg.GetCursor().GetOffset()
}

func (tr *testMessageReceiver) ValidateMsgs(want []*pb.SequencedMessage) {
var got []*pb.SequencedMessage
func (tr *testMessageReceiver) ValidateMsgs(want []*ReceivedMessage) {
var got []*ReceivedMessage
for count := 0; count < len(want); count++ {
select {
case <-time.After(serviceTestWaitTimeout):
tr.t.Errorf("Received messages count: got %d, want %d", count, len(want))
case received := <-tr.received:
received.Ack.Ack()
got = append(got, received.Msg)
got = append(got, received)
}
}

sort.Sort(ByMsgOffset(want))
sort.Sort(ByMsgOffset(got))
if !testutil.Equal(got, want) {
if !testutil.Equal(got, want, cmpopts.IgnoreFields(ReceivedMessage{}, "Ack")) {
tr.t.Errorf("Received messages: got: %v\nwant: %v", got, want)
}
}
Expand Down Expand Up @@ -698,7 +715,7 @@ func TestMultiPartitionSubscriberMultipleMessages(t *testing.T) {
if gotErr := sub.WaitStarted(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
receiver.ValidateMsgs([]*pb.SequencedMessage{msg1, msg2, msg3, msg4})
receiver.ValidateMsgs(join(partitionMsgs(1, msg1, msg2), partitionMsgs(2, msg3, msg4)))
sub.Stop()
if gotErr := sub.WaitStopped(); gotErr != nil {
t.Errorf("Stop() got err: (%v)", gotErr)
Expand Down Expand Up @@ -746,7 +763,7 @@ func TestMultiPartitionSubscriberPermanentError(t *testing.T) {
if gotErr := sub.WaitStarted(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
receiver.ValidateMsgs([]*pb.SequencedMessage{msg1, msg3})
receiver.ValidateMsgs(join(partitionMsgs(1, msg1), partitionMsgs(2, msg3)))
errorBarrier.Release() // Release server error now to ensure test is deterministic
if gotErr := sub.WaitStopped(); !test.ErrorEqual(gotErr, serverErr) {
t.Errorf("Final error got: (%v), want: (%v)", gotErr, serverErr)
Expand Down Expand Up @@ -875,14 +892,14 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {
}

// Partition assignments are initially {3, 6}.
receiver.ValidateMsgs([]*pb.SequencedMessage{msg1, msg3})
receiver.ValidateMsgs(join(partitionMsgs(3, msg1), partitionMsgs(6, msg3)))
if got, want := sub.Partitions(), []int{3, 6}; !testutil.Equal(got, want) {
t.Errorf("subscriber partitions: got %d, want %d", got, want)
}

// Partition assignments will now be {3, 8}.
assignmentBarrier.Release()
receiver.ValidateMsgs([]*pb.SequencedMessage{msg5})
receiver.ValidateMsgs(partitionMsgs(8, msg5))
if got, want := sub.Partitions(), []int{3, 8}; !testutil.Equal(got, want) {
t.Errorf("subscriber partitions: got %d, want %d", got, want)
}
Expand All @@ -892,7 +909,7 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) {
sub.FlushCommits()
msg2Barrier.Release()
msg4Barrier.Release()
receiver.ValidateMsgs([]*pb.SequencedMessage{msg2})
receiver.ValidateMsgs(partitionMsgs(3, msg2))

// Stop should flush all commit cursors.
sub.Stop()
Expand Down Expand Up @@ -945,7 +962,7 @@ func TestAssigningSubscriberPermanentError(t *testing.T) {
if gotErr := sub.WaitStarted(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
receiver.ValidateMsgs([]*pb.SequencedMessage{msg1, msg2})
receiver.ValidateMsgs(join(partitionMsgs(1, msg1), partitionMsgs(2, msg2)))

// Permanent assignment stream error should terminate subscriber. Commits are
// still flushed.
Expand Down
81 changes: 45 additions & 36 deletions pubsublite/pscompat/integration_test.go
Expand Up @@ -28,6 +28,7 @@ import (
"cloud.google.com/go/pubsublite"
"cloud.google.com/go/pubsublite/internal/test"
"cloud.google.com/go/pubsublite/internal/wire"
"cloud.google.com/go/pubsublite/publish"
"github.com/google/go-cmp/cmp/cmpopts"
"golang.org/x/sync/errgroup"
"google.golang.org/api/option"
Expand Down Expand Up @@ -177,30 +178,38 @@ func publishMessages(t *testing.T, settings PublishSettings, topic wire.TopicPat
waitForPublishResults(t, pubResults)
}

func publishPrefixedMessages(t *testing.T, settings PublishSettings, topic wire.TopicPath, msgPrefix string, messageCount int) []string {
func publishPrefixedMessages(t *testing.T, settings PublishSettings, topic wire.TopicPath, msgPrefix string, msgCount, msgSize int) []string {
ctx := context.Background()
publisher := publisherClient(ctx, t, settings, topic)
defer publisher.Stop()

orderingSender := test.NewOrderingSender()
var pubResults []*pubsub.PublishResult
var msgs []string
for i := 0; i < messageCount; i++ {
var msgData []string
for i := 0; i < msgCount; i++ {
data := orderingSender.Next(msgPrefix)
msgs = append(msgs, data)
pubResults = append(pubResults, publisher.Publish(ctx, &pubsub.Message{Data: []byte(data)}))
msgData = append(msgData, data)
msg := &pubsub.Message{Data: []byte(data)}
if msgSize > 0 {
// Add padding to achieve desired message size.
msg.Attributes = map[string]string{"attr": strings.Repeat("*", msgSize-len(data))}
}
pubResults = append(pubResults, publisher.Publish(ctx, msg))
}
waitForPublishResults(t, pubResults)
return msgs
return msgData
}

func waitForPublishResults(t *testing.T, pubResults []*pubsub.PublishResult) {
cctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
for i, result := range pubResults {
_, err := result.Get(cctx)
id, err := result.Get(cctx)
if err != nil {
t.Errorf("Publish(%d) got err: %v", i, err)
}
if _, err := publish.ParseMetadata(id); err != nil {
t.Error(err)
}
}
t.Logf("Published %d messages", len(pubResults))
cancel()
Expand All @@ -219,9 +228,7 @@ func messageDiff(got, want *pubsub.Message) string {
return testutil.Diff(got, want, cmpopts.IgnoreUnexported(pubsub.Message{}), cmpopts.IgnoreFields(pubsub.Message{}, "ID", "PublishTime"), cmpopts.EquateEmpty())
}

type checkOrdering bool

func receiveAllMessages(t *testing.T, msgTracker *test.MsgTracker, settings ReceiveSettings, subscription wire.SubscriptionPath, checkOrder checkOrdering) {
func receiveAllMessages(t *testing.T, msgTracker *test.MsgTracker, settings ReceiveSettings, subscription wire.SubscriptionPath) {
cctx, stopSubscriber := context.WithTimeout(context.Background(), defaultTestTimeout)
orderingValidator := test.NewOrderingReceiver()

Expand All @@ -233,13 +240,21 @@ func receiveAllMessages(t *testing.T, msgTracker *test.MsgTracker, settings Rece
t.Fatalf("Received unexpected message: %q", truncateMsg(data))
return
}
if checkOrder {
if err := orderingValidator.Receive(data, msg.OrderingKey); err != nil {
t.Errorf("Received unordered message: %q", truncateMsg(data))

// Check message ordering.
metadata, err := publish.ParseMetadata(msg.ID)
if err != nil {
t.Error(err)
} else {
orderingKey := fmt.Sprintf("%d", metadata.Partition)
if err := orderingValidator.Receive(data, orderingKey); err != nil {
t.Errorf("Received unordered message with id %s: %q", msg.ID, truncateMsg(data))
}
}

// Stop the subscriber when all messages have been received.
if msgTracker.Empty() {
stopSubscriber() // Stop the subscriber when all messages have been received
stopSubscriber()
}
}

Expand Down Expand Up @@ -331,7 +346,6 @@ func TestIntegration_PublishSubscribeSinglePartition(t *testing.T) {
// Swaps data and key.
msg.Data = wireMsg.GetMessage().GetKey()
msg.OrderingKey = string(wireMsg.GetMessage().GetData())
msg.ID = "FAKE_ID"
msg.PublishTime = time.Now()
return nil
}
Expand Down Expand Up @@ -474,12 +488,12 @@ func TestIntegration_PublishSubscribeSinglePartition(t *testing.T) {
pubSettings := DefaultPublishSettings
pubSettings.CountThreshold = publishBatchSize
pubSettings.DelayThreshold = 100 * time.Millisecond
msgs := publishPrefixedMessages(t, pubSettings, topicPath, "ordering", messageCount)
msgs := publishPrefixedMessages(t, pubSettings, topicPath, "ordering", messageCount, 0)

// Receive messages.
msgTracker := test.NewMsgTracker()
msgTracker.Add(msgs...)
receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath, checkOrdering(true))
receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath)
})

// Checks that subscriber flow control works.
Expand All @@ -488,33 +502,28 @@ func TestIntegration_PublishSubscribeSinglePartition(t *testing.T) {
const maxOutstandingMessages = 2 // Receive small batches of messages

// Publish messages.
msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "subscriber_flow_control", messageCount)
msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "subscriber_flow_control", messageCount, 0)

// Receive messages.
msgTracker := test.NewMsgTracker()
msgTracker.Add(msgs...)
customSettings := recvSettings
customSettings.MaxOutstandingMessages = maxOutstandingMessages
receiveAllMessages(t, msgTracker, customSettings, subscriptionPath, checkOrdering(true))
receiveAllMessages(t, msgTracker, customSettings, subscriptionPath)
})

// Verifies that large messages can be sent and received.
t.Run("LargeMessages", func(t *testing.T) {
const messageCount = 3
const messageLen = MaxPublishRequestBytes - 50
const messageCount = 5
const messageSize = MaxPublishRequestBytes - 50

// Publish messages.
msgTracker := test.NewMsgTracker()
var msgs []*pubsub.Message
for i := 0; i < messageCount; i++ {
data := strings.Repeat(fmt.Sprintf("%d", i), messageLen)
msgTracker.Add(data)
msgs = append(msgs, &pubsub.Message{Data: []byte(data)})
}
publishMessages(t, DefaultPublishSettings, topicPath, msgs...)
msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "large_messages", messageCount, messageSize)

// Receive messages.
receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath, checkOrdering(false))
msgTracker := test.NewMsgTracker()
msgTracker.Add(msgs...)
receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath)
})
}

Expand Down Expand Up @@ -542,13 +551,13 @@ func TestIntegration_PublishSubscribeMultiPartition(t *testing.T) {
const messageCount = 50 * partitionCount

// Publish messages.
msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "routing_no_key", messageCount)
msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "routing_no_key", messageCount, 0)

// Receive messages, not checking for ordering since they do not have a key.
// However, they would still be ordered within their partition.
msgTracker := test.NewMsgTracker()
msgTracker.Add(msgs...)
receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath, checkOrdering(false))
receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath)
})

// Tests messages published with ordering key.
Expand Down Expand Up @@ -577,7 +586,7 @@ func TestIntegration_PublishSubscribeMultiPartition(t *testing.T) {
publishMessages(t, pubSettings, topicPath, msgs...)

// Receive messages.
receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath, checkOrdering(true))
receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath)
})

// Verifies usage of the partition assignment service.
Expand All @@ -586,7 +595,7 @@ func TestIntegration_PublishSubscribeMultiPartition(t *testing.T) {
const subscriberCount = 2 // Should be between [2, partitionCount]

// Publish messages.
msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "partition_assignment", messageCount)
msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "partition_assignment", messageCount, 0)

// Start multiple subscribers that use partition assignment.
msgTracker := test.NewMsgTracker()
Expand Down Expand Up @@ -649,7 +658,7 @@ func TestIntegration_SubscribeFanOut(t *testing.T) {
}

// Publish messages.
msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "fan_out", messageCount)
msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "fan_out", messageCount, 0)

// Receive messages from multiple subscriptions.
recvSettings := DefaultReceiveSettings
Expand All @@ -658,6 +667,6 @@ func TestIntegration_SubscribeFanOut(t *testing.T) {
for _, subscription := range subscriptionPaths {
msgTracker := test.NewMsgTracker()
msgTracker.Add(msgs...)
receiveAllMessages(t, msgTracker, recvSettings, subscription, checkOrdering(partitionCount == 1))
receiveAllMessages(t, msgTracker, recvSettings, subscription)
}
}
3 changes: 0 additions & 3 deletions pubsublite/pscompat/message.go
Expand Up @@ -104,9 +104,6 @@ func transformReceivedMessage(from *pb.SequencedMessage, to *pubsub.Message) err
return fmt.Errorf("%s: %s", errInvalidMessage.Error(), err)
}
}
if from.GetCursor() != nil {
to.ID = fmt.Sprintf("%d", from.GetCursor().GetOffset())
}
if len(msg.GetKey()) > 0 {
to.OrderingKey = string(msg.GetKey())
}
Expand Down
1 change: 0 additions & 1 deletion pubsublite/pscompat/message_test.go
Expand Up @@ -66,7 +66,6 @@ func TestMessageTransforms(t *testing.T) {
},
},
wantMsg: &pubsub.Message{
ID: "10",
PublishTime: time.Unix(1577836800, 900800700),
Data: []byte("foo"),
OrderingKey: "bar",
Expand Down
6 changes: 4 additions & 2 deletions pubsublite/pscompat/settings.go
Expand Up @@ -146,8 +146,10 @@ func (s *PublishSettings) toWireSettings() wire.PublishSettings {
type NackHandler func(*pubsub.Message) error

// ReceiveMessageTransformerFunc transforms a Pub/Sub Lite SequencedMessage API
// proto to a pubsub.Message. If this returns an error, the SubscriberClient
// will consider this a fatal error and terminate.
// proto to a pubsub.Message. The implementation must not set pubsub.Message.ID.
//
// If this returns an error, the SubscriberClient will consider this a fatal
// error and terminate.
type ReceiveMessageTransformerFunc func(*pb.SequencedMessage, *pubsub.Message) error

// ReceiveSettings configure the SubscriberClient. Flow control settings
Expand Down