Skip to content

Commit

Permalink
[FEAT] object store compatibility test
Browse files Browse the repository at this point in the history
[FIX] [JETSTREAM] [OS] allow_direct was not defaulted to true
[FIX] [JETSTREAM] [OS] ttl was not copied over to max_age
  • Loading branch information
aricart committed Jun 30, 2023
1 parent f019f3b commit 711e35c
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 3 deletions.
76 changes: 76 additions & 0 deletions bin/os_compatibility_test.ts
@@ -0,0 +1,76 @@
/*
* Copyright 2023 The NATS Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { connect, millis, Msg } from "../src/mod.ts";

const nc = await connect({ servers: "demo.nats.io" });
const js = nc.jetstream();
const jsm = await js.jetstreamManager();

const sub = nc.subscribe("tests.object_store.>");

const create = async function (m: Msg): Promise<void> {
const config = m.json<{ bucket: string }>();
await js.views.os(config.bucket);
m.respond();
};

const customized = async function (m: Msg): Promise<void> {
const config = m.json<Record<string, unknown>>();
const name = config.bucket as string || "";
delete config.bucket;
config.millis = millis(config.max_age as number || 0);
await js.views.os(name, config);
m.respond();
};

const entry = async function (m: Msg): Promise<void> {
const t = m.json<{
bucket: string;
config: { description: string; link: null; name: string };
url: string;
}>();

const name = t.bucket as string || "";
const os = await js.views.os(name);
const d = await fetch(t.url);
if (d.ok && d.body) {
await os.put(
{ name: t.config.name, description: t.config.description },
d.body,
);
}
m.respond();
};

const opts = [
create,
customized,
entry,
];

let i = 0;
for await (const m of sub) {
const r = m.json<{ bucket: string }>();
// if (r.bucket) {
// try {
// await jsm.streams.delete(`OBJ_${r.bucket as string}`);
// } catch (err) {
// // ignore
// }
// }
console.log(r);
await opts[i++](m);
}
6 changes: 4 additions & 2 deletions jetstream/objectstore.ts
Expand Up @@ -773,15 +773,17 @@ export class ObjectStoreImpl implements ObjectStore {
} catch (err) {
return Promise.reject(err);
}
const sc = Object.assign({}, opts) as StreamConfig;
const max_age = opts?.ttl || 0
delete opts.ttl
const sc = Object.assign({max_age}, opts) as StreamConfig;
sc.name = this.stream;
sc.allow_direct = true;
sc.allow_rollup_hdrs = true;
sc.discard = DiscardPolicy.New;
sc.subjects = [`$O.${this.name}.C.>`, `$O.${this.name}.M.>`];
if (opts.placement) {
sc.placement = opts.placement;
}

try {
await this.jsm.streams.info(sc.name);
} catch (err) {
Expand Down
2 changes: 1 addition & 1 deletion jetstream/types.ts
Expand Up @@ -1244,7 +1244,7 @@ export type ObjectStoreStatus = {
export type ObjectStoreInfo = ObjectStoreStatus;
export type ObjectStoreOptions = {
description?: string;
ttl?: Nanos;
"ttl"?: Nanos;
storage: StorageType;
replicas: number;
"max_bytes": number;
Expand Down

0 comments on commit 711e35c

Please sign in to comment.