Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to ensure that all offsets are committed successfully when commit manually #543

Open
hhxsv5 opened this issue Jun 21, 2023 · 0 comments
Labels

Comments

@hhxsv5
Copy link

hhxsv5 commented Jun 21, 2023

Description

The following code:

    $settings = [
        'socket.keepalive.enable'  => true,
        'log_level'                => LOG_WARNING,
        'enable.auto.offset.store' => 'true',
        'auto.offset.reset'        => 'earliest',
        'enable.partition.eof'     => 'false',
        'enable.auto.commit'       => 'false',
        'max.poll.interval.ms'     => 300000,
        'session.timeout.ms'       => 45000,
        'group.id'                 => 'test-group',
        'group.instance.id'        => uniqid('', true),
        'metadata.broker.list'     => 'kafka1:9092,kafka2:9092,kafka3:9092',
    ];
    foreach ($settings as $key => $value) {
        $conf->set($key, $value);
    }
    $consumer = new KafkaConsumer($conf);
    $consumer->subscribe(['dave-test1']);
    while (!$quit) {
        $message = $consumer->consume(60 * 1000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                echo "consume: {$message->payload}\n";
                $consumer->commitAsync($message);
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "consume: no more messages; will wait for more\n";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "consume: timed out\n";
                break;
            default:
                throw new \RuntimeException($message->errstr(), $message->err);
        }
    }
    try {
        // Is this the best practice to ensure the success of commit offset, such as commitAsync failure ?
        $consumer->commit(null);
    } catch (Exception $e) {
        if ($e->getCode() !== RD_KAFKA_RESP_ERR__NO_OFFSET) {
            echo 'commit fail: ' . $e->getMessage();
        }
    }
    echo "consumer process end PID {$process->pid}\n";

Resulted in this output:

Not sure commit all offsets 

But I expected this output instead:

Commit all offsets even if commitAsync fails

php-rdkafka Version

6.0.1

librdkafka Version

1.8.2

PHP Version

7.4.33

Operating System

Aws linux 2

Kafka Version

2.1.1

@hhxsv5 hhxsv5 added the bug label Jun 21, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant