Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] stop strategy for consumer #513

Draft
wants to merge 2 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion nats-base-client/consumer.ts
Expand Up @@ -23,6 +23,7 @@ import {
ConsumerEvents,
ConsumerInfo,
ConsumerMessages,
ConsumeStop,
DeliverPolicy,
FetchMessages,
FetchOptions,
Expand Down Expand Up @@ -161,7 +162,7 @@ export type OrderedConsumerOptions = {
opt_start_time: string;
replay_policy: ReplayPolicy;
inactive_threshold: number;
};
} & ConsumeStop;

export class OrderedPullConsumerImpl implements Consumer {
api: ConsumerAPIImpl;
Expand Down
65 changes: 59 additions & 6 deletions nats-base-client/consumermessages.ts
Expand Up @@ -26,6 +26,8 @@ import {
ConsumerCallbackFn,
ConsumerDebugEvents,
ConsumerEvents,
ConsumeStop,
ConsumeStopStrategy,
Events,
JsMsg,
MsgHdrs,
Expand Down Expand Up @@ -55,6 +57,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
timeout: Timeout<unknown> | null;
cleanupHandler?: () => void;
listeners: QueuedIterator<ConsumerStatus>[];
stopStrategy?: ConsumeStopStrategy;
stopStrategyFn?: (m: JsMsg) => boolean;
noMore: boolean;

// callback: ConsumerCallbackFn;
constructor(
Expand All @@ -67,6 +72,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>

this.opts = this.parseOptions(opts, refilling);
this.callback = (opts as ConsumeCallback).callback || null;
this.noMore = false;
this.noIterator = typeof this.callback === "function";
this.monitor = null;
this.pong = null;
Expand All @@ -77,6 +83,31 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
this.inbox = createInbox();
this.listeners = [];

const stopOpts = opts as ConsumeStop;
this.stopStrategy = stopOpts.strategy;
switch (this.stopStrategy) {
case ConsumeStopStrategy.NoMessages:
{
this.stopStrategyFn = (m): boolean => {
return m.info.pending === 0;
};
}
break;
case ConsumeStopStrategy.Sequence:
{
if (typeof stopOpts.arg !== "number") {
throw new Error("stop strategy args is not a number");
}
const seq = stopOpts.arg;
this.stopStrategyFn = (m): boolean => {
return m.seq >= seq;
};
}
break;
default:
// nothing
}

const {
max_messages,
max_bytes,
Expand All @@ -102,6 +133,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>

this.sub = c.api.nc.subscribe(this.inbox, {
callback: (err, msg) => {
if (this.noMore) {
return;
}
if (err) {
// this is possibly only a permissions error which means
// that the server rejected (eliminating the sub)
Expand Down Expand Up @@ -144,10 +178,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
// we got a bad request - no progress here
if (code === 400) {
const error = toErr();
//@ts-ignore: fn
this._push(() => {
this.stop(error);
});
this.stop(error);
} else if (code === 409 && description === "consumer deleted") {
const error = toErr();
this.stop(error);
Expand All @@ -160,21 +191,29 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
} else {
// push the user message
this._push(toJsMsg(msg));
const jsMsg = toJsMsg(msg);
this._push(jsMsg);
this.received++;
if (this.pending.msgs) {
this.pending.msgs--;
}
if (this.pending.bytes) {
this.pending.bytes -= (msg as MsgImpl).size();
}
// if we are just processing a stream, end when we get the last message
if (this.stopStrategyFn) {
this.noMore = this.stopStrategyFn(jsMsg);
}
}

// if we don't have pending bytes/messages we are done or starving
if (this.pending.msgs === 0 && this.pending.bytes === 0) {
this.pending.requests = 0;
}
if (this.refilling) {

if (this.noMore) {
this.stop();
} else if (this.refilling) {
// FIXME: this could result in 1/4 = 0
if (
(max_messages &&
Expand Down Expand Up @@ -239,6 +278,20 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
})();

if (this.refilling && this.stopStrategyFn) {
// this is the initial pull if we have a consume, but we are
// checking pending, we want to verify that the info on the
// consumer is up-to-date, and whether we have any pending
// messages
this.consumer.info()
.then((ci) => {
if (ci.num_pending === 0) {
this.stop();
}
})
.catch();
}

// this is the initial pull
this.pull(this.pullOptions());
}
Expand Down
24 changes: 16 additions & 8 deletions nats-base-client/types.ts
Expand Up @@ -2647,10 +2647,6 @@ export interface ConsumerUpdateConfig {
metadata?: Record<string, string>;
}

export type Ordered = {
ordered: true;
};

export type NextOptions = Expires;

export type ConsumeBytes =
Expand All @@ -2659,19 +2655,21 @@ export type ConsumeBytes =
& ThresholdBytes
& Expires
& IdleHeartbeat
& ConsumeCallback;
& ConsumeCallback
& ConsumeStop;

export type ConsumeMessages =
& Partial<MaxMessages>
& ThresholdMessages
& Expires
& IdleHeartbeat
& ConsumeCallback;
& ConsumeCallback
& ConsumeStop;

export type ConsumeOptions = ConsumeBytes | ConsumeMessages;

/**
* Options for fetching
* Options for fetching bytes
*/
export type FetchBytes =
& MaxBytes
Expand All @@ -2680,7 +2678,7 @@ export type FetchBytes =
& IdleHeartbeat;

/**
* Options for a c
* Options for fetching messages
*/
export type FetchMessages =
& Partial<MaxMessages>
Expand Down Expand Up @@ -2738,6 +2736,16 @@ export type Expires = {
expires?: number;
};

export enum ConsumeStopStrategy {
NoMessages,
Sequence,
}

export type ConsumeStop = {
strategy?: ConsumeStopStrategy;
arg?: number;
};

export type IdleHeartbeat = {
/**
* Number of milliseconds to wait for a server heartbeat when not actively receiving
Expand Down
57 changes: 57 additions & 0 deletions tests/consumers_test.ts
Expand Up @@ -25,11 +25,13 @@ import {
} from "https://deno.land/std@0.75.0/testing/asserts.ts";
import {
AckPolicy,
ConsumeOptions,
Consumer,
ConsumerDebugEvents,
ConsumerEvents,
ConsumerMessages,
ConsumerStatus,
ConsumeStopStrategy,
Empty,
NatsConnection,
PubAck,
Expand Down Expand Up @@ -836,3 +838,58 @@ Deno.test("consumers - next", async () => {

await cleanup(ns, nc);
});

async function testConsumerStop(
conf: ConsumeOptions,
count: number,
): Promise<ConsumerMessages> {
const { ns, nc } = await setup(jetstreamServerConf());
const { stream, subj } = await initStream(nc);

const jsm = await nc.jetstreamManager();
await jsm.consumers.add(stream, {
durable_name: stream,
ack_policy: AckPolicy.Explicit,
});

const js = nc.jetstream();
await Promise.all(
Array.from({ length: count }).map(() => {
return js.publish(subj);
}),
);

const c = await js.consumers.get(stream, stream);
const iter = await c.consume(conf);
await (async () => {
for await (const m of iter) {
m.ack();
}
})().then();

await cleanup(ns, nc);

return iter;
}

Deno.test("consumer - stopStrategy empty", async () => {
const messages = await testConsumerStop({
strategy: ConsumeStopStrategy.NoMessages,
}, 0);
assertEquals(messages.getProcessed(), 0);
});

Deno.test("consumer - stopStrategy all ", async () => {
const messages = await testConsumerStop({
strategy: ConsumeStopStrategy.NoMessages,
}, 10);
assertEquals(messages.getProcessed(), 10);
});

Deno.test("consumer - stopStrategy sequence", async () => {
const messages = await testConsumerStop({
strategy: ConsumeStopStrategy.Sequence,
arg: 5,
}, 10);
assertEquals(messages.getProcessed(), 5);
});
71 changes: 69 additions & 2 deletions tests/consumersordered_test.ts
Expand Up @@ -13,14 +13,26 @@
* limitations under the License.
*/

import { cleanup, jetstreamServerConf, setup } from "./jstest_util.ts";
import {
cleanup,
initStream,
jetstreamServerConf,
setup,
} from "./jstest_util.ts";
import {
assertEquals,
assertExists,
assertRejects,
} from "https://deno.land/std@0.125.0/testing/asserts.ts";
import { OrderedPullConsumerImpl } from "../nats-base-client/consumer.ts";
import { DeliverPolicy, JsMsg } from "../nats-base-client/types.ts";
import {
AckPolicy,
ConsumeOptions,
ConsumerMessages,
ConsumeStopStrategy,
DeliverPolicy,
JsMsg,
} from "../nats-base-client/types.ts";
import { deferred } from "../nats-base-client/mod.ts";
import { notCompatible } from "./helpers/mod.ts";
import { delay } from "../nats-base-client/util.ts";
Expand Down Expand Up @@ -608,3 +620,58 @@ Deno.test("ordered - next", async () => {

await cleanup(ns, nc);
});

async function testOrderedConsumerStop(
conf: ConsumeOptions,
count: number,
): Promise<ConsumerMessages> {
const { ns, nc } = await setup(jetstreamServerConf());
const { stream, subj } = await initStream(nc);

const jsm = await nc.jetstreamManager();
await jsm.consumers.add(stream, {
durable_name: stream,
ack_policy: AckPolicy.Explicit,
});

const js = nc.jetstream();
await Promise.all(
Array.from({ length: count }).map(() => {
return js.publish(subj);
}),
);

const c = await js.consumers.get(stream);
const iter = await c.consume(conf);
await (async () => {
for await (const m of iter) {
m.ack();
}
})().then();

await cleanup(ns, nc);

return iter;
}

Deno.test("ordered - stopStrategy empty", async () => {
const messages = await testOrderedConsumerStop({
strategy: ConsumeStopStrategy.NoMessages,
}, 0);
assertEquals(messages.getProcessed(), 0);
});

Deno.test("ordered - stopStrategy all ", async () => {
const messages = await testOrderedConsumerStop({
strategy: ConsumeStopStrategy.NoMessages,
}, 10);
assertEquals(messages.getProcessed(), 10);
});

Deno.test("ordered - stopStrategy sequence", async () => {
const messages = await testOrderedConsumerStop({
strategy: ConsumeStopStrategy.Sequence,
arg: 5,
}, 10);
assertEquals(messages.getProcessed(), 5);
});
4 changes: 2 additions & 2 deletions tests/jetstream_test.ts
Expand Up @@ -1125,7 +1125,7 @@ Deno.test("jetstream - fetch one - no wait breaks fast", async () => {

await done;
sw.mark();
assert(25 > sw.duration());
assertBetween(sw.duration(), 0, 500);
assertEquals(batch.getReceived(), 1);
await cleanup(ns, nc);
});
Expand Down Expand Up @@ -1159,7 +1159,7 @@ Deno.test("jetstream - fetch none - cancel timers", async () => {

await done;
sw.mark();
assert(25 > sw.duration());
assertBetween(sw.duration(), 0, 500);
assertEquals(batch.getReceived(), 0);
await cleanup(ns, nc);
});
Expand Down