Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicate consumption occurs when join a new consumer and CommitInterval is configured #1259

Open
dalianzhu opened this issue Jan 25, 2024 · 2 comments
Labels

Comments

@dalianzhu
Copy link

dalianzhu commented Jan 25, 2024

In the following code, when a new consumer join the group, duplicate data consumption will be detected.

Set CommitInterval to zero can reduce the number of duplicate messages. I suspect the commit did not take effect, call CommitMessages did not returns error.

A similar consumer using sarama not found such issue.

Perhaps during rebalance, it is necessary to do sth for commits that have not yet been submitted. Because when CommitInterval is configured, calling CommitMessages will not take effect immediately.

func ReadKafkaWithKafkago(ctx context.Context, task string) {
	topic := "testkafka"
	address := "127.0.0.1:9092"
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:        []string{address},
		Topic:          topic,
		CommitInterval: time.Second, 
		GroupID:        "test1",
		QueueCapacity: 50,
	})

loop:
	for {
		// When receiving the sigterm signal the ctx sets to done
		msg, err := reader.FetchMessage(ctx)
		if err != nil {
			if errors.Is(err, io.EOF) {
				logrus.Infof("kafka get eof")
				break loop
			}
			if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
				logrus.Infof("kafka get context canceled:%v", err)
				break loop
			}
			logrus.Errorf("kafka get msg error:%v", err)
		}

		checkDuplicates(msg) // each msg has a unique id so I can use it to check msgs
		err = reader.CommitMessages(context.Background(), msg)
		if err != nil {
			logrus.Errorf("commit error:%v", err)
		}
	}
	reader.Close()
}

Kafka Version

  • What version(s) of Kafka are you testing against?
    kafka_2.13-2.7.1
  • What version of kafka-go are you using?
    v0.4.47

To Reproduce

  1. Use the above code to start a new consumer.
  2. Duplicate messages appear for some existing consumers or the new consumer.
@dalianzhu dalianzhu added the bug label Jan 25, 2024
@dalianzhu dalianzhu changed the title Duplicate consumption occurs when creating a new consumer and set CommitInterval enable Duplicate consumption occurs when creating a new consumer and CommitInterval is configured Jan 26, 2024
@dalianzhu dalianzhu changed the title Duplicate consumption occurs when creating a new consumer and CommitInterval is configured Duplicate consumption occurs when join a new consumer and CommitInterval is configured Jan 26, 2024
@jakedavis
Copy link

We're hitting this as well. We're trying to use a 5s commit interval (synchronous won't work for us for performance reasons) and regardless of other settings, we consistently have 2x the number of messages when using kafka-go compared to a variety of other consumers of the same data. I can provide other info if desired but, mostly just confirming this behavior.

@ahmedwahba47
Copy link

I confirm the issue still happens, even without setting CommitInterval , upon a new consumer connects duplicates appear.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants