Skip to content

Commit

Permalink
[WIP] stop strategy for consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed May 16, 2023
1 parent 2338783 commit 7ae657f
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 19 deletions.
3 changes: 2 additions & 1 deletion nats-base-client/consumer.ts
Expand Up @@ -23,6 +23,7 @@ import {
ConsumerEvents,
ConsumerInfo,
ConsumerMessages,
ConsumeStop,
DeliverPolicy,
FetchMessages,
FetchOptions,
Expand Down Expand Up @@ -161,7 +162,7 @@ export type OrderedConsumerOptions = {
opt_start_time: string;
replay_policy: ReplayPolicy;
inactive_threshold: number;
};
} & ConsumeStop;

export class OrderedPullConsumerImpl implements Consumer {
api: ConsumerAPIImpl;
Expand Down
65 changes: 59 additions & 6 deletions nats-base-client/consumermessages.ts
Expand Up @@ -26,6 +26,8 @@ import {
ConsumerCallbackFn,
ConsumerDebugEvents,
ConsumerEvents,
ConsumeStop,
ConsumeStopStrategy,
Events,
JsMsg,
MsgHdrs,
Expand Down Expand Up @@ -55,6 +57,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
timeout: Timeout<unknown> | null;
cleanupHandler?: () => void;
listeners: QueuedIterator<ConsumerStatus>[];
stopStrategy?: ConsumeStopStrategy;
stopStrategyFn?: (m: JsMsg) => boolean;
noMore: boolean;

// callback: ConsumerCallbackFn;
constructor(
Expand All @@ -67,6 +72,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>

this.opts = this.parseOptions(opts, refilling);
this.callback = (opts as ConsumeCallback).callback || null;
this.noMore = false;
this.noIterator = typeof this.callback === "function";
this.monitor = null;
this.pong = null;
Expand All @@ -77,6 +83,31 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
this.inbox = createInbox();
this.listeners = [];

const stopOpts = opts as ConsumeStop;
this.stopStrategy = stopOpts.strategy;
switch (this.stopStrategy) {
case ConsumeStopStrategy.NoMessages:
{
this.stopStrategyFn = (m): boolean => {
return m.info.pending === 0;
};
}
break;
case ConsumeStopStrategy.Sequence:
{
if (typeof stopOpts.arg !== "number") {
throw new Error("stop strategy args is not a number");
}
const seq = stopOpts.arg;
this.stopStrategyFn = (m): boolean => {
return m.seq >= seq;
};
}
break;
default:
// nothing
}

const {
max_messages,
max_bytes,
Expand All @@ -102,6 +133,9 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>

this.sub = c.api.nc.subscribe(this.inbox, {
callback: (err, msg) => {
if (this.noMore) {
return;
}
if (err) {
// this is possibly only a permissions error which means
// that the server rejected (eliminating the sub)
Expand Down Expand Up @@ -144,10 +178,7 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
// we got a bad request - no progress here
if (code === 400) {
const error = toErr();
//@ts-ignore: fn
this._push(() => {
this.stop(error);
});
this.stop(error);
} else if (code === 409 && description === "consumer deleted") {
const error = toErr();
this.stop(error);
Expand All @@ -160,21 +191,29 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
} else {
// push the user message
this._push(toJsMsg(msg));
const jsMsg = toJsMsg(msg);
this._push(jsMsg);
this.received++;
if (this.pending.msgs) {
this.pending.msgs--;
}
if (this.pending.bytes) {
this.pending.bytes -= (msg as MsgImpl).size();
}
// if we are just processing a stream, end when we get the last message
if (this.stopStrategyFn) {
this.noMore = this.stopStrategyFn(jsMsg);
}
}

// if we don't have pending bytes/messages we are done or starving
if (this.pending.msgs === 0 && this.pending.bytes === 0) {
this.pending.requests = 0;
}
if (this.refilling) {

if (this.noMore) {
this.stop();
} else if (this.refilling) {
// FIXME: this could result in 1/4 = 0
if (
(max_messages &&
Expand Down Expand Up @@ -239,6 +278,20 @@ export class PullConsumerMessagesImpl extends QueuedIteratorImpl<JsMsg>
}
})();

if (this.refilling && this.stopStrategyFn) {
// this is the initial pull if we have a consume, but we are
// checking pending, we want to verify that the info on the
// consumer is up-to-date, and whether we have any pending
// messages
this.consumer.info()
.then((ci) => {
if (ci.num_pending === 0) {
this.stop();
}
})
.catch();
}

// this is the initial pull
this.pull(this.pullOptions());
}
Expand Down
24 changes: 16 additions & 8 deletions nats-base-client/types.ts
Expand Up @@ -2647,10 +2647,6 @@ export interface ConsumerUpdateConfig {
metadata?: Record<string, string>;
}

export type Ordered = {
ordered: true;
};

export type NextOptions = Expires;

export type ConsumeBytes =
Expand All @@ -2659,19 +2655,21 @@ export type ConsumeBytes =
& ThresholdBytes
& Expires
& IdleHeartbeat
& ConsumeCallback;
& ConsumeCallback
& ConsumeStop;

export type ConsumeMessages =
& Partial<MaxMessages>
& ThresholdMessages
& Expires
& IdleHeartbeat
& ConsumeCallback;
& ConsumeCallback
& ConsumeStop;

export type ConsumeOptions = ConsumeBytes | ConsumeMessages;

/**
* Options for fetching
* Options for fetching bytes
*/
export type FetchBytes =
& MaxBytes
Expand All @@ -2680,7 +2678,7 @@ export type FetchBytes =
& IdleHeartbeat;

/**
* Options for a c
* Options for fetching messages
*/
export type FetchMessages =
& Partial<MaxMessages>
Expand Down Expand Up @@ -2738,6 +2736,16 @@ export type Expires = {
expires?: number;
};

export enum ConsumeStopStrategy {
NoMessages,
Sequence,
}

export type ConsumeStop = {
strategy?: ConsumeStopStrategy;
arg?: number;
};

export type IdleHeartbeat = {
/**
* Number of milliseconds to wait for a server heartbeat when not actively receiving
Expand Down
57 changes: 57 additions & 0 deletions tests/consumers_test.ts
Expand Up @@ -25,11 +25,13 @@ import {
} from "https://deno.land/std@0.75.0/testing/asserts.ts";
import {
AckPolicy,
ConsumeOptions,
Consumer,
ConsumerDebugEvents,
ConsumerEvents,
ConsumerMessages,
ConsumerStatus,
ConsumeStopStrategy,
Empty,
NatsConnection,
PubAck,
Expand Down Expand Up @@ -836,3 +838,58 @@ Deno.test("consumers - next", async () => {

await cleanup(ns, nc);
});

async function testConsumerStop(
conf: ConsumeOptions,
count: number,
): Promise<ConsumerMessages> {
const { ns, nc } = await setup(jetstreamServerConf());
const { stream, subj } = await initStream(nc);

const jsm = await nc.jetstreamManager();
await jsm.consumers.add(stream, {
durable_name: stream,
ack_policy: AckPolicy.Explicit,
});

const js = nc.jetstream();
await Promise.all(
Array.from({ length: count }).map(() => {
return js.publish(subj);
}),
);

const c = await js.consumers.get(stream, stream);
const iter = await c.consume(conf);
await (async () => {
for await (const m of iter) {
m.ack();
}
})().then();

await cleanup(ns, nc);

return iter;
}

Deno.test("consumer - stopStrategy empty", async () => {
const messages = await testConsumerStop({
strategy: ConsumeStopStrategy.NoMessages,
}, 0);
assertEquals(messages.getProcessed(), 0);
});

Deno.test("consumer - stopStrategy all ", async () => {
const messages = await testConsumerStop({
strategy: ConsumeStopStrategy.NoMessages,
}, 10);
assertEquals(messages.getProcessed(), 10);
});

Deno.test("consumer - stopStrategy sequence", async () => {
const messages = await testConsumerStop({
strategy: ConsumeStopStrategy.Sequence,
arg: 5,
}, 10);
assertEquals(messages.getProcessed(), 5);
});
71 changes: 69 additions & 2 deletions tests/consumersordered_test.ts
Expand Up @@ -13,14 +13,26 @@
* limitations under the License.
*/

import { cleanup, jetstreamServerConf, setup } from "./jstest_util.ts";
import {
cleanup,
initStream,
jetstreamServerConf,
setup,
} from "./jstest_util.ts";
import {
assertEquals,
assertExists,
assertRejects,
} from "https://deno.land/std@0.125.0/testing/asserts.ts";
import { OrderedPullConsumerImpl } from "../nats-base-client/consumer.ts";
import { DeliverPolicy, JsMsg } from "../nats-base-client/types.ts";
import {
AckPolicy,
ConsumeOptions,
ConsumerMessages,
ConsumeStopStrategy,
DeliverPolicy,
JsMsg,
} from "../nats-base-client/types.ts";
import { deferred } from "../nats-base-client/mod.ts";
import { notCompatible } from "./helpers/mod.ts";
import { delay } from "../nats-base-client/util.ts";
Expand Down Expand Up @@ -608,3 +620,58 @@ Deno.test("ordered - next", async () => {

await cleanup(ns, nc);
});

async function testOrderedConsumerStop(
conf: ConsumeOptions,
count: number,
): Promise<ConsumerMessages> {
const { ns, nc } = await setup(jetstreamServerConf());
const { stream, subj } = await initStream(nc);

const jsm = await nc.jetstreamManager();
await jsm.consumers.add(stream, {
durable_name: stream,
ack_policy: AckPolicy.Explicit,
});

const js = nc.jetstream();
await Promise.all(
Array.from({ length: count }).map(() => {
return js.publish(subj);
}),
);

const c = await js.consumers.get(stream);
const iter = await c.consume(conf);
await (async () => {
for await (const m of iter) {
m.ack();
}
})().then();

await cleanup(ns, nc);

return iter;
}

Deno.test("ordered - stopStrategy empty", async () => {
const messages = await testOrderedConsumerStop({
strategy: ConsumeStopStrategy.NoMessages,
}, 0);
assertEquals(messages.getProcessed(), 0);
});

Deno.test("ordered - stopStrategy all ", async () => {
const messages = await testOrderedConsumerStop({
strategy: ConsumeStopStrategy.NoMessages,
}, 10);
assertEquals(messages.getProcessed(), 10);
});

Deno.test("ordered - stopStrategy sequence", async () => {
const messages = await testOrderedConsumerStop({
strategy: ConsumeStopStrategy.Sequence,
arg: 5,
}, 10);
assertEquals(messages.getProcessed(), 5);
});
4 changes: 2 additions & 2 deletions tests/jetstream_test.ts
Expand Up @@ -1125,7 +1125,7 @@ Deno.test("jetstream - fetch one - no wait breaks fast", async () => {

await done;
sw.mark();
assert(25 > sw.duration());
assertBetween(sw.duration(), 0, 500);
assertEquals(batch.getReceived(), 1);
await cleanup(ns, nc);
});
Expand Down Expand Up @@ -1159,7 +1159,7 @@ Deno.test("jetstream - fetch none - cancel timers", async () => {

await done;
sw.mark();
assert(25 > sw.duration());
assertBetween(sw.duration(), 0, 500);
assertEquals(batch.getReceived(), 0);
await cleanup(ns, nc);
});
Expand Down

0 comments on commit 7ae657f

Please sign in to comment.