Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OnPartitionsAssigned called twice for the same partitions #705

Closed
iimos opened this issue Apr 11, 2024 · 6 comments · Fixed by #737
Closed

OnPartitionsAssigned called twice for the same partitions #705

iimos opened this issue Apr 11, 2024 · 6 comments · Fixed by #737
Labels
bug Something isn't working has pr

Comments

@iimos
Copy link

iimos commented Apr 11, 2024

Hi!

I've encountered an issue when OnPartitionsAssigned called twice with the same set of partitions.

Here are franz logs with my comments (read from bottom to top):

2024-04-11 11:28:29.115	{"how":0,"input":{"pb_metering_resource":{"0":{"At":9679,"Epoch":-1,"CurrentEpoch":0},"3":{"At":50126,"Epoch":-1,"CurrentEpoch":0}}},"level":"info","msg":"assigning partitions","time":"2024-04-11T08:28:29Z","why":"newly fetched offsets for group resource_tracker"}
2024-04-11 11:28:22.650	{"group":"resource_tracker","level":"info","msg":"beginning heartbeat loop","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:22.650	{"added":{"pb_metering_resource":[0,3]},"group":"resource_tracker","level":"info","lost":{},"msg":"new group session begun","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:22.650	{"assigned":{"pb_metering_resource":[0,3,4,7,10,11]},"group":"resource_tracker","level":"info","msg":"synced","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:22.648	{"group":"resource_tracker","level":"info","msg":"syncing","protocol":"cooperative-sticky","protocol_type":"consumer","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:22.647	{"group":"resource_tracker","level":"info","msg":"joining group","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:22.646	{"level":"info","msg":"immediate metadata update triggered","time":"2024-04-11T08:28:22Z","why":"waitmeta after heartbeat error"}
2024-04-11 11:28:22.646	{"group":"resource_tracker","level":"info","msg":"cooperative consumer calling onRevoke at the end of a session even though no partitions were lost","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:22.646	{"err":"REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.","group":"resource_tracker","level":"info","msg":"heartbeat errored","time":"2024-04-11T08:28:22Z"}
2024-04-11 11:28:07.711	{"how":0,"input":{"pb_metering_resource":{"10":{"At":34472,"Epoch":-1,"CurrentEpoch":0},"11":{"At":13108,"Epoch":-1,"CurrentEpoch":0},"4":{"At":83302,"Epoch":-1,"CurrentEpoch":0},"7":{"At":726686,"Epoch":-1,"CurrentEpoch":0}}},"level":"info","msg":"assigning partitions","time":"2024-04-11T08:28:07Z","why":"newly fetched offsets for group resource_tracker"}
2024-04-11 11:28:07.653	{"group":"resource_tracker","level":"info","msg":"beginning heartbeat loop","time":"2024-04-11T08:28:07Z"}

here OnPartitionsAssigned({"pb_metering_resource":[4,7,10,11]}) called again

2024-04-11 11:28:07.653	{"added":{},"group":"resource_tracker","level":"info","lost":{},"msg":"new group session begun","time":"2024-04-11T08:28:07Z"}
2024-04-11 11:28:07.653	{"assigned":{"pb_metering_resource":[4,7,10,11]},"group":"resource_tracker","level":"info","msg":"synced","time":"2024-04-11T08:28:07Z"}
2024-04-11 11:28:07.653	{"group":"resource_tracker","level":"info","msg":"syncing","protocol":"cooperative-sticky","protocol_type":"consumer","time":"2024-04-11T08:28:07Z"}
2024-04-11 11:28:03.828	{"group":"resource_tracker","level":"info","msg":"joining group","time":"2024-04-11T08:28:03Z"}
2024-04-11 11:28:03.828	{"group":"resource_tracker","level":"info","msg":"fetch offsets failed due to context cancelation","time":"2024-04-11T08:28:03Z"}

here OnPartitionsRevoked({}) called

2024-04-11 11:28:03.827	{"group":"resource_tracker","level":"info","msg":"cooperative consumer calling onRevoke at the end of a session even though no partitions were lost","time":"2024-04-11T08:28:03Z"}
2024-04-11 11:28:01.640	{"level":"info","msg":"immediate metadata update triggered","time":"2024-04-11T08:28:01Z","why":"waitmeta after heartbeat error"}
2024-04-11 11:28:01.640	{"err":"REBALANCE_IN_PROGRESS: The group is rebalancing, so a rejoin is needed.","group":"resource_tracker","level":"info","msg":"heartbeat errored","time":"2024-04-11T08:28:01Z"}
2024-04-11 11:27:52.639	{"group":"resource_tracker","level":"info","msg":"beginning heartbeat loop","time":"2024-04-11T08:27:52Z"}

here OnPartitionsAssigned({"pb_metering_resource":[4,7,10,11]}) called

2024-04-11 11:27:52.639	{"added":{"pb_metering_resource":[4,7,10,11]},"group":"resource_tracker","level":"info","lost":{},"msg":"new group session begun","time":"2024-04-11T08:27:52Z"}
2024-04-11 11:27:52.639	{"assigned":{"pb_metering_resource":[4,7,10,11]},"group":"resource_tracker","level":"info","msg":"synced","time":"2024-04-11T08:27:52Z"}

The problem is that sometimes after reballance OnPartitionsAssigned({"pb_metering_resource":[4,7,10,11]}) is called twice in a row.

I'll be very appreciated for help.


franz version: v1.15.2

client initialisation:

kafka, err := kgo.NewClient(
	kgo.SeedBrokers(...),
	kgo.ConsumeTopics(...),
	kgo.ConsumerGroup(...),
	kgo.ClientID(fmt.Sprintf("%s-%s", params.KafkaParams.AppName, uuid.Must(uuid.NewV4()))),
	kgo.SessionTimeout(12 * time.Second),
	kgo.HeartbeatInterval(3 * time.Second),

	// Offset commit settings
	kgo.DisableAutoCommit(),
	kgo.ConsumeResetOffset(kgo.NewOffset().AtStart()),

	// Network settings
	kgo.DialTimeout(3 * time.Second),
	kgo.FetchMaxBytes(16MiB)),
	kgo.BrokerMaxReadBytes(16MiB),

	// Partition assignment and rebalancing settings
	kgo.BlockRebalanceOnPoll(),
	kgo.OnPartitionsAssigned(c.onPartitionsAssigned),
	kgo.AdjustFetchOffsetsFn(c.adjustFetchOffsetsFn),
	kgo.OnPartitionsRevoked(c.onPartitionsRevoked),
	kgo.OnPartitionsLost(c.onPartitionsRevoked),
)
@iimos
Copy link
Author

iimos commented Apr 11, 2024

Probably related to #658

@twmb
Copy link
Owner

twmb commented Apr 11, 2024 via email

@iimos
Copy link
Author

iimos commented Apr 12, 2024

@twmb Unfortunately not

@twmb
Copy link
Owner

twmb commented Apr 24, 2024

I'll try to stare at this a bit, sooner than later. I haven't had time recently and, when I have, have been prioritizing some of my own fun for once. But, ... I'll try, soon.

@twmb twmb added the TODO label May 23, 2024
@twmb
Copy link
Owner

twmb commented May 26, 2024

Now that I finally got to looking at this, the problem is pretty clear. This is hitting some code added for #98, specifically this function:

// If we are cooperatively consuming, we have a potential problem: if fetch
// offsets is canceled due to an immediate rebalance, when we resume, we will
// not re-fetch offsets for partitions we were previously assigned and are
// still assigned. We will only fetch offsets for new assignments.
//
// To work around that issue, we track everything we are fetching in g.fetching
// and only clear g.fetching if fetchOffsets returns with no error.
//
// Now, if fetching returns early due to an error, when we rejoin and re-fetch,
// we will resume fetching what we were previously:
//
// - first we remove what was lost
// - then we add anything new
// - then we translate our total set into the "added" list to be fetched on return
//
// Any time a group is completely lost, the manage loop clears fetching. When
// cooperative consuming, a hard error is basically losing the entire state and
// rejoining from scratch.
func (g *groupConsumer) adjustCooperativeFetchOffsets(added, lost map[string][]int32) map[string][]int32 {
if g.fetching != nil {
// We were fetching previously: remove anything lost.
for topic, partitions := range lost {
ft := g.fetching[topic]
if ft == nil {
continue // we were not fetching this topic
}
for _, partition := range partitions {
delete(ft, partition)
}
if len(ft) == 0 {
delete(g.fetching, topic)
}
}
} else {
// We were not fetching previously: start a new map for what we
// are adding.
g.fetching = make(map[string]map[int32]struct{})
}
// Merge everything we are newly fetching to our fetching map.
for topic, partitions := range added {
ft := g.fetching[topic]
if ft == nil {
ft = make(map[int32]struct{}, len(partitions))
g.fetching[topic] = ft
}
for _, partition := range partitions {
ft[partition] = struct{}{}
}
}
// Now translate our full set (previously fetching ++ newly fetching --
// lost) into a new "added" map to be fetched.
added = make(map[string][]int32, len(g.fetching))
for topic, partitions := range g.fetching {
ps := make([]int32, 0, len(partitions))
for partition := range partitions {
ps = append(ps, partition)
}
added[topic] = ps
}
return added
}

  • In the first group session, OnPartitionsAssigned is called with what you're assigned (good so far)
  • A FetchOffsets request is issued
  • A rebalance happens so quickly that it actually kills the active FetchOffsets request
  • You do not add nor lose any partitions
  • The client needs to resume fetching offsets
  • Resuming fetching offsets adds what was being fetched back into "newly added partitions"
  • The client calls OnPartitionsAssigned with the same set of partitions

The fix is pretty straightforward, I'll fix this in the next release (which I hope finally to be quite soon).

@iimos
Copy link
Author

iimos commented May 27, 2024

@twmb Thank you so much for looking into this and identifying the issue! It's great to hear that there's a clear path to a fix. I'll be looking forward to the next release for the resolution. Your help is greatly appreciated!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working has pr
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants