Skip to content

Commit

Permalink
Merge pull request #689 from nats-io/fix-688
Browse files Browse the repository at this point in the history
[FIX] Kv Watcher reliability during server restarts
  • Loading branch information
aricart committed Apr 22, 2024
2 parents 118dbdd + cdce825 commit 66a47b9
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 1 deletion.
3 changes: 3 additions & 0 deletions jetstream/jsclient.ts
Expand Up @@ -98,6 +98,7 @@ import {
PullOptions,
ReplayPolicy,
} from "./jsapi_types.ts";
import { nuid } from "../nats-base-client/nuid.ts";

export enum PubHeaders {
MsgIdHdr = "Nats-Msg-Id",
Expand Down Expand Up @@ -775,6 +776,7 @@ export class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>
const nci = this.js.nc;
nci._resub(this.sub, newDeliver);
const info = this.info;
info.config.name = nuid.next();
info.ordered_consumer_sequence.delivery_seq = 0;
info.flow_control.heartbeat_count = 0;
info.flow_control.fc_count = 0;
Expand Down Expand Up @@ -842,6 +844,7 @@ export class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>
// reset the consumer
const seq = this.info?.ordered_consumer_sequence?.stream_seq || 0;
this._resetOrderedConsumer(seq + 1);
this.monitor?.restart();
// if we are ordered, we will reset the consumer and keep
// feeding the iterator or callback - we are not stopping
return false;
Expand Down
30 changes: 30 additions & 0 deletions jetstream/tests/kv_test.ts
Expand Up @@ -2134,3 +2134,33 @@ Deno.test("kv - honors checkAPI option", async () => {

await cleanup(ns, nc);
});

Deno.test("kv - watcher on server restart", async () => {
let { ns, nc } = await setup(
jetstreamServerConf({}),
);
const js = nc.jetstream();
const kv = await js.views.kv("A");
const iter = await kv.watch();
const d = deferred<KvEntry>();
(async () => {
for await (const e of iter) {
d.resolve(e);
break;
}
})().then();

ns = await ns.restart();
console.log("server restarted");
for (let i = 0; i < 10; i++) {
try {
await kv.put("hello", "world");
break;
} catch {
await delay(500);
}
}

await d;
await cleanup(ns, nc);
});
1 change: 1 addition & 0 deletions nats-base-client/idleheartbeat_monitor.ts
Expand Up @@ -78,6 +78,7 @@ export class IdleHeartbeatMonitor {
}
this.timer = 0;
this.autoCancelTimer = 0;
this.missed = 0;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/deno_transport.ts
Expand Up @@ -157,7 +157,7 @@ export class DenoTransport implements Transport {
const sto = await (this.loadTlsOptions(hostname));
this.conn = await Deno.startTls(
//@ts-ignore: just the conn
this.conn,
this.conn as Deno.TcpConn,
sto,
);
// this is necessary because the startTls process doesn't
Expand Down

0 comments on commit 66a47b9

Please sign in to comment.