-
-
Notifications
You must be signed in to change notification settings - Fork 155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
There is no option to set max.poll.interval.ms #708
Comments
What's the use case of max.poll.interval.ms? Is it: if the client takes longer than, say, 5 minutes to process a message, the client leaves the group. Another client likely will consume the same message once a rebalance happens. If the original client is ever successful at processing the message, that means you have a duplicate message processed, but the original client will then enter a fatal state and not consume anymore? I don't really understand the use case for it. I'm not opposed outright, but knowing the use case will allow me to better understand how to implement and what to document on the config option. |
Thank you very much for your response
The use case is that the consumer can spend more than 5 minutes processing a batch of messages, and we don't want a rebalance to happen during that processing. Not only rebalance will cause some other consumer to take those messages that are being processed at the time, but it also takes quite a time itself, stresses the cluster, pauses data consumption. In more detail: If you have any further questions, I'll be glad to answer all of them. |
This client does not ever kick a member from the group without an explicit choice by the end user (or if your client crashes or closes uncleanly, after which it'll miss heartbeats and be kicked) -- effectively, the min.poll.interval.ms is unlimited. Is there a reason to add the option for a limit? |
Excuse me if I'm wrong, but that's how I understood the documentation I encountered this case over a year ago: a consumer was sending heartbeats but not polling, then coordinator considered the consumer failed and initiated rebalancing. After I changed Are you sure that with the current version of Kafka if consumer is not polling it will not be kicked? Did you test it? |
Yes, I'm positive. You can test this easily with the client. It sounds like you're coming in with experience of how the Java client works but haven't tested this client? You can run this program against a topic with one partition that has one message to demonstrate: package main
import (
"os"
"time"
"github.com/twmb/franz-go/pkg/kgo"
)
func main() {
cl, _ := kgo.NewClient(
kgo.ConsumerGroup("mygroup"),
kgo.ConsumeTopics("foo"),
kgo.SessionTimeout(15*time.Second),
kgo.WithLogger(kgo.BasicLogger(os.Stderr, kgo.LogLevelDebug, func() string {
return time.Now().Format("15:04:05 ")
})),
)
for {
// do nothing
_ = cl
}
} The heartbeat continues in the background indefinitely even though you are doing nothing with the client: Logs``` 11:50:07 [INFO] immediate metadata update triggered; why: querying metadata for consumer initialization 11:50:07 [DEBUG] opening connection to broker; addr: 127.0.0.1:9092, broker: seed_0 11:50:07 [DEBUG] connection opened to broker; addr: 127.0.0.1:9092, broker: seed_0 11:50:07 [DEBUG] issuing api versions request; broker: seed_0, version: 3 11:50:07 [DEBUG] wrote ApiVersions v3; broker: seed_0, bytes_written: 31, write_wait: 59.334µs, time_to_write: 24.375µs, err: 11:50:07 [DEBUG] read ApiVersions v3; broker: seed_0, bytes_read: 233, read_wait: 36.666µs, time_to_read: 92.042µs, err: 11:50:07 [DEBUG] connection initialized successfully; addr: 127.0.0.1:9092, broker: seed_0 11:50:07 [DEBUG] wrote Metadata v12; broker: seed_0, bytes_written: 43, write_wait: 1.018834ms, time_to_write: 17.75µs, err: 11:50:07 [DEBUG] read Metadata v12; broker: seed_0, bytes_read: 157, read_wait: 78.458µs, time_to_read: 107.542µs, err: 11:50:07 [INFO] beginning to manage the group lifecycle; group: mygroup 11:50:07 [INFO] beginning autocommit loop; group: mygroup 11:50:07 [DEBUG] blocking commits from join&sync 11:50:07 [INFO] joining group; group: mygroup 11:50:07 [DEBUG] prepared to issue find coordinator request; coordinator_type: 0, coordinator_keys: [mygroup] 11:50:07 [DEBUG] sharded request; req: FindCoordinator, destinations: [any] 11:50:07 [DEBUG] opening connection to broker; addr: 127.0.0.1:9093, broker: 1 11:50:07 [DEBUG] connection opened to broker; addr: 127.0.0.1:9093, broker: 1 11:50:07 [DEBUG] issuing api versions request; broker: 1, version: 3 11:50:07 [DEBUG] wrote ApiVersions v3; broker: 1, bytes_written: 31, write_wait: 6.375µs, time_to_write: 10.625µs, err: 11:50:07 [DEBUG] read ApiVersions v3; broker: 1, bytes_read: 233, read_wait: 12.916µs, time_to_read: 63.667µs, err: 11:50:07 [DEBUG] connection initialized successfully; addr: 127.0.0.1:9093, broker: 1 11:50:07 [DEBUG] wrote FindCoordinator v4; broker: 1, bytes_written: 29, write_wait: 374.25µs, time_to_write: 14.958µs, err: 11:50:07 [DEBUG] read FindCoordinator v4; broker: 1, bytes_read: 45, read_wait: 23.125µs, time_to_read: 82µs, err: 11:50:07 [DEBUG] opening connection to broker; addr: 127.0.0.1:9094, broker: 2 11:50:07 [DEBUG] connection opened to broker; addr: 127.0.0.1:9094, broker: 2 11:50:07 [DEBUG] issuing api versions request; broker: 2, version: 3 11:50:07 [DEBUG] wrote ApiVersions v3; broker: 2, bytes_written: 31, write_wait: 7.875µs, time_to_write: 10.292µs, err: 11:50:07 [DEBUG] read ApiVersions v3; broker: 2, bytes_read: 233, read_wait: 15.042µs, time_to_read: 104.416µs, err: 11:50:07 [DEBUG] connection initialized successfully; addr: 127.0.0.1:9094, broker: 2 11:50:07 [DEBUG] wrote JoinGroup v9; broker: 2, bytes_written: 137, write_wait: 333.625µs, time_to_write: 9.667µs, err: 11:50:07 [DEBUG] read JoinGroup v9; broker: 2, bytes_read: 62, read_wait: 19.625µs, time_to_read: 132.917µs, err: 11:50:07 [INFO] join returned MemberIDRequired, rejoining with response's MemberID; group: mygroup, member_id: kgo-a9ee52ec60197286d6723bc94fa4c303 11:50:07 [DEBUG] wrote JoinGroup v9; broker: 2, bytes_written: 173, write_wait: 8.25µs, time_to_write: 10.583µs, err: 11:50:07 [DEBUG] read JoinGroup v9; broker: 2, bytes_read: 193, read_wait: 15.75µs, time_to_read: 107.084µs, err: 11:50:07 [INFO] joined, balancing group; group: mygroup, member_id: kgo-a9ee52ec60197286d6723bc94fa4c303, instance_id: , generation: 1, balance_protocol: cooperative-sticky, leader: true 11:50:07 [INFO] balancing group as leader 11:50:07 [INFO] balance group member; id: kgo-a9ee52ec60197286d6723bc94fa4c303, interests: interested topics: [foo], previously owned: 11:50:07 [INFO] balanced; plan: kgo-a9ee52ec60197286d6723bc94fa4c303{foo[0]} 11:50:07 [INFO] syncing; group: mygroup, protocol_type: consumer, protocol: cooperative-sticky 11:50:07 [DEBUG] wrote SyncGroup v5; broker: 2, bytes_written: 160, write_wait: 8.875µs, time_to_write: 9.75µs, err: 11:50:07 [DEBUG] read SyncGroup v5; broker: 2, bytes_read: 68, read_wait: 19.083µs, time_to_read: 81µs, err: 11:50:07 [INFO] synced; group: mygroup, assigned: foo[0] 11:50:07 [DEBUG] unblocking commits from join&sync 11:50:07 [INFO] new group session begun; group: mygroup, added: foo[0], lost: 11:50:07 [DEBUG] entering OnPartitionsAssigned; with: map[foo:[0]] 11:50:07 [INFO] beginning heartbeat loop; group: mygroup 11:50:07 [DEBUG] sharded request; req: OffsetFetch, destinations: [2] 11:50:07 [DEBUG] opening connection to broker; addr: 127.0.0.1:9094, broker: 2 11:50:07 [DEBUG] connection opened to broker; addr: 127.0.0.1:9094, broker: 2 11:50:07 [DEBUG] connection initialized successfully; addr: 127.0.0.1:9094, broker: 2 11:50:07 [DEBUG] wrote OffsetFetch v8; broker: 2, bytes_written: 41, write_wait: 206.375µs, time_to_write: 10.125µs, err: 11:50:07 [DEBUG] read OffsetFetch v8; broker: 2, bytes_read: 53, read_wait: 35.375µs, time_to_read: 64.542µs, err: 11:50:07 [INFO] assigning partitions; why: newly fetched offsets for group mygroup, how: assigning everything new, keeping current assignment, input: foo[0{-2 e-1 ce0}] 11:50:07 [DEBUG] assign requires loading offsets 11:50:07 [DEBUG] offsets to load broker; broker: 1, load: {map[foo:map[0:{-2 e-1 ce0}]] map[]} 11:50:07 [DEBUG] wrote ListOffsets v7; broker: 1, bytes_written: 48, write_wait: 40µs, time_to_write: 9.042µs, err: 11:50:07 [DEBUG] read ListOffsets v7; broker: 1, bytes_read: 48, read_wait: 18.375µs, time_to_read: 65.5µs, err: 11:50:07 [DEBUG] handled list results; broker: 1, using: map[foo:map[0:{0 0}]], reloading: map[] 11:50:07 [DEBUG] opening connection to broker; addr: 127.0.0.1:9093, broker: 1 11:50:07 [DEBUG] connection opened to broker; addr: 127.0.0.1:9093, broker: 1 11:50:07 [DEBUG] connection initialized successfully; addr: 127.0.0.1:9093, broker: 1 11:50:07 [DEBUG] wrote Fetch v16; broker: 1, bytes_written: 94, write_wait: 193.416µs, time_to_write: 9.292µs, err: 11:50:07 [DEBUG] read Fetch v16; broker: 1, bytes_read: 147, read_wait: 17.917µs, time_to_read: 123.25µs, err: 11:50:08 [DEBUG] heartbeating; group: mygroup 11:50:08 [DEBUG] wrote Heartbeat v4; broker: 2, bytes_written: 69, write_wait: 18µs, time_to_write: 39.417µs, err: 11:50:08 [DEBUG] read Heartbeat v4; broker: 2, bytes_read: 16, read_wait: 24.208µs, time_to_read: 253.792µs, err: 11:50:08 [DEBUG] heartbeat complete; group: mygroup, err: 11:50:10 [DEBUG] heartbeating; group: mygroup 11:50:10 [DEBUG] wrote Heartbeat v4; broker: 2, bytes_written: 69, write_wait: 14.959µs, time_to_write: 33.25µs, err: 11:50:10 [DEBUG] read Heartbeat v4; broker: 2, bytes_read: 16, read_wait: 72.791µs, time_to_read: 158.125µs, err: 11:50:10 [DEBUG] heartbeat complete; group: mygroup, err: 11:50:12 [DEBUG] skipping autocommit due to no offsets to commit; group: mygroup 11:50:13 [DEBUG] heartbeating; group: mygroup 11:50:13 [DEBUG] wrote Heartbeat v4; broker: 2, bytes_written: 69, write_wait: 22.167µs, time_to_write: 36.375µs, err: 11:50:13 [DEBUG] read Heartbeat v4; broker: 2, bytes_read: 16, read_wait: 21.042µs, time_to_read: 201.25µs, err: 11:50:13 [DEBUG] heartbeat complete; group: mygroup, err: 11:50:16 [DEBUG] heartbeating; group: mygroup 11:50:16 [DEBUG] wrote Heartbeat v4; broker: 2, bytes_written: 69, write_wait: 13.541µs, time_to_write: 35.542µs, err: 11:50:16 [DEBUG] read Heartbeat v4; broker: 2, bytes_read: 16, read_wait: 24.292µs, time_to_read: 191.791µs, err: 11:50:16 [DEBUG] heartbeat complete; group: mygroup, err: 11:50:17 [DEBUG] skipping autocommit due to no offsets to commit; group: mygroup 11:50:19 [DEBUG] heartbeating; group: mygroup 11:50:19 [DEBUG] wrote Heartbeat v4; broker: 2, bytes_written: 69, write_wait: 17.208µs, time_to_write: 34.084µs, err: 11:50:19 [DEBUG] read Heartbeat v4; broker: 2, bytes_read: 16, read_wait: 23.125µs, time_to_read: 160.25µs, err: 11:50:19 [DEBUG] heartbeat complete; group: mygroup, err: 11:50:22 [DEBUG] skipping autocommit due to no offsets to commit; group: mygroup 11:50:22 [DEBUG] heartbeating; group: mygroup 11:50:22 [DEBUG] wrote Heartbeat v4; broker: 2, bytes_written: 69, write_wait: 15.792µs, time_to_write: 30.333µs, err: 11:50:22 [DEBUG] read Heartbeat v4; broker: 2, bytes_read: 16, read_wait: 22.875µs, time_to_read: 152.667µs, err: 11:50:22 [DEBUG] heartbeat complete; group: mygroup, err: 11:50:25 [DEBUG] heartbeating; group: mygroup 11:50:25 [DEBUG] wrote Heartbeat v4; broker: 2, bytes_written: 69, write_wait: 17.167µs, time_to_write: 43.541µs, err: 11:50:25 [DEBUG] read Heartbeat v4; broker: 2, bytes_read: 16, read_wait: 26.334µs, time_to_read: 209.291µs, err: 11:50:25 [DEBUG] heartbeat complete; group: mygroup, err: 11:50:27 [DEBUG] skipping autocommit due to no offsets to commit; group: mygroup ```I'm going to close this, but if there's a need for max.poll.interval.ms (to forcefully quite the group if the client is stalling) we can revisit, but I need to understand the use case first. |
First of all, thank you very much for your project. It's very nice to have a good Kafka consumer that respects interfaces (compared to std confluent library) and also gives a better performance. Impressive work!
Coming to the problem, I need to set manually the
max.poll.interval.ms
with the consumer I use, but I see no option for it.Are there any plans for implementing this functionality in the future? Are there any workarounds to achieve the same result?
If you want to have this feature probably I can help, but I'll need some help finding documentation on how it should be implemented, so any help is very much appreciated.
UPD:
I find
CommitUncommittedOffsets
function VERY useful, what a nice thing to have!Still, I can not use it, because without
max.poll.interval.ms
the only approach that is left to my case is to poll every (SessionTimout/2) seconds while I'm processing collected records. Otherwise, the broker will consider the consumer frozen and will trigger rebalance which is quite a boomer.The text was updated successfully, but these errors were encountered: