Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic retention settings causes consumer to skip consuming new messages that arrived after deleted messages #2898

Closed
bentcoder opened this issue May 14, 2024 · 5 comments

Comments

@bentcoder
Copy link

bentcoder commented May 14, 2024

Description

Hi,

If I write in order, it would probably be clearer so here it is:

  1. In the producer app, I set message retention as 10000 ms at topic creation and run it. The consumer app is not running yet.
  2. Producer app sends 3 some messages to the topic.
  3. I let 5 minutes go by so that the messages are auto removed from the topic. Topic has 0 messages now which is good.
  4. I produce 3 messages again. Topic has 3 messages now.
  5. I run consumer app straight after but it doesn't see/consume (I can see them UI) those pending messages! WHY?
  6. I produce 3 messages again. Topic has 6 messages now.
  7. Because consumer is already running, it consumes these 3 new messages!

So technically what's happening here is that, when messages are deleted from topic due to retention settings, consumer will not see new messages that were added after deleted messages. This happens if I run consumer after adding new messages. If I run it in the middle and send messages, it would consume them.

This issue doesn't exists if I don't set those two retention entries at ConfigEntries.

Versions
Sarama Kafka Go
v1.43.2 v3.3.2 1.22
Configuration

Producer app

Create topic.

ms := "10000"

err := adm.CreateTopic("any-day", &sarama.TopicDetail{
	NumPartitions:     1,
	ReplicationFactor: 1,
	ConfigEntries: map[string]*string{
		"retention.ms":        &ms,
		"delete.retention.ms": &ms,
	},
}, false)

Send message.

msg := sarama.ProducerMessage{
	Topic:     "any-day",
	Value:     sarama.StringEncoder(data),
	Key:       sarama.StringEncoder(item.ID),
	Timestamp: time.Now().UTC(),
}

part, offs, err := syncProducer.SendMessage(&msg)

Consumer app

type Kafka struct { // ... }

func (k Kafka) CreateConsumerGroup(topic string) error {
	grp, err := sarama.NewConsumerGroup(k.addrs, topic+"-group", k.config)
	if err != nil {
		return err
	}

	go func() {
		for err := range grp.Errors() {
			fmt.Println("ERROR", err)
		}
	}()

	cons := AnyDay{}

	for {
		err := grp.Consume(context.Background(), []string{topic}, cons)
		if err != nil {
			fmt.Printf("error from consumer: %v", err)
		}
	}
}

type AnyDay struct{}

func (AnyDay) Setup(sarama.ConsumerGroupSession) error   { return nil }
func (AnyDay) Cleanup(sarama.ConsumerGroupSession) error { return nil }
func (AnyDay) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	i := 1
	for msg := range claim.Messages() {
		par := msg.Partition
		off := msg.Offset
		top := msg.Topic
		key := msg.Key

		fmt.Printf("%d: PAR: %d OFF: %d TOP: %s KEY: %s > %s\n", i, par, off, top, key, string(msg.Value))

		sess.MarkMessage(msg, "")

		i++
	}

	return nil
}
@puellanivis
Copy link
Contributor

This sort of looks like what I would expect if you had Config.Offsets.Initial == OffsetNewest. When the consumer joins the topic, it would then be waiting for newer messages, and not from the start. Relevant comments on the field from godoc:

// The initial offset to use if no offset was previously committed.
// Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
Initial int64

@bentcoder
Copy link
Author

I actually changed it to OffsetOldest and it has been working fine since but I wasn't sure if this was, in fact, the solution. Hence I wanted to wait for a comment.

@puellanivis
Copy link
Contributor

Great news. :) Good to get feedback that what the thing that first comes to mind was the issue. 👍

@dnwe
Copy link
Collaborator

dnwe commented May 16, 2024

Yep, the described behaviour does match with what you'd expect a consumer to do if it starts as a new group without any committed offset(s) and a config setting of sarama.OffsetNewest

@bentcoder
Copy link
Author

Excellent, thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants