Skip to content

Commit

Permalink
Wait for createDocument to be loaded for subsequent createConnections (
Browse files Browse the repository at this point in the history
  • Loading branch information
jordangarcia authored May 17, 2024
1 parent 60c5202 commit bc42146
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 54 deletions.
31 changes: 26 additions & 5 deletions packages/server/src/Hocuspocus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ export class Hocuspocus {
onDestroy: () => new Promise(r => r(null)),
}

loadingDocuments: Map<string, Promise<Document>> = new Map()

documents: Map<string, Document> = new Map()

server?: HocuspocusServer
Expand Down Expand Up @@ -415,14 +417,33 @@ export class Hocuspocus {
* Create a new document by the given request
*/
public async createDocument(documentName: string, request: Partial<Pick<IncomingMessage, 'headers' | 'url'>>, socketId: string, connection: ConnectionConfiguration, context?: any): Promise<Document> {
if (this.documents.has(documentName)) {
const document = this.documents.get(documentName)
const existingLoadingDoc = this.loadingDocuments.get(documentName)

if (document) {
return document
}
if (existingLoadingDoc) {
return existingLoadingDoc
}

const existingDoc = this.documents.get(documentName)
if (existingDoc) {
return Promise.resolve(existingDoc)
}

const loadDocPromise = this.loadDocument(documentName, request, socketId, connection, context)

this.loadingDocuments.set(documentName, loadDocPromise)

try {
await loadDocPromise
this.loadingDocuments.delete(documentName)
} catch (e) {
this.loadingDocuments.delete(documentName)
throw e
}

return loadDocPromise
}

async loadDocument(documentName: string, request: Partial<Pick<IncomingMessage, 'headers' | 'url'>>, socketId: string, connection: ConnectionConfiguration, context?: any): Promise<Document> {
const requestHeaders = request.headers ?? {}
const requestParameters = getParameters(request)

Expand Down
145 changes: 96 additions & 49 deletions tests/server/onLoadDocument.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import test from 'ava'
import { newHocuspocus, newHocuspocusProvider } from '../utils/index.js'

const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms))

test('executes the onLoadDocument callback', async t => {
await new Promise(async resolve => {
const server = await newHocuspocus({
Expand Down Expand Up @@ -130,6 +132,56 @@ test('multiple simultaneous connections do not create multiple documents', async
})
})

test('multiple simultaneous connections wait for the document to be loaded', async t => {
t.plan(6)

await new Promise(async resolve => {
let resolveOnLoadDocument: () => void = () => {}

const server = await newHocuspocus({
onLoadDocument({ document }) {
// delay more accurately simulates a database fetch
return new Promise(async innerResolve => {
resolveOnLoadDocument = () => {
document.getArray('foo').insert(0, ['bar'])
innerResolve(document)
}
})
},
})

const provider1 = newHocuspocusProvider(server)
const provider2 = newHocuspocusProvider(server)
let provider1Synced = false
let provider2Synced = false

provider1.on('synced', () => {
provider1Synced = true
const value = provider1.document.getArray('foo').get(0)
t.is(value, 'bar')
})
provider2.on('synced', () => {
provider2Synced = true
const value = provider2.document.getArray('foo').get(0)
t.is(value, 'bar')
})

await sleep(100)

t.false(provider1Synced, 'provider1Synced')
t.false(provider2Synced, 'provider2Synced')

resolveOnLoadDocument()

await sleep(100)

t.true(provider1Synced, 'provider1Synced')
t.true(provider2Synced, 'provider2Synced')

resolve('done')
})
})

test('has the server instance', async t => {
await new Promise(async resolve => {
const server = await newHocuspocus({
Expand Down Expand Up @@ -164,8 +216,32 @@ test('stops when an error is thrown in onLoadDocument', async t => {
})
})

test('stops when an error is thrown in onLoadDocument, even when authenticated', async t => {
await new Promise(async resolve => {
const server = await newHocuspocus({
async onAuthenticate() {
return true
},
async onLoadDocument() {
throw new Error()
},
})

newHocuspocusProvider(server, {
token: 'super-secret-token',
onAuthenticationFailed() {
t.pass()
resolve('done')
},
onClose() {
t.fail()
},
})
})
})

test('disconnects all clients related to the document when an error is thrown in onLoadDocument', async t => {
const resolvesNeeded = 4
const resolvesNeeded = 2

await new Promise(async resolve => {

Expand Down Expand Up @@ -195,19 +271,13 @@ test('disconnects all clients related to the document when an error is thrown in
}

const provider1 = newHocuspocusProvider(server, {
onConnect() {
resolver()
},
onClose(event) {
provider1.disconnect()
resolver()
},
})

const provider2 = newHocuspocusProvider(server, {
onConnect() {
resolver()
},
onClose() {
provider2.disconnect()
resolver()
Expand All @@ -218,33 +288,11 @@ test('disconnects all clients related to the document when an error is thrown in

})

test('stops when an error is thrown in onLoadDocument, even when authenticated', async t => {
await new Promise(async resolve => {
const server = await newHocuspocus({
async onAuthenticate() {
return true
},
async onLoadDocument() {
throw new Error()
},
})

newHocuspocusProvider(server, {
token: 'super-secret-token',
onAuthenticationFailed() {
t.pass()
resolve('done')
},
onClose() {
t.fail()
},
})
})
})

test('if a new connection connects while the previous connection still fetches the document, it will just work properly', async t => {
t.plan(11)

let callsToOnLoadDocument = 0
const resolvesNeeded = 11
const resolvesNeeded = 10

await new Promise(async resolve => {

Expand Down Expand Up @@ -313,33 +361,32 @@ test('if a new connection connects while the previous connection still fetches t

t.is(server.documents.size, 1)

const value = provider.document.getArray('foo').get(0)
t.is(value, undefined) // document hasnt loaded yet because it loads for 5sec, but this runs after ~2sec
const value = provider2.document.getArray('foo').get(0)
t.is(value, 'bar-1')

resolver()
},
onMessage(data) {
if (!provider2.isSynced) return
provider2MessagesReceived += 1

const value = provider.document.getArray('foo').get(0)
setTimeout(() => {
const value = provider2.document.getArray('foo').get(0)

if (provider2MessagesReceived === 1) {
// do nothing, this is just the ACK for the sync
t.is(value, undefined)
} else if (provider2MessagesReceived === 2) {
if (provider2MessagesReceived === 1) {
// initial state is now synced
t.is(value, undefined)
} else if (provider2MessagesReceived === 3) {
t.is(value, 'bar-updatedAfterProvider1Synced')
setTimeout(() => {
provider.document.getArray('foo').insert(0, ['bar-updatedAfterProvider2ReceivedMessageFrom1'])
}, 100)
} else {
t.is(value, 'bar-updatedAfterProvider2ReceivedMessageFrom1')
}
t.is(value, 'bar-1')
} else if (provider2MessagesReceived === 2) {
t.is(value, 'bar-updatedAfterProvider1Synced')
setTimeout(() => {
provider.document.getArray('foo').insert(0, ['bar-updatedAfterProvider2ReceivedMessageFrom1'])
}, 100)
} else {
t.is(value, 'bar-updatedAfterProvider2ReceivedMessageFrom1')
}
resolver()
})

resolver()
},
})

Expand Down

0 comments on commit bc42146

Please sign in to comment.