Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Buffer Management and Message Processing in BroadwayKafka.Producer #126

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

slashmili
Copy link
Contributor

Hello,

First, I apologize for submitting this PR without prior notice.

This PR includes the following changes:

  1. Refactor buffer management into a separate BroadwayKafka.Producer.Buffer module.
  2. After fetching messages from the broker, prepend the messages to the front of the queue, and then dequeue the items based on demand.

Here are my reasons for these changes:

1. Refactor buffer management into BroadwayKafka.Producer.Buffer

I've been trying to understand the buffer system in this library and found it a bit challenging to follow. In the main branch, when dequeuing items, we are also updating acks, which complicates the logic around dequeue.

In addition because the buffer management is deep inside the producer, we are not able to test it. If it was just a simple queue pop in/out, I wouldn't be worry about the test but as you might know, we have quite a logic going on in the buffer.

2. enqueue and dequeue messages after each pull

The first reason for this change is that it provides a simpler approach, as messages follow the same path: first enqueued, then dequeued.

The second reason is that, when there are no new messages to enqueue, the dequeue operation picks up available messages from the queue, increasing data throughput.

defdelegate empty?(buffer), to: :queue, as: :is_empty

@spec dequeue(t, count :: non_neg_integer) ::
{t, list({key :: key, items :: list(Message.t()), last_item :: Message.t()}),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personally I don't like the return value of this function but it's done to avoid some common operation on the list.

  • t: the changed buffer
  • list({key :: key, items :: list(Message.t()), last_item :: Message.t()}):
    • key: the key that the items where popped out, it's needed to do some ack operation.
    • items: the actual message that is going to be proceeded.
    • last_item: The producer always needs the offset of the last popped item in a given key
  • dequeued_count: the number of items that are popped is needed to do new demand calculation.

In the ideal world, I'd like this function to only return:

{t, list(Message.t())}

but then we have to traverse the list multiple times to get:

  • Number of dequeued items
  • list of unique keys in the items
  • the last item in for each set of keys

@josevalim
Copy link
Member

Thank you @slashmili for the PR. Can you please give it a try in your systems prod for a while and, if all good, you let us know?

if at == :rear do
:queue.in({key, list}, buffer)
else
:queue.in_r({key, list}, buffer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we were not used to do a in_r but now we do, what is the rationale for needing this operation now?

Copy link
Contributor Author

@slashmili slashmili Apr 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in main branch we pull data from the broker for a given key, and then take to_send from the newly fetched messages and enqueue the rest into the buffer:

messages = fetch_messages_from_kafka(state, key, offset)
to_send = min(demand, max_demand)
{new_acks, not_sent, messages, pending} = split_demand(messages, acks, key, to_send)
new_buffer = enqueue_many(state.buffer, key, pending)

What I changed in this PR is the following:

  1. Enqueue the messages in front of the queue,
  2. Dequeue to_send amount of messages from the queue(which dequeues from the messages it just enqueued)
  3. As this is a ring buffer, the rest of the messages for this key will be pushed back to the end of buffer to be consumed later.

https://github.com/slashmili/broadway_kafka/blob/2e0f559117da47fc5c1babb45561a015288bcfcb/lib/broadway_kafka/producer.ex#L671-L681

It seems counterintuitive at first but leave us with a simpler logic in my opinion. The Producer doesn't need to worry about reverse splitting the messages.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect. So I dropped one comment on the typespec. We can fix CI and then this is good to go to me!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants