diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index 69c879fc0c2..58c37d36ada 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -426,6 +426,14 @@ func (s *GServer) DeleteTopic(_ context.Context, req *pb.DeleteTopicRequest) (*e if t == nil { return nil, status.Errorf(codes.NotFound, "topic %q", req.Topic) } + for _, sub := range s.subs { + if sub.deadLetterTopic == nil { + continue + } + if req.Topic == sub.deadLetterTopic.proto.Name { + return nil, status.Errorf(codes.FailedPrecondition, "topic %q used as deadLetter for %s", req.Topic, sub.proto.Name) + } + } t.stop() delete(s.topics, req.Topic) return &emptypb.Empty{}, nil @@ -464,8 +472,16 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p if ps.PushConfig == nil { ps.PushConfig = &pb.PushConfig{} } + var deadLetterTopic *topic + if ps.DeadLetterPolicy != nil { + dlTopic, ok := s.topics[ps.DeadLetterPolicy.DeadLetterTopic] + if !ok { + return nil, status.Errorf(codes.NotFound, "deadLetter topic %q", ps.DeadLetterPolicy.DeadLetterTopic) + } + deadLetterTopic = dlTopic + } - sub := newSubscription(top, &s.mu, s.timeNowFunc, ps) + sub := newSubscription(top, &s.mu, s.timeNowFunc, deadLetterTopic, ps) top.subs[ps.Name] = sub s.subs[ps.Name] = sub sub.start(&s.wg) @@ -575,6 +591,13 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti case "dead_letter_policy": sub.proto.DeadLetterPolicy = req.Subscription.DeadLetterPolicy + if sub.proto.DeadLetterPolicy != nil { + dlTopic, ok := s.topics[sub.proto.DeadLetterPolicy.DeadLetterTopic] + if !ok { + return nil, status.Errorf(codes.NotFound, "topic %q", sub.proto.DeadLetterPolicy.DeadLetterTopic) + } + sub.deadLetterTopic = dlTopic + } case "retry_policy": sub.proto.RetryPolicy = req.Subscription.RetryPolicy @@ -734,29 +757,31 @@ func (t *topic) publish(pm *pb.PubsubMessage, m *Message) { } type subscription struct { - topic *topic - mu *sync.Mutex // the server mutex, here for convenience - proto *pb.Subscription - ackTimeout time.Duration - msgs map[string]*message // unacked messages by message ID - streams []*stream - done chan struct{} - timeNowFunc func() time.Time -} - -func newSubscription(t *topic, mu *sync.Mutex, timeNowFunc func() time.Time, ps *pb.Subscription) *subscription { + topic *topic + deadLetterTopic *topic + mu *sync.Mutex // the server mutex, here for convenience + proto *pb.Subscription + ackTimeout time.Duration + msgs map[string]*message // unacked messages by message ID + streams []*stream + done chan struct{} + timeNowFunc func() time.Time +} + +func newSubscription(t *topic, mu *sync.Mutex, timeNowFunc func() time.Time, deadLetterTopic *topic, ps *pb.Subscription) *subscription { at := time.Duration(ps.AckDeadlineSeconds) * time.Second if at == 0 { at = 10 * time.Second } return &subscription{ - topic: t, - mu: mu, - proto: ps, - ackTimeout: at, - msgs: map[string]*message{}, - done: make(chan struct{}), - timeNowFunc: timeNowFunc, + topic: t, + deadLetterTopic: deadLetterTopic, + mu: mu, + proto: ps, + ackTimeout: at, + msgs: map[string]*message{}, + done: make(chan struct{}), + timeNowFunc: timeNowFunc, } } @@ -954,10 +979,18 @@ func (s *subscription) pull(max int) []*pb.ReceivedMessage { now := s.timeNowFunc() s.maintainMessages(now) var msgs []*pb.ReceivedMessage - for _, m := range s.msgs { + for id, m := range s.msgs { if m.outstanding() { continue } + if s.deadLetterCandidate(m) { + s.ack(id) + s.publishToDeadLetter(m) + continue + } + if s.proto.DeadLetterPolicy != nil { + m.proto.DeliveryAttempt = int32(*m.deliveries) + } (*m.deliveries)++ m.ackDeadline = now.Add(s.ackTimeout) msgs = append(msgs, m.proto) @@ -976,10 +1009,15 @@ func (s *subscription) deliver() { s.maintainMessages(now) // Try to deliver each remaining message. curIndex := 0 - for _, m := range s.msgs { + for id, m := range s.msgs { if m.outstanding() { continue } + if s.deadLetterCandidate(m) { + s.ack(id) + s.publishToDeadLetter(m) + continue + } // If the message was never delivered before, start with the stream at // curIndex. If it was delivered before, start with the stream after the one // that owned it. @@ -1041,6 +1079,7 @@ func (s *subscription) maintainMessages(now time.Time) { pubTime := m.proto.Message.PublishTime.AsTime() // Remove messages that have been undelivered for a long time. if !m.outstanding() && now.Sub(pubTime) > retentionDuration { + s.publishToDeadLetter(m) delete(s.msgs, id) } } @@ -1074,6 +1113,33 @@ func (s *subscription) deleteStream(st *stream) { s.streams = deleteStreamAt(s.streams, i) } } + +func (s *subscription) deadLetterCandidate(m *message) bool { + if s.proto.DeadLetterPolicy == nil { + return false + } + if m.retriesDone(s.proto.DeadLetterPolicy.MaxDeliveryAttempts) { + return true + } + return false +} + +func (s *subscription) publishToDeadLetter(m *message) { + acks := 0 + if m.acks != nil { + acks = *m.acks + } + deliveries := 0 + if m.deliveries != nil { + deliveries = *m.deliveries + } + s.deadLetterTopic.publish(m.proto.Message, &Message{ + PublishTime: m.publishTime, + Acks: acks, + Deliveries: deliveries, + }) +} + func deleteStreamAt(s []*stream, i int) []*stream { // Preserve order for round-robin delivery. return append(s[:i], s[i+1:]...) @@ -1093,6 +1159,11 @@ func (m *message) outstanding() bool { return !m.ackDeadline.IsZero() } +// A message is outstanding if it is owned by some stream. +func (m *message) retriesDone(maxRetries int32) bool { + return m.deliveries != nil && int32(*m.deliveries) >= maxRetries +} + func (m *message) makeAvailable() { m.ackDeadline = time.Time{} } diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index 13ede4de927..2433da6d2d8 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -15,9 +15,11 @@ package pstest import ( + "bytes" "context" "fmt" "io" + "math/rand" "net" "reflect" "strings" @@ -61,7 +63,7 @@ func TestNewServerWithPort(t *testing.T) { } func TestTopics(t *testing.T) { - pclient, _, server, cleanup := newFake(context.TODO(), t) + pclient, sclient, server, cleanup := newFake(context.TODO(), t) defer cleanup() ctx := context.Background() @@ -101,6 +103,39 @@ func TestTopics(t *testing.T) { if got, want := len(server.GServer.topics), 0; got != want { t.Fatalf("got %d topics, want %d", got, want) } + + t.Run(`Given a topic that is used by a subscription as deadLetter, + When topic deleted, + Then error raised`, func(t *testing.T) { + var topics []*pb.Topic + for i := 1; i < 3; i++ { + topics = append(topics, mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{ + Name: fmt.Sprintf("projects/P/topics/T%d", i), + Labels: map[string]string{"num": fmt.Sprintf("%d", i)}, + })) + } + + if got, want := len(server.GServer.topics), len(topics); got != want { + t.Fatalf("got %d topics, want %d", got, want) + } + + s := mustCreateSubscription(ctx, t, sclient, &pb.Subscription{ + Name: fmt.Sprintf("project/P/subscriptions/sub_with_deadLetter"), + Topic: topics[0].Name, + AckDeadlineSeconds: 10, + DeadLetterPolicy: &pb.DeadLetterPolicy{ + DeadLetterTopic: topics[1].Name, + }, + }) + + _, err := pclient.DeleteTopic(ctx, &pb.DeleteTopicRequest{ + Topic: topics[1].Name, + }) + expectedErr := status.Errorf(codes.FailedPrecondition, "topic %q used as deadLetter for %s", topics[1].Name, s.Name) + if err == nil || err.Error() != expectedErr.Error() { + t.Fatalf("returned a different error than the expected one. \nReceived '%s'; \nExpected: '%s'", err, expectedErr) + } + }) } func TestSubscriptions(t *testing.T) { @@ -169,6 +204,30 @@ func TestSubscriptions(t *testing.T) { if got, want := len(server.GServer.subs), 0; got != want { t.Fatalf("got %d subscriptions, want %d", got, want) } + + t.Run(`Given a subscription creation, + When called with a deadLetter topic that does not exist, + Then error returned`, func(t *testing.T) { + topic := mustCreateTopic(ctx, t, pclient, &pb.Topic{Name: "projects/P/topics/test"}) + _, err := server.GServer.CreateSubscription(ctx, &pb.Subscription{ + Name: "projects/P/subscriptions/test", + Topic: topic.Name, + AckDeadlineSeconds: 10, + DeadLetterPolicy: &pb.DeadLetterPolicy{ + DeadLetterTopic: "projects/P/topics/nonexisting", + }, + }) + expectedErr := status.Errorf(codes.NotFound, "deadLetter topic \"projects/P/topics/nonexisting\"") + if err == nil || err.Error() != expectedErr.Error() { + t.Fatalf("expected subscription creation to fail with a specific err but it didn't. \nError: %s \nExepcted err: %s", err, expectedErr) + } + _, err = server.GServer.DeleteTopic(ctx, &pb.DeleteTopicRequest{ + Topic: topic.Name, + }) + if err != nil { + t.Fatalf("unexpected error during deleting topic") + } + }) } func TestSubscriptionErrors(t *testing.T) { @@ -219,6 +278,145 @@ func TestSubscriptionErrors(t *testing.T) { checkCode(err, codes.NotFound) } +func TestSubscriptionDeadLetter(t *testing.T) { + _, _, server, cleanup := newFake(context.TODO(), t) + defer cleanup() + + ctx := context.Background() + + topic, err := server.GServer.CreateTopic(ctx, &pb.Topic{ + Name: "projects/P/topics/in", + }) + if err != nil { + t.Fatalf("failed to create in topic") + } + deadLetterTopic, err := server.GServer.CreateTopic(ctx, &pb.Topic{ + Name: "projects/P/topics/deadLetter", + }) + if err != nil { + t.Fatalf("failed to create deadLetter topic") + } + retries := 3 + sub, err := server.GServer.CreateSubscription(ctx, &pb.Subscription{ + Name: "projects/P/subscriptions/S", + Topic: topic.Name, + AckDeadlineSeconds: 10, + DeadLetterPolicy: &pb.DeadLetterPolicy{ + DeadLetterTopic: deadLetterTopic.Name, + MaxDeliveryAttempts: int32(retries), + }, + }) + if err != nil { + t.Fatalf("failed to create subscription") + } + dlSub, err := server.GServer.CreateSubscription(ctx, &pb.Subscription{ + Name: "projects/P/subscriptions/SD", + Topic: deadLetterTopic.Name, + AckDeadlineSeconds: 10, + }) + if err != nil { + t.Fatalf("failed to create subscription") + } + + messageData := []byte("message data") + _, err = server.GServer.Publish(ctx, &pb.PublishRequest{ + Topic: topic.Name, + Messages: []*pb.PubsubMessage{ + { + Data: messageData, + }, + }, + }) + if err != nil { + t.Fatalf("failed to publish message") + } + rand.Seed(time.Now().UTC().UnixNano()) + maxAttempts := rand.Intn(5) + retries + for i := 0; i < maxAttempts; i++ { + pull, err := server.GServer.Pull(ctx, &pb.PullRequest{ + Subscription: sub.Name, + MaxMessages: 10, + }) + if err != nil { + t.Fatalf("failed during pull") + } + if i < retries { + if len(pull.ReceivedMessages) != 1 { + t.Fatalf("expected 1 message received a different number %d", len(pull.ReceivedMessages)) + + } + for _, m := range pull.ReceivedMessages { + if int32(i) != m.DeliveryAttempt { + t.Fatalf("message delivery attempt not the expected one. expected: %d, actual: %d", i, m.DeliveryAttempt) + } + _, err := server.GServer.ModifyAckDeadline(ctx, &pb.ModifyAckDeadlineRequest{ + Subscription: sub.Name, + AckIds: []string{m.AckId}, + AckDeadlineSeconds: 0, + }) + if err != nil { + t.Fatalf("failed to modify ack deadline") + } + } + } else { + if len(pull.ReceivedMessages) > 0 { + t.Fatalf("received a non empty list of messages %d", len(pull.ReceivedMessages)) + } + } + } + + dlPull, err := server.GServer.Pull(ctx, &pb.PullRequest{ + Subscription: dlSub.Name, + MaxMessages: 10, + }) + if err != nil { + t.Fatalf("failed during pulling from deadLetter sub") + } + if len(dlPull.ReceivedMessages) != 1 { + t.Fatalf("expected 1 message received a different number %d", len(dlPull.ReceivedMessages)) + } + receivedMessage := dlPull.ReceivedMessages[0] + if bytes.Compare(receivedMessage.Message.Data, messageData) != 0 { + t.Fatalf("unexpected message received from deadLetter") + } + if receivedMessage.DeliveryAttempt > 0 { + t.Fatalf("message sent to deadLetter should not have the deliveryAttempt value from the original subscription message") + } + _, err = server.GServer.Acknowledge(ctx, &pb.AcknowledgeRequest{ + Subscription: dlSub.Name, + AckIds: []string{receivedMessage.GetAckId()}, + }) + if err != nil { + t.Fatalf("failed to acknowledge message from deadLetter") + } + + for _, s := range []string{"projects/P/subscriptions/S", "projects/P/subscriptions/SD"} { + _, err = server.GServer.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{ + Subscription: s, + }) + if err != nil { + t.Fatalf("failed to delete subscription %s; error: %s", s, err) + } + } + + for _, delTopic := range []string{"projects/P/topics/in", "projects/P/topics/deadLetter"} { + _, err = server.GServer.DeleteTopic(ctx, &pb.DeleteTopicRequest{ + Topic: delTopic, + }) + if err != nil { + t.Fatalf("failed to delete topic %s; error: %s", delTopic, err) + } + } + + if got, want := len(server.GServer.subs), 0; got != want { + t.Fatalf("got %d subscriptions, want %d", got, want) + } + + if got, want := len(server.GServer.topics), 0; got != want { + t.Fatalf("got %d topics, want %d", got, want) + } +} + func TestPublish(t *testing.T) { s := NewServer() defer s.Close() @@ -684,7 +882,7 @@ func TestTryDeliverMessage(t *testing.T) { {availStreamIdx: 3, expectedOutIdx: 2}, // s0, s1 (deleted), s2, s3 becomes s0, s2, s3. So we expect outIdx=2. } { top := newTopic(&pb.Topic{Name: "some-topic"}) - sub := newSubscription(top, &sync.Mutex{}, time.Now, &pb.Subscription{Name: "some-sub", Topic: "some-topic"}) + sub := newSubscription(top, &sync.Mutex{}, time.Now, nil, &pb.Subscription{Name: "some-sub", Topic: "some-topic"}) done := make(chan struct{}, 1) done <- struct{}{}