Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

There is no option to set max.poll.interval.ms #708

Closed
GRbit opened this issue Apr 17, 2024 · 5 comments
Closed

There is no option to set max.poll.interval.ms #708

GRbit opened this issue Apr 17, 2024 · 5 comments

Comments

@GRbit
Copy link

GRbit commented Apr 17, 2024

First of all, thank you very much for your project. It's very nice to have a good Kafka consumer that respects interfaces (compared to std confluent library) and also gives a better performance. Impressive work!

Coming to the problem, I need to set manually the max.poll.interval.ms with the consumer I use, but I see no option for it.

Are there any plans for implementing this functionality in the future? Are there any workarounds to achieve the same result?

If you want to have this feature probably I can help, but I'll need some help finding documentation on how it should be implemented, so any help is very much appreciated.

UPD:
I find CommitUncommittedOffsets function VERY useful, what a nice thing to have!
Still, I can not use it, because without max.poll.interval.ms the only approach that is left to my case is to poll every (SessionTimout/2) seconds while I'm processing collected records. Otherwise, the broker will consider the consumer frozen and will trigger rebalance which is quite a boomer.

@twmb twmb added the TODO label May 23, 2024
@twmb
Copy link
Owner

twmb commented May 24, 2024

What's the use case of max.poll.interval.ms?

Is it: if the client takes longer than, say, 5 minutes to process a message, the client leaves the group. Another client likely will consume the same message once a rebalance happens. If the original client is ever successful at processing the message, that means you have a duplicate message processed, but the original client will then enter a fatal state and not consume anymore?

I don't really understand the use case for it. I'm not opposed outright, but knowing the use case will allow me to better understand how to implement and what to document on the config option.

@GRbit
Copy link
Author

GRbit commented May 24, 2024

Thank you very much for your response

What's the use case of max.poll.interval.ms?

The use case is that the consumer can spend more than 5 minutes processing a batch of messages, and we don't want a rebalance to happen during that processing. Not only rebalance will cause some other consumer to take those messages that are being processed at the time, but it also takes quite a time itself, stresses the cluster, pauses data consumption.

In more detail:
I know that most of the systems only process only one event at a time and do it fast, so almost no one needs this peculiar max.poll.interval.ms setting. The fact that your already well-developed library (once again, many kudos to you and all other contributors) doesn't have this option is just another valid proof that 99% of users just don't need it. In our case, we collect a large batch of events and then process them all together, and sometimes, the processing takes more than 5 minutes. It's not a regular case, but when it happens, it means that the system is already working with higher bandwidth than usual, and cluster rebalance makes it even worth by slowing down the whole system, which gives even more load causing longer processing... I think you can see where it's going.

If you have any further questions, I'll be glad to answer all of them.

@twmb
Copy link
Owner

twmb commented May 25, 2024

This client does not ever kick a member from the group without an explicit choice by the end user (or if your client crashes or closes uncleanly, after which it'll miss heartbeats and be kicked) -- effectively, the min.poll.interval.ms is unlimited. Is there a reason to add the option for a limit?

@GRbit
Copy link
Author

GRbit commented May 26, 2024

Excuse me if I'm wrong, but that's how I understood the documentation
https://kafka.apache.org/documentation/#consumerconfigs_max.poll.interval.ms
If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance

I encountered this case over a year ago: a consumer was sending heartbeats but not polling, then coordinator considered the consumer failed and initiated rebalancing. After I changed max.poll.interval.ms the problem was solved. Maybe I didn't make a proper research, but that seemed pretty clear to me what happened.

Are you sure that with the current version of Kafka if consumer is not polling it will not be kicked? Did you test it?

@twmb
Copy link
Owner

twmb commented May 26, 2024

Yes, I'm positive. You can test this easily with the client. It sounds like you're coming in with experience of how the Java client works but haven't tested this client?

You can run this program against a topic with one partition that has one message to demonstrate:

package main

import (
	"os"
	"time"

	"github.com/twmb/franz-go/pkg/kgo"
)

func main() {
	cl, _ := kgo.NewClient(
		kgo.ConsumerGroup("mygroup"),
		kgo.ConsumeTopics("foo"),
		kgo.SessionTimeout(15*time.Second),
		kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelDebug, func() string {
			return time.Now().Format("15:04:05 ")
		})),
	)

	for {
		// do nothing
		_ = cl
	}
}

