Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JetStream KV gets consumer stuck in a cluster node #691

Closed
ThalesValentim opened this issue Apr 25, 2024 · 11 comments · Fixed by #694
Closed

JetStream KV gets consumer stuck in a cluster node #691

ThalesValentim opened this issue Apr 25, 2024 · 11 comments · Fixed by #694
Labels
defect Suspected defect such as a bug or regression

Comments

@ThalesValentim
Copy link

ThalesValentim commented Apr 25, 2024

Observed behavior

The KV consumer subscription remains active without interest after the client reconnects to another available cluster node.

Freshly, I had discussed the possibility of nats-server issue (see 5352) and it has been raised that it might be a client bug when keeping interest on the old subscription.

Expected behavior

Consumers should be unsubscribed/removed if the client has reconnected to another available cluster node and has no interest on it.

Server and client version

  • nats-server 2.10.14
  • nats.js 2.23.0

Host environment

  • Unix OS
  • Docker
  • Cluster mode

Steps to reproduce

Considering that nats-server has 3 cluster servers running

  1. Run an app to watch for JetStream KV updates with a cluster mode setup, Node.js example:
const nc = await nats.connect({
    servers: ['localhost:4111','localhost:4222','localhost:4333'],
  });
  const sc = nats.StringCodec();
  const js = nc.jetstream();
  const kv = await js.views.kv("testing");
  const jsm = await nc.jetstreamManager();
  const stream = await jsm.streams.get('KV_testing');
  const { config } = await stream.info();

  await jsm.streams.update(config.name, {
    ...config,
    num_replicas: 3
  });

  const watch = await kv.watch();
  (async () => {
    for await (const e of watch) {
      // do something with the change
      console.log(
        `watch: ${e.key}: ${e.operation} ${e.value ? sc.decode(e.value) : ""}`,
      );
    }
  })();

  kv.put('hello.word', sc.encode("hi"));
  1. Check which cluster has been chosen: nats consumer report KV_testing
  2. Force container restart/node fail situation, so the client will connect to another available cluster node
  3. Check the consumer report again: nats consumer report KV_testing
  4. Output shows two consumers instead of one expected
@ThalesValentim ThalesValentim added the defect Suspected defect such as a bug or regression label Apr 25, 2024
@ThalesValentim ThalesValentim changed the title JetStream KV gets consumer stuck in the cluster server JetStream KV gets consumer stuck in a cluster node Apr 25, 2024
@aricart
Copy link
Member

aricart commented Apr 26, 2024

The multiple consumers is not the problem - if the ordered consumer (the watcher fails) it will recreate it from underneath - the client will attempt to remove the old consumer but it may not succeed, specially if the cluster is flapping. It will then attempt to recreate the consumer, and the server will prune the old consumer after 5 seconds.

@aricart
Copy link
Member

aricart commented Apr 26, 2024

Also if the consumer is never reaped, you have 2 processes that are watching/consuming etc. As you can see below, we create a new subscription with a new inbox, and the old consumer unsubs.

info.deliver = newDeliver;

Is your watch throwing an error?

@ThalesValentim
Copy link
Author

ThalesValentim commented Apr 29, 2024

Also if the consumer is never reaped, you have 2 processes that are watching/consuming etc. As you can see below, we create a new subscription with a new inbox, and the old consumer unsubs.

info.deliver = newDeliver;

Is your watch throwing an error?

Thanks for the explanation.

It has been observed that the consumer is never reaped, even with only one KV watch started by the client.
We are running 3 clustered nodes, let us name them nats1, nats2, and nats3.

Assuming the app client starts and connects to the nats3 node:

  • If the nats3 node fails, the KV.watch() creates a new subscription in one of the available nodes, which is the expected behavior when the consumer tries to reconnect. That works well.
  • Let's assume it reconnected to the nats2 node. (see attached pics)
  • After reconnection, fix the failed nats3 node and put it back and online again.
  • client KV.watch() should not be interested in the subscription from the nats3 anymore as the watch is attached to the new subscription from the nats2 node

The suspicion is that when the node (nats3) is unavailable, it can't unsubscribe the old consumer, and the client keeps the old subscription active when nats3 is back.

Checking nats consumer report KV_testing, if the nats3 is not recovered (fail state), it shows Inaccessible Consumers :

Screenshot 2024-04-29 at 10 27 41

When the nats3 is healthy again:

Screenshot 2024-04-29 at 11 30 00

Are updates being propagated through both existing consumers?
The expectation is that only one of the consumers listed above should receive the updates over the client watch callback.

I noticed the Ack Floor receives an update when publishing a change to the KV_testing: nats kv put testing test.key test.value

Screenshot 2024-04-29 at 14 31 40

I haven't experienced error exceptions following the steps described.

@aricart
Copy link
Member

aricart commented Apr 29, 2024

Couple of things:

When the client disconnects - the server terminates all interest for that client (subjects the client is interested in)
On reconnect the client will resend interest.

