diff --git a/pubsublite/internal/wire/subscriber.go b/pubsublite/internal/wire/subscriber.go index d75b02df5c2..054747a362c 100644 --- a/pubsublite/internal/wire/subscriber.go +++ b/pubsublite/internal/wire/subscriber.go @@ -37,8 +37,9 @@ var ( // ReceivedMessage stores a received Pub/Sub message and AckConsumer for // acknowledging the message. type ReceivedMessage struct { - Msg *pb.SequencedMessage - Ack AckConsumer + Msg *pb.SequencedMessage + Ack AckConsumer + Partition int } // MessageReceiverFunc receives a Pub/Sub message from a topic partition. @@ -277,7 +278,7 @@ func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse) for _, msg := range response.Messages { ack := newAckConsumer(msg.GetCursor().GetOffset(), msg.GetSizeBytes(), s.onAck) - s.messageQueue.Add(&ReceivedMessage{Msg: msg, Ack: ack}) + s.messageQueue.Add(&ReceivedMessage{Msg: msg, Ack: ack, Partition: s.subscription.Partition}) } return nil } diff --git a/pubsublite/internal/wire/subscriber_test.go b/pubsublite/internal/wire/subscriber_test.go index cc4f6cbba34..135cacd1ac2 100644 --- a/pubsublite/internal/wire/subscriber_test.go +++ b/pubsublite/internal/wire/subscriber_test.go @@ -21,6 +21,7 @@ import ( "cloud.google.com/go/internal/testutil" "cloud.google.com/go/pubsublite/internal/test" + "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" @@ -41,6 +42,22 @@ func initFlowControlReq() *pb.SubscribeRequest { return flowControlSubReq(flowControlTokens{Bytes: 1000, Messages: 10}) } +func partitionMsgs(partition int, msgs ...*pb.SequencedMessage) []*ReceivedMessage { + var received []*ReceivedMessage + for _, msg := range msgs { + received = append(received, &ReceivedMessage{Msg: msg, Partition: partition}) + } + return received +} + +func join(args ...[]*ReceivedMessage) []*ReceivedMessage { + var received []*ReceivedMessage + for _, msgs := range args { + received = append(received, msgs...) + } + return received +} + type testMessageReceiver struct { t *testing.T received chan *ReceivedMessage @@ -70,29 +87,29 @@ func (tr *testMessageReceiver) ValidateMsg(want *pb.SequencedMessage) AckConsume } } -type ByMsgOffset []*pb.SequencedMessage +type ByMsgOffset []*ReceivedMessage func (m ByMsgOffset) Len() int { return len(m) } func (m ByMsgOffset) Swap(i, j int) { m[i], m[j] = m[j], m[i] } func (m ByMsgOffset) Less(i, j int) bool { - return m[i].GetCursor().GetOffset() < m[j].GetCursor().GetOffset() + return m[i].Msg.GetCursor().GetOffset() < m[j].Msg.GetCursor().GetOffset() } -func (tr *testMessageReceiver) ValidateMsgs(want []*pb.SequencedMessage) { - var got []*pb.SequencedMessage +func (tr *testMessageReceiver) ValidateMsgs(want []*ReceivedMessage) { + var got []*ReceivedMessage for count := 0; count < len(want); count++ { select { case <-time.After(serviceTestWaitTimeout): tr.t.Errorf("Received messages count: got %d, want %d", count, len(want)) case received := <-tr.received: received.Ack.Ack() - got = append(got, received.Msg) + got = append(got, received) } } sort.Sort(ByMsgOffset(want)) sort.Sort(ByMsgOffset(got)) - if !testutil.Equal(got, want) { + if !testutil.Equal(got, want, cmpopts.IgnoreFields(ReceivedMessage{}, "Ack")) { tr.t.Errorf("Received messages: got: %v\nwant: %v", got, want) } } @@ -698,7 +715,7 @@ func TestMultiPartitionSubscriberMultipleMessages(t *testing.T) { if gotErr := sub.WaitStarted(); gotErr != nil { t.Errorf("Start() got err: (%v)", gotErr) } - receiver.ValidateMsgs([]*pb.SequencedMessage{msg1, msg2, msg3, msg4}) + receiver.ValidateMsgs(join(partitionMsgs(1, msg1, msg2), partitionMsgs(2, msg3, msg4))) sub.Stop() if gotErr := sub.WaitStopped(); gotErr != nil { t.Errorf("Stop() got err: (%v)", gotErr) @@ -746,7 +763,7 @@ func TestMultiPartitionSubscriberPermanentError(t *testing.T) { if gotErr := sub.WaitStarted(); gotErr != nil { t.Errorf("Start() got err: (%v)", gotErr) } - receiver.ValidateMsgs([]*pb.SequencedMessage{msg1, msg3}) + receiver.ValidateMsgs(join(partitionMsgs(1, msg1), partitionMsgs(2, msg3))) errorBarrier.Release() // Release server error now to ensure test is deterministic if gotErr := sub.WaitStopped(); !test.ErrorEqual(gotErr, serverErr) { t.Errorf("Final error got: (%v), want: (%v)", gotErr, serverErr) @@ -875,14 +892,14 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) { } // Partition assignments are initially {3, 6}. - receiver.ValidateMsgs([]*pb.SequencedMessage{msg1, msg3}) + receiver.ValidateMsgs(join(partitionMsgs(3, msg1), partitionMsgs(6, msg3))) if got, want := sub.Partitions(), []int{3, 6}; !testutil.Equal(got, want) { t.Errorf("subscriber partitions: got %d, want %d", got, want) } // Partition assignments will now be {3, 8}. assignmentBarrier.Release() - receiver.ValidateMsgs([]*pb.SequencedMessage{msg5}) + receiver.ValidateMsgs(partitionMsgs(8, msg5)) if got, want := sub.Partitions(), []int{3, 8}; !testutil.Equal(got, want) { t.Errorf("subscriber partitions: got %d, want %d", got, want) } @@ -892,7 +909,7 @@ func TestAssigningSubscriberAddRemovePartitions(t *testing.T) { sub.FlushCommits() msg2Barrier.Release() msg4Barrier.Release() - receiver.ValidateMsgs([]*pb.SequencedMessage{msg2}) + receiver.ValidateMsgs(partitionMsgs(3, msg2)) // Stop should flush all commit cursors. sub.Stop() @@ -945,7 +962,7 @@ func TestAssigningSubscriberPermanentError(t *testing.T) { if gotErr := sub.WaitStarted(); gotErr != nil { t.Errorf("Start() got err: (%v)", gotErr) } - receiver.ValidateMsgs([]*pb.SequencedMessage{msg1, msg2}) + receiver.ValidateMsgs(join(partitionMsgs(1, msg1), partitionMsgs(2, msg2))) // Permanent assignment stream error should terminate subscriber. Commits are // still flushed. diff --git a/pubsublite/pscompat/integration_test.go b/pubsublite/pscompat/integration_test.go index eefae8a6209..2f539cf34c9 100644 --- a/pubsublite/pscompat/integration_test.go +++ b/pubsublite/pscompat/integration_test.go @@ -28,6 +28,7 @@ import ( "cloud.google.com/go/pubsublite" "cloud.google.com/go/pubsublite/internal/test" "cloud.google.com/go/pubsublite/internal/wire" + "cloud.google.com/go/pubsublite/publish" "github.com/google/go-cmp/cmp/cmpopts" "golang.org/x/sync/errgroup" "google.golang.org/api/option" @@ -177,30 +178,38 @@ func publishMessages(t *testing.T, settings PublishSettings, topic wire.TopicPat waitForPublishResults(t, pubResults) } -func publishPrefixedMessages(t *testing.T, settings PublishSettings, topic wire.TopicPath, msgPrefix string, messageCount int) []string { +func publishPrefixedMessages(t *testing.T, settings PublishSettings, topic wire.TopicPath, msgPrefix string, msgCount, msgSize int) []string { ctx := context.Background() publisher := publisherClient(ctx, t, settings, topic) defer publisher.Stop() orderingSender := test.NewOrderingSender() var pubResults []*pubsub.PublishResult - var msgs []string - for i := 0; i < messageCount; i++ { + var msgData []string + for i := 0; i < msgCount; i++ { data := orderingSender.Next(msgPrefix) - msgs = append(msgs, data) - pubResults = append(pubResults, publisher.Publish(ctx, &pubsub.Message{Data: []byte(data)})) + msgData = append(msgData, data) + msg := &pubsub.Message{Data: []byte(data)} + if msgSize > 0 { + // Add padding to achieve desired message size. + msg.Attributes = map[string]string{"attr": strings.Repeat("*", msgSize-len(data))} + } + pubResults = append(pubResults, publisher.Publish(ctx, msg)) } waitForPublishResults(t, pubResults) - return msgs + return msgData } func waitForPublishResults(t *testing.T, pubResults []*pubsub.PublishResult) { cctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) for i, result := range pubResults { - _, err := result.Get(cctx) + id, err := result.Get(cctx) if err != nil { t.Errorf("Publish(%d) got err: %v", i, err) } + if _, err := publish.ParseMetadata(id); err != nil { + t.Error(err) + } } t.Logf("Published %d messages", len(pubResults)) cancel() @@ -219,9 +228,7 @@ func messageDiff(got, want *pubsub.Message) string { return testutil.Diff(got, want, cmpopts.IgnoreUnexported(pubsub.Message{}), cmpopts.IgnoreFields(pubsub.Message{}, "ID", "PublishTime"), cmpopts.EquateEmpty()) } -type checkOrdering bool - -func receiveAllMessages(t *testing.T, msgTracker *test.MsgTracker, settings ReceiveSettings, subscription wire.SubscriptionPath, checkOrder checkOrdering) { +func receiveAllMessages(t *testing.T, msgTracker *test.MsgTracker, settings ReceiveSettings, subscription wire.SubscriptionPath) { cctx, stopSubscriber := context.WithTimeout(context.Background(), defaultTestTimeout) orderingValidator := test.NewOrderingReceiver() @@ -233,13 +240,21 @@ func receiveAllMessages(t *testing.T, msgTracker *test.MsgTracker, settings Rece t.Fatalf("Received unexpected message: %q", truncateMsg(data)) return } - if checkOrder { - if err := orderingValidator.Receive(data, msg.OrderingKey); err != nil { - t.Errorf("Received unordered message: %q", truncateMsg(data)) + + // Check message ordering. + metadata, err := publish.ParseMetadata(msg.ID) + if err != nil { + t.Error(err) + } else { + orderingKey := fmt.Sprintf("%d", metadata.Partition) + if err := orderingValidator.Receive(data, orderingKey); err != nil { + t.Errorf("Received unordered message with id %s: %q", msg.ID, truncateMsg(data)) } } + + // Stop the subscriber when all messages have been received. if msgTracker.Empty() { - stopSubscriber() // Stop the subscriber when all messages have been received + stopSubscriber() } } @@ -331,7 +346,6 @@ func TestIntegration_PublishSubscribeSinglePartition(t *testing.T) { // Swaps data and key. msg.Data = wireMsg.GetMessage().GetKey() msg.OrderingKey = string(wireMsg.GetMessage().GetData()) - msg.ID = "FAKE_ID" msg.PublishTime = time.Now() return nil } @@ -474,12 +488,12 @@ func TestIntegration_PublishSubscribeSinglePartition(t *testing.T) { pubSettings := DefaultPublishSettings pubSettings.CountThreshold = publishBatchSize pubSettings.DelayThreshold = 100 * time.Millisecond - msgs := publishPrefixedMessages(t, pubSettings, topicPath, "ordering", messageCount) + msgs := publishPrefixedMessages(t, pubSettings, topicPath, "ordering", messageCount, 0) // Receive messages. msgTracker := test.NewMsgTracker() msgTracker.Add(msgs...) - receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath, checkOrdering(true)) + receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath) }) // Checks that subscriber flow control works. @@ -488,33 +502,28 @@ func TestIntegration_PublishSubscribeSinglePartition(t *testing.T) { const maxOutstandingMessages = 2 // Receive small batches of messages // Publish messages. - msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "subscriber_flow_control", messageCount) + msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "subscriber_flow_control", messageCount, 0) // Receive messages. msgTracker := test.NewMsgTracker() msgTracker.Add(msgs...) customSettings := recvSettings customSettings.MaxOutstandingMessages = maxOutstandingMessages - receiveAllMessages(t, msgTracker, customSettings, subscriptionPath, checkOrdering(true)) + receiveAllMessages(t, msgTracker, customSettings, subscriptionPath) }) // Verifies that large messages can be sent and received. t.Run("LargeMessages", func(t *testing.T) { - const messageCount = 3 - const messageLen = MaxPublishRequestBytes - 50 + const messageCount = 5 + const messageSize = MaxPublishRequestBytes - 50 // Publish messages. - msgTracker := test.NewMsgTracker() - var msgs []*pubsub.Message - for i := 0; i < messageCount; i++ { - data := strings.Repeat(fmt.Sprintf("%d", i), messageLen) - msgTracker.Add(data) - msgs = append(msgs, &pubsub.Message{Data: []byte(data)}) - } - publishMessages(t, DefaultPublishSettings, topicPath, msgs...) + msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "large_messages", messageCount, messageSize) // Receive messages. - receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath, checkOrdering(false)) + msgTracker := test.NewMsgTracker() + msgTracker.Add(msgs...) + receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath) }) } @@ -542,13 +551,13 @@ func TestIntegration_PublishSubscribeMultiPartition(t *testing.T) { const messageCount = 50 * partitionCount // Publish messages. - msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "routing_no_key", messageCount) + msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "routing_no_key", messageCount, 0) // Receive messages, not checking for ordering since they do not have a key. // However, they would still be ordered within their partition. msgTracker := test.NewMsgTracker() msgTracker.Add(msgs...) - receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath, checkOrdering(false)) + receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath) }) // Tests messages published with ordering key. @@ -577,7 +586,7 @@ func TestIntegration_PublishSubscribeMultiPartition(t *testing.T) { publishMessages(t, pubSettings, topicPath, msgs...) // Receive messages. - receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath, checkOrdering(true)) + receiveAllMessages(t, msgTracker, recvSettings, subscriptionPath) }) // Verifies usage of the partition assignment service. @@ -586,7 +595,7 @@ func TestIntegration_PublishSubscribeMultiPartition(t *testing.T) { const subscriberCount = 2 // Should be between [2, partitionCount] // Publish messages. - msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "partition_assignment", messageCount) + msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "partition_assignment", messageCount, 0) // Start multiple subscribers that use partition assignment. msgTracker := test.NewMsgTracker() @@ -649,7 +658,7 @@ func TestIntegration_SubscribeFanOut(t *testing.T) { } // Publish messages. - msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "fan_out", messageCount) + msgs := publishPrefixedMessages(t, DefaultPublishSettings, topicPath, "fan_out", messageCount, 0) // Receive messages from multiple subscriptions. recvSettings := DefaultReceiveSettings @@ -658,6 +667,6 @@ func TestIntegration_SubscribeFanOut(t *testing.T) { for _, subscription := range subscriptionPaths { msgTracker := test.NewMsgTracker() msgTracker.Add(msgs...) - receiveAllMessages(t, msgTracker, recvSettings, subscription, checkOrdering(partitionCount == 1)) + receiveAllMessages(t, msgTracker, recvSettings, subscription) } } diff --git a/pubsublite/pscompat/message.go b/pubsublite/pscompat/message.go index 69f764340c7..1a6f65dc16a 100644 --- a/pubsublite/pscompat/message.go +++ b/pubsublite/pscompat/message.go @@ -104,9 +104,6 @@ func transformReceivedMessage(from *pb.SequencedMessage, to *pubsub.Message) err return fmt.Errorf("%s: %s", errInvalidMessage.Error(), err) } } - if from.GetCursor() != nil { - to.ID = fmt.Sprintf("%d", from.GetCursor().GetOffset()) - } if len(msg.GetKey()) > 0 { to.OrderingKey = string(msg.GetKey()) } diff --git a/pubsublite/pscompat/message_test.go b/pubsublite/pscompat/message_test.go index 5a44f7e886d..e196ee2a00e 100644 --- a/pubsublite/pscompat/message_test.go +++ b/pubsublite/pscompat/message_test.go @@ -66,7 +66,6 @@ func TestMessageTransforms(t *testing.T) { }, }, wantMsg: &pubsub.Message{ - ID: "10", PublishTime: time.Unix(1577836800, 900800700), Data: []byte("foo"), OrderingKey: "bar", diff --git a/pubsublite/pscompat/settings.go b/pubsublite/pscompat/settings.go index 175ef3275f7..726c89804d1 100644 --- a/pubsublite/pscompat/settings.go +++ b/pubsublite/pscompat/settings.go @@ -146,8 +146,10 @@ func (s *PublishSettings) toWireSettings() wire.PublishSettings { type NackHandler func(*pubsub.Message) error // ReceiveMessageTransformerFunc transforms a Pub/Sub Lite SequencedMessage API -// proto to a pubsub.Message. If this returns an error, the SubscriberClient -// will consider this a fatal error and terminate. +// proto to a pubsub.Message. The implementation must not set pubsub.Message.ID. +// +// If this returns an error, the SubscriberClient will consider this a fatal +// error and terminate. type ReceiveMessageTransformerFunc func(*pb.SequencedMessage, *pubsub.Message) error // ReceiveSettings configure the SubscriberClient. Flow control settings diff --git a/pubsublite/pscompat/subscriber.go b/pubsublite/pscompat/subscriber.go index 80f6100be4a..709fddcc566 100644 --- a/pubsublite/pscompat/subscriber.go +++ b/pubsublite/pscompat/subscriber.go @@ -20,6 +20,7 @@ import ( "cloud.google.com/go/pubsub" "cloud.google.com/go/pubsublite/internal/wire" + "cloud.google.com/go/pubsublite/publish" "google.golang.org/api/option" ipubsub "cloud.google.com/go/internal/pubsub" @@ -28,6 +29,7 @@ import ( var ( errNackCalled = errors.New("pubsublite: subscriber client does not support nack. See NackHandler for how to customize nack handling") errDuplicateReceive = errors.New("pubsublite: receive is already in progress for this subscriber client") + errMessageIDSet = errors.New("pubsublite: pubsub.Message.ID must not be set") ) // handleNack is the default NackHandler implementation. @@ -127,6 +129,18 @@ func newSubscriberInstance(ctx context.Context, factory wireSubscriberFactory, s return subInstance, nil } +func (si *subscriberInstance) transformMessage(in *wire.ReceivedMessage, out *pubsub.Message) error { + if err := si.settings.MessageTransformer(in.Msg, out); err != nil { + return err + } + if len(out.ID) > 0 { + return errMessageIDSet + } + metadata := &publish.Metadata{Partition: in.Partition, Offset: in.Msg.GetCursor().GetOffset()} + out.ID = metadata.String() + return nil +} + func (si *subscriberInstance) onMessage(msg *wire.ReceivedMessage) { pslAckh := &pslAckHandler{ ackh: msg.Ack, @@ -135,7 +149,7 @@ func (si *subscriberInstance) onMessage(msg *wire.ReceivedMessage) { } psMsg := ipubsub.NewMessage(pslAckh) pslAckh.msg = psMsg - if err := si.settings.MessageTransformer(msg.Msg, psMsg); err != nil { + if err := si.transformMessage(msg, psMsg); err != nil { si.Terminate(err) return } diff --git a/pubsublite/pscompat/subscriber_test.go b/pubsublite/pscompat/subscriber_test.go index 36abe36736b..a614a744cda 100644 --- a/pubsublite/pscompat/subscriber_test.go +++ b/pubsublite/pscompat/subscriber_test.go @@ -127,6 +127,7 @@ func newTestSubscriberInstance(ctx context.Context, settings ReceiveSettings, re } func TestSubscriberInstanceTransformMessage(t *testing.T) { + const partition = 3 ctx := context.Background() input := &pb.SequencedMessage{ Message: &pb.PubSubMessage{ @@ -156,7 +157,7 @@ func TestSubscriberInstanceTransformMessage(t *testing.T) { Data: []byte("data"), OrderingKey: "key", Attributes: map[string]string{"attr": "value"}, - ID: "123", + ID: "3:123", PublishTime: time.Unix(1577836800, 900800700), }, }, @@ -173,6 +174,7 @@ func TestSubscriberInstanceTransformMessage(t *testing.T) { want: &pubsub.Message{ Data: []byte("key"), OrderingKey: "data", + ID: "3:123", }, }, } { @@ -181,7 +183,7 @@ func TestSubscriberInstanceTransformMessage(t *testing.T) { tc.mutateSettings(&settings) ack := &mockAckConsumer{} - msg := &wire.ReceivedMessage{Msg: input, Ack: ack} + msg := &wire.ReceivedMessage{Msg: input, Ack: ack, Partition: partition} cctx, stopSubscriber := context.WithTimeout(ctx, defaultSubscriberTestTimeout) messageReceiver := func(ctx context.Context, got *pubsub.Message) { @@ -212,41 +214,63 @@ func TestSubscriberInstanceTransformMessage(t *testing.T) { } func TestSubscriberInstanceTransformMessageError(t *testing.T) { - wantErr := errors.New("message could not be converted") + transformErr := errors.New("message could not be converted") - settings := DefaultReceiveSettings - settings.MessageTransformer = func(_ *pb.SequencedMessage, _ *pubsub.Message) error { - return wantErr - } - - ctx := context.Background() - ack := &mockAckConsumer{} - msg := &wire.ReceivedMessage{ - Ack: ack, - Msg: &pb.SequencedMessage{ - Message: &pb.PubSubMessage{Data: []byte("data")}, + for _, tc := range []struct { + desc string + transformer ReceiveMessageTransformerFunc + wantErr error + }{ + { + desc: "returns error", + transformer: func(_ *pb.SequencedMessage, _ *pubsub.Message) error { + return transformErr + }, + wantErr: transformErr, }, - } + { + desc: "sets message id", + transformer: func(_ *pb.SequencedMessage, out *pubsub.Message) error { + out.ID = "should_not_be_set" + return nil + }, + wantErr: errMessageIDSet, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + settings := DefaultReceiveSettings + settings.MessageTransformer = tc.transformer - cctx, _ := context.WithTimeout(ctx, defaultSubscriberTestTimeout) - messageReceiver := func(ctx context.Context, got *pubsub.Message) { - t.Errorf("Received unexpected message: %v", got) - got.Nack() - } - subInstance := newTestSubscriberInstance(cctx, settings, messageReceiver) - subInstance.wireSub.(*mockWireSubscriber).DeliverMessages(msg) + ctx := context.Background() + ack := &mockAckConsumer{} + msg := &wire.ReceivedMessage{ + Ack: ack, + Msg: &pb.SequencedMessage{ + Message: &pb.PubSubMessage{Data: []byte("data")}, + }, + } - if gotErr := subInstance.Wait(cctx); !test.ErrorEqual(gotErr, wantErr) { - t.Errorf("subscriberInstance.Wait() got err: (%v), want: (%v)", gotErr, wantErr) - } - if got, want := ack.AckCount, 0; got != want { - t.Errorf("mockAckConsumer.AckCount: got %d, want %d", got, want) - } - if got, want := subInstance.recvCtx.Err(), context.Canceled; !test.ErrorEqual(got, want) { - t.Errorf("subscriberInstance.recvCtx.Err(): got (%v), want (%v)", got, want) - } - if got, want := subInstance.wireSub.(*mockWireSubscriber).Terminated, true; got != want { - t.Errorf("mockWireSubscriber.Terminated: got %v, want %v", got, want) + cctx, _ := context.WithTimeout(ctx, defaultSubscriberTestTimeout) + messageReceiver := func(ctx context.Context, got *pubsub.Message) { + t.Errorf("Received unexpected message: %v", got) + got.Nack() + } + subInstance := newTestSubscriberInstance(cctx, settings, messageReceiver) + subInstance.wireSub.(*mockWireSubscriber).DeliverMessages(msg) + + if gotErr := subInstance.Wait(cctx); !test.ErrorEqual(gotErr, tc.wantErr) { + t.Errorf("subscriberInstance.Wait() got err: (%v), want: (%v)", gotErr, tc.wantErr) + } + if got, want := ack.AckCount, 0; got != want { + t.Errorf("mockAckConsumer.AckCount: got %d, want %d", got, want) + } + if got, want := subInstance.recvCtx.Err(), context.Canceled; !test.ErrorEqual(got, want) { + t.Errorf("subscriberInstance.recvCtx.Err(): got (%v), want (%v)", got, want) + } + if got, want := subInstance.wireSub.(*mockWireSubscriber).Terminated, true; got != want { + t.Errorf("mockWireSubscriber.Terminated: got %v, want %v", got, want) + } + }) } } diff --git a/pubsublite/publish/example_test.go b/pubsublite/publish/example_test.go new file mode 100644 index 00000000000..5227083aeca --- /dev/null +++ b/pubsublite/publish/example_test.go @@ -0,0 +1,65 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and + +package publish_test + +import ( + "context" + "fmt" + + "cloud.google.com/go/pubsub" + "cloud.google.com/go/pubsublite/pscompat" + "cloud.google.com/go/pubsublite/publish" +) + +func ExampleParseMetadata_publisher() { + ctx := context.Background() + const topic = "projects/my-project/locations/zone/topics/my-topic" + publisher, err := pscompat.NewPublisherClient(ctx, topic) + if err != nil { + // TODO: Handle error. + } + defer publisher.Stop() + + result := publisher.Publish(ctx, &pubsub.Message{Data: []byte("payload")}) + id, err := result.Get(ctx) + if err != nil { + // TODO: Handle error. + } + metadata, err := publish.ParseMetadata(id) + if err != nil { + // TODO: Handle error. + } + fmt.Printf("Published message to partition %d with offset %d\n", metadata.Partition, metadata.Offset) +} + +func ExampleParseMetadata_subscriber() { + ctx := context.Background() + const subscription = "projects/my-project/locations/zone/subscriptions/my-subscription" + subscriber, err := pscompat.NewSubscriberClient(ctx, subscription) + if err != nil { + // TODO: Handle error. + } + err = subscriber.Receive(ctx, func(ctx context.Context, m *pubsub.Message) { + // TODO: Handle message. + m.Ack() + metadata, err := publish.ParseMetadata(m.ID) + if err != nil { + // TODO: Handle error. + } + fmt.Printf("Received message from partition %d with offset %d\n", metadata.Partition, metadata.Offset) + }) + if err != nil { + // TODO: Handle error. + } +} diff --git a/pubsublite/publish/metadata.go b/pubsublite/publish/metadata.go index ed16fac09ca..1db88f6e040 100644 --- a/pubsublite/publish/metadata.go +++ b/pubsublite/publish/metadata.go @@ -34,18 +34,9 @@ func (m *Metadata) String() string { return fmt.Sprintf("%d:%d", m.Partition, m.Offset) } -// ParseMetadata converts the ID string of a pubsub.PublishResult to Metadata. -// -// Example: -// result := publisher.Publish(ctx, &pubsub.Message{Data: []byte("payload")}) -// id, err := result.Get(ctx) -// if err != nil { -// // TODO: Handle error. -// } -// metadata, err := publish.ParseMetadata(id) -// if err != nil { -// // TODO: Handle error. -// } +// ParseMetadata creates Metadata from the ID string of a pubsub.PublishResult +// returned by pscompat.PublisherClient or pubsub.Message.ID received from +// pscompat.SubscriberClient. func ParseMetadata(id string) (*Metadata, error) { parts := strings.Split(id, ":") if len(parts) != 2 {