Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FAQ - JetStream and consumer queue groups #446

Closed
aricart opened this issue Aug 31, 2021 · 6 comments
Closed

FAQ - JetStream and consumer queue groups #446

aricart opened this issue Aug 31, 2021 · 6 comments

Comments

@aricart
Copy link
Member

aricart commented Aug 31, 2021

This is an FAQ for the changes related to server 2.4.0 and nats.js v2.2.0.

If you have a JetStream consumer that was created with a nats.js version prior to v2.2.0 that uses queue groups:

  • If you upgrade to server 2.4.0, your consumer will silently fail to get messages. If you use an old server, it may seem to work correctly but it doesn't (thus the changes to 2.4.0 ).
  • If you upgrade to v2.2.0 the client will throw an error if the consumer/and subscription don't match - queue must match the deliver_group specified.

To create a queue subscriber with the magic jetstream subscribe(), specify the consumer builder option queue(n):

const opts = consumerOpts();
opts.durable("me");
opts.deliverTo("x");
opts.queue("x");
opts.idleHeartbeat(1000);

await js.subscribe(subj, opts);

If using JetStreamManager to create your consumer, the consumer configuration exposes deliver_group:

export interface ConsumerConfig {
...
  "deliver_group"?: string;
 ...
}

For more information please read the release notes:

https://github.com/nats-io/nats.js/releases/tag/v2.2.0

@rabeeshcharles
Copy link

nats-server version - 2.5.0
nats.js - 2.2.0

I have a publisher for subject1
and two subscriber instance for the suject1

While publishing a msg to the subject, all the two subscriber instance were consuming the same msg.
But the requirement is any one subscriber instance should receive the msg at any cost.

Can you point me correct direction?

@aricart
Copy link
Member Author

aricart commented Sep 11, 2021

On the consumer configuration, you need to specify deliver_group or use the
ConsumerOptsBuilder.queue(name). Then use that same consumer from the two subscriptions. See https://github.com/nats-io/nats.deno/blob/main/tests/jetstream_test.ts#L1728

@rabeeshcharles
Copy link

Yes I have done it already, then only raised a question here
My config

const opts = consumerOpts();
opts.durable("me");
opts.ackAll()
opts.queue("ConsumerGroup1")
opts.deliverTo('time');
let sub = await js.subscribe("time",opts);

@aricart
Copy link
Member Author

aricart commented Sep 11, 2021

Yes I have done it already, then only raised a question here

My config
const opts = consumerOpts();
opts.durable("me");
opts.ackAll()
opts.queue("ConsumerGroup1")
opts.deliverTo('time');
let sub = await js.subscribe("time",opts);

Can you confirm your server version

@rabeeshcharles
Copy link

Nats Server Version - 2.5.0

Proof Pic
MicrosoftTeams-image

@aricart
Copy link
Member Author

aricart commented Sep 11, 2021

I wrote the following test, works for me under 2.5.0.

test("jetstream - qsub ackall", async (t) => {
  const ns = await NatsServer.start(jetstreamServerConf());
  let nc = await connect({ port: ns.port });
  const jsm = await nc.jetstreamManager();
  const stream = nuid.next();
  const subj = nuid.next();
  await jsm.streams.add({ name: stream, subjects: [subj] });

  const js = nc.jetstream();

  const opts = consumerOpts();
  opts.queue("q");
  opts.durable("n");
  opts.deliverTo("here");
  opts.ackAll();
  opts.callback((_err, m) => {});

  const sub = await js.subscribe(subj, opts);
  const sub2 = await js.subscribe(subj, opts);

  for (let i = 0; i < 100; i++) {
    await js.publish(subj, Empty);
  }
  await nc.flush();
  await sub.drain();
  await sub2.drain();

  t.true(sub.getProcessed() > 0);
  t.true(sub2.getProcessed() > 0);
  t.is(sub.getProcessed() + sub2.getProcessed(), 100);


  const ci = await jsm.consumers.info(stream, "n");
  t.is(ci.num_pending, 0);
  t.is(ci.num_ack_pending, 0);

  await nc.close();
  await ns.stop();
});

Please create a different issue if you need additional follow-up. This issue was intended as documentation.

@nats-io nats-io locked as off-topic and limited conversation to collaborators Sep 11, 2021
@aricart aricart closed this as completed Feb 2, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants