Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add syncPolicy callback for per-document authorization #350

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions packages/automerge-repo/src/Repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ export class Repo extends EventEmitter<RepoEvents> {
/** @hidden */
sharePolicy: SharePolicy = async () => true

/** By default, we sync with all peers. */
/** @hidden */
syncPolicy: SyncPolicy = async () => true

/** maps peer id to to persistence information (storageId, isEphemeral), access by collection synchronizer */
/** @hidden */
peerMetadataByPeerId: Record<PeerId, PeerMetadata> = {}
Expand All @@ -63,13 +67,15 @@ export class Repo extends EventEmitter<RepoEvents> {
network = [],
peerId,
sharePolicy,
syncPolicy,
isEphemeral = storage === undefined,
enableRemoteHeadsGossiping = false,
}: RepoConfig = {}) {
super()
this.#remoteHeadsGossipingEnabled = enableRemoteHeadsGossiping
this.#log = debug(`automerge-repo:repo`)
this.sharePolicy = sharePolicy ?? this.sharePolicy
this.syncPolicy = syncPolicy ?? this.syncPolicy

// DOC COLLECTION

Expand Down Expand Up @@ -551,6 +557,11 @@ export interface RepoConfig {
*/
sharePolicy?: SharePolicy

/**
* A callback can be provided to implement authorization based on document ID and peer ID.
*/
syncPolicy?: SyncPolicy

/**
* Whether to enable the experimental remote heads gossiping feature
*/
Expand All @@ -570,6 +581,18 @@ export type SharePolicy = (
documentId?: DocumentId
) => Promise<boolean>

/** A function that determines whether we should sync a document with a peer
*
* @remarks
* This function is called by the {@link Repo} every time a peer requests to
* sync a document. If this function returns `true` the document syncs normally;
* if `false`, it reports the document as unavailable.
* */
export type SyncPolicy = (
peerId: PeerId,
documentId?: DocumentId
) => Promise<boolean>

// events & payloads
export interface RepoEvents {
/** A new document was created or discovered */
Expand Down
12 changes: 12 additions & 0 deletions packages/automerge-repo/src/synchronizer/CollectionSynchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ export class CollectionSynchronizer extends Synchronizer {
)

const documentId = message.documentId
const senderId = message.senderId
const okToSync = await this.repo.syncPolicy(senderId, documentId)
Copy link

@twop twop Jun 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was casually observing Discord and took a look at the PR, hence pardon my intrusion, just wanted to mention that await somePromise will always put a new event into the event loop at the very end, thus even an innocent no-op (like async () => true) can create a lot of latency if the event loop is busy/saturated. So it is possible that it might be better to make it optional and thus not awaiting as the default behavior.

if (!okToSync) {
log(`sync blocked by policy: ${message.senderId}, ${message.documentId}`)
this.emit("message", {
type: "doc-unavailable",
targetId: senderId,
documentId
})
return
}

if (!documentId) {
throw new Error("received a message with an invalid documentId")
}
Expand Down
50 changes: 50 additions & 0 deletions packages/automerge-repo/test/CollectionSynchronizer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,54 @@ describe("CollectionSynchronizer", () => {

setTimeout(done)
}))

it("should not synchronize to a peer which is excluded from the sync policy", () =>
new Promise<void>((done, reject) => {
const handle = repo.create()
repo.sharePolicy = async () => false
repo.syncPolicy = async (peerId) => peerId !== "peer1"

synchronizer.addPeer("peer1" as PeerId)

synchronizer.on("message", (message) => {
if (message.type !== "doc-unavailable") {
reject(new Error("Should not have sent a sync message"))
}
})

synchronizer.receiveMessage({
type: "request",
senderId: "peer1" as PeerId,
targetId: "repo" as PeerId,
documentId: handle.documentId,
data: new Uint8Array()
})

setTimeout(done)
}))

it("should not synchronize a document which is excluded from the sync policy", () =>
new Promise<void>((done, reject) => {
const handle = repo.create()
repo.sharePolicy = async () => false
repo.syncPolicy = async (_, documentId) => documentId !== handle.documentId

synchronizer.addPeer("peer1" as PeerId)

synchronizer.on("message", (message) => {
if (message.type !== "doc-unavailable") {
reject(new Error("Should not have sent a sync message"))
}
})

synchronizer.receiveMessage({
type: "request",
senderId: "peer1" as PeerId,
targetId: "repo" as PeerId,
documentId: handle.documentId,
data: new Uint8Array()
})

setTimeout(done)
}))
})
29 changes: 28 additions & 1 deletion packages/automerge-repo/test/Repo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
generateAutomergeUrl,
stringifyAutomergeUrl,
} from "../src/AutomergeUrl.js"
import { Repo } from "../src/Repo.js"
import { Repo, SyncPolicy } from "../src/Repo.js"
import { eventPromise } from "../src/helpers/eventPromise.js"
import { pause } from "../src/helpers/pause.js"
import {
Expand Down Expand Up @@ -673,6 +673,7 @@ describe("Repo", () => {
const setup = async ({
connectAlice = true,
isCharlieEphemeral = false,
charlieForbiddenDoc = false
} = {}) => {
const charlieExcludedDocuments: DocumentId[] = []
const bobExcludedDocuments: DocumentId[] = []
Expand All @@ -694,6 +695,18 @@ describe("Repo", () => {
return true
}

const syncPolicy: SyncPolicy = async (peerId, documentId) => {
if (
charlieForbiddenDoc &&
charlieExcludedDocuments.includes(documentId) &&
peerId === "charlie"
) {
return false
}

return true
}

// Set up three repos; connect Alice to Bob, and Bob to Charlie

const abChannel = new MessageChannel()
Expand All @@ -709,6 +722,7 @@ describe("Repo", () => {
network: connectAlice ? [aliceNetworkAdapter] : [],
peerId: alice,
sharePolicy,
syncPolicy
})

const bob = "bob" as PeerId
Expand All @@ -721,6 +735,7 @@ describe("Repo", () => {
],
peerId: bob,
sharePolicy,
syncPolicy
})

const charlie = "charlie" as PeerId
Expand Down Expand Up @@ -841,6 +856,18 @@ describe("Repo", () => {
teardown()
})

it("charlieRepo can't sync with a document when forbidden by syncPolicy", async () => {
const { charlieRepo, notForCharlie, teardown } = await setup({charlieForbiddenDoc: true})

const handle = charlieRepo.find<TestDoc>(notForCharlie)

await pause(50)

assert(handle.isUnavailable())

teardown()
})

it("charlieRepo can request a document across a network of multiple peers", async () => {
const { charlieRepo, notForBob, teardown } = await setup()

Expand Down