Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infinite loop: kminion doesn't expect empty consumer offset #107

Open
mikekamornikov opened this issue Aug 2, 2021 · 1 comment
Open

Comments

@mikekamornikov
Copy link

We use the following settings for __consumer_offsets:

    cleanup.policy:    compact,delete
    compression.type:  producer
    retention.ms:      2592000000
    segment.ms:        3600000

As you see it's possible that __consumer_offsets had records before and doesn't have them right now. I means that Low Water Mark = High Water Mark != 0
1

offset_consumer.go checkIfConsumerLagIsCaughtUp internally does the following:

highMarksRes, err := offsetReq.RequestWith(ctx, s.kafkaSvc.Client)
...
consumedOffsets := s.storage.getConsumedOffsets()
...
			highWaterMark := partition.Offset - 1
			consumedOffset := consumedOffsets[partition.Partition]
			partitionLag := highWaterMark - consumedOffset
			if partitionLag < 0 {
				partitionLag = 0
			}

			if partitionLag > 0 {
				partitionsLagging++
				totalLag += partitionLag
				s.logger.Debug("consumer_offsets topic lag has not been caught up yet",
					zap.Int32("partition_id", partition.Partition),
					zap.Int64("high_water_mark", highWaterMark),
					zap.Int64("consumed_offset", consumedOffset),
					zap.Int64("partition_lag", partitionLag))
				isReady = false
				continue
			}

partitionLag in this case is always greater than 0 isReady is always false.

It looks like there should be a check for default value for consumerOffset (0) somewhere before final condition.

@weeco
Copy link
Contributor

weeco commented Feb 10, 2024

In case the __consumer_offsets topic is incomplete (due to configured retention settings) you should not use this mode as well, because it would yield wrong results. It has no chance of knowing the current group offset unless it would use the Kafka API again. This would unblock the start, but it would emit wrong metrics, or am I misunderstanding something?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants