Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support force using ListOffsetsRequest to fetch offset #699

Closed
Max-Cheng opened this issue Mar 28, 2024 · 10 comments
Closed

Support force using ListOffsetsRequest to fetch offset #699

Max-Cheng opened this issue Mar 28, 2024 · 10 comments

Comments

@Max-Cheng
Copy link
Contributor

Why have this proposal

Some cloud service providers (such as alibabacloud) haven't implemented OffsetsForLeaderEpoch.

What happened

Franz-go uses OffsetsForLeaderEpoch to fetch leader epoch/End Offset and compare consumer offset between leader. After restart, will trigger reset offset behaviour.

Log

[root@xxxx dig]# ./kcl_linux_amd64 misc list-offsets TARGET-TOPIC
BROKER  TOPIC                  PARTITION  START  END   ERROR
101     TARGET-TOPIC  0          170    270
102     TARGET-TOPIC  1          100    100
103     TARGET-TOPIC  2          100    100
101     TARGET-TOPIC  3          100    100
102     TARGET-TOPIC  4          36     100
103     TARGET-TOPIC  5          0      85
[root@xxxx dig]# ./kcl_linux_amd64 misc offset-for-leader-epoch TARGET-TOPIC
BROKER  TOPIC                  PARTITION  LEADER EPOCH  END OFFSET  ERROR
101     TARGET-TOPIC  0          0             0
101     TARGET-TOPIC  3          0             0
102     TARGET-TOPIC  1          0             0
102     TARGET-TOPIC  4          0             0
103     TARGET-TOPIC  2          0             0
103     TARGET-TOPIC  5          0             0
[root@xxxx dig]# ./kcl_linux_amd64 group describe -v xxxx
GROUP        xxxx
COORDINATOR  103
STATE        Stable
BALANCER     cooperative-sticky
MEMBERS      3
TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   MEMBER-ID                                 CLIENT-ID  HOST
TARGET-TOPIC  0          270             270             0     kgo-xxxx  kgo        /xxxx
TARGET-TOPIC  1          100             100             0     kgo-xxxx  kgo        /xxxxx
TARGET-TOPIC  2          100             100             0     kgo-xxxx  kgo        /xxxx
TARGET-TOPIC  3          100             100             0     kgo-xxxx  kgo        /xxxx
TARGET-TOPIC  4          100             100             0     kgo-xxxx  kgo        /xxxx
TARGET-TOPIC  5          85              85              0     kgo-xxxx  kgo        /xxxxx
@Max-Cheng
Copy link
Contributor Author

The main issue is that method kgo.Client.loadEpochsForBrokerLoad() return the broker end-offset to 0,thus client reset offset into the FAKE end-offset, cause re-consume message from kafka

@Max-Cheng
Copy link
Contributor Author

draft:#700

@Max-Cheng
Copy link
Contributor Author

The main issue is that method kgo.Client.loadEpochsForBrokerLoad() return the broker end-offset to 0,thus client reset offset into the FAKE end-offset, cause re-consume message from kafka

Update: using param to force supportsOffsetForLeaderEpoch return false to skip endoffset check.

@twmb
Copy link
Owner

twmb commented Apr 3, 2024

Why is the provider returning the ApiKey for OffsetsForLeaderEpoch in the ApiVersions response?
There shouldn't need to be an escape hatch here.

@Max-Cheng
Copy link
Contributor Author

Max-Cheng commented Apr 4, 2024

Why is the provider returning the ApiKey for OffsetsForLeaderEpoch in the ApiVersions response? There shouldn't need to be an escape hatch here.

Yes, I am also extremely puzzled as to why Alibaba would construct an ApiVersions response with a fake value. However, from the logs, I found that the reason for resetting was because the endoffset was 0, which led to the consumer consuming repeatedly (auto.reset.offset). At the same time, I also confirmed with their technical support to understand whether this interface was working correctly internally. But the response I received was that they do not support this interface.

@twmb
Copy link
Owner

twmb commented Apr 24, 2024

Can you file a ticket with Alibaba with evidence they are supporting the API?

Also, I think we can avoid your feature for now -- what do you think about using a kversions.Versions that specifically pins OffsetForLeaderEpoch to a max version of -1?

@Max-Cheng
Copy link
Contributor Author

This ticket was communicated through Alibaba Cloud's DingTalk, involving our client. I am unable to show you the complete content of the ticket.
image

Meeting Content:
The issue arises from the fact that the "cloud storage type" topic in Kafka does not support the relevant API calls.

Solution:
You can utilize the "local storage type" topic, which is entirely consistent with the open-source version.

My idea is that alibaba cloud shouldn't make such a ridiculous implementation here, and shouldn't just modify the kversions.Versions to avoid this behavior, idk other cloud provider "implement" such as alibaba, it doesn't make sense.

@Max-Cheng
Copy link
Contributor Author

Why using another flag to avoid this behavior: because they claim customer can spefic version to install, thus this won't a version issue
image

@twmb
Copy link
Owner

twmb commented May 23, 2024

Sorry for the delayed reply, I meant to post this much sooner.

What I mean is, rather than introducing a whole new config option to disable OffsetForLeaderEpoch, why don't you do something like this?

func main() {
	v := kversion.Stable()
	v.SetMaxKeyVersion(kmsg.OffsetForLeaderEpoch.Int16(), -1)
	cl, err := kgo.NewClient(
		kgo.MaxVersions(v),

This will opt out of the offset for leader epoch key (I think)

@twmb twmb closed this as completed May 26, 2024
@twmb twmb closed this as not planned Won't fix, can't repro, duplicate, stale May 26, 2024
@Max-Cheng
Copy link
Contributor Author

Sorry for the delayed reply, I meant to post this much sooner.

What I mean is, rather than introducing a whole new config option to disable OffsetForLeaderEpoch, why don't you do something like this?

func main() {
	v := kversion.Stable()
	v.SetMaxKeyVersion(kmsg.OffsetForLeaderEpoch.Int16(), -1)
	cl, err := kgo.NewClient(
		kgo.MaxVersions(v),

This will opt out of the offset for leader epoch key (I think)

Yep. I think this option will help correct the wrong behavior. But after rethinking, what if those Cloud-Providers implement a "fake" Kafka protocol based on their claimed specific version? Rolling back the version may not be a good idea.

But Thanks you reply.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants