Skip to content

Commit

Permalink
[FEAT] exported consumer - this allows getting a Consumer without per…
Browse files Browse the repository at this point in the history
…forming any JSM operations on the server
  • Loading branch information
aricart committed Nov 7, 2023
1 parent 9b25658 commit c9f9279
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 2 deletions.
3 changes: 2 additions & 1 deletion README.md
Expand Up @@ -31,7 +31,8 @@ point into the JS Doc is the
[NatsConnection](https://nats-io.github.io/nats.deno/interfaces/NatsConnection.html)
all functionality starts with a connection.

**Check out [NATS by example](https://natsbyexample.com) - An evolving collection of runnable, cross-client reference examples for NATS.**
**Check out [NATS by example](https://natsbyexample.com) - An evolving
collection of runnable, cross-client reference examples for NATS.**

## Basics

Expand Down
1 change: 1 addition & 0 deletions jetstream/internal_mod.ts
Expand Up @@ -140,6 +140,7 @@ export type {
ConsumerMessages,
ConsumerStatus,
Expires,
ExportedConsumer,
FetchBytes,
FetchMessages,
FetchOptions,
Expand Down
14 changes: 14 additions & 0 deletions jetstream/jsmstream_api.ts
Expand Up @@ -41,6 +41,7 @@ import {
} from "../nats-base-client/core.ts";
import {
ApiPagedRequest,
ConsumerInfo,
ExternalStream,
MsgDeleteRequest,
MsgRequest,
Expand All @@ -60,6 +61,7 @@ import {
} from "./jsapi_types.ts";
import {
Consumer,
ExportedConsumer,
OrderedConsumerOptions,
OrderedPullConsumerImpl,
PullConsumerImpl,
Expand Down Expand Up @@ -132,6 +134,18 @@ export class ConsumersImpl implements Consumers {
});
}

async bind(
stream: string,
name: string,
): Promise<ExportedConsumer> {
await this.checkVersion();
const ci: ConsumerInfo = {
stream_name: stream,
name: name,
} as unknown as ConsumerInfo;
return Promise.resolve(new PullConsumerImpl(this.api, ci));
}

async ordered(
stream: string,
opts?: Partial<OrderedConsumerOptions>,
Expand Down
1 change: 1 addition & 0 deletions jetstream/mod.ts
Expand Up @@ -61,6 +61,7 @@ export type {
DeliveryInfo,
Destroyable,
Expires,
ExportedConsumer,
ExternalStream,
FetchBytes,
FetchMessages,
Expand Down
72 changes: 72 additions & 0 deletions jetstream/tests/consumers_test.ts
Expand Up @@ -40,6 +40,7 @@ import {
ConsumerDebugEvents,
ConsumerEvents,
ConsumerStatus,
PullConsumerImpl,
PullConsumerMessagesImpl,
} from "../consumer.ts";

Expand Down Expand Up @@ -593,3 +594,74 @@ Deno.test("consumers - inboxPrefix is respected", async () => {
await done;
await cleanup(ns, nc);
});

Deno.test("consumers - bind", async () => {
const { ns, nc } = await setup(jetstreamServerConf());
const jsm = await nc.jetstreamManager();
await jsm.streams.add({ name: "messages", subjects: ["hello"] });

const js = nc.jetstream();
await js.publish("hello");
await js.publish("hello");

await jsm.consumers.add("messages", {
durable_name: "a",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(3000),
max_waiting: 500,
});

await jsm.consumers.add("messages", {
durable_name: "b",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(3000),
max_waiting: 500,
});

await jsm.consumers.add("messages", {
durable_name: "c",
deliver_policy: DeliverPolicy.All,
ack_policy: AckPolicy.Explicit,
ack_wait: nanos(3000),
max_waiting: 500,
});

jsm.consumers.info = () => {
throw new Error("exported is not allowed info");
};

let consumer = await js.consumers.bind("messages", "a");
let pci = consumer as PullConsumerImpl;
pci.info = () => {
throw new Error("exported is not allowed to info");
};
let iter = await consumer.consume({ max_messages: 2 });
for await (const m of iter) {
m.ack();
if (m.info.pending === 0) {
break;
}
}

consumer = await js.consumers.bind("messages", "b");
pci = consumer as PullConsumerImpl;
pci.info = () => {
throw new Error("exported is not allowed to info");
};
iter = await consumer.fetch({ max_messages: 2 });
for await (const m of iter) {
m.ack();
}

consumer = await js.consumers.bind("messages", "c");
pci = consumer as PullConsumerImpl;
pci.info = () => {
throw new Error("exported is not allowed to info");
};
assertExists(await consumer.next());
assertExists(await consumer.next());

await cleanup(ns, nc);
});
17 changes: 16 additions & 1 deletion jetstream/types.ts
Expand Up @@ -13,7 +13,11 @@
* limitations under the License.
*/

import { Consumer, OrderedConsumerOptions } from "./consumer.ts";
import {
Consumer,
ExportedConsumer,
OrderedConsumerOptions,
} from "./consumer.ts";
import {
JetStreamOptions,
MsgHdrs,
Expand Down Expand Up @@ -465,6 +469,17 @@ export interface Consumers {
stream: string,
name?: string | Partial<OrderedConsumerOptions>,
): Promise<Consumer>;

/**
* Returns a Consumer configured for the specified stream and consumer name. Note there
* are no JSAPI lookups or verifications.
* @param stream
* @param name
*/
bind(
stream: string,
name: string,
): Promise<ExportedConsumer>;
}

export interface ConsumerOpts {
Expand Down

0 comments on commit c9f9279

Please sign in to comment.