In the case of the ordered consumer, if the server comes back quickly enough, it may still have the consumer (which is reaped after the specified time (would be nice if you could print the full consumer infos for the clients on the KV_testing)

When the client reconnects, there's a slight chance that the consumer is still live, and the subscription is rewired between the consumer and client, and things resume.

If the client detects a sequence gap or if the heartbeat monitor detects that the client is not getting messages, the client will recreate the consumer. If you are seeing 2 different consumers and one of them is not going away, there are 2 different consumers on that KV.

@ThalesValentim
Copy link
Author

Please, follow the outputs of the consumers' info:

The first one was the initial consumer, and the last one was created after the node failure:
Screenshot 2024-04-30 at 10 53 03

Before publishing a message:

9H5K3K4DYTB8N6538QY5X4
Screenshot 2024-04-30 at 10 54 30

9H5K3K4DYTB8N6538QY73H
Screenshot 2024-04-30 at 10 54 49

After publishing a message:

9H5K3K4DYTB8N6538QY5X4
Screenshot 2024-04-30 at 10 55 59

9H5K3K4DYTB8N6538QY73H
Screenshot 2024-04-30 at 10 56 22

Subscriptions:

{
  "account": "$G",
  "subject": "_INBOX.9H5K3K4DYTB8N6538QY65L",
  "sid": "2",
  "msgs": 63,
  "cid": 58
}
{
  "account": "$G",
  "subject": "_INBOX.9H5K3K4DYTB8N6538QY6V0",
  "sid": "3",
  "msgs": 80,
  "cid": 58
}

@aricart
Copy link
Member

aricart commented Apr 30, 2024

I did another local test - which uses some refactored APIs but shares all the changes in your current client:

import { connect, millis } from "../src/mod.ts";
import { Kvm } from "../kv/mod.ts";

const nc = await connect();

const kvm = new Kvm(nc);
const kv = await kvm.create("A", { replicas: 3, history: 100 });

const w = await kv.watch();
console.log(millis((await w._data.consumerInfo()).config.inactive_threshold));
(async () => {
  for await (const e of w) {
    console.log(`${e.key}: ${e.string()} - ${e.revision}`);
  }
})().catch((err) => {
  console.log(err);
  throw err;
});

let i = 1;
setInterval(() => {
  const idx = i++;
  kv.put("a", `${idx}`).catch((err) => {
    i--;
  });
}, 3000);

setInterval(() => {
  const subs = nc.protocol.subscriptions.all();
  const subjects = subs.map((sub) => {
    return sub.subject || sub.requestSubject;
  });
  console.log(subjects);
}, 1000);

And ran that against my cluster tool - https://github.com/nats-io/nats.deno/blob/d32538fa38bbd636826d53216ad5b86e5a71708e/tests/helpers/cluster.ts

And started chaos on it - where random servers are restarted, creating a very hostile environment:

5000
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
a: 1 - 1
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
a: 2 - 2
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
a: 3 - 3
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
a: 4 - 4
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
a: 5 - 5
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
a: 6 - 6
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
a: 7 - 7
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6PP2" ]
a: 8 - 8
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6RVO" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6RVO" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6RVO" ]
a: 9 - 9
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6RVO" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6RVO" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6RVO" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
a: 10 - 10
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
a: 11 - 11
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6SYZ" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6UI0" ]
a: 12 - 12
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6UI0" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6UI0" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6UI0" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6UI0" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6UI0" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6UI0" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6UI0" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6VT6" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6VT6" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6VT6" ]
[ "_INBOX.2HUPXNC4TQBO00A4SV6P1H.*", "_INBOX.2HUPXNC4TQBO00A4SV6VT6" ]

Note that at any one point the client only has 2 suscriptions, one of them changes whenever the ordered consumer resets.

This means that somehow a cluster route is possibly staying open (from a previous subscription) but for a fact shows that the process is NOT listening on that subject. I don't know how long before the server figures out that it has that stale situation. Have you observed when it goes away?

@ThalesValentim
Copy link
Author

Thanks for sharing the test and results! I added the loggings into my local tests and got the same behavior. The client didn't report listening to multiple subjects.
I ran the app test for different timespans, for up to two hours, and I observed that the consumer disappeared from the report only after stopping the application / dropping the connection.

@ThalesValentim
Copy link
Author

Debugging the KV watcher via JetStreamSubscriptionImpl, the "old subscription" reference name was found in the path _data.info.last

Screenshot 2024-05-02 at 12 17 11

also found the current consumer in the watcher config:

Screenshot 2024-05-02 at 12 17 26

consumer report:

Screenshot 2024-05-02 at 12 26 56

@aricart
Copy link
Member

aricart commented May 2, 2024

@ThalesValentim I found the issue - will have a fix and a release in a bit

@aricart
Copy link
Member

aricart commented May 3, 2024

@ThalesValentim all the javascript clients have been released with the above fix! Thanks for helping me get to the bottom of this!

@ThalesValentim
Copy link
Author

@ThalesValentim all the javascript clients have been released with the above fix! Thanks for helping me get to the bottom of this!

Thanks a lot!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants