diff --git a/packages/automerge-repo/src/Repo.ts b/packages/automerge-repo/src/Repo.ts index fb504ab7..7d80588d 100644 --- a/packages/automerge-repo/src/Repo.ts +++ b/packages/automerge-repo/src/Repo.ts @@ -51,6 +51,10 @@ export class Repo extends EventEmitter { /** @hidden */ sharePolicy: SharePolicy = async () => true + /** By default, we sync with all peers. */ + /** @hidden */ + syncPolicy: SyncPolicy = async () => true + /** maps peer id to to persistence information (storageId, isEphemeral), access by collection synchronizer */ /** @hidden */ peerMetadataByPeerId: Record = {} @@ -63,6 +67,7 @@ export class Repo extends EventEmitter { network = [], peerId, sharePolicy, + syncPolicy, isEphemeral = storage === undefined, enableRemoteHeadsGossiping = false, }: RepoConfig = {}) { @@ -70,6 +75,7 @@ export class Repo extends EventEmitter { this.#remoteHeadsGossipingEnabled = enableRemoteHeadsGossiping this.#log = debug(`automerge-repo:repo`) this.sharePolicy = sharePolicy ?? this.sharePolicy + this.syncPolicy = syncPolicy ?? this.syncPolicy // DOC COLLECTION @@ -551,6 +557,11 @@ export interface RepoConfig { */ sharePolicy?: SharePolicy + /** + * A callback can be provided to implement authorization based on document ID and peer ID. + */ + syncPolicy?: SyncPolicy + /** * Whether to enable the experimental remote heads gossiping feature */ @@ -570,6 +581,18 @@ export type SharePolicy = ( documentId?: DocumentId ) => Promise +/** A function that determines whether we should sync a document with a peer + * + * @remarks + * This function is called by the {@link Repo} every time a peer requests to + * sync a document. If this function returns `true` the document syncs normally; + * if `false`, it reports the document as unavailable. + * */ +export type SyncPolicy = ( + peerId: PeerId, + documentId?: DocumentId +) => Promise + // events & payloads export interface RepoEvents { /** A new document was created or discovered */ diff --git a/packages/automerge-repo/src/synchronizer/CollectionSynchronizer.ts b/packages/automerge-repo/src/synchronizer/CollectionSynchronizer.ts index ddfb4820..129ebf26 100644 --- a/packages/automerge-repo/src/synchronizer/CollectionSynchronizer.ts +++ b/packages/automerge-repo/src/synchronizer/CollectionSynchronizer.ts @@ -85,6 +85,18 @@ export class CollectionSynchronizer extends Synchronizer { ) const documentId = message.documentId + const senderId = message.senderId + const okToSync = await this.repo.syncPolicy(senderId, documentId) + if (!okToSync) { + log(`sync blocked by policy: ${message.senderId}, ${message.documentId}`) + this.emit("message", { + type: "doc-unavailable", + targetId: senderId, + documentId + }) + return + } + if (!documentId) { throw new Error("received a message with an invalid documentId") } diff --git a/packages/automerge-repo/test/CollectionSynchronizer.test.ts b/packages/automerge-repo/test/CollectionSynchronizer.test.ts index 7bbf44a2..3bf85611 100644 --- a/packages/automerge-repo/test/CollectionSynchronizer.test.ts +++ b/packages/automerge-repo/test/CollectionSynchronizer.test.ts @@ -75,4 +75,54 @@ describe("CollectionSynchronizer", () => { setTimeout(done) })) + + it("should not synchronize to a peer which is excluded from the sync policy", () => + new Promise((done, reject) => { + const handle = repo.create() + repo.sharePolicy = async () => false + repo.syncPolicy = async (peerId) => peerId !== "peer1" + + synchronizer.addPeer("peer1" as PeerId) + + synchronizer.on("message", (message) => { + if (message.type !== "doc-unavailable") { + reject(new Error("Should not have sent a sync message")) + } + }) + + synchronizer.receiveMessage({ + type: "request", + senderId: "peer1" as PeerId, + targetId: "repo" as PeerId, + documentId: handle.documentId, + data: new Uint8Array() + }) + + setTimeout(done) + })) + + it("should not synchronize a document which is excluded from the sync policy", () => + new Promise((done, reject) => { + const handle = repo.create() + repo.sharePolicy = async () => false + repo.syncPolicy = async (_, documentId) => documentId !== handle.documentId + + synchronizer.addPeer("peer1" as PeerId) + + synchronizer.on("message", (message) => { + if (message.type !== "doc-unavailable") { + reject(new Error("Should not have sent a sync message")) + } + }) + + synchronizer.receiveMessage({ + type: "request", + senderId: "peer1" as PeerId, + targetId: "repo" as PeerId, + documentId: handle.documentId, + data: new Uint8Array() + }) + + setTimeout(done) + })) }) diff --git a/packages/automerge-repo/test/Repo.test.ts b/packages/automerge-repo/test/Repo.test.ts index 2db92356..c4e9c66e 100644 --- a/packages/automerge-repo/test/Repo.test.ts +++ b/packages/automerge-repo/test/Repo.test.ts @@ -8,7 +8,7 @@ import { generateAutomergeUrl, stringifyAutomergeUrl, } from "../src/AutomergeUrl.js" -import { Repo } from "../src/Repo.js" +import { Repo, SyncPolicy } from "../src/Repo.js" import { eventPromise } from "../src/helpers/eventPromise.js" import { pause } from "../src/helpers/pause.js" import { @@ -673,6 +673,7 @@ describe("Repo", () => { const setup = async ({ connectAlice = true, isCharlieEphemeral = false, + charlieForbiddenDoc = false } = {}) => { const charlieExcludedDocuments: DocumentId[] = [] const bobExcludedDocuments: DocumentId[] = [] @@ -694,6 +695,18 @@ describe("Repo", () => { return true } + const syncPolicy: SyncPolicy = async (peerId, documentId) => { + if ( + charlieForbiddenDoc && + charlieExcludedDocuments.includes(documentId) && + peerId === "charlie" + ) { + return false + } + + return true + } + // Set up three repos; connect Alice to Bob, and Bob to Charlie const abChannel = new MessageChannel() @@ -709,6 +722,7 @@ describe("Repo", () => { network: connectAlice ? [aliceNetworkAdapter] : [], peerId: alice, sharePolicy, + syncPolicy }) const bob = "bob" as PeerId @@ -721,6 +735,7 @@ describe("Repo", () => { ], peerId: bob, sharePolicy, + syncPolicy }) const charlie = "charlie" as PeerId @@ -841,6 +856,18 @@ describe("Repo", () => { teardown() }) + it("charlieRepo can't sync with a document when forbidden by syncPolicy", async () => { + const { charlieRepo, notForCharlie, teardown } = await setup({charlieForbiddenDoc: true}) + + const handle = charlieRepo.find(notForCharlie) + + await pause(50) + + assert(handle.isUnavailable()) + + teardown() + }) + it("charlieRepo can request a document across a network of multiple peers", async () => { const { charlieRepo, notForBob, teardown } = await setup()