The heartbeat continues in the background indefinitely even though you are doing nothing with the client:

Logs ``` 11:50:07 [INFO] immediate metadata update triggered; why: querying metadata for consumer initialization 11:50:07 [DEBUG] opening connection to broker; addr: 127.0.0.1:9092, broker: seed_0 11:50:07 [DEBUG] connection opened to broker; addr: 127.0.0.1:9092, broker: seed_0 11:50:07 [DEBUG] issuing api versions request; broker: seed_0, version: 3 11:50:07 [DEBUG] wrote ApiVersions v3; broker: seed_0, bytes_written: 31, write_wait: 59.334µs, time_to_write: 24.375µs, err: 11:50:07 [DEBUG] read ApiVersions v3; broker: seed_0, bytes_read: 233, read_wait: 36.666µs, time_to_read: 92.042µs, err: 11:50:07 [DEBUG] connection initialized successfully; addr: 127.0.0.1:9092, broker: seed_0 11:50:07 [DEBUG] wrote Metadata v12; broker: seed_0, bytes_written: 43, write_wait: 1.018834ms, time_to_write: 17.75µs, err: 11:50:07 [DEBUG] read Metadata v12; broker: seed_0, bytes_read: 157, read_wait: 78.458µs, time_to_read: 107.542µs, err: 11:50:07 [INFO] beginning to manage the group lifecycle; group: mygroup 11:50:07 [INFO] beginning autocommit loop; group: mygroup 11:50:07 [DEBUG] blocking commits from join&sync 11:50:07 [INFO] joining group; group: mygroup 11:50:07 [DEBUG] prepared to issue find coordinator request; coordinator_type: 0, coordinator_keys: [mygroup] 11:50:07 [DEBUG] sharded request; req: FindCoordinator, destinations: [any] 11:50:07 [DEBUG] opening connection to broker; addr: 127.0.0.1:9093, broker: 1 11:50:07 [DEBUG] connection opened to broker; addr: 127.0.0.1:9093, broker: 1 11:50:07 [DEBUG] issuing api versions request; broker: 1, version: 3 11:50:07 [DEBUG] wrote ApiVersions v3; broker: 1, bytes_written: 31, write_wait: 6.375µs, time_to_write: 10.625µs, err: 11:50:07 [DEBUG] read ApiVersions v3; broker: 1, bytes_read: 233, read_wait: 12.916µs, time_to_read: 63.667µs, err: 11:50:07 [DEBUG] connection initialized successfully; addr: 127.0.0.1:9093, broker: 1 11:50:07 [DEBUG] wrote FindCoordinator v4; broker: 1, bytes_written: 29, write_wait: 374.25µs, time_to_write: 14.958µs, err: 11:50:07 [DEBUG] read FindCoordinator v4; broker: 1, bytes_read: 45, read_wait: 23.125µs, time_to_read: 82µs, err: 11:50:07 [DEBUG] opening connection to broker; addr: 127.0.0.1:9094, broker: 2 11:50:07 [DEBUG] connection opened to broker; addr: 127.0.0.1:9094, broker: 2 11:50:07 [DEBUG] issuing api versions request; broker: 2, version: 3 11:50:07 [DEBUG] wrote ApiVersions v3; broker: 2, bytes_written: 31, write_wait: 7.875µs, time_to_write: 10.292µs, err: 11:50:07 [DEBUG] read ApiVersions v3; broker: 2, bytes_read: 233, read_wait: 15.042µs, time_to_read: 104.416µs, err: 11:50:07 [DEBUG] connection initialized successfully; addr: 127.0.0.1:9094, broker: 2 11:50:07 [DEBUG] wrote JoinGroup v9; broker: 2, bytes_written: 137, write_wait: 333.625µs, time_to_write: 9.667µs, err: 11:50:07 [DEBUG] read JoinGroup v9; broker: 2, bytes_read: 62, read_wait: 19.625µs, time_to_read: 132.917µs, err: 11:50:07 [INFO] join returned MemberIDRequired, rejoining with response's MemberID; group: mygroup, member_id: kgo-a9ee52ec60197286d6723bc94fa4c303 11:50:07 [DEBUG] wrote JoinGroup v9; broker: 2, bytes_written: 173, write_wait: 8.25µs, time_to_write: 10.583µs, err: 11:50:07 [DEBUG] read JoinGroup v9; broker: 2, bytes_read: 193, read_wait: 15.75µs, time_to_read: 107.084µs, err: 11:50:07 [INFO] joined, balancing group; group: mygroup, member_id: kgo-a9ee52ec60197286d6723bc94fa4c303, instance_id: , generation: 1, balance_protocol: cooperative-sticky, leader: true 11:50:07 [INFO] balancing group as leader 11:50:07 [INFO] balance group member; id: kgo-a9ee52ec60197286d6723bc94fa4c303, interests: interested topics: [foo], previously owned: 11:50:07 [INFO] balanced; plan: kgo-a9ee52ec60197286d6723bc94fa4c303{foo[0]} 11:50:07 [INFO] syncing; group: mygroup, protocol_type: consumer, protocol: cooperative-sticky 11:50:07 [DEBUG] wrote SyncGroup v5; broker: 2, bytes_written: 160, write_wait: 8.875µs, time_to_write: 9.75µs, err: 11:50:07 [DEBUG] read SyncGroup v5; broker: 2, bytes_read: 68, read_wait: 19.083µs, time_to_read: 81µs, err: 11:50:07 [INFO] synced; group: mygroup, assigned: foo[0] 11:50:07 [DEBUG] unblocking commits from join&sync 11:50:07 [INFO] new group session begun; group: mygroup, added: foo[0], lost: 11:50:07 [DEBUG] entering OnPartitionsAssigned; with: map[foo:[0]] 11:50:07 [INFO] beginning heartbeat loop; group: mygroup 11:50:07 [DEBUG] sharded request; req: OffsetFetch, destinations: [2] 11:50:07 [DEBUG] opening connection to broker; addr: 127.0.0.1:9094, broker: 2 11:50:07 [DEBUG] connection opened to broker; addr: 127.0.0.1:9094, broker: 2 11:50:07 [DEBUG] connection initialized successfully; addr: 127.0.0.1:9094, broker: 2 11:50:07 [DEBUG] wrote OffsetFetch v8; broker: 2, bytes_written: 41, write_wait: 206.375µs, time_to_write: 10.125µs, err: 11:50:07 [DEBUG] read OffsetFetch v8; broker: 2, bytes_read: 53, read_wait: 35.375µs, time_to_read: 64.542µs, err: 11:50:07 [INFO] assigning partitions; why: newly fetched offsets for group mygroup, how: assigning everything new, keeping current assignment, input: foo[0{-2 e-1 ce0}] 11:50:07 [DEBUG] assign requires loading offsets 11:50:07 [DEBUG] offsets to load broker; broker: 1, load: {map[foo:map[0:{-2 e-1 ce0}]] map[]} 11:50:07 [DEBUG] wrote ListOffsets v7; broker: 1, bytes_written: 48, write_wait: 40µs, time_to_write: 9.042µs, err: 11:50:07 [DEBUG] read ListOffsets v7; broker: 1, bytes_read: 48, read_wait: 18.375µs, time_to_read: 65.5µs, err: 11:50:07 [DEBUG] handled list results; broker: 1, using: map[foo:map[0:{0 0}]], reloading: map[] 11:50:07 [DEBUG] opening connection to broker; addr: 127.0.0.1:9093, broker: 1 11:50:07 [DEBUG] connection opened to broker; addr: 127.0.0.1:9093, broker: 1 11:50:07 [DEBUG] connection initialized successfully; addr: 127.0.0.1:9093, broker: 1 11:50:07 [DEBUG] wrote Fetch v16; broker: 1, bytes_written: 94, write_wait: 193.416µs, time_to_write: 9.292µs, err: 11:50:07 [DEBUG] read Fetch v16; broker: 1, bytes_read: 147, read_wait: 17.917µs, time_to_read: 123.25µs, err: 11:50:08 [DEBUG] heartbeating; group: mygroup 11:50:08 [DEBUG] wrote Heartbeat v4; broker: 2, bytes_written: 69, write_wait: 18µs, time_to_write: 39.417µs, err: 11:50:08 [DEBUG] read Heartbeat v4; broker: 2, bytes_read: 16, read_wait: 24.208µs, time_to_read: 253.792µs, err: 11:50:08 [DEBUG] heartbeat complete; group: mygroup, err: 11:50:10 [DEBUG] heartbeating; group: mygroup 11:50:10 [DEBUG] wrote Heartbeat v4; broker: 2, bytes_written: 69, write_wait: 14.959µs, time_to_write: 33.25µs, err: 11:50:10 [DEBUG] read Heartbeat v4; broker: 2, bytes_read: 16, read_wait: 72.791µs, time_to_read: 158.125µs, err: 11:50:10 [DEBUG] heartbeat complete; group: mygroup, err: 11:50:12 [DEBUG] skipping autocommit due to no offsets to commit; group: mygroup 11:50:13 [DEBUG] heartbeating; group: mygroup 11:50:13 [DEBUG] wrote Heartbeat v4; broker: 2, bytes_written: 69, write_wait: 22.167µs, time_to_write: 36.375µs, err: 11:50:13 [DEBUG] read Heartbeat v4; broker: 2, bytes_read: 16, read_wait: 21.042µs, time_to_read: 201.25µs, err: 11:50:13 [DEBUG] heartbeat complete; group: mygroup, err: 11:50:16 [DEBUG] heartbeating; group: mygroup 11:50:16 [DEBUG] wrote Heartbeat v4; broker: 2, bytes_written: 69, write_wait: 13.541µs, time_to_write: 35.542µs, err: 11:50:16 [DEBUG] read Heartbeat v4; broker: 2, bytes_read: 16, read_wait: 24.292µs, time_to_read: 191.791µs, err: 11:50:16 [DEBUG] heartbeat complete; group: mygroup, err: 11:50:17 [DEBUG] skipping autocommit due to no offsets to commit; group: mygroup 11:50:19 [DEBUG] heartbeating; group: mygroup 11:50:19 [DEBUG] wrote Heartbeat v4; broker: 2, bytes_written: 69, write_wait: 17.208µs, time_to_write: 34.084µs, err: 11:50:19 [DEBUG] read Heartbeat v4; broker: 2, bytes_read: 16, read_wait: 23.125µs, time_to_read: 160.25µs, err: 11:50:19 [DEBUG] heartbeat complete; group: mygroup, err: 11:50:22 [DEBUG] skipping autocommit due to no offsets to commit; group: mygroup 11:50:22 [DEBUG] heartbeating; group: mygroup 11:50:22 [DEBUG] wrote Heartbeat v4; broker: 2, bytes_written: 69, write_wait: 15.792µs, time_to_write: 30.333µs, err: 11:50:22 [DEBUG] read Heartbeat v4; broker: 2, bytes_read: 16, read_wait: 22.875µs, time_to_read: 152.667µs, err: 11:50:22 [DEBUG] heartbeat complete; group: mygroup, err: 11:50:25 [DEBUG] heartbeating; group: mygroup 11:50:25 [DEBUG] wrote Heartbeat v4; broker: 2, bytes_written: 69, write_wait: 17.167µs, time_to_write: 43.541µs, err: 11:50:25 [DEBUG] read Heartbeat v4; broker: 2, bytes_read: 16, read_wait: 26.334µs, time_to_read: 209.291µs, err: 11:50:25 [DEBUG] heartbeat complete; group: mygroup, err: 11:50:27 [DEBUG] skipping autocommit due to no offsets to commit; group: mygroup ```

I'm going to close this, but if there's a need for max.poll.interval.ms (to forcefully quite the group if the client is stalling) we can revisit, but I need to understand the use case first.

@twmb twmb closed this as not planned Won't fix, can't repro, duplicate, stale May 26, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants