You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
In the following code, when a new consumer join the group, duplicate data consumption will be detected.
Set CommitInterval to zero can reduce the number of duplicate messages. I suspect the commit did not take effect, call CommitMessages did not returns error.
A similar consumer using sarama not found such issue.
Perhaps during rebalance, it is necessary to do sth for commits that have not yet been submitted. Because when CommitInterval is configured, calling CommitMessages will not take effect immediately.
funcReadKafkaWithKafkago(ctx context.Context, taskstring) {
topic:="testkafka"address:="127.0.0.1:9092"reader:=kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{address},
Topic: topic,
CommitInterval: time.Second,
GroupID: "test1",
QueueCapacity: 50,
})
loop:
for {
// When receiving the sigterm signal the ctx sets to donemsg, err:=reader.FetchMessage(ctx)
iferr!=nil {
iferrors.Is(err, io.EOF) {
logrus.Infof("kafka get eof")
break loop
}
iferrors.Is(err, context.DeadlineExceeded) ||errors.Is(err, context.Canceled) {
logrus.Infof("kafka get context canceled:%v", err)
break loop
}
logrus.Errorf("kafka get msg error:%v", err)
}
checkDuplicates(msg) // each msg has a unique id so I can use it to check msgserr=reader.CommitMessages(context.Background(), msg)
iferr!=nil {
logrus.Errorf("commit error:%v", err)
}
}
reader.Close()
}
Kafka Version
What version(s) of Kafka are you testing against?
kafka_2.13-2.7.1
What version of kafka-go are you using?
v0.4.47
To Reproduce
Use the above code to start a new consumer.
Duplicate messages appear for some existing consumers or the new consumer.
The text was updated successfully, but these errors were encountered:
dalianzhu
changed the title
Duplicate consumption occurs when creating a new consumer and set CommitInterval enable
Duplicate consumption occurs when creating a new consumer and CommitInterval is configured
Jan 26, 2024
dalianzhu
changed the title
Duplicate consumption occurs when creating a new consumer and CommitInterval is configured
Duplicate consumption occurs when join a new consumer and CommitInterval is configured
Jan 26, 2024
We're hitting this as well. We're trying to use a 5s commit interval (synchronous won't work for us for performance reasons) and regardless of other settings, we consistently have 2x the number of messages when using kafka-go compared to a variety of other consumers of the same data. I can provide other info if desired but, mostly just confirming this behavior.
In the following code, when a new consumer join the group, duplicate data consumption will be detected.
Set CommitInterval to zero can reduce the number of duplicate messages. I suspect the commit did not take effect, call
CommitMessages
did not returns error.A similar consumer using
sarama
not found such issue.Perhaps during rebalance, it is necessary to do sth for commits that have not yet been submitted. Because when CommitInterval is configured, calling
CommitMessages
will not take effect immediately.Kafka Version
To Reproduce
The text was updated successfully, but these errors were encountered: