Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove the BatchQueue type #1117

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

PleasingFungus
Copy link
Contributor

@PleasingFungus PleasingFungus commented Apr 17, 2023

This was added in #716 to 'preserve message order across batches'. However, as noted in #1058, a simple channel performs the same function. To allow us to add backpressure for producers, let's switch back to a channel and add a new MaxBufferedBatches configuration option. This prevents producers from seeing unbounded memory usage and quickly OOMing when they produce faster than brokers can keep up with.

This was added in segmentio#716 to 'preserve message order across batches'.
However, as noted in segmentio#1058, a simple channel performs the same
function. To allow us to add backpressure for producers (coming
in a later commit), let's switch back to a channel.
@PleasingFungus
Copy link
Contributor Author

Looking into test failures.

@rhansen2
Copy link
Collaborator

Hi @PleasingFungus, With the unbuffered channel, are all writes now synchronous until the last batch is being written for a given topic-partition? Are writes to one topic-partition now blocking writes to other topic-partitions until the final batch for a topic-partition is being sent?

@PleasingFungus
Copy link
Contributor Author

https://app.circleci.com/pipelines/github/segmentio/kafka-go/1243/workflows/144442e6-8c6b-4872-8dde-f6fb0e9cb1b0/jobs/13622 seems to be a pre-existing test failure in TestCloseLeavesGroup. I can repro the same issue in main.

In https://app.circleci.com/pipelines/github/segmentio/kafka-go/1243/workflows/144442e6-8c6b-4872-8dde-f6fb0e9cb1b0/jobs/13625, TestClientCreateTopics seems to also already be broken before this PR. I can't reproduce the failure in TestClientCreatePartitionsNoAssignments; it may be an issue with multiple tests running in parallel.

In https://app.circleci.com/pipelines/github/segmentio/kafka-go/1243/workflows/144442e6-8c6b-4872-8dde-f6fb0e9cb1b0/jobs/13618 , I likewise can't reproduce the failure in TestReaderReadCompactedMessage.

@PleasingFungus
Copy link
Contributor Author

Hi, @rhansen2!

I'm sorry, you're quite right - the initial version of this PR was flawed. I tried to separate out the MaxBufferedBatches configuration option to simplify the initial PR, but that would result in undesired blocking, as you suggested. I've pushed a fixed version.

@rhansen2 rhansen2 self-assigned this Apr 28, 2023
@rhansen2
Copy link
Collaborator

rhansen2 commented May 5, 2023

I think these changes still represent a behavior change. Keeping the behavior the same as it was while still reintroducing ordering was the original motivation for the current queuing implementation. Could you describe a bit the need for this behavior in the library vs potentially implementing it at a different layer of an application?

@PleasingFungus
Copy link
Contributor Author

Could you describe a bit the need for this behavior in the library vs potentially implementing it at a different layer of an application?

Sure thing! The core use case that we encountered was a producer which writes messages significantly faster than the Kafka brokers are able to respond. We encountered this during performance testing while evaluating this library, but it's easy to imagine this happening in production - perhaps there's a burst of incoming data, or perhaps the Kafka brokers are under heavy load or otherwise have their response time degraded.

Without the changes in this PR, BatchQueue would rapidly grow, causing unbounded memory growth in the application. This would eventually cause an OOM crash.

It would not be possible to fix this issue at a higher level of the application. The application does not know how fast the brokers are responding to messages - that information is only available to kafka-go. It needs to write asynchronously under normal circumstances, because waiting for a round trip response is far too slow.

There are three responses that kafka-go can make in this case:

  1. Allow unbounded buffer growth. (Status quo.)
  2. Drop messages above some buffer size.
  3. Block production above some buffer size.

ISTM that (3) is the only response which will not lead to crashes or data loss in this scenario. What do you think?

@rhansen2
Copy link
Collaborator

Thanks for the detailed response!

It would not be possible to fix this issue at a higher level of the application. The application does not know how fast the brokers are responding to messages - that information is only available to kafka-go.

Could this be accomplished by tracking the number of unwritten messages? I think could be done by tracking the difference between the messages passed to WriteMessages and those that have been passed to a Completion function on the writer.

Given the change in behavior here, do you think it'd be possible to implement this is a backwards compatible way?

@PleasingFungus
Copy link
Contributor Author

Could this be accomplished by tracking the number of unwritten messages? I think could be done by tracking the difference between the messages passed to WriteMessages and those that have been passed to a Completion function on the writer.

Good question! Are you imagining something like this?

type sender struct {
	w           *kafka.Writer
	maxInFlight chan struct{}
}

func newSender() (s *sender) {
	const batchSize = 100
	const maxBatchesInFlight = 100
	return &sender{
		w: &kafka.Writer{
			Completion: func(messages []kafka.Message, err error) {
				for i := 0; i < len(messages); i++ {
					<-s.maxInFlight
				}
			},
			/*...*/

		},
		maxInFlight: make(chan struct{}, batchSize*maxBatchesInFlight),
	}
}

func (s *sender) sendMessageWithBackpressure(ctx context.Context, msg kafka.Message) error {
	s.maxInFlight <- struct{}{}
	return s.w.WriteMessages(ctx, msg)
}

It seems possible to me that this might work, though it's substantially more complex than either the status quo or this PR - very possible that I'm overlooking some subtle issue. It also leaves the default behavior as the unsafe "unbounded memory usage" from the last sentence of the original PR description.

Given the change in behavior here, do you think it'd be possible to implement this is a backwards compatible way?

Sure, fair enough. I've taken a shot at this in d9c7ab9 (only lightly tested) - PTAL.

@rhansen2
Copy link
Collaborator

rhansen2 commented Jul 7, 2023

Thanks for your continued patience with this issue!

The example you posted is what I was thinking.

In the backwards compatible commit, does the use of goroutines maintain the ordering of messages in all cases?

@PleasingFungus
Copy link
Contributor Author

Thanks for your continued patience with this issue!

No worries :)

In the backwards compatible commit, does the use of goroutines maintain the ordering of messages in all cases?

Yes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants