Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer group not reconsuming messages if message is not marked #2854

Open
AhmedNSidd opened this issue Apr 5, 2024 · 1 comment
Open

Comments

@AhmedNSidd
Copy link

Description

Using a go test, I'm producing a message for a kafka consumer group I created. In this consumer group, I'm logging the kafka message, but I'm not doing session.MarkMessage or session.Commit on the message. I would expect that this would allow me to reconsume the message, and it would relog the message in the for loop, but it does not reconsume the message.

Example code for handler

func (cgh *ConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
		//session.MarkMessage(message, "")
		//session.Commit()
	}
	return nil
}
Versions
Sarama Kafka Go
v1.43.0 3.6 1.21
Configuration
	saramaConfig.Consumer.Return.Errors = true
	saramaConfig.Consumer.Offsets.AutoCommit.Enable = false
Logs
logs: CLICK ME


Additional Context
@fujistick
Copy link

I hit this exact same issue and figured it out...

It's because a new consumer group defaults to an initial offset at the end of the topic. This is set with samaraConfig.Consumer.Offsets.Initial. If you set samConfig.Consumer.Offsets.Initial = sarama.OffsetOldest you'll find it will replay messages from the start of the topic.

In my case I has assumed that having samConfig.Consumer.Offsets.Initial = sarama.OffsetNewest (-1, the end of the topic) and then starting a consumer for a groupID and committing offsets (either manually or automatically) without marking any messages processed, would actually commit concrete offset values - basically a snapshot of the newest offsets at the time the consumer started.

It doesn't appear that is the case, so when you start up again with the same groupID offsets are still -1. Once a message has been marked for each partition and offsets committed, this will produce concrete offsets and everything works as expected from there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants