Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

"KafkaJSGroupCoordinatorNotFound" errors in EoS tests #230

Closed
ianwsperber opened this issue Dec 6, 2018 · 4 comments
Closed

"KafkaJSGroupCoordinatorNotFound" errors in EoS tests #230

ianwsperber opened this issue Dec 6, 2018 · 4 comments

Comments

@ianwsperber
Copy link
Contributor

ianwsperber commented Dec 6, 2018

While developing the EoS project locally I've repeatedly encountered issues with "KafkaJSGroupCoordinatorNotFound" errors, "Failed to find group coordinator". The actual underlying error code from Kafka is GROUP_COORDINATOR_NOT_AVAILABLE. I believe this has something to do with the local test setup vis-a-vis the docker container... these errors have not seemed to pop up when my branches have run with a fresh container through CI. So not sure if this has any bearing on how these features will behave in production, but it's been enough of a pain that I wanted to raise awareness in case it comes up in the future. These errors are non-deterministic, it always seems if you re-run the tests eventually you'll have a run where it finds the coordinator.

I've been running my tests by starting the docker container and leaving it running, then running tests as needed. It's possible things are getting into a bad state because of that.

Example:

 FAIL  src/producer/index.spec.js (20.545s)
  ● Producer › transactions › supports sending offsets

    KafkaJSGroupCoordinatorNotFound: Failed to find group coordinator

      293 |     }
      294 |
    > 295 |     throw new KafkaJSGroupCoordinatorNotFound('Failed to find group coordinator')
          |           ^
      296 |   }
      297 |
      298 |   /**

      at Cluster.findGroupCoordinatorMetadata (src/cluster/index.js:295:11)

The stack trace isn't very helpful but it's likely originating in our call to initProducerId, as this is as first call to find the group coordinator.

@ianwsperber
Copy link
Contributor Author

Example debug logs:

  console.error src/loggers/console.js:15
    {"level":"ERROR","timestamp":"2018-12-06T23:15:19.790Z","logger":"kafkajs","message":"[Connection] Response GroupCoordinator(key: 10, version: 1)","broker":"192.168.1.5:9098","clientId":"test-96fa365332fa8397f323-5119-07d294b5-275e-4009-92a2-a3f26fb0bce0","error":"The group coordinator is not available","correlationId":15,"size":22}

  console.log src/loggers/console.js:19
    {"level":"DEBUG","timestamp":"2018-12-06T23:15:19.791Z","logger":"kafkajs","message":"[Connection] Response GroupCoordinator(key: 10, version: 1)","broker":"192.168.1.5:9098","clientId":"test-96fa365332fa8397f323-5119-07d294b5-275e-4009-92a2-a3f26fb0bce0","error":"The group coordinator is not available","correlationId":15,"payload":{"type":"Buffer","data":[0,0,0,0,0,15,255,255,255,255,255,255,0,0,255,255,255,255]}}

  console.log src/loggers/console.js:19
    {"level":"DEBUG","timestamp":"2018-12-06T23:15:19.791Z","logger":"kafkajs","message":"[Cluster] Tried to find group coordinator","nodeId":"2","error":{"name":"KafkaJSProtocolError","retriable":true,"type":"GROUP_COORDINATOR_NOT_AVAILABLE","code":15}}

  console.log src/loggers/console.js:19
    {"level":"DEBUG","timestamp":"2018-12-06T23:15:19.792Z","logger":"kafkajs","message":"[Cluster] Group coordinator not available, retrying...","nodeId":"2","retryCount":14,"retryTime":1000}

@ianwsperber
Copy link
Contributor Author

I think I just needed to increase the memory limit on docker - I realized one of the brokers was exiting with code 137. IIRC kafka transactions require 3 brokers to work properly. After bumping my memory limit from 2 gigs to 4 gigs bug stopped appearing. See moby/moby#21083

Exactly Once Delivery and Transactional Messaging automation moved this from To do to Done Dec 7, 2018
@tulios
Copy link
Owner

tulios commented Dec 7, 2018

@ianwsperber maybe we should add a note on the "development" section, that's the reason I split the tests into groups for the CI.

@arupsarkar-sfdc
Copy link

arupsarkar-sfdc commented Dec 21, 2023

I am having the same problem, my app is running in heroku and my kafka broker is also initiated in heroku. I am getting the following error when I am trying to do "consumer.run"

Cluster.findGroupCoordinatorMetadata (/app/node_modules/kafkajs/src/cluster/index.js:420:11)
process.processTicksAndRejections (node:internal/process/task_queues:95:5)

async /app/node_modules/kafkajs/src/cluster/index.js:346:33
async [private:ConsumerGroup:join] (/app/node_modules/kafkajs/src/consumer/consumerGroup.js:167:24)
async /app/node_modules/kafkajs/src/consumer/consumerGroup.js:335:9
async Runner.start (/app/node_modules/kafkajs/src/consumer/runner.js:84:7)
async start (/app/node_modules/kafkajs/src/consumer/index.js:243:7)
async Object.run (/app/node_modules/kafkajs/src/consumer/index.js:304:5)

Object.startConsumer (/app/server/kafka-server.js:106:9)
async /app/server/app.js:477:5

The following is my consumer.run code, I have tried from very simple to what you are seeing below. I am able to succefully publish messages, fyi I have SSL enabled on heroku and also when creating my kafka client.

What am I missing ?

        //create the consumer
        const consumer = kafka.consumer({ groupId: 'my-app'});
        //connect to consumer group
        await consumer.connect()
        .then(() => {
          console.log("consumer connected");
        })
        .catch((error) => {
          console.error("Error connecting consumer", error);
        })
        //Sunscribe to topic
        await consumer.subscribe({ topic: 'pearl-3815.datacloud-streaming-channel', fromBeginning: true });
        // run the consumer
        await consumer.run({
          eachBatchAutoResolve: true,
          eachBatch: async ({
              batch,
              resolveOffset,
              heartbeat,
              commitOffsetsIfNecessary,
              uncommittedOffsets,
              isRunning,
              isStale,
              pause,
          }) => {
              for (let message of batch.messages) {
                  console.log({
                      topic: batch.topic,
                      partition: batch.partition,
                      highWatermark: batch.highWatermark,
                      message: {
                          offset: message.offset,
                          key: message.key.toString(),
                          value: message.value.toString(),
                          headers: message.headers,
                      }
                  })
      
                  resolveOffset(message.offset)
                  await heartbeat()
              }
          },
      })
      .then((data) => {
        console.log("consumer run data", data);
      })
      .catch((error) => {
        console.log("consumer run error", error);
      })

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
No open projects
Development

No branches or pull requests

3 participants