Skip to content

Commit

Permalink
[FEAT] [JS] [CONSUMERS] added name_prefix option to ordered consume…
Browse files Browse the repository at this point in the history
…r that allows user-provided prefix to generated consumer names
  • Loading branch information
aricart committed May 9, 2024
1 parent 99e3f2e commit 412e11e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
8 changes: 7 additions & 1 deletion jetstream/consumer.ts
Expand Up @@ -22,7 +22,7 @@ import {
} from "../nats-base-client/util.ts";
import { ConsumerAPI, ConsumerAPIImpl } from "./jsmconsumer_api.ts";
import { nuid } from "../nats-base-client/nuid.ts";
import { isHeartbeatMsg } from "./jsutil.ts";
import { isHeartbeatMsg, minValidation } from "./jsutil.ts";
import { QueuedIteratorImpl } from "../nats-base-client/queued_iterator.ts";
import {
createInbox,
Expand Down Expand Up @@ -948,6 +948,7 @@ export class PullConsumerImpl implements Consumer {
* {@link ConsumerUpdateConfig}
*/
export type OrderedConsumerOptions = {
name_prefix: string;
filterSubjects: string[] | string;
deliver_policy: DeliverPolicy;
opt_start_seq: number;
Expand Down Expand Up @@ -981,6 +982,11 @@ export class OrderedPullConsumerImpl implements Consumer {
this.stream = stream;
this.cursor = { stream_seq: 1, deliver_seq: 0 };
this.namePrefix = nuid.next();
if (typeof opts.name_prefix === "string") {
// make sure the prefix is valid
minValidation("name_prefix", opts.name_prefix);
this.namePrefix = opts.name_prefix + this.namePrefix;
}
this.serial = 0;
this.currentConsumer = null;
this.userCallback = null;
Expand Down
31 changes: 31 additions & 0 deletions jetstream/tests/consumers_ordered_test.ts
Expand Up @@ -15,6 +15,7 @@

import { initStream } from "./jstest_util.ts";
import {
assert,
assertEquals,
assertExists,
assertRejects,
Expand Down Expand Up @@ -947,3 +948,33 @@ Deno.test("ordered consumers - bind is rejected", async () => {

await cleanup(ns, nc);
});

Deno.test("ordered consumers - name prefix", async () => {
const { ns, nc } = await setup(jetstreamServerConf());

const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "A", subjects: ["a"] });

const js = nc.jetstream();
const c = await js.consumers.get("A", { name_prefix: "hello" });
const ci = await c.info(true);
assert(ci.name.startsWith("hello"));

await assertRejects(
() => {
return js.consumers.get("A", { name_prefix: "" });
},
Error,
"name_prefix name required",
);

await assertRejects(
() => {
return js.consumers.get("A", { name_prefix: "one.two" });
},
Error,
"invalid name_prefix name - name_prefix name cannot contain '.'",
);

await cleanup(ns, nc);
});

0 comments on commit 412e11e

Please sign in to comment.