Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch processing #518

Open
sauravaggarwal opened this issue Apr 26, 2022 · 5 comments
Open

Batch processing #518

sauravaggarwal opened this issue Apr 26, 2022 · 5 comments
Labels

Comments

@sauravaggarwal
Copy link

Description

Need to consume messages in batches with auto re balancing

@arnaud-lb
Copy link
Owner

Hi

Did you try the high level consumer + RdKafka\ConsumerTopic::consumeBatch ?

@sauravaggarwal
Copy link
Author

Hi,

Thanks for update.Using below configuration for consuming the messages. But getting single message.

    $conf = new \RdKafka\Conf();
    $conf->set('group.id', 'testing');
    $conf->set('metadata.broker.list', config('kafka'));
    $conf->set('sasl.username',config('kafka'));
    $conf->set('sasl.password',config('kafka'));
    $conf->set('sasl.mechanisms',config('kafka'));
    $conf->set('security.protocol',config('kafka'));
    $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
        switch ($err) {
            case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                echo "Assign: ";
                var_dump($partitions);
                $kafka->assign($partitions);
                break;

             case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                 echo "Revoke: ";
                 var_dump($partitions);
                 $kafka->assign(NULL);
                 break;

             default:
                throw new \Exception($err);
        }
    });
    $consumer = new \RdKafka\KafkaConsumer($conf);
    $consumer->subscribe(['test']);
    while (true) {
        $message = $consumer->consume(120*1000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                var_dump($message);
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "No more messages; will wait for more\n";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "Timed out\n";
                break;
            default:
                throw new \Exception($message->errstr(), $message->err);
                break;
        }
    }

Also if we are using multiple consumer to consume the messages. We are passing all the partition to both the worker, then both the worker are consuming the same data.Do we any any process to check if one consumer partition is running then we can block it running from another consumer?
Do you have any reference link to implement the same with example?

@nick-zh
Copy link
Collaborator

nick-zh commented Apr 28, 2022

batchConsume is from the legacy api (low level consumer) and in general it is not advised to use it.
The high level consumer, hides a lot of the internal logic and does a very good job handling multiple consumer for / topics / partitions on it's own so you don't need to care about that.
Furthermore, consuming in batch is not really in the Kafka spirit, since Kafka is all about streaming constant data.
If you want to implement this, you should do it in your project.
I saw your question here, same goes for this library, but you are free the create your own consumer class based on AbstractKafkaConsumer and implement that functionality.

@nick-zh
Copy link
Collaborator

nick-zh commented Apr 28, 2022

@arnaud-lb if you want to explore if this could be implemented, there is an approach mentioned here, but i think doing this in userland should just be fine 😃

@sauravaggarwal
Copy link
Author

Hi, We are using low-level consumer for batch processing. Can we identify if any consumer is running which is consuming the messages from that partition? Basically in case of multiple consumer need to divide the load on both the consumer and If we are running multiple consumer and reading the data from same topic and partition in low level consumer then there is data redundancy. Need to avoid that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants