Skip to content

Commit

Permalink
[FEAT] exported consumer - this allows getting a Consumer without per…
Browse files Browse the repository at this point in the history
…forming any JSM operations on the server
  • Loading branch information
aricart committed Oct 6, 2023
1 parent aef226b commit 7b4e54d
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 1 deletion.
1 change: 1 addition & 0 deletions jetstream/internal_mod.ts
Expand Up @@ -139,6 +139,7 @@ export type {
ConsumerMessages,
ConsumerStatus,
Expires,
ExportedConsumer,
FetchBytes,
FetchMessages,
FetchOptions,
Expand Down
14 changes: 14 additions & 0 deletions jetstream/jsmstream_api.ts
Expand Up @@ -41,6 +41,7 @@ import {
} from "../nats-base-client/core.ts";
import {
ApiPagedRequest,
ConsumerInfo,
ExternalStream,
MsgDeleteRequest,
MsgRequest,
Expand All @@ -60,6 +61,7 @@ import {
} from "./jsapi_types.ts";
import {
Consumer,
ExportedConsumer,
OrderedConsumerOptions,
OrderedPullConsumerImpl,
PullConsumerImpl,
Expand Down Expand Up @@ -132,6 +134,18 @@ export class ConsumersImpl implements Consumers {
});
}

async bind(
stream: string,
name: string,
): Promise<ExportedConsumer> {
await this.checkVersion();
const ci: ConsumerInfo = {
stream_name: stream,
name: name,
} as unknown as ConsumerInfo;
return Promise.resolve(new PullConsumerImpl(this.api, ci));
}

async ordered(
stream: string,
opts?: Partial<OrderedConsumerOptions>,
Expand Down
1 change: 1 addition & 0 deletions jetstream/mod.ts
Expand Up @@ -61,6 +61,7 @@ export type {
DeliveryInfo,
Destroyable,
Expires,
ExportedConsumer,
ExternalStream,
FetchBytes,
FetchMessages,
Expand Down
72 changes: 72 additions & 0 deletions jetstream/tests/consumers_test.ts
Expand Up @@ -48,6 +48,7 @@ import {
ConsumerDebugEvents,
ConsumerEvents,
ConsumerStatus,
PullConsumerImpl,
PullConsumerMessagesImpl,
} from "../consumer.ts";
import { deadline } from "../../nats-base-client/util.ts";
Expand Down Expand Up @@ -1105,3 +1106,74 @@ Deno.test("consumers - fetch sync", async () => {
assertEquals(await sync.next(), null);
await cleanup(ns, nc);
});

Deno.test("consumers - bind", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "messages", subjects: ["hello"] });

const js = nc.jetstream();
await js.publish("hello");
await js.publish("hello");

await jsm.consumers.add("messages", {
durable_name: "a",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(3000),
max_waiting: 500,
});

await jsm.consumers.add("messages", {
durable_name: "b",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(3000),
max_waiting: 500,
});

await jsm.consumers.add("messages", {
durable_name: "c",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(3000),
max_waiting: 500,
});

jsm.consumers.info = () => {
throw new Error("exported is not allowed info");
};

let consumer = await js.consumers.bind("messages", "a");
let pci = consumer as PullConsumerImpl;
pci.info = () => {
throw new Error("exported is not allowed to info");
};
let iter = await consumer.consume({ max_messages: 2 });
for await (const m of iter) {
m.ack();
if (m.info.pending === 0) {
break;
}
}

consumer = await js.consumers.bind("messages", "b");
pci = consumer as PullConsumerImpl;
pci.info = () => {
throw new Error("exported is not allowed to info");
};
iter = await consumer.fetch({ max_messages: 2 });
for await (const m of iter) {
m.ack();
}

consumer = await js.consumers.bind("messages", "c");
pci = consumer as PullConsumerImpl;
pci.info = () => {
throw new Error("exported is not allowed to info");
};
assertExists(await consumer.next());
assertExists(await consumer.next());

await cleanup(ns, nc);
});
17 changes: 16 additions & 1 deletion jetstream/types.ts
Expand Up @@ -13,7 +13,11 @@
* limitations under the License.
*/

import { Consumer, OrderedConsumerOptions } from "./consumer.ts";
import {
Consumer,
ExportedConsumer,
OrderedConsumerOptions,
} from "./consumer.ts";
import {
JetStreamOptions,
MsgHdrs,
Expand Down Expand Up @@ -462,6 +466,17 @@ export interface Consumers {
stream: string,
name?: string | Partial<OrderedConsumerOptions>,
): Promise<Consumer>;

/**
* Returns a Consumer configured for the specified stream and consumer name. Note there
* are no JSAPI lookups or verifications.
* @param stream
* @param name
*/
bind(
stream: string,
name: string,
): Promise<ExportedConsumer>;
}

export interface ConsumerOpts {
Expand Down

0 comments on commit 7b4e54d

Please sign in to comment.