You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hi, we use AddConsumeTopics(foo) to add new topics to the client dynamically. This works great. Unfortunately, we experienced redelivering of old messages once PurgeTopicsFromConsuming(foo) has been called after AddConsumeTopics(foo). It seems that the client forget that we have set the offset to kgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().UnixMilli())). Is this expected? I believe it is because PurgeTopicsFromConsuming(foo) will also remove the offset state.
We were able to workaround this by using PauseFetchTopics and ResumeFetchTopics
The text was updated successfully, but these errors were encountered:
I'm a bit confused here -- when you say "It seems that the client forget that we have set the offset to ... time.Now()", doesn't that mean that the time.Now is set once, and then any time you purge and re-add the topic, re-adding it will re-consume from the original offset milli.
What is your expected goal? Purging does remove literally everything about the state of a topic within the client. What is not removed is any initial configuration -- initial configuration is used when re-adding the topic.
If you intend to pause consuming and then resume later, PauseFetchTopics and ResumeFetchTopics are the two correct APIs to use.
I'm going to pre-emptively close this issue hoping my message answers your question, but please reply if it did not.
I'm a bit confused here -- when you say "It seems that the client forget that we have set the offset to ... time.Now()", doesn't that mean that the time.Now is set once, and then any time you purge and re-add the topic, re-adding it will re-consume from the original offset milli.
Exactly, that's the conclusion I came to as well. It was not obvious to me because I created the client once with the offset config. Thanks for confirming.
Hi, we use
AddConsumeTopics(foo)
to add new topics to the client dynamically. This works great. Unfortunately, we experienced redelivering of old messages oncePurgeTopicsFromConsuming(foo)
has been called afterAddConsumeTopics(foo)
. It seems that the client forget that we have set the offset tokgo.ConsumeResetOffset(kgo.NewOffset().AfterMilli(time.Now().UnixMilli()))
. Is this expected? I believe it is becausePurgeTopicsFromConsuming(foo)
will also remove the offset state.We were able to workaround this by using
PauseFetchTopics
andResumeFetchTopics
The text was updated successfully, but these errors were encountered: