Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Enable sync with all peers for not local documents #292

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
4 changes: 3 additions & 1 deletion packages/automerge-repo/src/Repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ export class Repo extends EventEmitter<RepoEvents> {
// The `document` event is fired by the DocCollection any time we create a new document or look
// up a document by ID. We listen for it in order to wire up storage and network synchronization.
this.on("document", async ({ handle, isNew }) => {
let isLocal = isNew
if (storageSubsystem) {
// Save when the document changes, but no more often than saveDebounceRate.
const saveFn = ({
Expand All @@ -90,6 +91,7 @@ export class Repo extends EventEmitter<RepoEvents> {
// Try to load from disk
const loadedDoc = await storageSubsystem.loadDoc(handle.documentId)
if (loadedDoc) {
isLocal = true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is another side effect here. Let's say that we have a peer with a sharePolicy of false. If it boots up for the first time and tries to find a document, will this now be immediately synced with all peers?

handle.update(() => loadedDoc)
}
}
Expand Down Expand Up @@ -117,7 +119,7 @@ export class Repo extends EventEmitter<RepoEvents> {
}

// Register the document with the synchronizer. This advertises our interest in the document.
this.#synchronizer.addDocument(handle.documentId)
this.#synchronizer.addDocument({ documentId: handle.documentId, isLocal })
})

this.on("delete-document", ({ documentId }) => {
Expand Down
18 changes: 14 additions & 4 deletions packages/automerge-repo/src/synchronizer/CollectionSynchronizer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,25 @@ export class CollectionSynchronizer extends Synchronizer {
/**
* Starts synchronizing the given document with all peers that we share it generously with.
*/
addDocument(documentId: DocumentId) {
addDocument({
documentId,
isLocal,
}: {
documentId: DocumentId
isLocal: boolean
}) {
// HACK: this is a hack to prevent us from adding the same document twice
if (this.#docSetUp[documentId]) {
return
}
const docSynchronizer = this.#fetchDocSynchronizer(documentId)
void this.#documentGenerousPeers(documentId).then(peers => {
docSynchronizer.beginSync(peers)
})
if (isLocal) {
void this.#documentGenerousPeers(documentId).then(peers =>
docSynchronizer.beginSync(peers)
)
} else {
docSynchronizer.beginSync(Array.from(this.#peers.values()))
}
}

// TODO: implement this
Expand Down
8 changes: 4 additions & 4 deletions packages/automerge-repo/test/CollectionSynchronizer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ describe("CollectionSynchronizer", () => {
done()
})

synchronizer.addDocument(handle.documentId)
synchronizer.addDocument({ documentId: handle.documentId, isLocal: true })
}))

it("starts synchronizing existing documents when a peer is added", () =>
new Promise<void>(done => {
const handle = repo.create()
synchronizer.addDocument(handle.documentId)
synchronizer.addDocument({ documentId: handle.documentId, isLocal: true })
synchronizer.once("message", event => {
const { targetId, documentId } = event as SyncMessage
assert(targetId === "peer1")
Expand All @@ -52,7 +52,7 @@ describe("CollectionSynchronizer", () => {

repo.sharePolicy = async (peerId: PeerId) => peerId !== "peer1"

synchronizer.addDocument(handle.documentId)
synchronizer.addDocument({ documentId: handle.documentId, isLocal: true })
synchronizer.once("message", () => {
reject(new Error("Should not have sent a message"))
})
Expand All @@ -73,7 +73,7 @@ describe("CollectionSynchronizer", () => {
reject(new Error("Should not have sent a message"))
})

synchronizer.addDocument(handle.documentId)
synchronizer.addDocument({ documentId: handle.documentId, isLocal: true })

setTimeout(done)
}))
Expand Down
66 changes: 66 additions & 0 deletions packages/automerge-repo/test/Repo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,72 @@ describe("Repo", () => {
assert.equal(bobDoc.isReady(), true)
})

it("share policy `false`", async () => {
const alice = "alice" as PeerId
const bob = "bob" as PeerId
const [aliceAdapter, bobAdapter] = DummyNetworkAdapter.createConnectedPair()

const aliceRepo = new Repo({
network: [aliceAdapter],
peerId: alice,
sharePolicy: async () => false,
})
const bobRepo = new Repo({
network: [bobAdapter],
peerId: bob,
sharePolicy: async () => false,
})

aliceAdapter.peerCandidate(bob)
bobAdapter.peerCandidate(alice)

const aliceDoc = aliceRepo.create()
aliceDoc.change((doc: any) => (doc.text = "Hello world"))

const bobDoc = bobRepo.find(aliceDoc.url)

await bobDoc.whenReady()

assert.equal(bobDoc.isReady(), true)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there should be a test here for a 3rd peer, to show it can't find a document from across the network.

eg. alice --> bob --> charlie

Alice creates a document.
Charlie requests it. Show that it doesn't resolve.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

})

it("share policy `false` with `MessageChannelNetworkAdapter`", async () => {
const alice = "alice" as PeerId
const bob = "bob" as PeerId
const { port1: ab, port2: ba } = new MessageChannel()

const aliceNetworkAdapter = new MessageChannelNetworkAdapter(ab)
const bobNetworkAdapter = new MessageChannelNetworkAdapter(ba)

const aliceRepo = new Repo({
network: [aliceNetworkAdapter],
peerId: alice,
sharePolicy: async () => false,
})
const bobRepo = new Repo({
network: [bobNetworkAdapter],
peerId: bob,
sharePolicy: async () => false,
})

await Promise.all([
eventPromise(aliceRepo.networkSubsystem, "peer"),
eventPromise(bobRepo.networkSubsystem, "peer"),
])

const aliceDoc = aliceRepo.create()
aliceDoc.change((doc: any) => (doc.text = "Hello world"))

const bobDoc = bobRepo.find(aliceDoc.url)

await bobDoc.whenReady()

assert.equal(bobDoc.isReady(), true)

ab.close()
ba.close()
})

describe("with peers (mesh network)", () => {
const setup = async () => {
// Set up three repos; connect Alice to Bob, Bob to Charlie, and Alice to Charlie
Expand Down
Loading