Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CooperativeSticky] Best practice to use manual commits & avoid commits during rebalance #2206

Open
RonAmihai opened this issue Apr 10, 2024 · 0 comments

Comments

@RonAmihai
Copy link

Description

Currently, as the following librdkafka issue describes: confluentinc/librdkafka#4059
Performing manual commits during cooperative rebalance is prohibited and may cause additional follow-up to rebalance triggers.

Some Kafka libraries (example) prevents manual commit during rebalancing as a solution (In the mentioned case - the library raises RebalanceInProgressException; but that depends whether the rebalance state was fetched before the commit was performed. Otherwise, some scenarios of commit during rebalance may raise general commit errors).

However - it doesn't seem like confluent-kafka-dotnet prevents that.

Whenever trying to perform manual commits during rebalance, the following exception is thrown:

Confluent.Kafka.KafkaException: Broker: Specified group generation ID is not valid
   at Confluent.Kafka.Impl.SafeKafkaHandle.Commit(IEnumerable`1 offsets)
   at Confluent.Kafka.Consumer`2.Commit(IEnumerable`1 offsets)

The only possible way I've come with so far to handle that situation (with the current implementation) is by:

  1. Querying the group state (either after each consumption or in background)
  2. Assert the state before each commit
  3. If the state is rebalance related (either PreparingRebalance or CompletingRebalance) then skip manual commit
  4. Manual commit will be performed after following comsume/s depending on the state
var groupDescriptions = await _adminClient.DescribeConsumerGroupsAsync(["group_id"]);
var groupState = groupDescriptions.ConsumerGroupDescriptions.FirstOrDefault(g => g.GroupId == "group_id")?.State;
var isDuringRebalance = groupState is ConsumerGroupState.PreparingRebalance or ConsumerGroupState.CompletingRebalance;
// Assert isDuringRebalance before each manual commit

That strategy mainly reduces scenarios like those but can only prevent them partially.

Can we have a better strategy to handle those situations?
Is there other considerations one should take regarding commits during cooperative rebalances?

How to reproduce

  1. Create a consumer with partition.assignment.strategy=cooperative-sticky
  2. Trigger rebalance that affects some of the partitions
  3. Consume messages in non-affected partitions
  4. Perform manual commit to those messages offset
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant