Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsub): add deadletter and retries handling in the fake pubsub #5320

Merged
merged 6 commits into from Jan 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
113 changes: 92 additions & 21 deletions pubsub/pstest/fake.go
Expand Up @@ -426,6 +426,14 @@ func (s *GServer) DeleteTopic(_ context.Context, req *pb.DeleteTopicRequest) (*e
if t == nil {
return nil, status.Errorf(codes.NotFound, "topic %q", req.Topic)
}
for _, sub := range s.subs {
if sub.deadLetterTopic == nil {
continue
}
if req.Topic == sub.deadLetterTopic.proto.Name {
return nil, status.Errorf(codes.FailedPrecondition, "topic %q used as deadLetter for %s", req.Topic, sub.proto.Name)
}
}
t.stop()
delete(s.topics, req.Topic)
return &emptypb.Empty{}, nil
Expand Down Expand Up @@ -464,8 +472,16 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p
if ps.PushConfig == nil {
ps.PushConfig = &pb.PushConfig{}
}
var deadLetterTopic *topic
if ps.DeadLetterPolicy != nil {
dlTopic, ok := s.topics[ps.DeadLetterPolicy.DeadLetterTopic]
if !ok {
return nil, status.Errorf(codes.NotFound, "deadLetter topic %q", ps.DeadLetterPolicy.DeadLetterTopic)
}
deadLetterTopic = dlTopic
}

sub := newSubscription(top, &s.mu, s.timeNowFunc, ps)
sub := newSubscription(top, &s.mu, s.timeNowFunc, deadLetterTopic, ps)
top.subs[ps.Name] = sub
s.subs[ps.Name] = sub
sub.start(&s.wg)
Expand Down Expand Up @@ -575,6 +591,13 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti

case "dead_letter_policy":
sub.proto.DeadLetterPolicy = req.Subscription.DeadLetterPolicy
if sub.proto.DeadLetterPolicy != nil {
dlTopic, ok := s.topics[sub.proto.DeadLetterPolicy.DeadLetterTopic]
if !ok {
return nil, status.Errorf(codes.NotFound, "topic %q", sub.proto.DeadLetterPolicy.DeadLetterTopic)
}
sub.deadLetterTopic = dlTopic
}

case "retry_policy":
sub.proto.RetryPolicy = req.Subscription.RetryPolicy
Expand Down Expand Up @@ -734,29 +757,31 @@ func (t *topic) publish(pm *pb.PubsubMessage, m *Message) {
}

type subscription struct {
topic *topic
mu *sync.Mutex // the server mutex, here for convenience
proto *pb.Subscription
ackTimeout time.Duration
msgs map[string]*message // unacked messages by message ID
streams []*stream
done chan struct{}
timeNowFunc func() time.Time
}

func newSubscription(t *topic, mu *sync.Mutex, timeNowFunc func() time.Time, ps *pb.Subscription) *subscription {
topic *topic
deadLetterTopic *topic
mu *sync.Mutex // the server mutex, here for convenience
proto *pb.Subscription
ackTimeout time.Duration
msgs map[string]*message // unacked messages by message ID
streams []*stream
done chan struct{}
timeNowFunc func() time.Time
}

func newSubscription(t *topic, mu *sync.Mutex, timeNowFunc func() time.Time, deadLetterTopic *topic, ps *pb.Subscription) *subscription {
at := time.Duration(ps.AckDeadlineSeconds) * time.Second
if at == 0 {
at = 10 * time.Second
}
return &subscription{
topic: t,
mu: mu,
proto: ps,
ackTimeout: at,
msgs: map[string]*message{},
done: make(chan struct{}),
timeNowFunc: timeNowFunc,
topic: t,
deadLetterTopic: deadLetterTopic,
mu: mu,
proto: ps,
ackTimeout: at,
msgs: map[string]*message{},
done: make(chan struct{}),
timeNowFunc: timeNowFunc,
}
}

Expand Down Expand Up @@ -954,10 +979,18 @@ func (s *subscription) pull(max int) []*pb.ReceivedMessage {
now := s.timeNowFunc()
s.maintainMessages(now)
var msgs []*pb.ReceivedMessage
for _, m := range s.msgs {
for id, m := range s.msgs {
if m.outstanding() {
continue
}
if s.deadLetterCandidate(m) {
s.ack(id)
s.publishToDeadLetter(m)
continue
}
yottta marked this conversation as resolved.
Show resolved Hide resolved
if s.proto.DeadLetterPolicy != nil {
m.proto.DeliveryAttempt = int32(*m.deliveries)
}
(*m.deliveries)++
m.ackDeadline = now.Add(s.ackTimeout)
msgs = append(msgs, m.proto)
Expand All @@ -976,10 +1009,15 @@ func (s *subscription) deliver() {
s.maintainMessages(now)
// Try to deliver each remaining message.
curIndex := 0
for _, m := range s.msgs {
for id, m := range s.msgs {
if m.outstanding() {
continue
}
if s.deadLetterCandidate(m) {
s.ack(id)
s.publishToDeadLetter(m)
continue
}
// If the message was never delivered before, start with the stream at
// curIndex. If it was delivered before, start with the stream after the one
// that owned it.
Expand Down Expand Up @@ -1041,6 +1079,7 @@ func (s *subscription) maintainMessages(now time.Time) {
pubTime := m.proto.Message.PublishTime.AsTime()
// Remove messages that have been undelivered for a long time.
if !m.outstanding() && now.Sub(pubTime) > retentionDuration {
s.publishToDeadLetter(m)
delete(s.msgs, id)
}
}
Expand Down Expand Up @@ -1074,6 +1113,33 @@ func (s *subscription) deleteStream(st *stream) {
s.streams = deleteStreamAt(s.streams, i)
}
}

func (s *subscription) deadLetterCandidate(m *message) bool {
if s.proto.DeadLetterPolicy == nil {
return false
}
if m.retriesDone(s.proto.DeadLetterPolicy.MaxDeliveryAttempts) {
return true
}
return false
}

func (s *subscription) publishToDeadLetter(m *message) {
acks := 0
if m.acks != nil {
acks = *m.acks
}
deliveries := 0
if m.deliveries != nil {
deliveries = *m.deliveries
}
s.deadLetterTopic.publish(m.proto.Message, &Message{
Copy link
Contributor Author

@yottta yottta Jan 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I doubt that I need to use the original message properties (deliveries and acks) since the publish method is not allowing the proto values as well for these properties.
This is about the changes that you suggested around m.proto.DeliveryAttempt.
Also you can check fake_test.go#382

PublishTime: m.publishTime,
Acks: acks,
Deliveries: deliveries,
})
}

func deleteStreamAt(s []*stream, i int) []*stream {
// Preserve order for round-robin delivery.
return append(s[:i], s[i+1:]...)
Expand All @@ -1093,6 +1159,11 @@ func (m *message) outstanding() bool {
return !m.ackDeadline.IsZero()
}

// A message is outstanding if it is owned by some stream.
func (m *message) retriesDone(maxRetries int32) bool {
return m.deliveries != nil && int32(*m.deliveries) >= maxRetries
}

func (m *message) makeAvailable() {
m.ackDeadline = time.Time{}
}
Expand Down