Skip to content

Commit

Permalink
fix(pubsub): add deadletter and retries handling in the fake pubsub (#…
Browse files Browse the repository at this point in the history
…5320)

* fix(pubsub): add deadletter and retries handling in the fake pubsub server

* fix(pubsub): Sync delivery attempt field here before incrementing if dead letter policy is set

Co-authored-by: Alex Hong <9397363+hongalex@users.noreply.github.com>

* fix(pubsub): delivery attempts to proto as well; typos

Co-authored-by: Alex Hong <9397363+hongalex@users.noreply.github.com>
  • Loading branch information
yottta and hongalex committed Jan 18, 2022
1 parent a811e30 commit 116a610
Show file tree
Hide file tree
Showing 2 changed files with 292 additions and 23 deletions.
113 changes: 92 additions & 21 deletions pubsub/pstest/fake.go
Expand Up @@ -426,6 +426,14 @@ func (s *GServer) DeleteTopic(_ context.Context, req *pb.DeleteTopicRequest) (*e
if t == nil {
return nil, status.Errorf(codes.NotFound, "topic %q", req.Topic)
}
for _, sub := range s.subs {
if sub.deadLetterTopic == nil {
continue
}
if req.Topic == sub.deadLetterTopic.proto.Name {
return nil, status.Errorf(codes.FailedPrecondition, "topic %q used as deadLetter for %s", req.Topic, sub.proto.Name)
}
}
t.stop()
delete(s.topics, req.Topic)
return &emptypb.Empty{}, nil
Expand Down Expand Up @@ -464,8 +472,16 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p
if ps.PushConfig == nil {
ps.PushConfig = &pb.PushConfig{}
}
var deadLetterTopic *topic
if ps.DeadLetterPolicy != nil {
dlTopic, ok := s.topics[ps.DeadLetterPolicy.DeadLetterTopic]
if !ok {
return nil, status.Errorf(codes.NotFound, "deadLetter topic %q", ps.DeadLetterPolicy.DeadLetterTopic)
}
deadLetterTopic = dlTopic
}

sub := newSubscription(top, &s.mu, s.timeNowFunc, ps)
sub := newSubscription(top, &s.mu, s.timeNowFunc, deadLetterTopic, ps)
top.subs[ps.Name] = sub
s.subs[ps.Name] = sub
sub.start(&s.wg)
Expand Down Expand Up @@ -575,6 +591,13 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti

case "dead_letter_policy":
sub.proto.DeadLetterPolicy = req.Subscription.DeadLetterPolicy
if sub.proto.DeadLetterPolicy != nil {
dlTopic, ok := s.topics[sub.proto.DeadLetterPolicy.DeadLetterTopic]
if !ok {
return nil, status.Errorf(codes.NotFound, "topic %q", sub.proto.DeadLetterPolicy.DeadLetterTopic)
}
sub.deadLetterTopic = dlTopic
}

case "retry_policy":
sub.proto.RetryPolicy = req.Subscription.RetryPolicy
Expand Down Expand Up @@ -734,29 +757,31 @@ func (t *topic) publish(pm *pb.PubsubMessage, m *Message) {
}

type subscription struct {
topic *topic
mu *sync.Mutex // the server mutex, here for convenience
proto *pb.Subscription
ackTimeout time.Duration
msgs map[string]*message // unacked messages by message ID
streams []*stream
done chan struct{}
timeNowFunc func() time.Time
}

func newSubscription(t *topic, mu *sync.Mutex, timeNowFunc func() time.Time, ps *pb.Subscription) *subscription {
topic *topic
deadLetterTopic *topic
mu *sync.Mutex // the server mutex, here for convenience
proto *pb.Subscription
ackTimeout time.Duration
msgs map[string]*message // unacked messages by message ID
streams []*stream
done chan struct{}
timeNowFunc func() time.Time
}

func newSubscription(t *topic, mu *sync.Mutex, timeNowFunc func() time.Time, deadLetterTopic *topic, ps *pb.Subscription) *subscription {
at := time.Duration(ps.AckDeadlineSeconds) * time.Second
if at == 0 {
at = 10 * time.Second
}
return &subscription{
topic: t,
mu: mu,
proto: ps,
ackTimeout: at,
msgs: map[string]*message{},
done: make(chan struct{}),
timeNowFunc: timeNowFunc,
topic: t,
deadLetterTopic: deadLetterTopic,
mu: mu,
proto: ps,
ackTimeout: at,
msgs: map[string]*message{},
done: make(chan struct{}),
timeNowFunc: timeNowFunc,
}
}

Expand Down Expand Up @@ -954,10 +979,18 @@ func (s *subscription) pull(max int) []*pb.ReceivedMessage {
now := s.timeNowFunc()
s.maintainMessages(now)
var msgs []*pb.ReceivedMessage
for _, m := range s.msgs {
for id, m := range s.msgs {
if m.outstanding() {
continue
}
if s.deadLetterCandidate(m) {
s.ack(id)
s.publishToDeadLetter(m)
continue
}
if s.proto.DeadLetterPolicy != nil {
m.proto.DeliveryAttempt = int32(*m.deliveries)
}
(*m.deliveries)++
m.ackDeadline = now.Add(s.ackTimeout)
msgs = append(msgs, m.proto)
Expand All @@ -976,10 +1009,15 @@ func (s *subscription) deliver() {
s.maintainMessages(now)
// Try to deliver each remaining message.
curIndex := 0
for _, m := range s.msgs {
for id, m := range s.msgs {
if m.outstanding() {
continue
}
if s.deadLetterCandidate(m) {
s.ack(id)
s.publishToDeadLetter(m)
continue
}
// If the message was never delivered before, start with the stream at
// curIndex. If it was delivered before, start with the stream after the one
// that owned it.
Expand Down Expand Up @@ -1041,6 +1079,7 @@ func (s *subscription) maintainMessages(now time.Time) {
pubTime := m.proto.Message.PublishTime.AsTime()
// Remove messages that have been undelivered for a long time.
if !m.outstanding() && now.Sub(pubTime) > retentionDuration {
s.publishToDeadLetter(m)
delete(s.msgs, id)
}
}
Expand Down Expand Up @@ -1074,6 +1113,33 @@ func (s *subscription) deleteStream(st *stream) {
s.streams = deleteStreamAt(s.streams, i)
}
}

func (s *subscription) deadLetterCandidate(m *message) bool {
if s.proto.DeadLetterPolicy == nil {
return false
}
if m.retriesDone(s.proto.DeadLetterPolicy.MaxDeliveryAttempts) {
return true
}
return false
}

func (s *subscription) publishToDeadLetter(m *message) {
acks := 0
if m.acks != nil {
acks = *m.acks
}
deliveries := 0
if m.deliveries != nil {
deliveries = *m.deliveries
}
s.deadLetterTopic.publish(m.proto.Message, &Message{
PublishTime: m.publishTime,
Acks: acks,
Deliveries: deliveries,
})
}

func deleteStreamAt(s []*stream, i int) []*stream {
// Preserve order for round-robin delivery.
return append(s[:i], s[i+1:]...)
Expand All @@ -1093,6 +1159,11 @@ func (m *message) outstanding() bool {
return !m.ackDeadline.IsZero()
}

// A message is outstanding if it is owned by some stream.
func (m *message) retriesDone(maxRetries int32) bool {
return m.deliveries != nil && int32(*m.deliveries) >= maxRetries
}

func (m *message) makeAvailable() {
m.ackDeadline = time.Time{}
}
Expand Down

0 comments on commit 116a610

Please sign in to comment.