Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

os changes #535

Draft
wants to merge 3 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
184 changes: 139 additions & 45 deletions jetstream/objectstore.ts
Expand Up @@ -202,19 +202,26 @@ function emptyReadableStream(): ReadableStream {
});
}

export class ObjectStoreImpl implements ObjectStore {
jsm: JetStreamManager;
js: JetStreamClient;
stream!: string;
name: string;
export interface ObjectStoreKeys {
keyName(key: string): { name: string; error?: Error };
metaSubject(key: string): string;
chunkSubject(id: string, key: string): string;
metaSubjectAll(): string;
streamSubjectNames(): string[];
version(): number;
}

constructor(name: string, jsm: JetStreamManager, js: JetStreamClient) {
export class ObjectStoreKeysV1 implements ObjectStoreKeys {
name: string;
constructor(name: string) {
this.name = name;
this.jsm = jsm;
this.js = js;
}

_sanitizeName(name: string): { name: string; error?: Error } {
version() {
return 1;
}

keyName(name: string): { name: string; error?: Error } {
if (!name || name.length === 0) {
return { name, error: new Error("name cannot be empty") };
}
Expand All @@ -232,6 +239,78 @@ export class ObjectStoreImpl implements ObjectStore {
return { name, error };
}

chunkSubject(id: string, _key: string): string {
return `$O.${this.name}.C.${id}`;
}

metaSubject(key: string): string {
return `$O.${this.name}.M.${Base64UrlPaddedCodec.encode(key)}`;
}

metaSubjectAll(): string {
return `$O.${this.name}.M.>`;
}

streamSubjectNames(): string[] {
return [`$O.${this.name}.C.>`, `$O.${this.name}.M.>`];
}
}

export class ObjectStoreKeysV2 implements ObjectStoreKeys {
name: string;
constructor(name: string) {
this.name = name;
}

version() {
return 2;
}

keyName(name: string): { name: string; error?: Error } {
let error = undefined;
try {
validateKey(name);
} catch (err) {
error = err;
}
return { name, error };
}

chunkSubject(id: string, key: string): string {
return `$O2.${this.name}.C.${id}.${key}`;
}

metaSubject(key: string): string {
return `$O2.${this.name}.M.${key}`;
}

metaSubjectAll(): string {
return `$O2.${this.name}.M.>`;
}

streamSubjectNames(): string[] {
return [`$O2.${this.name}.C.>`, `$O2.${this.name}.M.>`];
}
}

export class ObjectStoreImpl implements ObjectStore {
jsm: JetStreamManager;
js: JetStreamClient;
stream!: string;
name: string;
keys: ObjectStoreKeys;

constructor(name: string, jsm: JetStreamManager, js: JetStreamClient) {
this.name = name;
this.jsm = jsm;
this.js = js;
this.keys = new ObjectStoreKeysV2(this.name);
}

version(): number {
return this.keys.version();
}

async info(name: string): Promise<ObjectInfo | null> {
const info = await this.rawInfo(name);
return info ? new ObjectInfoImpl(info) : null;
Expand All @@ -255,12 +334,12 @@ export class ObjectStoreImpl implements ObjectStore {
}

async rawInfo(name: string): Promise<ServerObjectInfo | null> {
const { name: obj, error } = this._sanitizeName(name);
const { name: obj, error } = this.keys.keyName(name);
if (error) {
return Promise.reject(error);
}

const meta = this._metaSubject(obj);
const meta = this.keys.metaSubject(obj);
try {
const m = await this.jsm.streams.getMessage(this.stream, {
last_by_subj: meta,
Expand Down Expand Up @@ -334,14 +413,14 @@ export class ObjectStoreImpl implements ObjectStore {
meta.options.max_chunk_size = maxChunk;

const old = await this.info(meta.name);
const { name: n, error } = this._sanitizeName(meta.name);
const { name: n, error } = this.keys.keyName(meta.name);
if (error) {
return Promise.reject(error);
}

const id = nuid.next();
const chunkSubj = this._chunkSubject(id);
const metaSubj = this._metaSubject(n);
const chunkSubj = this.keys.chunkSubject(id, n);
const metaSubj = this.keys.metaSubject(n);

const info = Object.assign({
bucket: this.name,
Expand Down Expand Up @@ -402,7 +481,7 @@ export class ObjectStoreImpl implements ObjectStore {
if (old) {
try {
await this.jsm.streams.purge(this.stream, {
filter: `$O.${this.name}.C.${old.nuid}`,
filter: this.keys.chunkSubject(old.nuid, meta.name),
});
} catch (_err) {
// rejecting here, would mean send the wrong signal
Expand Down Expand Up @@ -538,7 +617,7 @@ export class ObjectStoreImpl implements ObjectStore {
const oc = consumerOpts();
oc.orderedConsumer();
const sha = new SHA256();
const subj = `$O.${this.name}.C.${info.nuid}`;
const subj = this.keys.chunkSubject(info.nuid, name);
const sub = await this.js.subscribe(subj, oc);
(async () => {
for await (const jm of sub) {
Expand Down Expand Up @@ -590,7 +669,7 @@ export class ObjectStoreImpl implements ObjectStore {
return Promise.reject("bucket required");
}
const osi = bucket as ObjectStoreImpl;
const { name: n, error } = this._sanitizeName(name);
const { name: n, error } = this.keys.keyName(name);
if (error) {
return Promise.reject(error);
}
Expand All @@ -606,7 +685,7 @@ export class ObjectStoreImpl implements ObjectStore {
if (info.deleted) {
return Promise.reject(new Error("object is deleted"));
}
const { name: n, error } = this._sanitizeName(name);
const { name: n, error } = this.keys.keyName(name);
if (error) {
return Promise.reject(error);
}
Expand Down Expand Up @@ -645,11 +724,11 @@ export class ObjectStoreImpl implements ObjectStore {
const h = headers();
h.set(JsHeaders.RollupHdr, JsHeaders.RollupValueSubject);

await this.js.publish(this._metaSubject(info.name), jc.encode(info), {
await this.js.publish(this.keys.metaSubject(info.name), jc.encode(info), {
headers: h,
});
return this.jsm.streams.purge(this.stream, {
filter: this._chunkSubject(info.nuid),
filter: this.keys.chunkSubject(info.nuid, name),
});
}

Expand All @@ -670,7 +749,7 @@ export class ObjectStoreImpl implements ObjectStore {
// effectively making the object available under 2 names, but it doesn't remove the
// older one.
meta.name = meta.name ?? info.name;
const { name: n, error } = this._sanitizeName(meta.name);
const { name: n, error } = this.keys.keyName(meta.name);
if (error) {
return Promise.reject(error);
}
Expand All @@ -687,7 +766,7 @@ export class ObjectStoreImpl implements ObjectStore {
const ii = Object.assign({}, info, toServerObjectStoreMeta(meta!));
const jc = JSONCodec();

return this.js.publish(this._metaSubject(ii.name), jc.encode(ii));
return this.js.publish(this.keys.metaSubject(ii.name), jc.encode(ii));
}

async watch(opts: Partial<
Expand All @@ -700,7 +779,7 @@ export class ObjectStoreImpl implements ObjectStore {
opts.ignoreDeletes = opts.ignoreDeletes ?? false;
let initialized = false;
const qi = new QueuedIteratorImpl<ObjectInfo | null>();
const subj = this._metaSubjectAll();
const subj = this.keys.metaSubjectAll();
try {
await this.jsm.streams.getMessage(this.stream, { last_by_subj: subj });
} catch (err) {
Expand Down Expand Up @@ -755,37 +834,52 @@ export class ObjectStoreImpl implements ObjectStore {
return qi;
}

_chunkSubject(id: string) {
return `$O.${this.name}.C.${id}`;
}

_metaSubject(n: string): string {
return `$O.${this.name}.M.${Base64UrlPaddedCodec.encode(n)}`;
}

_metaSubjectAll(): string {
return `$O.${this.name}.M.>`;
}

async init(opts: Partial<ObjectStoreOptions> = {}): Promise<void> {
const adapters = [
new ObjectStoreKeysV1(this.name),
new ObjectStoreKeysV2(this.name),
];
try {
this.stream = objectStoreStreamName(this.name);
} catch (err) {
return Promise.reject(err);
}
const sc = Object.assign({}, opts) as StreamConfig;
sc.name = this.stream;
sc.allow_rollup_hdrs = true;
sc.discard = DiscardPolicy.New;
sc.subjects = [`$O.${this.name}.C.>`, `$O.${this.name}.M.>`];
if (opts.placement) {
sc.placement = opts.placement;
}

try {
await this.jsm.streams.info(sc.name);
const si = await this.jsm.streams.info(this.stream);
const { subjects } = si.config;
const keys = adapters.find((k) => {
const a = k.streamSubjectNames();
return a.includes(subjects[0]) && a.includes(subjects[1]);
});
if (!keys) {
return Promise.reject(
new Error("unknown objectstore version configuration"),
);
}
this.keys = keys;
} catch (err) {
if (err.message === "stream not found") {
// honor the version given, if specified - otherwise best version
switch (opts.version) {
case 1:
this.keys = adapters[0];
break;
case 2:
this.keys = adapters[1];
break;
}
const sc = Object.assign({}, opts) as StreamConfig;
sc.name = this.stream;
sc.allow_rollup_hdrs = true;
sc.discard = DiscardPolicy.New;
sc.subjects = this.keys.streamSubjectNames();
if (opts.placement) {
sc.placement = opts.placement;
}
// FIXME: metadata would be good, but really the
// subject for the keys should be different in what
// the stream takes
// sc.metadata = { NatsObjectStoreVersion: "2" };
await this.jsm.streams.add(sc);
}
}
Expand Down