Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry producing on next partition if possible when a partition is not… #887

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

pkumar-singh
Copy link
Member

@pkumar-singh pkumar-singh commented Nov 3, 2022

Motivation

When a topic is a partitioned topic and a partition is not available for producing messages, currently pulsar client will still try to produce messages on unavailable partitions, which it may not necessarily need to do in certain cases. Pulsar Client may simply pick up another partition and try producing in certain cases.
Partition Unavailable
There could be a plethora of reasons a partition can become unavailable. But the most prominent reason is partition is moving from one broker to another, and until every actor is in sync with which broker owns the partition, the partition will be unavailable for producing. Actors are producers, old broker, new broker.

Client Behavior
This is the typical produce code.
producer.sendAsync(payLoad.getBytes(StandardCharsets.UTF_8));

When send is called message is enqueued in a queue(called pending message queue) and the future is returned.
And future is only completed when the message is picked from the queue and sent to the broker asynchronously and ack is received asynchronously again. Max size of the pending message queue is controlled by producer config maxPendingMessages.
When pending message queue is full, the application will start getting publish failures. Pending message queue provide a cushion towards unavailable partitions. But again it has some limits.

When another partitions can be picked

When the message is not keyed. That means the message is not ordered based on a key.
When routing mode is round-robin, that means a message can be produced to any of the partitions. So If a partition is unavailable try and pick up another partition for producing, by using the same round-robin algorithm.

Copy link
Member

@shibd shibd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks good to me. This is a new API, maybe other clients also need support. We need to write a PIP first and discuss it in the mail.

return producers[partition]
producerForPartition := producers[partition].(*partitionProducer)
if producerForPartition.getProducerState() != producerReady {
nextPartition := getNextConnectedPartition(p, msg, partition, 5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
nextPartition := getNextConnectedPartition(p, msg, partition, 5)
nextPartition := getNextConnectedPartition(p, msg, partition, p.options.MaxRetryOtherPartitions)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants