diff --git a/jetstream/kv.ts b/jetstream/kv.ts index 5abaab0c..dd28900d 100644 --- a/jetstream/kv.ts +++ b/jetstream/kv.ts @@ -900,6 +900,31 @@ export class Bucket implements KV, KvRemove { const si = await this.jsm.streams.info(bn); return new KvStatusImpl(si, cluster); } + + static mirror( + opts: Partial, + srcBucket: string, + destBucket: string, + ...filter: string[] + ): Partial { + opts = opts ?? {}; + if (opts.mirror) { + throw new Error("mirror already set"); + } + const mirror = {} as Partial; + mirror.name = `KV_${srcBucket}`; + + filter = filter ?? []; + if (filter.length === 0) { + filter.push(">"); + } + + mirror.subject_transforms = filter.map((f) => { + return { src: `$KV.${srcBucket}.${f}`, dest: `$KV.${destBucket}.${f}` }; + }); + + return Object.assign(opts, { mirror }); + } } export class KvStatusImpl implements KvStatus { diff --git a/jetstream/tests/kv_test.ts b/jetstream/tests/kv_test.ts index bd5d8512..277dbe2a 100644 --- a/jetstream/tests/kv_test.ts +++ b/jetstream/tests/kv_test.ts @@ -37,6 +37,8 @@ import { KvOptions, nanos, StorageType, + StreamSource, + SubjectTransformConfig, } from "../mod.ts"; import { @@ -2060,3 +2062,31 @@ Deno.test("kv - watcher will name and filter", async () => { await cleanup(ns, nc); }); + +Deno.test("kv - mirror check", async () => { + const { ns, nc } = await setup( + jetstreamServerConf({}, true), + ); + + const js = nc.jetstream(); + const kv = await js.views.kv("A"); + + await Promise.all([ + kv.put("a", "hello"), + kv.put("b", "hi"), + kv.put("c", "done"), + ]); + + const kv2 = await js.views.kv("B", Bucket.mirror({}, "A", "B", "a", "c")); + + // have to wait for the mirror to catch up + await delay(1000); + let s2 = await kv2.status(); + assertEquals(s2.streamInfo.state.messages, 2); + + assertExists(await kv2.get("a")); + assertEquals(await kv2.get("b"), null); + assertExists(await kv2.get("c")); + + await cleanup(ns, nc); +});