Skip to content

Commit

Permalink
Merge pull request #661 from nats-io/kv-bind
Browse files Browse the repository at this point in the history
[FIX] [KV] bind doesn't auto-initiate stream info
  • Loading branch information
aricart committed Mar 12, 2024
2 parents 608e3c1 + 7971e6c commit d045fd7
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 13 deletions.
2 changes: 1 addition & 1 deletion jetstream/jsclient.ts
Expand Up @@ -118,7 +118,7 @@ class ViewsImpl implements Views {
);
}
if (opts.bindOnly) {
return Bucket.bind(this.js, name);
return Bucket.bind(this.js, name, opts);
}

return Bucket.create(this.js, name, opts);
Expand Down
12 changes: 9 additions & 3 deletions jetstream/kv.ts
Expand Up @@ -207,13 +207,19 @@ export class Bucket implements KV, KvRemove {
static async bind(
js: JetStreamClient,
name: string,
opts: Partial<{ codec: KvCodecs }> = {},
opts: Partial<KvOptions> = {},
): Promise<KV> {
const jsm = await js.jetstreamManager();
const info = await jsm.streams.info(`${kvPrefix}${name}`);
validateBucket(info.config.name);
const info = {
config: {
allow_direct: opts.allow_direct,
},
} as StreamInfo;
validateBucket(name);
const bucket = new Bucket(name, js, jsm);
info.config.name = opts.streamName ?? bucket.bucketName();
Object.assign(bucket, info);
bucket.stream = info.config.name;
bucket.codec = opts.codec || NoopKvCodecs();
bucket.direct = info.config.allow_direct ?? false;
bucket.initializePrefixes(info);
Expand Down
40 changes: 31 additions & 9 deletions jetstream/tests/kv_test.ts
Expand Up @@ -191,13 +191,6 @@ Deno.test("kv - bind to existing KV", async () => {
const status = await kv.status();
assertEquals(status.bucket, `${n}`);
await crud(kv);
await assertRejects(
async () => {
await js.views.kv("does_not_exist", { bindOnly: true });
},
NatsError,
"stream not found",
);
await cleanup(ns, nc);
});

Expand Down Expand Up @@ -567,8 +560,7 @@ Deno.test("kv - ttl", async () => {
const e = await b.get("x");
assert(e);
assertEquals(sc.decode(e.value), "hello");

await delay(1500);
await delay(2000);
assertEquals(await b.get("x"), null);

await cleanup(ns, nc);
Expand Down Expand Up @@ -2014,3 +2006,33 @@ Deno.test("kv - purge key if revision", async () => {
await b.purge("a", { previousSeq: seq });
await cleanup(ns, nc);
});

Deno.test("kv - bind no info", async () => {
const { ns, nc } = await setup(
jetstreamServerConf({}, true),
);
if (await notCompatible(ns, nc, "2.6.3")) {
return;
}
const js = nc.jetstream();
await js.views.kv("A");

const d = deferred();
nc.subscribe("$JS.API.STREAM.INFO.>", {
callback: (_err, msg) => {
d.reject(new Error("saw stream info"));
},
});

const kv = await js.views.kv("A", { bindOnly: true, allow_direct: true });
await kv.put("a", "hello");
const e = await kv.get("a");
assertEquals(e?.string(), "hello");
await kv.delete("a");

d.resolve();
// shouldn't have rejected earlier
await d;

await cleanup(ns, nc);
});

0 comments on commit d045fd7

Please sign in to comment.