Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaJS Consumer Offset Reset Issue after Manual Commit #1660

Open
aderounmu opened this issue Jan 19, 2024 · 3 comments
Open

KafkaJS Consumer Offset Reset Issue after Manual Commit #1660

aderounmu opened this issue Jan 19, 2024 · 3 comments

Comments

@aderounmu
Copy link

Describe the bug:
When using manual commit and committing the next offset (offset + 1), if the consumer is shut down and restarted, the following errors occur: "The requested offset is not within the range of offsets maintained by the server" and "[ConsumerGroup] Offset out of range, resetting to default offset." After these errors, the consumer starts consuming from the first offset again.

To Reproduce:

  1. Start the consumer with the provided code snippet.
  2. Manually commit the next offset (offset + 1).
  3. Shutdown the consumer.
  4. Restart the consumer.

Expected behavior:
The consumer should not consume any more messages after a manual commit, similar to the behavior of automatic commit.

Observed behavior:
After restarting the consumer, the following errors are encountered:

  • "The requested offset is not within the range of offsets maintained by the server."
  • "[ConsumerGroup] Offset out of range, resetting to default offset."
    The consumer then starts consuming from the first offset again.

Environment:

  • OS: macOS 13.1
  • KafkaJS version: kafkajs@2.2.4
  • Kafka version: v3.3.2
  • NodeJS version: v20.3.0

Additional context:

  • The consumer code snippet is provided in the description.
  • The consumer is part of the 'test-group-4' consumer group.
  • The errors indicate that the offset requested is not within the server's maintained range, and the consumer group resets to the default offset.

Error Message:

{"level":"ERROR","timestamp":"2024-01-19T04:46:53.973Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 11)","broker":"127.0.0.1:9092","clientId":"example1-consumer","error":"The requested offset is not within the range of offsets maintained by the server","correlationId":0,"size":124} 
{"level":"ERROR","timestamp":"2024-01-19T04:46:53.975Z","logger":"kafkajs","message":"[ConsumerGroup] Offset out of range, resetting to default offset","topic":"TEST_KPI_MESSAGE","partition":0,"groupId":"test-group-4","memberId":"example1-consumer-85565b63-3099-4ec5-a2b0-1beb7ba74738"} 

Consumer Code

const { Kafka, logLevel } = require('kafkajs');
const kafka = new Kafka({
  logLevel: logLevel.ERROR,
  brokers: [`localhost:9092`],
  clientId: 'example1-consumer',
});

function timeout(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

const topic = 'TEST_KPI_MESSAGE';
const consumer = kafka.consumer({ groupId: 'test-group-4' });
let timeEventConsumer = 0;

const run = async () => {
  await consumer.connect();
  console.log('consumer connected');
  await consumer.subscribe({ topic, fromBeginning: true });
  await consumer.run({
    // eachBatch: async ({ batch }) => {
    //   console.log(batch)
    // },
    autoCommit: false,
    eachMessage: async ({ topic, partition, message }) => {
      const prefix = `${topic}[${partition} | ${message.offset}] / ${message.timestamp}`;
      console.log(`- ${prefix} ${message.key}#${message.value}`);

      await timeout(2000);
      timeEventConsumer = timeEventConsumer + 1;
      console.log(timeEventConsumer);

      await consumer.commitOffsets([
        {
          topic,
          partition,
          offset: Number(message.offset + 1).toString(),
        },
      ]);
    },
  });
};

console.log("App 2 Started");
// while (true){
run().catch(e => console.error(`[example/consumer] ${e.message}`, e));
//

const errorTypes = ['unhandledRejection', 'uncaughtException'];
const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'];

errorTypes.forEach(type => {
  process.on(type, async e => {
    try {
      console.log(`process.on ${type}`);
      console.error(e);
      await consumer.disconnect();
      process.exit(0);
    } catch (_) {
      process.exit(1);
    }
  });
});

signalTraps.forEach(type => {
  process.once(type, async () => {
    try {
      await consumer.disconnect();
    } finally {
      process.kill(process.pid, type);
    }
  });
});
@vihapr
Copy link

vihapr commented Feb 12, 2024

Probably where is an error in your code:

offset: Number(message.offset + 1).toString(),

Should be:

offset:  (Number(message.offset) + 1).toString(),

@xtrembaker
Copy link

I agree with @vihapr but still get the error: "The requested offset is not within the range of offsets maintained by the server".

@lpinho77
Copy link

lpinho77 commented Apr 18, 2024

Had the same problem, fixed by parsing the message.offset before adding 1

offset: (parseInt(message.offset, 10) + 1).toString(),

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants