-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Automatic offset tracking for stream queues #661
base: main
Are you sure you want to change the base?
Conversation
2ecc8d3
to
e00b4e5
Compare
…e on init to facilitate simple write.
Co-authored-by: Carl Hörberg <carl@84codes.com>
Co-authored-by: Carl Hörberg <carl@84codes.com>
Co-authored-by: Carl Hörberg <carl@84codes.com>
194926e
to
e58ad9b
Compare
StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1 | ||
end | ||
|
||
it "should not use saved offset if x-stream-offset is set" do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want this behavior? I'm thinking that if you setup a consumer that says x-stream-offset=10
and it start consumes to offset=20
if I restart that consumer then I would expect it to continue from 21, not 10, even though the x-stream-offset
says 10.
Otherwise I have to specify x-stream-offset
first time and then somehow remove the argument if the consumer is restarted and should continue. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit tricky IMO. But in general I think that if you specify x-stream-offset when consuming, that should take precedent over other things.
Let's say you get messages 10-20, and then decide you want to get them again for whatever reason (or jump to another part of the stream), there would be no way for you to do that if we use our tracking.
An alternative could be to use an extra argument (x-stream-auto-offset or something), where we think of x-stream-offset as a starting point, and then use our internal tracking for any subsequent requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How this works in Kafka is that offsets are stored based on consumer group name, so if you want to start over using a new offset you need to rename the group. Changing offset when using a group that has already a stored offset on the broker has no effect. They do have an API call into kafka to reset a consumer group offset so you can tell a running consumer to start reading from somewhere else, but that is just a side note.
For rabbitmq Streams it looks like the offset it stored based on consumer name (same feature where you can group consumers on) so if order to "reset" the offset and use another value you need to rename the consumer. Do we have a name of something for a consumer that we can use in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For rabbitmq Streams it looks like the offset it stored based on consumer name (same feature where you can group consumers on) so if order to "reset" the offset and use another value you need to rename the consumer. Do we have a name of something for a consumer that we can use in this case?
We use the consumer tag, so you can always "start over" by just setting a new tag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, then if we can reset by changing consumer tag I think the broker should use stored offsets if there are any and if not use the one given by the consumer using the argument x-stream-offset
So use stored offset if exists for consumer tags, otherwise use the offset given by consumer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I'm not sure I agree - I think that if you provide x-stream-offset
you explicitly say what you want. That would also change existing behavior where you always get what you request when providing an x-stream-offset
. But let's discuss more during tomorrows meeting!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@snichme added x-stream-use-automatic-offset
so we can have it both ways :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yay, nice!
…g x-stream-use-automatic-offset
@@ -34,7 +36,10 @@ module LavinMQ | |||
raise Error::PreconditionFailed.new("x-priority not supported on stream queues") | |||
end | |||
case frame.arguments["x-stream-offset"]? | |||
when Nil, Int, Time, "first", "next", "last" | |||
when Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RabbitMQ says that if x-stream-offset
isn't specified it uses "next" as detault:
If it is unspecified the consumer will start reading from the next offset written to the log after the consumer starts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, yeah that's right. I've noticed that as well but forgot to do anything about it 🙈
I guess it's been like that since we released streams though - should we change our behavior to match, or should we note that as something that we do differently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, that quote is actually from the stream client, as rabbit doesn't support this for the amqp clients.
So based on that we can do what we want. So if we have one way it works now, leave it at that for now might be good.
StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1 | ||
end | ||
|
||
it "should not use saved offset if x-stream-offset is set" do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yay, nice!
WHAT is this pull request doing?
Adds broker tracking of consumer offsets in streams if no
x-stream-offset
is provided by the consumer. Does not track if the consumer tag is generated by the broker.✅ When to run
cleanup_consumer_offsets
?✅ IndexError when trying to cleanup if msg_size => segment_size
HOW can this pull request be tested?
Run specs