diff --git a/jetstream/objectstore.ts b/jetstream/objectstore.ts index 389f1e4b..5db65c5f 100644 --- a/jetstream/objectstore.ts +++ b/jetstream/objectstore.ts @@ -202,19 +202,26 @@ function emptyReadableStream(): ReadableStream { }); } -export class ObjectStoreImpl implements ObjectStore { - jsm: JetStreamManager; - js: JetStreamClient; - stream!: string; - name: string; +export interface ObjectStoreKeys { + keyName(key: string): { name: string; error?: Error }; + metaSubject(key: string): string; + chunkSubject(id: string, key: string): string; + metaSubjectAll(): string; + streamSubjectNames(): string[]; + version(): number; +} - constructor(name: string, jsm: JetStreamManager, js: JetStreamClient) { +export class ObjectStoreKeysV1 implements ObjectStoreKeys { + name: string; + constructor(name: string) { this.name = name; - this.jsm = jsm; - this.js = js; } - _sanitizeName(name: string): { name: string; error?: Error } { + version() { + return 1; + } + + keyName(name: string): { name: string; error?: Error } { if (!name || name.length === 0) { return { name, error: new Error("name cannot be empty") }; } @@ -232,6 +239,78 @@ export class ObjectStoreImpl implements ObjectStore { return { name, error }; } + chunkSubject(id: string, _key: string): string { + return `$O.${this.name}.C.${id}`; + } + + metaSubject(key: string): string { + return `$O.${this.name}.M.${Base64UrlPaddedCodec.encode(key)}`; + } + + metaSubjectAll(): string { + return `$O.${this.name}.M.>`; + } + + streamSubjectNames(): string[] { + return [`$O.${this.name}.C.>`, `$O.${this.name}.M.>`]; + } +} + +export class ObjectStoreKeysV2 implements ObjectStoreKeys { + name: string; + constructor(name: string) { + this.name = name; + } + + version() { + return 2; + } + + keyName(name: string): { name: string; error?: Error } { + let error = undefined; + try { + validateKey(name); + } catch (err) { + error = err; + } + return { name, error }; + } + + chunkSubject(id: string, key: string): string { + return `$O2.${this.name}.C.${id}.${key}`; + } + + metaSubject(key: string): string { + return `$O2.${this.name}.M.${key}`; + } + + metaSubjectAll(): string { + return `$O2.${this.name}.M.>`; + } + + streamSubjectNames(): string[] { + return [`$O2.${this.name}.C.>`, `$O2.${this.name}.M.>`]; + } +} + +export class ObjectStoreImpl implements ObjectStore { + jsm: JetStreamManager; + js: JetStreamClient; + stream!: string; + name: string; + keys: ObjectStoreKeys; + + constructor(name: string, jsm: JetStreamManager, js: JetStreamClient) { + this.name = name; + this.jsm = jsm; + this.js = js; + this.keys = new ObjectStoreKeysV2(this.name); + } + + version(): number { + return this.keys.version(); + } + async info(name: string): Promise { const info = await this.rawInfo(name); return info ? new ObjectInfoImpl(info) : null; @@ -255,12 +334,12 @@ export class ObjectStoreImpl implements ObjectStore { } async rawInfo(name: string): Promise { - const { name: obj, error } = this._sanitizeName(name); + const { name: obj, error } = this.keys.keyName(name); if (error) { return Promise.reject(error); } - const meta = this._metaSubject(obj); + const meta = this.keys.metaSubject(obj); try { const m = await this.jsm.streams.getMessage(this.stream, { last_by_subj: meta, @@ -334,14 +413,14 @@ export class ObjectStoreImpl implements ObjectStore { meta.options.max_chunk_size = maxChunk; const old = await this.info(meta.name); - const { name: n, error } = this._sanitizeName(meta.name); + const { name: n, error } = this.keys.keyName(meta.name); if (error) { return Promise.reject(error); } const id = nuid.next(); - const chunkSubj = this._chunkSubject(id); - const metaSubj = this._metaSubject(n); + const chunkSubj = this.keys.chunkSubject(id, n); + const metaSubj = this.keys.metaSubject(n); const info = Object.assign({ bucket: this.name, @@ -402,7 +481,7 @@ export class ObjectStoreImpl implements ObjectStore { if (old) { try { await this.jsm.streams.purge(this.stream, { - filter: `$O.${this.name}.C.${old.nuid}`, + filter: this.keys.chunkSubject(old.nuid, meta.name), }); } catch (_err) { // rejecting here, would mean send the wrong signal @@ -538,7 +617,7 @@ export class ObjectStoreImpl implements ObjectStore { const oc = consumerOpts(); oc.orderedConsumer(); const sha = new SHA256(); - const subj = `$O.${this.name}.C.${info.nuid}`; + const subj = this.keys.chunkSubject(info.nuid, name); const sub = await this.js.subscribe(subj, oc); (async () => { for await (const jm of sub) { @@ -590,7 +669,7 @@ export class ObjectStoreImpl implements ObjectStore { return Promise.reject("bucket required"); } const osi = bucket as ObjectStoreImpl; - const { name: n, error } = this._sanitizeName(name); + const { name: n, error } = this.keys.keyName(name); if (error) { return Promise.reject(error); } @@ -606,7 +685,7 @@ export class ObjectStoreImpl implements ObjectStore { if (info.deleted) { return Promise.reject(new Error("object is deleted")); } - const { name: n, error } = this._sanitizeName(name); + const { name: n, error } = this.keys.keyName(name); if (error) { return Promise.reject(error); } @@ -645,11 +724,11 @@ export class ObjectStoreImpl implements ObjectStore { const h = headers(); h.set(JsHeaders.RollupHdr, JsHeaders.RollupValueSubject); - await this.js.publish(this._metaSubject(info.name), jc.encode(info), { + await this.js.publish(this.keys.metaSubject(info.name), jc.encode(info), { headers: h, }); return this.jsm.streams.purge(this.stream, { - filter: this._chunkSubject(info.nuid), + filter: this.keys.chunkSubject(info.nuid, name), }); } @@ -670,7 +749,7 @@ export class ObjectStoreImpl implements ObjectStore { // effectively making the object available under 2 names, but it doesn't remove the // older one. meta.name = meta.name ?? info.name; - const { name: n, error } = this._sanitizeName(meta.name); + const { name: n, error } = this.keys.keyName(meta.name); if (error) { return Promise.reject(error); } @@ -687,7 +766,7 @@ export class ObjectStoreImpl implements ObjectStore { const ii = Object.assign({}, info, toServerObjectStoreMeta(meta!)); const jc = JSONCodec(); - return this.js.publish(this._metaSubject(ii.name), jc.encode(ii)); + return this.js.publish(this.keys.metaSubject(ii.name), jc.encode(ii)); } async watch(opts: Partial< @@ -700,7 +779,7 @@ export class ObjectStoreImpl implements ObjectStore { opts.ignoreDeletes = opts.ignoreDeletes ?? false; let initialized = false; const qi = new QueuedIteratorImpl(); - const subj = this._metaSubjectAll(); + const subj = this.keys.metaSubjectAll(); try { await this.jsm.streams.getMessage(this.stream, { last_by_subj: subj }); } catch (err) { @@ -755,37 +834,52 @@ export class ObjectStoreImpl implements ObjectStore { return qi; } - _chunkSubject(id: string) { - return `$O.${this.name}.C.${id}`; - } - - _metaSubject(n: string): string { - return `$O.${this.name}.M.${Base64UrlPaddedCodec.encode(n)}`; - } - - _metaSubjectAll(): string { - return `$O.${this.name}.M.>`; - } - async init(opts: Partial = {}): Promise { + const adapters = [ + new ObjectStoreKeysV1(this.name), + new ObjectStoreKeysV2(this.name), + ]; try { this.stream = objectStoreStreamName(this.name); } catch (err) { return Promise.reject(err); } - const sc = Object.assign({}, opts) as StreamConfig; - sc.name = this.stream; - sc.allow_rollup_hdrs = true; - sc.discard = DiscardPolicy.New; - sc.subjects = [`$O.${this.name}.C.>`, `$O.${this.name}.M.>`]; - if (opts.placement) { - sc.placement = opts.placement; - } - try { - await this.jsm.streams.info(sc.name); + const si = await this.jsm.streams.info(this.stream); + const { subjects } = si.config; + const keys = adapters.find((k) => { + const a = k.streamSubjectNames(); + return a.includes(subjects[0]) && a.includes(subjects[1]); + }); + if (!keys) { + return Promise.reject( + new Error("unknown objectstore version configuration"), + ); + } + this.keys = keys; } catch (err) { if (err.message === "stream not found") { + // honor the version given, if specified - otherwise best version + switch (opts.version) { + case 1: + this.keys = adapters[0]; + break; + case 2: + this.keys = adapters[1]; + break; + } + const sc = Object.assign({}, opts) as StreamConfig; + sc.name = this.stream; + sc.allow_rollup_hdrs = true; + sc.discard = DiscardPolicy.New; + sc.subjects = this.keys.streamSubjectNames(); + if (opts.placement) { + sc.placement = opts.placement; + } + // FIXME: metadata would be good, but really the + // subject for the keys should be different in what + // the stream takes + // sc.metadata = { NatsObjectStoreVersion: "2" }; await this.jsm.streams.add(sc); } } diff --git a/jetstream/tests/objectstore_test.ts b/jetstream/tests/objectstore_test.ts index 946161ba..f3d21097 100644 --- a/jetstream/tests/objectstore_test.ts +++ b/jetstream/tests/objectstore_test.ts @@ -33,7 +33,7 @@ import { Empty, headers, StringCodec } from "../../src/mod.ts"; import { equals } from "https://deno.land/std@0.190.0/bytes/mod.ts"; import { SHA256 } from "../../nats-base-client/sha256.js"; import { Base64UrlPaddedCodec } from "../../nats-base-client/base64.ts"; -import { digestType } from "../objectstore.ts"; +import { digestType, ObjectStoreImpl } from "../objectstore.ts"; function readableStreamFrom(data: Uint8Array): ReadableStream { return new ReadableStream({ @@ -314,8 +314,28 @@ Deno.test("objectstore - object names", async () => { const os = await js.views.os("test", { storage: StorageType.Memory }); const sc = StringCodec(); await os.put({ name: "blob.txt" }, readableStreamFrom(sc.encode("A"))); - await os.put({ name: "foo bar" }, readableStreamFrom(sc.encode("A"))); - await os.put({ name: " " }, readableStreamFrom(sc.encode("A"))); + if (os.version() === 1) { + await os.put({ name: "foo bar" }, readableStreamFrom(sc.encode("A"))); + } else { + await assertRejects( + async () => { + await os.put({ name: "foo bar" }, readableStreamFrom(sc.encode("A"))); + }, + Error, + "invalid key", + ); + } + if (os.version() === 1) { + await os.put({ name: " " }, readableStreamFrom(sc.encode("A"))); + } else { + await assertRejects( + async () => { + await os.put({ name: "foo bar" }, readableStreamFrom(sc.encode("A"))); + }, + Error, + "invalid key", + ); + } await assertRejects(async () => { await os.put({ name: "*" }, readableStreamFrom(sc.encode("A"))); @@ -669,26 +689,42 @@ Deno.test("objectstore - sanitize", async () => { const js = nc.jetstream(); const os = await js.views.os("test"); await os.put({ name: "has.dots.here" }, readableStreamFrom(makeData(1))); - await os.put( - { name: "the spaces are here" }, - readableStreamFrom(makeData(1)), - ); + if (os.version() === 1) { + await os.put( + { name: "the spaces are here" }, + readableStreamFrom(makeData(1)), + ); + } else { + await assertRejects( + async () => { + await os.put( + { name: "the spaces are here" }, + readableStreamFrom(makeData(1)), + ); + }, + Error, + "invalid key", + ); + } const info = await os.status({ subjects_filter: ">", }); + const osi = os as ObjectStoreImpl; assertEquals( info.streamInfo.state - ?.subjects![`$O.test.M.${Base64UrlPaddedCodec.encode("has_dots_here")}`], - 1, - ); - assertEquals( - info.streamInfo.state - .subjects![ - `$O.test.M.${Base64UrlPaddedCodec.encode("the_spaces_are_here")}` - ], + ?.subjects![osi.keys.metaSubject("has.dots.here")], 1, ); + if (os.version() === 1) { + assertEquals( + info.streamInfo.state + .subjects![ + osi.keys.metaSubject("the spaces are here") + ], + 1, + ); + } await cleanup(ns, nc); }); @@ -795,6 +831,9 @@ Deno.test("objectstore - no store", async () => { Deno.test("objectstore - hashtests", async () => { const { ns, nc } = await setup(jetstreamServerConf({ max_payload: 1024 * 1024, + jetstream: { + max_file_store: 1024 * 1024 * 2, + }, }, true)); if (await notCompatible(ns, nc, "2.6.3")) { return; @@ -1016,3 +1055,50 @@ Deno.test("objectstore - put/get blob", async () => { await cleanup(ns, nc); }); + +Deno.test("objectstore - v1/v2", async () => { + const { ns, nc } = await setup(jetstreamServerConf({}, true)); + if (await notCompatible(ns, nc, "2.6.3")) { + return; + } + + const js = nc.jetstream(); + const v1 = await js.views.os("v1", { description: "testing", version: 1 }); + assertEquals(v1.version(), 1); + await v1.putBlob({ name: "A" }, new Uint8Array(10)); + + let { subjects } = (await v1.status()).streamInfo.config; + assert(subjects[0].startsWith("$O.")); + + let v = await js.views.os("v1"); + assertEquals(v.version(), 1); + let data = await v.getBlob("A"); + assertEquals(data?.length, 10); + + const v2 = await js.views.os("v2", { version: 2 }); + assertEquals(v2.version(), 2); + await v2.putBlob({ name: "A" }, new Uint8Array(11)); + + v = await js.views.os("v2", { version: 1 }); + assertEquals(v.version(), 2); + data = await v.getBlob("A"); + assertEquals(data?.length, 11); + + let v3 = await js.views.os("v3", { description: "testing" }); + const sc = (await v3.status()).streamInfo.config; + subjects = sc.subjects.map((s) => { + return s.replaceAll("$O2.", "$O3."); + }); + const jsm = await js.jetstreamManager(); + await jsm.streams.update(sc.name, { subjects }); + + await assertRejects( + async () => { + await js.views.os("v3"); + }, + Error, + "unknown objectstore version configuration", + ); + + await cleanup(ns, nc); +}); diff --git a/jetstream/types.ts b/jetstream/types.ts index 258929c6..8612cecd 100644 --- a/jetstream/types.ts +++ b/jetstream/types.ts @@ -1250,6 +1250,7 @@ export type ObjectStoreOptions = { replicas: number; "max_bytes": number; placement: Placement; + version: number; }; export type ObjectResult = { info: ObjectInfo; @@ -1312,6 +1313,8 @@ export interface ObjectStore { update(name: string, meta: Partial): Promise; destroy(): Promise; + + version(): number; } export enum DirectMsgHeaders {