Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic offset tracking for stream queues #661

Open
wants to merge 50 commits into
base: main
Choose a base branch
from

Conversation

viktorerlingsson
Copy link
Member

@viktorerlingsson viktorerlingsson commented Apr 17, 2024

WHAT is this pull request doing?

Adds broker tracking of consumer offsets in streams if no x-stream-offset is provided by the consumer. Does not track if the consumer tag is generated by the broker.

✅ When to run cleanup_consumer_offsets?
✅ IndexError when trying to cleanup if msg_size => segment_size

HOW can this pull request be tested?

Run specs

@viktorerlingsson viktorerlingsson force-pushed the streams_automatic_offset_tracking branch 2 times, most recently from 2ecc8d3 to e00b4e5 Compare April 23, 2024 13:56
src/lavinmq/mfile.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
@viktorerlingsson viktorerlingsson linked an issue May 14, 2024 that may be closed by this pull request
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
src/lavinmq/queue/stream_queue_message_store.cr Outdated Show resolved Hide resolved
@viktorerlingsson viktorerlingsson force-pushed the streams_automatic_offset_tracking branch from 194926e to e58ad9b Compare May 23, 2024 14:38
@viktorerlingsson viktorerlingsson marked this pull request as ready for review June 3, 2024 08:09
StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1
end

it "should not use saved offset if x-stream-offset is set" do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want this behavior? I'm thinking that if you setup a consumer that says x-stream-offset=10 and it start consumes to offset=20 if I restart that consumer then I would expect it to continue from 21, not 10, even though the x-stream-offset says 10.
Otherwise I have to specify x-stream-offset first time and then somehow remove the argument if the consumer is restarted and should continue. WDYT?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit tricky IMO. But in general I think that if you specify x-stream-offset when consuming, that should take precedent over other things.

Let's say you get messages 10-20, and then decide you want to get them again for whatever reason (or jump to another part of the stream), there would be no way for you to do that if we use our tracking.

An alternative could be to use an extra argument (x-stream-auto-offset or something), where we think of x-stream-offset as a starting point, and then use our internal tracking for any subsequent requests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How this works in Kafka is that offsets are stored based on consumer group name, so if you want to start over using a new offset you need to rename the group. Changing offset when using a group that has already a stored offset on the broker has no effect. They do have an API call into kafka to reset a consumer group offset so you can tell a running consumer to start reading from somewhere else, but that is just a side note.

For rabbitmq Streams it looks like the offset it stored based on consumer name (same feature where you can group consumers on) so if order to "reset" the offset and use another value you need to rename the consumer. Do we have a name of something for a consumer that we can use in this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For rabbitmq Streams it looks like the offset it stored based on consumer name (same feature where you can group consumers on) so if order to "reset" the offset and use another value you need to rename the consumer. Do we have a name of something for a consumer that we can use in this case?

We use the consumer tag, so you can always "start over" by just setting a new tag.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, then if we can reset by changing consumer tag I think the broker should use stored offsets if there are any and if not use the one given by the consumer using the argument x-stream-offset

So use stored offset if exists for consumer tags, otherwise use the offset given by consumer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not sure I agree - I think that if you provide x-stream-offset you explicitly say what you want. That would also change existing behavior where you always get what you request when providing an x-stream-offset. But let's discuss more during tomorrows meeting!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@snichme added x-stream-use-automatic-offset so we can have it both ways :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yay, nice!

@@ -34,7 +36,10 @@ module LavinMQ
raise Error::PreconditionFailed.new("x-priority not supported on stream queues")
end
case frame.arguments["x-stream-offset"]?
when Nil, Int, Time, "first", "next", "last"
when Nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RabbitMQ says that if x-stream-offset isn't specified it uses "next" as detault:

If it is unspecified the consumer will start reading from the next offset written to the log after the consumer starts

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah that's right. I've noticed that as well but forgot to do anything about it 🙈

I guess it's been like that since we released streams though - should we change our behavior to match, or should we note that as something that we do differently?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, that quote is actually from the stream client, as rabbit doesn't support this for the amqp clients.
So based on that we can do what we want. So if we have one way it works now, leave it at that for now might be good.

StreamQueueSpecHelpers.offset_from_headers(msg_2.properties.headers).should eq 1
end

it "should not use saved offset if x-stream-offset is set" do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yay, nice!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Stream queues: Automatic offset tracking
3 participants