From 6d0ccb15ea36ed020d08807f70117d9b5de46247 Mon Sep 17 00:00:00 2001 From: Anthony LC Date: Thu, 13 Feb 2025 16:42:56 +0100 Subject: [PATCH 01/10] =?UTF-8?q?=F0=9F=94=A7(y-provider)=20add=20missing?= =?UTF-8?q?=20Sentry=20environment?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The environment was missing in the Sentry configuration. This commit adds the environment to the Sentry configuration. --- src/frontend/servers/y-provider/src/env.ts | 5 +++-- src/frontend/servers/y-provider/src/services/sentry.ts | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/frontend/servers/y-provider/src/env.ts b/src/frontend/servers/y-provider/src/env.ts index 7f23bfc58..a4f86dd9e 100644 --- a/src/frontend/servers/y-provider/src/env.ts +++ b/src/frontend/servers/y-provider/src/env.ts @@ -4,7 +4,8 @@ export const COLLABORATION_SERVER_ORIGIN = process.env.COLLABORATION_SERVER_ORIGIN || 'http://localhost:3000'; export const COLLABORATION_SERVER_SECRET = process.env.COLLABORATION_SERVER_SECRET || 'secret-api-key'; -export const Y_PROVIDER_API_KEY = - process.env.Y_PROVIDER_API_KEY || 'yprovider-api-key'; export const PORT = Number(process.env.PORT || 4444); export const SENTRY_DSN = process.env.SENTRY_DSN || ''; +export const SENTRY_ENV = process.env.SENTRY_ENV || 'Development'; +export const Y_PROVIDER_API_KEY = + process.env.Y_PROVIDER_API_KEY || 'yprovider-api-key'; diff --git a/src/frontend/servers/y-provider/src/services/sentry.ts b/src/frontend/servers/y-provider/src/services/sentry.ts index 9857777a3..254631424 100644 --- a/src/frontend/servers/y-provider/src/services/sentry.ts +++ b/src/frontend/servers/y-provider/src/services/sentry.ts @@ -1,11 +1,12 @@ import * as Sentry from '@sentry/node'; import { nodeProfilingIntegration } from '@sentry/profiling-node'; -import { SENTRY_DSN } from '../env'; +import { SENTRY_DSN, SENTRY_ENV } from '../env'; Sentry.init({ dsn: SENTRY_DSN, integrations: [nodeProfilingIntegration()], tracesSampleRate: 0.1, profilesSampleRate: 1.0, + environment: SENTRY_ENV, }); From 971b7e099d1d75fe3e3627275ed7285e85a7192f Mon Sep 17 00:00:00 2001 From: Anthony LC Date: Thu, 13 Feb 2025 16:45:52 +0100 Subject: [PATCH 02/10] =?UTF-8?q?=E2=9C=A8(y-provider)=20collaboration=20p?= =?UTF-8?q?olling?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We can now interact with the collaboration server using http requests. It will be used as a fallback when the websocket is not working. 2 kind of requests: - to send messages to the server we use POST requests - to get messages from the server we use a GET request using SSE (Server Sent Events) --- .../y-provider/__tests__/PollSync.test.ts | 185 ++++++++++++++ .../collaborationPollHandler.test.ts | 193 +++++++++++++++ .../servers/y-provider/jest.config.js | 1 + src/frontend/servers/y-provider/package.json | 1 + .../src/handlers/collaborationPollHandler.ts | 132 ++++++++++ .../servers/y-provider/src/handlers/index.ts | 3 +- .../servers/y-provider/src/helpers.ts | 6 +- .../servers/y-provider/src/libs/PollSync.ts | 227 ++++++++++++++++++ src/frontend/servers/y-provider/src/routes.ts | 2 + .../y-provider/src/servers/appServer.ts | 20 +- src/frontend/servers/y-provider/src/utils.ts | 13 +- src/frontend/yarn.lock | 2 +- 12 files changed, 777 insertions(+), 8 deletions(-) create mode 100644 src/frontend/servers/y-provider/__tests__/PollSync.test.ts create mode 100644 src/frontend/servers/y-provider/__tests__/collaborationPollHandler.test.ts create mode 100644 src/frontend/servers/y-provider/src/handlers/collaborationPollHandler.ts create mode 100644 src/frontend/servers/y-provider/src/libs/PollSync.ts diff --git a/src/frontend/servers/y-provider/__tests__/PollSync.test.ts b/src/frontend/servers/y-provider/__tests__/PollSync.test.ts new file mode 100644 index 000000000..8cfb69a41 --- /dev/null +++ b/src/frontend/servers/y-provider/__tests__/PollSync.test.ts @@ -0,0 +1,185 @@ +/* eslint-disable @typescript-eslint/no-require-imports */ +import { EventEmitter } from 'events'; + +import { MessageType } from '@hocuspocus/server'; +import { Response } from 'express'; +import * as Y from 'yjs'; + +import { PollSync, PollSyncRequest } from '../src/libs/PollSync'; + +const { base64ToYDoc } = require('@/utils'); +const { logger } = require('@/utils'); + +const mockEncodeStateAsUpdate = jest.fn(); +jest.mock('yjs', () => ({ + ...jest.requireActual('yjs'), + encodeStateAsUpdate: () => mockEncodeStateAsUpdate(), +})); + +jest.mock('@/utils', () => ({ + base64ToYDoc: jest.fn((_b64: string) => new Y.Doc()), + toBase64: jest.fn((data: Uint8Array) => Buffer.from(data).toString('base64')), + logger: jest.fn(), +})); + +jest.mock('@hocuspocus/server', () => { + const originalModule = jest.requireActual('@hocuspocus/server'); + return { + __esModule: true, + ...originalModule, + IncomingMessage: jest.fn().mockImplementation((buf: Buffer) => ({ + buffer: buf, + readVarString: jest.fn(() => 'testRoom'), + readVarUint: jest.fn(() => MessageType.Sync), + writeVarUint: jest.fn(), + decoder: {}, + encoder: {}, + })), + }; +}); + +describe('PollSync', () => { + let req: PollSyncRequest; + let res: Response; + let dummyDoc: any; + let pollSync: PollSync; + let mockHocuspocusServer: any; + + beforeEach(() => { + req = Object.assign(new EventEmitter(), { + on: jest.fn((event, cb) => { + // For 'close' event, store the callback for manual trigger in tests. + if (event === 'close') { + req.destroy = cb; + } + }), + }) as unknown as PollSyncRequest; + + res = { + write: jest.fn(), + end: jest.fn(), + } as unknown as Response; + + // Create a dummy document with required methods/properties + dummyDoc = { + name: 'testRoom', + merge: jest.fn((_other: Y.Doc) => { + // Simulate merging by returning a new Y.Doc instance + return new Y.Doc(); + }), + getConnections: jest.fn(() => [{ handleMessage: jest.fn() }]), + awareness: { + on: jest.fn(), + off: jest.fn(), + }, + addDirectConnection: jest.fn(), + removeDirectConnection: jest.fn(), + on: jest.fn(), + off: jest.fn(), + }; + + pollSync = new PollSync(req, 'testRoom', true); + // Pre-set the document for non-init tests + (pollSync as any)._hpDocument = dummyDoc; + + // Create a dummy Hocuspocus server + mockHocuspocusServer = { + loadingDocuments: { get: jest.fn() }, + documents: new Map(), + createDocument: jest.fn(() => ({ + name: 'newDoc', + merge: (doc: Y.Doc) => doc, + })), + }; + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + describe('initHocuspocusDocument', () => { + it('should return document from loadingDocuments when available', async () => { + mockHocuspocusServer.loadingDocuments.get.mockResolvedValue(dummyDoc); + pollSync = new PollSync(req, 'testRoom', true); + const doc = await pollSync.initHocuspocusDocument(mockHocuspocusServer); + expect(doc).toBe(dummyDoc); + expect(mockHocuspocusServer.loadingDocuments.get).toHaveBeenCalledWith( + 'testRoom', + ); + }); + + it('should return document from documents when available', async () => { + mockHocuspocusServer.loadingDocuments.get.mockResolvedValue(undefined); + mockHocuspocusServer.documents.set('testRoom', dummyDoc); + pollSync = new PollSync(req, 'testRoom', false); + const doc = await pollSync.initHocuspocusDocument(mockHocuspocusServer); + expect(doc).toBe(dummyDoc); + }); + + it('should create a new document when none exists and canEdit is true', async () => { + mockHocuspocusServer.loadingDocuments.get.mockResolvedValue(undefined); + pollSync = new PollSync(req, 'testRoom', true); + const newDoc = { name: 'newDoc', merge: (doc: any) => doc }; + mockHocuspocusServer.createDocument.mockResolvedValue(newDoc); + const doc = await pollSync.initHocuspocusDocument(mockHocuspocusServer); + expect(doc).toBe(newDoc); + expect(mockHocuspocusServer.createDocument).toHaveBeenCalled(); + }); + }); + + describe('sync', () => { + it('should encode state without merging when canEdit is false', () => { + // When user cannot edit, merge should not be called. + pollSync = new PollSync(req, 'testRoom', false); + (pollSync as any)._hpDocument = dummyDoc; + mockEncodeStateAsUpdate.mockReturnValue(Uint8Array.from([1, 2, 3])); + + const result = pollSync.sync('dummyLocalDoc64'); + expect(dummyDoc.merge).not.toHaveBeenCalled(); + expect(result).toBe( + Buffer.from(Uint8Array.from([1, 2, 3])).toString('base64'), + ); + }); + + it('should merge local doc when canEdit is true', () => { + pollSync = new PollSync(req, 'testRoom', true); + (pollSync as any)._hpDocument = dummyDoc; + const localDoc = new Y.Doc(); + // Mock base64ToYDoc to return our localDoc + base64ToYDoc.mockReturnValue(localDoc); + mockEncodeStateAsUpdate.mockReturnValue(Uint8Array.from([4, 5, 6])); + pollSync.sync('localDoc64'); + expect(dummyDoc.merge).toHaveBeenCalledWith(localDoc); + }); + }); + + describe('sendClientsMessages', () => { + it('should log an error if room names do not match', () => { + // Set doc name different from what IncomingMessage returns ('testRoom') + dummyDoc.name = 'differentRoom'; + (pollSync as any)._hpDocument = dummyDoc; + const fakeMessage = Buffer.from('fakeMessage').toString('base64'); + pollSync.sendClientsMessages(fakeMessage); + expect(logger).toHaveBeenCalled(); + }); + }); + + describe('pullClientsMessages', () => { + it('should register event listeners and cleanup on request close', () => { + (pollSync as any)._hpDocument = dummyDoc; + pollSync.pullClientsMessages(res); + expect(dummyDoc.addDirectConnection).toHaveBeenCalled(); + expect(dummyDoc.on).toHaveBeenCalled(); + expect(dummyDoc.awareness.on).toHaveBeenCalled(); + + // Simulate a 'close' event on the request + if (req.destroy) { + req.destroy(); + } + expect(dummyDoc.removeDirectConnection).toHaveBeenCalled(); + // Verify that document listeners are removed (off was called) + expect(dummyDoc.off).toHaveBeenCalled(); + expect(dummyDoc.awareness.off).toHaveBeenCalled(); + }); + }); +}); diff --git a/src/frontend/servers/y-provider/__tests__/collaborationPollHandler.test.ts b/src/frontend/servers/y-provider/__tests__/collaborationPollHandler.test.ts new file mode 100644 index 000000000..f8b99e95e --- /dev/null +++ b/src/frontend/servers/y-provider/__tests__/collaborationPollHandler.test.ts @@ -0,0 +1,193 @@ +import { Response } from 'express'; + +import { + collaborationPollPostMessageHandler, + collaborationPollSSEMessageHandler, + collaborationPollSyncDocHandler, +} from '../src/handlers/collaborationPollHandler'; + +const mockInitHocuspocusDocument = jest.fn(); +const mockSendClientsMessages = jest.fn(); +const mockSync = jest.fn(); +const mockPullClientsMessages = jest.fn(); + +jest.mock('@/libs/PollSync', () => { + return { + PollSync: jest.fn().mockImplementation(() => ({ + initHocuspocusDocument: mockInitHocuspocusDocument, + sendClientsMessages: mockSendClientsMessages, + sync: mockSync, + pullClientsMessages: mockPullClientsMessages, + })), + }; +}); + +jest.mock('@/servers/hocusPocusServer', () => ({ + hocusPocusServer: {}, +})); + +// Helper function to create a mock response +function createResponse() { + const res: Partial> = {}; + res.status = jest.fn().mockReturnValue(res); + res.json = jest.fn().mockReturnValue(res); + res.setHeader = jest.fn(); + res.write = jest.fn(); + res.headersSent = false; + return res as Response; +} + +describe('collaborationPollPostMessageHandler', () => { + beforeEach(() => { + jest.clearAllMocks(); + mockInitHocuspocusDocument.mockResolvedValue({ doc: 'exists' }); + }); + + it('should return 403 if user is not allowed to edit', async () => { + const req = { + query: { room: 'test-room' }, + headers: { 'x-can-edit': 'False' }, + body: { message64: 'testMessage' }, + } as any; + const res = createResponse(); + await collaborationPollPostMessageHandler(req, res); + expect(res.status).toHaveBeenCalledWith(403); + expect(res.json).toHaveBeenCalledWith({ error: 'Forbidden' }); + }); + + it('should return 400 if room is not provided', async () => { + const req = { + query: {}, + headers: { 'x-can-edit': 'True' }, + body: { message64: 'testMessage' }, + } as any; + const res = createResponse(); + await collaborationPollPostMessageHandler(req, res); + expect(res.status).toHaveBeenCalledWith(400); + expect(res.json).toHaveBeenCalledWith({ error: 'Room name not provided' }); + }); + + it('should return 404 if document is not found', async () => { + mockInitHocuspocusDocument.mockResolvedValue(null); + const req = { + query: { room: 'test-room' }, + headers: { 'x-can-edit': 'True' }, + body: { message64: 'testMessage' }, + } as any; + const res = createResponse(); + res.headersSent = false; + await collaborationPollPostMessageHandler(req, res); + expect(res.status).toHaveBeenCalledWith(404); + expect(res.json).toHaveBeenCalledWith({ error: 'Document not found' }); + }); + + it('should process message and return updated true when successful', async () => { + // Reset headerSent to false to simulate a proper response + const req = { + query: { room: 'test-room' }, + headers: { 'x-can-edit': 'True' }, + body: { message64: 'testMessage' }, + } as any; + const res = createResponse(); + res.headersSent = false; + await collaborationPollPostMessageHandler(req, res); + expect(mockSendClientsMessages).toHaveBeenCalledWith('testMessage'); + expect(res.status).toHaveBeenCalledWith(200); + expect(res.json).toHaveBeenCalledWith({ updated: true }); + }); +}); + +describe('collaborationPollSyncDocHandler', () => { + beforeEach(() => { + jest.clearAllMocks(); + mockInitHocuspocusDocument.mockResolvedValue({ doc: 'exists' }); + mockSync.mockReturnValue('syncDocEncoded'); + }); + + it('should return 400 if room is not provided', async () => { + const req = { + query: {}, + headers: { 'x-can-edit': 'True' }, + body: { localDoc64: 'localDocEncoded' }, + } as any; + const res = createResponse(); + await collaborationPollSyncDocHandler(req, res); + expect(res.status).toHaveBeenCalledWith(400); + expect(res.json).toHaveBeenCalledWith({ error: 'Room name not provided' }); + }); + + it('should return 404 if document is not found', async () => { + mockInitHocuspocusDocument.mockResolvedValue(null); + const req = { + query: { room: 'test-room' }, + headers: { 'x-can-edit': 'True' }, + body: { localDoc64: 'localDocEncoded' }, + } as any; + const res = createResponse(); + await collaborationPollSyncDocHandler(req, res); + expect(res.status).toHaveBeenCalledWith(404); + expect(res.json).toHaveBeenCalledWith({ error: 'Document not found' }); + }); + + it('should sync document and return syncDoc64 when successful', async () => { + const req = { + query: { room: 'test-room' }, + headers: { 'x-can-edit': 'True' }, + body: { localDoc64: 'localDocEncoded' }, + } as any; + const res = createResponse(); + res.headersSent = false; + await collaborationPollSyncDocHandler(req, res); + expect(mockSync).toHaveBeenCalledWith('localDocEncoded'); + expect(res.status).toHaveBeenCalledWith(200); + expect(res.json).toHaveBeenCalledWith({ syncDoc64: 'syncDocEncoded' }); + }); +}); + +describe('collaborationPollSSEMessageHandler', () => { + beforeEach(() => { + jest.clearAllMocks(); + mockInitHocuspocusDocument.mockResolvedValue({ doc: 'exists' }); + }); + + it('should return 400 if room is not provided', async () => { + const req = { + query: {}, + headers: { 'x-can-edit': 'True' }, + } as any; + const res = createResponse(); + await collaborationPollSSEMessageHandler(req, res); + expect(res.status).toHaveBeenCalledWith(400); + expect(res.json).toHaveBeenCalledWith({ error: 'Room name not provided' }); + }); + + it('should return 404 if document is not found', async () => { + mockInitHocuspocusDocument.mockResolvedValue(null); + const req = { + query: { room: 'test-room' }, + headers: { 'x-can-edit': 'True' }, + } as any; + const res = createResponse(); + await collaborationPollSSEMessageHandler(req, res); + expect(res.status).toHaveBeenCalledWith(404); + expect(res.json).toHaveBeenCalledWith({ error: 'Document not found' }); + }); + + it('should set SSE headers and send connected message when successful', async () => { + const req = { + query: { room: 'test-room' }, + headers: { 'x-can-edit': 'True' }, + } as any; + const res = createResponse(); + res.headersSent = false; + await collaborationPollSSEMessageHandler(req, res); + expect(res.setHeader).toHaveBeenCalledWith( + 'Content-Type', + 'text/event-stream', + ); + expect(res.setHeader).toHaveBeenCalledWith('Cache-Control', 'no-cache'); + expect(res.setHeader).toHaveBeenCalledWith('Connection', 'keep-alive'); + expect(res.write).toHaveBeenCalledWith(': connected\n\n'); + expect(mockPullClientsMessages).toHaveBeenCalledWith(res); + }); +}); diff --git a/src/frontend/servers/y-provider/jest.config.js b/src/frontend/servers/y-provider/jest.config.js index 9ba612b33..cf8543911 100644 --- a/src/frontend/servers/y-provider/jest.config.js +++ b/src/frontend/servers/y-provider/jest.config.js @@ -7,6 +7,7 @@ var config = { moduleNameMapper: { '^@/(.*)$': '/../src/$1', '^@blocknote/server-util$': '/../__mocks__/mock.js', + '^y-protocols/awareness.js$': '/../__mocks__/mock.js', }, }; export default config; diff --git a/src/frontend/servers/y-provider/package.json b/src/frontend/servers/y-provider/package.json index 1da18db22..de3a49242 100644 --- a/src/frontend/servers/y-provider/package.json +++ b/src/frontend/servers/y-provider/package.json @@ -23,6 +23,7 @@ "cors": "2.8.5", "express": "4.21.2", "express-ws": "5.0.2", + "uuid": "11.0.5", "y-protocols": "1.0.6", "yjs": "13.6.23" }, diff --git a/src/frontend/servers/y-provider/src/handlers/collaborationPollHandler.ts b/src/frontend/servers/y-provider/src/handlers/collaborationPollHandler.ts new file mode 100644 index 000000000..b5a1a7697 --- /dev/null +++ b/src/frontend/servers/y-provider/src/handlers/collaborationPollHandler.ts @@ -0,0 +1,132 @@ +import { Response } from 'express'; + +import { PollSync, PollSyncRequest } from '@/libs/PollSync'; +import { hocusPocusServer } from '@/servers/hocusPocusServer'; + +interface CollaborationPollPostMessagePayload { + message64: string; +} +interface CollaborationPollPostMessageResponse { + updated?: boolean; + error?: string; +} + +export const collaborationPollPostMessageHandler = async ( + req: PollSyncRequest, + res: Response, +) => { + const room = req.query.room; + const canEdit = req.headers['x-can-edit'] === 'True'; + + // Only editors can send messages + if (!canEdit) { + res.status(403).json({ error: 'Forbidden' }); + return; + } + + if (!room) { + res.status(400).json({ error: 'Room name not provided' }); + return; + } + + const pollSynch = new PollSync( + req, + room, + canEdit, + ); + const hpDoc = await pollSynch.initHocuspocusDocument(hocusPocusServer); + + if (!res.headersSent && !hpDoc) { + res.status(404).json({ error: 'Document not found' }); + return; + } + + pollSynch.sendClientsMessages(req.body.message64); + + if (!res.headersSent) { + res.status(200).json({ updated: true }); + } +}; + +/** + * Polling way of handling collaboration + * @param req + * @param res + */ +interface CollaborationPollSyncDocResponse { + syncDoc64?: string; + error?: string; +} +interface CollaborationPollSyncDocBody { + localDoc64: string; +} + +export const collaborationPollSyncDocHandler = async ( + req: PollSyncRequest, + res: Response, +) => { + const room = req.query.room; + const canEdit = req.headers['x-can-edit'] === 'True'; + + if (!room) { + res.status(400).json({ error: 'Room name not provided' }); + return; + } + + const pollSynch = new PollSync( + req, + room, + canEdit, + ); + const hpDoc = await pollSynch.initHocuspocusDocument(hocusPocusServer); + + if (!hpDoc) { + res.status(404).json({ error: 'Document not found' }); + return; + } + + const syncDoc64 = pollSynch.sync(req.body.localDoc64); + + if (!res.headersSent) { + res.status(200).json({ syncDoc64 }); + } +}; + +/** + * SSE message handling + * @param req + * @param res + */ +interface CollaborationPollSSEMessageResponse { + updatedDoc64?: string; + stateFingerprint?: string; + awareness64?: string; + error?: string; +} +export const collaborationPollSSEMessageHandler = async ( + req: PollSyncRequest, + res: Response, +) => { + const room = req.query.room; + const canEdit = req.headers['x-can-edit'] === 'True'; + + if (!room) { + res.status(400).json({ error: 'Room name not provided' }); + return; + } + + const pollSynch = new PollSync(req, room, canEdit); + const hpDoc = await pollSynch.initHocuspocusDocument(hocusPocusServer); + + if (!hpDoc) { + res.status(404).json({ error: 'Document not found' }); + return; + } + + res.setHeader('Content-Type', 'text/event-stream'); + res.setHeader('Cache-Control', 'no-cache'); + res.setHeader('Connection', 'keep-alive'); + res.write(': connected\n\n'); + + pollSynch.pullClientsMessages(res); +}; diff --git a/src/frontend/servers/y-provider/src/handlers/index.ts b/src/frontend/servers/y-provider/src/handlers/index.ts index 75bd7f7bb..5b14542be 100644 --- a/src/frontend/servers/y-provider/src/handlers/index.ts +++ b/src/frontend/servers/y-provider/src/handlers/index.ts @@ -1,3 +1,4 @@ export * from './collaborationResetConnectionsHandler'; -export * from './collaborationWSHandler'; +export * from './collaborationPollHandler'; export * from './convertMarkdownHandler'; +export * from './collaborationWSHandler'; diff --git a/src/frontend/servers/y-provider/src/helpers.ts b/src/frontend/servers/y-provider/src/helpers.ts index c23b83686..2a3bd033b 100644 --- a/src/frontend/servers/y-provider/src/helpers.ts +++ b/src/frontend/servers/y-provider/src/helpers.ts @@ -1,6 +1,6 @@ -export const promiseDone = () => { - let done: (value: void | PromiseLike) => void = () => {}; - const promise = new Promise((resolve) => { +export const promiseDone = () => { + let done: (value: T | PromiseLike) => void = () => {}; + const promise = new Promise((resolve) => { done = resolve; }); diff --git a/src/frontend/servers/y-provider/src/libs/PollSync.ts b/src/frontend/servers/y-provider/src/libs/PollSync.ts new file mode 100644 index 000000000..7d262d43d --- /dev/null +++ b/src/frontend/servers/y-provider/src/libs/PollSync.ts @@ -0,0 +1,227 @@ +/* eslint-disable @typescript-eslint/no-unused-vars */ +import crypto from 'crypto'; + +import { + AwarenessUpdate, + Document, + Hocuspocus, + IncomingMessage, + MessageType, + OutgoingMessage, +} from '@hocuspocus/server'; +import { Request, Response } from 'express'; +import { v4 as uuid } from 'uuid'; +import { applyAwarenessUpdate } from 'y-protocols/awareness.js'; +import { readSyncMessage } from 'y-protocols/sync'; +import * as Y from 'yjs'; + +import { base64ToYDoc, logger, toBase64 } from '@/utils'; + +export type PollSyncRequestQuery = { + room?: string; +}; + +export type PollSyncRequest = Request< + object, + object, + T, + PollSyncRequestQuery +>; + +export class PollSync { + public readonly canEdit: boolean; + public readonly req: PollSyncRequest; + public readonly room: string; + private _hpDocument?: Document; + + constructor(req: PollSyncRequest, room: string, canEdit: boolean) { + this.room = room; + this.canEdit = canEdit; + this.req = req; + } + + public get hpDocument() { + return this._hpDocument; + } + + public async initHocuspocusDocument(hocusPocusServer: Hocuspocus) { + const { req, room, canEdit } = this; + this._hpDocument = await hocusPocusServer.loadingDocuments.get(room); + + if (this._hpDocument) { + return this._hpDocument; + } + + this._hpDocument = hocusPocusServer.documents.get(room); + + if (this._hpDocument || (!this._hpDocument && !canEdit)) { + return this._hpDocument; + } + + /** + * createDocument is used to create a new document if it does not exist. + * If the document exists, it will return the existing document. + */ + this._hpDocument = await hocusPocusServer.createDocument( + room, + req, + uuid(), + { + readOnly: false, + requiresAuthentication: false, + isAuthenticated: false, + }, + ); + + return this._hpDocument; + } + + /** + * Sync the document with the latest changes + * + * @param localDoc64 + * @returns + */ + public sync(localDoc64: string): string | undefined { + const hpDoc = this.getHpDocument(); + let syncYDoc = hpDoc; + + // Merge the coming document with the latest changes (only if the user can edit) + if (this.canEdit) { + const localDoc = base64ToYDoc(localDoc64); + syncYDoc = hpDoc.merge(localDoc); + } + + return toBase64(Y.encodeStateAsUpdate(syncYDoc)); + } + + /** + * Create a hash SHA-256 of the state vector of the document. + * Usefull to compare the state of the document. + * @param doc + * @returns + */ + protected getStateFingerprint(doc: Y.Doc): string { + const stateVector = Y.encodeStateVector(doc); + return crypto.createHash('sha256').update(stateVector).digest('base64'); // or 'hex' + } + + /** + * Send messages to other clients + */ + public sendClientsMessages(message64: string) { + const hpDoc = this.getHpDocument(); + const messageBuffer = Buffer.from(message64, 'base64'); + const message = new IncomingMessage(messageBuffer); + const room = message.readVarString(); + + if (hpDoc.name !== room) { + logger('Send messages problem, room different', room, hpDoc.name); + return; + } + + // We write the sync to the current doc - it will propagate to others by itself + const type = message.readVarUint() as MessageType; + if (type === MessageType.Sync) { + message.writeVarUint(MessageType.Sync); + readSyncMessage(message.decoder, message.encoder, hpDoc, null); + } else if (type === MessageType.Awareness) { + const awarenessUpdate = message.readVarUint8Array(); + applyAwarenessUpdate( + hpDoc.awareness, + awarenessUpdate, + hpDoc.awareness.clientID, + ); + } else { + hpDoc.getConnections().forEach((connection) => { + connection.handleMessage(messageBuffer); + }); + } + } + + /** + * Pull messages from other clients + * + * We listen 2 kind of messages: + * - Document updates (change in the document) + * - Awareness messages (cursor, selection, etc.) + * + * @param res + */ + public pullClientsMessages(res: Response) { + const hpDoc = this.getHpDocument(); + hpDoc.addDirectConnection(); + + const updateMessagesFn = ( + update: Uint8Array, + _origin: string, + updatedDoc: Y.Doc, + _transaction: Y.Transaction, + ) => { + res.write( + `data: ${JSON.stringify({ + time: new Date(), + updatedDoc64: toBase64(update), + stateFingerprint: this.getStateFingerprint(updatedDoc), + })}\n\n`, + ); + }; + + const destroyFn = (updatedDoc: Y.Doc) => { + res.write( + `data: ${JSON.stringify({ + time: new Date(), + updatedDoc64: undefined, + stateFingerprint: this.getStateFingerprint(updatedDoc), + })}\n\n`, + ); + + hpDoc.off('destroy', destroyFn); + hpDoc.off('update', updateMessagesFn); + + // Close the connection + res.end(); + }; + + const updateAwarenessFn = ({ + added, + updated, + removed, + }: AwarenessUpdate) => { + const changedClients = added.concat(updated, removed); + const awarenessMessage = new OutgoingMessage( + this.room, + ).createAwarenessUpdateMessage(hpDoc.awareness, changedClients); + + res.write( + `data: ${JSON.stringify({ + time: new Date(), + awareness64: toBase64(awarenessMessage.toUint8Array()), + stateFingerprint: this.getStateFingerprint(hpDoc), + })}\n\n`, + ); + }; + + hpDoc.awareness.off('update', updateAwarenessFn); + hpDoc.awareness.on('update', updateAwarenessFn); + hpDoc.off('update', updateMessagesFn); + hpDoc.off('destroy', destroyFn); + hpDoc.on('update', updateMessagesFn); + hpDoc.on('destroy', destroyFn); + + this.req.on('close', () => { + hpDoc.off('update', updateMessagesFn); + hpDoc.off('destroy', destroyFn); + hpDoc.awareness.off('update', updateAwarenessFn); + hpDoc.removeDirectConnection(); + }); + } + + protected getHpDocument() { + if (!this.hpDocument) { + throw new Error('HocusPocus document not initialized'); + } + + return this.hpDocument; + } +} diff --git a/src/frontend/servers/y-provider/src/routes.ts b/src/frontend/servers/y-provider/src/routes.ts index 98803b87f..44b294e8e 100644 --- a/src/frontend/servers/y-provider/src/routes.ts +++ b/src/frontend/servers/y-provider/src/routes.ts @@ -1,5 +1,7 @@ export const routes = { COLLABORATION_WS: '/collaboration/ws/', + COLLABORATION_POLL_MESSAGE: '/collaboration/ws/poll/message/', + COLLABORATION_POLL_SYNC: '/collaboration/ws/poll/sync/', COLLABORATION_RESET_CONNECTIONS: '/collaboration/api/reset-connections/', CONVERT_MARKDOWN: '/api/convert-markdown/', }; diff --git a/src/frontend/servers/y-provider/src/servers/appServer.ts b/src/frontend/servers/y-provider/src/servers/appServer.ts index 80077bb8e..804378ab2 100644 --- a/src/frontend/servers/y-provider/src/servers/appServer.ts +++ b/src/frontend/servers/y-provider/src/servers/appServer.ts @@ -6,6 +6,9 @@ import expressWebsockets from 'express-ws'; import { PORT } from '../env'; import { + collaborationPollPostMessageHandler, + collaborationPollSSEMessageHandler, + collaborationPollSyncDocHandler, collaborationResetConnectionsHandler, collaborationWSHandler, convertMarkdownHandler, @@ -27,9 +30,24 @@ export const initServer = () => { app.use(corsMiddleware); /** - * Route to handle WebSocket connections + * Routes to handle collaboration connections */ app.ws(routes.COLLABORATION_WS, wsSecurity, collaborationWSHandler); + app.get( + routes.COLLABORATION_POLL_MESSAGE, + httpSecurity, + collaborationPollSSEMessageHandler, + ); + app.post( + routes.COLLABORATION_POLL_MESSAGE, + httpSecurity, + collaborationPollPostMessageHandler, + ); + app.post( + routes.COLLABORATION_POLL_SYNC, + httpSecurity, + collaborationPollSyncDocHandler, + ); /** * Route to reset connections in a room: diff --git a/src/frontend/servers/y-provider/src/utils.ts b/src/frontend/servers/y-provider/src/utils.ts index 847c55689..2bc62bb5d 100644 --- a/src/frontend/servers/y-provider/src/utils.ts +++ b/src/frontend/servers/y-provider/src/utils.ts @@ -1,3 +1,5 @@ +import * as Y from 'yjs'; + import { COLLABORATION_LOGGING } from './env'; // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -8,6 +10,13 @@ export function logger(...args: any[]) { } } -export const toBase64 = function (str: Uint8Array) { - return Buffer.from(str).toString('base64'); +export const toBase64 = function (uInt8Array: Uint8Array) { + return Buffer.from(uInt8Array).toString('base64'); +}; + +export const base64ToYDoc = (base64: string) => { + const uint8Array = Buffer.from(base64, 'base64'); + const ydoc = new Y.Doc(); + Y.applyUpdate(ydoc, uint8Array); + return ydoc; }; diff --git a/src/frontend/yarn.lock b/src/frontend/yarn.lock index 38330b156..0a8f94609 100644 --- a/src/frontend/yarn.lock +++ b/src/frontend/yarn.lock @@ -13934,7 +13934,7 @@ utils-merge@1.0.1: resolved "https://registry.yarnpkg.com/utils-merge/-/utils-merge-1.0.1.tgz#9f95710f50a267947b2ccc124741c1028427e713" integrity sha512-pMZTvIkT1d+TFGvDOqodOclx0QWkkgi6Tdoa8gC8ffGAAqz9pzPTZWAybbsHHoED/ztMtkv/VoYTYyShUn81hA== -uuid@^11.0.3: +uuid@11.0.5, uuid@^11.0.3: version "11.0.5" resolved "https://registry.yarnpkg.com/uuid/-/uuid-11.0.5.tgz#07b46bdfa6310c92c3fb3953a8720f170427fc62" integrity sha512-508e6IcKLrhxKdBbcA2b4KQZlLVp2+J5UwQ6F7Drckkc5N9ZJwFa4TgWtsww9UG8fGHbm6gbV19TdM5pQ4GaIA== From f27e968c15bbf5d38804e2fea263cc73299cb424 Mon Sep 17 00:00:00 2001 From: Anthony LC Date: Thu, 13 Feb 2025 16:56:38 +0100 Subject: [PATCH 03/10] =?UTF-8?q?=F0=9F=9A=9A(frontend)=20move=20toBase64?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We will need toBase64 in different features, better to move it to "doc-management". --- .../impress/src/features/docs/doc-editor/hook/useSaveDoc.tsx | 4 +--- .../apps/impress/src/features/docs/doc-editor/utils.ts | 3 --- .../apps/impress/src/features/docs/doc-management/utils.ts | 3 +++ 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/frontend/apps/impress/src/features/docs/doc-editor/hook/useSaveDoc.tsx b/src/frontend/apps/impress/src/features/docs/doc-editor/hook/useSaveDoc.tsx index ca1ed0054..ae00dac95 100644 --- a/src/frontend/apps/impress/src/features/docs/doc-editor/hook/useSaveDoc.tsx +++ b/src/frontend/apps/impress/src/features/docs/doc-editor/hook/useSaveDoc.tsx @@ -2,12 +2,10 @@ import { useRouter } from 'next/router'; import { useCallback, useEffect, useRef, useState } from 'react'; import * as Y from 'yjs'; -import { useUpdateDoc } from '@/features/docs/doc-management/'; +import { toBase64, useUpdateDoc } from '@/features/docs/doc-management/'; import { KEY_LIST_DOC_VERSIONS } from '@/features/docs/doc-versioning'; import { isFirefox } from '@/utils/userAgent'; -import { toBase64 } from '../utils'; - const useSaveDoc = (docId: string, doc: Y.Doc, canSave: boolean) => { const { mutate: updateDoc } = useUpdateDoc({ listInvalideQueries: [KEY_LIST_DOC_VERSIONS], diff --git a/src/frontend/apps/impress/src/features/docs/doc-editor/utils.ts b/src/frontend/apps/impress/src/features/docs/doc-editor/utils.ts index a3d311180..325a2304e 100644 --- a/src/frontend/apps/impress/src/features/docs/doc-editor/utils.ts +++ b/src/frontend/apps/impress/src/features/docs/doc-editor/utils.ts @@ -22,6 +22,3 @@ function hslToHex(h: number, s: number, l: number) { }; return `#${f(0)}${f(8)}${f(4)}`; } - -export const toBase64 = (str: Uint8Array) => - Buffer.from(str).toString('base64'); diff --git a/src/frontend/apps/impress/src/features/docs/doc-management/utils.ts b/src/frontend/apps/impress/src/features/docs/doc-management/utils.ts index 2c229128e..2707e7468 100644 --- a/src/frontend/apps/impress/src/features/docs/doc-management/utils.ts +++ b/src/frontend/apps/impress/src/features/docs/doc-management/utils.ts @@ -12,6 +12,9 @@ export const currentDocRole = (abilities: Doc['abilities']): Role => { : Role.READER; }; +export const toBase64 = (str: Uint8Array) => + Buffer.from(str).toString('base64'); + export const base64ToYDoc = (base64: string) => { const uint8Array = Buffer.from(base64, 'base64'); const ydoc = new Y.Doc(); From a2f1e32f21c9d238c4bc70715e3b6b9aadadc447 Mon Sep 17 00:00:00 2001 From: Anthony LC Date: Thu, 13 Feb 2025 17:14:04 +0100 Subject: [PATCH 04/10] =?UTF-8?q?=E2=9C=A8(frontend)=20create=20class=20Co?= =?UTF-8?q?llaborationProvider?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Create the CollaborationProvider class. This class is inherited from HocuspocusProvider class. This class integrate a fallback mechanism to handle the case where the user cannot connect with websockets. It will use post request to send the data to the collaboration server. It will use an EventSource to receive the data from the collaboration server. --- CHANGELOG.md | 5 + .../app-impress/doc-collaboration.spec.ts | 97 ++++++ .../__tests__/app-impress/doc-editor.spec.ts | 64 ---- src/frontend/apps/impress/package.json | 1 + .../api/collaborationRequests.ts | 67 ++++ .../doc-management/hooks/useCollaboration.tsx | 19 +- .../libs/CollaborationProvider.ts | 325 ++++++++++++++++++ .../__tests__/CollaborationProvider.test.ts | 179 ++++++++++ .../stores/useProviderStore.tsx | 8 +- .../impress/src/pages/docs/[id]/index.tsx | 2 +- src/frontend/yarn.lock | 7 + 11 files changed, 704 insertions(+), 70 deletions(-) create mode 100644 src/frontend/apps/e2e/__tests__/app-impress/doc-collaboration.spec.ts create mode 100644 src/frontend/apps/impress/src/features/docs/doc-management/api/collaborationRequests.ts create mode 100644 src/frontend/apps/impress/src/features/docs/doc-management/libs/CollaborationProvider.ts create mode 100644 src/frontend/apps/impress/src/features/docs/doc-management/libs/__tests__/CollaborationProvider.test.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ddf0ce23..3f8358a64 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,10 @@ and this project adheres to ## [Unreleased] +## Added + +- ✨Collaboration long polling fallback #517 + ## Changed - 🛂(frontend) Restore version visibility #629 @@ -18,6 +22,7 @@ and this project adheres to - ♻️(frontend) improve table pdf rendering + ## [2.2.0] - 2025-02-10 ## Added diff --git a/src/frontend/apps/e2e/__tests__/app-impress/doc-collaboration.spec.ts b/src/frontend/apps/e2e/__tests__/app-impress/doc-collaboration.spec.ts new file mode 100644 index 000000000..2c1640ffa --- /dev/null +++ b/src/frontend/apps/e2e/__tests__/app-impress/doc-collaboration.spec.ts @@ -0,0 +1,97 @@ +import { expect, test } from '@playwright/test'; + +import { createDoc, verifyDocName } from './common'; + +test.beforeEach(async ({ page }) => { + await page.goto('/'); +}); + +test.describe('Doc Collaboration', () => { + /** + * We check: + * - connection to the collaborative server + * - signal of the backend to the collaborative server (connection should close) + * - reconnection to the collaborative server + */ + test('checks the connection with collaborative server', async ({ + page, + browserName, + }) => { + let webSocketPromise = page.waitForEvent('websocket', (webSocket) => { + return webSocket + .url() + .includes('ws://localhost:8083/collaboration/ws/?room='); + }); + + const [title] = await createDoc(page, 'doc-editor', browserName, 1); + await verifyDocName(page, title); + + let webSocket = await webSocketPromise; + expect(webSocket.url()).toContain( + 'ws://localhost:8083/collaboration/ws/?room=', + ); + + // Is connected + let framesentPromise = webSocket.waitForEvent('framesent'); + + await page.locator('.ProseMirror.bn-editor').click(); + await page.locator('.ProseMirror.bn-editor').fill('Hello World'); + + let framesent = await framesentPromise; + expect(framesent.payload).not.toBeNull(); + + await page.getByRole('button', { name: 'Share' }).click(); + + const selectVisibility = page.getByLabel('Visibility', { exact: true }); + + // When the visibility is changed, the ws should closed the connection (backend signal) + const wsClosePromise = webSocket.waitForEvent('close'); + + await selectVisibility.click(); + await page + .getByRole('button', { + name: 'Connected', + }) + .click(); + + // Assert that the doc reconnects to the ws + const wsClose = await wsClosePromise; + expect(wsClose.isClosed()).toBeTruthy(); + + // Checkt the ws is connected again + webSocketPromise = page.waitForEvent('websocket', (webSocket) => { + return webSocket + .url() + .includes('ws://localhost:8083/collaboration/ws/?room='); + }); + + webSocket = await webSocketPromise; + framesentPromise = webSocket.waitForEvent('framesent'); + framesent = await framesentPromise; + expect(framesent.payload).not.toBeNull(); + }); + + test('checks the connection switch to polling after websocket failure', async ({ + page, + browserName, + }) => { + const responsePromise = page.waitForResponse( + (response) => + response.url().includes('/poll/') && response.status() === 200, + ); + + await page.routeWebSocket( + 'ws://localhost:8083/collaboration/ws/**', + async (ws) => { + await ws.close(); + }, + ); + + await page.reload(); + + await createDoc(page, 'doc-polling', browserName, 1); + + const response = await responsePromise; + expect(response.ok()).toBeTruthy(); + }); +}); diff --git a/src/frontend/apps/e2e/__tests__/app-impress/doc-editor.spec.ts b/src/frontend/apps/e2e/__tests__/app-impress/doc-editor.spec.ts index 035e39e06..ac069a857 100644 --- a/src/frontend/apps/e2e/__tests__/app-impress/doc-editor.spec.ts +++ b/src/frontend/apps/e2e/__tests__/app-impress/doc-editor.spec.ts @@ -88,70 +88,6 @@ test.describe('Doc Editor', () => { ).toBeVisible(); }); - /** - * We check: - * - connection to the collaborative server - * - signal of the backend to the collaborative server (connection should close) - * - reconnection to the collaborative server - */ - test('checks the connection with collaborative server', async ({ - page, - browserName, - }) => { - let webSocketPromise = page.waitForEvent('websocket', (webSocket) => { - return webSocket - .url() - .includes('ws://localhost:8083/collaboration/ws/?room='); - }); - - const randomDoc = await createDoc(page, 'doc-editor', browserName, 1); - await verifyDocName(page, randomDoc[0]); - - let webSocket = await webSocketPromise; - expect(webSocket.url()).toContain( - 'ws://localhost:8083/collaboration/ws/?room=', - ); - - // Is connected - let framesentPromise = webSocket.waitForEvent('framesent'); - - await page.locator('.ProseMirror.bn-editor').click(); - await page.locator('.ProseMirror.bn-editor').fill('Hello World'); - - let framesent = await framesentPromise; - expect(framesent.payload).not.toBeNull(); - - await page.getByRole('button', { name: 'Share' }).click(); - - const selectVisibility = page.getByLabel('Visibility', { exact: true }); - - // When the visibility is changed, the ws should closed the connection (backend signal) - const wsClosePromise = webSocket.waitForEvent('close'); - - await selectVisibility.click(); - await page - .getByRole('button', { - name: 'Connected', - }) - .click(); - - // Assert that the doc reconnects to the ws - const wsClose = await wsClosePromise; - expect(wsClose.isClosed()).toBeTruthy(); - - // Checkt the ws is connected again - webSocketPromise = page.waitForEvent('websocket', (webSocket) => { - return webSocket - .url() - .includes('ws://localhost:8083/collaboration/ws/?room='); - }); - - webSocket = await webSocketPromise; - framesentPromise = webSocket.waitForEvent('framesent'); - framesent = await framesentPromise; - expect(framesent.payload).not.toBeNull(); - }); - test('markdown button converts from markdown to the editor syntax json', async ({ page, browserName, diff --git a/src/frontend/apps/impress/package.json b/src/frontend/apps/impress/package.json index 290c80a89..5db34b667 100644 --- a/src/frontend/apps/impress/package.json +++ b/src/frontend/apps/impress/package.json @@ -62,6 +62,7 @@ "@types/node": "*", "@types/react": "18.3.12", "@types/react-dom": "*", + "@types/ws": "8.5.13", "cross-env": "7.0.3", "dotenv": "16.4.7", "eslint-config-impress": "*", diff --git a/src/frontend/apps/impress/src/features/docs/doc-management/api/collaborationRequests.ts b/src/frontend/apps/impress/src/features/docs/doc-management/api/collaborationRequests.ts new file mode 100644 index 000000000..7d0dd0318 --- /dev/null +++ b/src/frontend/apps/impress/src/features/docs/doc-management/api/collaborationRequests.ts @@ -0,0 +1,67 @@ +import { APIError, errorCauses } from '@/api'; + +interface PollOutgoingMessageParams { + pollUrl: string; + message64: string; +} +interface PollOutgoingMessageResponse { + updated?: boolean; +} + +export const pollOutgoingMessageRequest = async ({ + pollUrl, + message64, +}: PollOutgoingMessageParams): Promise => { + const response = await fetch(pollUrl, { + method: 'POST', + credentials: 'include', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + message64, + }), + }); + + if (!response.ok) { + throw new APIError( + `Post poll message request failed`, + await errorCauses(response), + ); + } + + return response.json() as Promise; +}; + +interface PollSyncParams { + pollUrl: string; + localDoc64: string; +} +interface PollSyncResponse { + syncDoc64?: string; +} + +export const postPollSyncRequest = async ({ + pollUrl, + localDoc64, +}: PollSyncParams): Promise => { + const response = await fetch(pollUrl, { + method: 'POST', + credentials: 'include', + headers: { + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ + localDoc64, + }), + }); + + if (!response.ok) { + throw new APIError( + `Sync request failed: ${response.status} ${response.statusText}`, + await errorCauses(response), + ); + } + + return response.json() as Promise; +}; diff --git a/src/frontend/apps/impress/src/features/docs/doc-management/hooks/useCollaboration.tsx b/src/frontend/apps/impress/src/features/docs/doc-management/hooks/useCollaboration.tsx index 14f3b7158..5ff96938b 100644 --- a/src/frontend/apps/impress/src/features/docs/doc-management/hooks/useCollaboration.tsx +++ b/src/frontend/apps/impress/src/features/docs/doc-management/hooks/useCollaboration.tsx @@ -6,17 +6,29 @@ import { useBroadcastStore } from '@/stores'; import { useProviderStore } from '../stores/useProviderStore'; import { Base64 } from '../types'; -export const useCollaboration = (room?: string, initialContent?: Base64) => { +export const useCollaboration = ( + room?: string, + initialContent?: Base64, + canEdit?: boolean, +) => { const collaborationUrl = useCollaborationUrl(room); const { setBroadcastProvider } = useBroadcastStore(); const { provider, createProvider, destroyProvider } = useProviderStore(); + /** + * Initialize the provider + */ useEffect(() => { - if (!room || !collaborationUrl || provider) { + if (!room || !collaborationUrl || provider || canEdit === undefined) { return; } - const newProvider = createProvider(collaborationUrl, room, initialContent); + const newProvider = createProvider( + collaborationUrl, + room, + canEdit, + initialContent, + ); setBroadcastProvider(newProvider); }, [ provider, @@ -25,6 +37,7 @@ export const useCollaboration = (room?: string, initialContent?: Base64) => { initialContent, createProvider, setBroadcastProvider, + canEdit, ]); /** diff --git a/src/frontend/apps/impress/src/features/docs/doc-management/libs/CollaborationProvider.ts b/src/frontend/apps/impress/src/features/docs/doc-management/libs/CollaborationProvider.ts new file mode 100644 index 000000000..083b49733 --- /dev/null +++ b/src/frontend/apps/impress/src/features/docs/doc-management/libs/CollaborationProvider.ts @@ -0,0 +1,325 @@ +import crypto from 'crypto'; + +import { + CompleteHocuspocusProviderConfiguration, + CompleteHocuspocusProviderWebsocketConfiguration, + HocuspocusProvider, + HocuspocusProviderConfiguration, + WebSocketStatus, + onOutgoingMessageParameters, + onStatusParameters, +} from '@hocuspocus/provider'; +import type { MessageEvent } from 'ws'; +import * as Y from 'yjs'; + +import { isAPIError } from '@/api'; + +import { + pollOutgoingMessageRequest, + postPollSyncRequest, +} from '../api/collaborationRequests'; +import { toBase64 } from '../utils'; + +type HocuspocusProviderConfigurationUrl = Required< + Pick +> & + Partial & + Required>; + +export const isHocuspocusProviderConfigurationUrl = ( + data: HocuspocusProviderConfiguration, +): data is HocuspocusProviderConfigurationUrl => { + return 'url' in data; +}; + +type CollaborationProviderConfiguration = HocuspocusProviderConfiguration & { + canEdit: boolean; +}; + +export class CollaborationProvider extends HocuspocusProvider { + /** + * If the user can edit the document + */ + public canEdit = false; + + /** + * If the long polling is started + * it is used to avoid starting it multiple times + * when the websocket is failed. + */ + public isLongPollingStarted = false; + + /** + * If the document is syncing with the server + * it is used to avoid starting it multiple times. + */ + public isSyncing = false; + /** + * The document can pass out of sync + * then sync again with a next updates so + * we add a counter to avoid syncing the document + * to quickly. + */ + public seemsUnsyncCount = 0; + public seemsUnsyncMaxCount = 5; + + /** + * In Safari or Firefox the websocket takes time before passing in + * mode failed, it can takes up to 1 minutes. To avoid this latence + * we set isWebsocketFailed to true, it is connects it will switch + * to false. + */ + public isWebsocketFailed = true; + /** + * There is a ping-pong mechanism with awareness, receipt awareness is send again, + * it creates useless requests. + * We use this variable to avoid treating the same awareness message twice. + */ + private treatedAwarenessMessage: string | null = null; + /** + * Server-Sent Events + */ + protected sse: EventSource | null = null; + /** + * Polling timeout + * It is used to avoid starting the polling to quickly + * to let the class init properly. + */ + protected pollTimeout: NodeJS.Timeout | null = null; + /** + * Easy way to get the url of the server + */ + protected url = ''; + + public constructor(configuration: CollaborationProviderConfiguration) { + let url = ''; + if (isHocuspocusProviderConfigurationUrl(configuration)) { + url = configuration.url; + } + + super(configuration); + + this.url = url; + this.canEdit = configuration.canEdit; + + if (configuration.canEdit) { + this.on('outgoingMessage', this.onPollOutgoingMessage.bind(this)); + } + } + + public setPollDefaultValues(): void { + this.isLongPollingStarted = false; + this.isWebsocketFailed = false; + this.seemsUnsyncCount = 0; + this.sse?.close(); + this.sse = null; + if (this.pollTimeout) { + clearTimeout(this.pollTimeout); + } + } + + public destroy(): void { + super.destroy(); + this.setPollDefaultValues(); + } + + public onStatus({ status }: onStatusParameters) { + if (status === WebSocketStatus.Connecting) { + this.isWebsocketFailed = true; + if (this.pollTimeout) { + clearTimeout(this.pollTimeout); + } + this.pollTimeout = setTimeout(() => { + this.initPolling(); + }, 5000); + } else if (status === WebSocketStatus.Connected) { + this.setPollDefaultValues(); + } + + super.onStatus({ status }); + } + + public initPolling() { + if (this.isLongPollingStarted || !this.isWebsocketFailed) { + return; + } + + this.isLongPollingStarted = true; + void this.pollSync(true); + this.initCollaborationSSE(); + } + + protected toPollUrl(endpoint: string): string { + let pollUrl = this.url.replace('ws:', 'http:'); + if (pollUrl.includes('wss:')) { + pollUrl = pollUrl.replace('wss:', 'https:'); + } + + pollUrl = pollUrl.replace('/ws/', '/ws/poll/' + endpoint + '/'); + + // To have our requests not cached + return `${pollUrl}&${Date.now()}`; + } + + protected isDuplicateAwareness(message64: string): boolean { + if (this.treatedAwarenessMessage === message64) { + return true; + } + + this.treatedAwarenessMessage = message64; + + return false; + } + + /** + * Outgoing message event + * + * Sent to the server the message to + * be sent to the other users + */ + public async onPollOutgoingMessage({ message }: onOutgoingMessageParameters) { + if (!this.isWebsocketFailed || !this.canEdit) { + return; + } + + const message64 = Buffer.from(message.toUint8Array()).toString('base64'); + if (this.isDuplicateAwareness(message64)) { + return; + } + + try { + const { updated } = await pollOutgoingMessageRequest({ + pollUrl: this.toPollUrl('message'), + message64, + }); + + if (!updated) { + await this.pollSync(); + } + } catch (error: unknown) { + if (isAPIError(error)) { + // The user is not allowed to send messages + if (error.status === 403) { + this.off('outgoingMessage', this.onPollOutgoingMessage.bind(this)); + this.canEdit = false; + } + } + } + } + + /** + * EventSource is a API for opening an HTTP + * connection for receiving push notifications + * from a server in real-time. + * We use it to sync the document with the server + */ + protected initCollaborationSSE() { + if (!this.isWebsocketFailed) { + return; + } + + this.sse = new EventSource(this.toPollUrl('message'), { + withCredentials: true, + }); + + this.sse.onmessage = (event) => { + const { updatedDoc64, stateFingerprint, awareness64 } = JSON.parse( + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + event.data, + ) as { + updatedDoc64?: string; + stateFingerprint?: string; + awareness64?: string; + }; + + if (awareness64) { + if (this.isDuplicateAwareness(awareness64)) { + return; + } + + this.treatedAwarenessMessage = awareness64; + const awareness = Buffer.from(awareness64, 'base64'); + + this.onMessage({ + data: awareness, + } as MessageEvent); + } + + if (updatedDoc64) { + this.document.transact(() => { + Y.applyUpdate(this.document, Buffer.from(updatedDoc64, 'base64')); + }, this); + } + + const localStateFingerprint = this.getStateFingerprint(this.document); + if (localStateFingerprint !== stateFingerprint) { + void this.pollSync(); + } else { + this.seemsUnsyncCount = 0; + } + }; + + this.sse.onopen = () => {}; + + this.sse.onerror = (err) => { + console.error('SSE error:', err); + this.sse?.close(); + + setTimeout(() => { + this.initCollaborationSSE(); + }, 5000); + }; + } + + /** + * Sync the document with the server. + * + * In some rare cases, the document may be out of sync. + * We use a fingerprint to compare documents, + * it happens that the local fingerprint is different from the server one + * when awareness plus the document are updated quickly. + * The system is resilient to this kind of problems, so `seemsUnsyncCount` should + * go back to 0 after a few seconds. If not, we will force a sync. + */ + public async pollSync(forseSync = false) { + if (!this.isWebsocketFailed || this.isSyncing) { + return; + } + + this.seemsUnsyncCount++; + + if (this.seemsUnsyncCount < this.seemsUnsyncMaxCount && !forseSync) { + return; + } + + this.isSyncing = true; + + try { + const { syncDoc64 } = await postPollSyncRequest({ + pollUrl: this.toPollUrl('sync'), + localDoc64: toBase64(Y.encodeStateAsUpdate(this.document)), + }); + + if (syncDoc64) { + const uint8Array = Buffer.from(syncDoc64, 'base64'); + Y.applyUpdate(this.document, uint8Array); + this.seemsUnsyncCount = 0; + } + } catch (error) { + console.error('Polling sync failed:', error); + } finally { + this.isSyncing = false; + } + } + + /** + * Create a hash SHA-256 of the state vector of the document. + * Usefull to compare the state of the document. + * @param doc + * @returns + */ + public getStateFingerprint(doc: Y.Doc): string { + const stateVector = Y.encodeStateVector(doc); + return crypto.createHash('sha256').update(stateVector).digest('base64'); + } +} diff --git a/src/frontend/apps/impress/src/features/docs/doc-management/libs/__tests__/CollaborationProvider.test.ts b/src/frontend/apps/impress/src/features/docs/doc-management/libs/__tests__/CollaborationProvider.test.ts new file mode 100644 index 000000000..348a672db --- /dev/null +++ b/src/frontend/apps/impress/src/features/docs/doc-management/libs/__tests__/CollaborationProvider.test.ts @@ -0,0 +1,179 @@ +import { WebSocketStatus } from '@hocuspocus/provider'; +import fetchMock from 'fetch-mock'; +import * as Y from 'yjs'; + +if (typeof EventSource === 'undefined') { + const mockEventSource = jest.fn(); + class MockEventSource { + constructor(...args: any[]) { + return mockEventSource(...args); + } + } + (global as any).EventSource = MockEventSource; +} + +import { CollaborationProvider } from '../CollaborationProvider'; + +const mockApplyUpdate = jest.fn(); +jest.mock('yjs', () => ({ + ...jest.requireActual('yjs'), + applyUpdate: (...args: any) => mockApplyUpdate(...args), +})); + +describe('CollaborationProvider', () => { + let config: any; + let provider: CollaborationProvider; + let fakeWebsocketProvider: any; + + beforeEach(() => { + fakeWebsocketProvider = { + on: jest.fn(), + open: jest.fn(), + attach: jest.fn(), + }; + config = { + name: 'test', + url: 'ws://localhost/ws/', + canEdit: true, + websocketProvider: fakeWebsocketProvider, + }; + provider = new CollaborationProvider(config); + }); + + afterEach(() => { + jest.clearAllMocks(); + fetchMock.restore(); + }); + + test('constructor initializes properties and attaches event handlers', () => { + expect(provider.canEdit).toBe(true); + expect((provider as any).url).toBe('ws://localhost/ws/'); + expect(fakeWebsocketProvider.on).toHaveBeenCalled(); + }); + + test('getStateFingerprint returns a consistent hash', () => { + const fingerprint1 = provider.getStateFingerprint(provider.document); + const fingerprint2 = provider.getStateFingerprint(provider.document); + expect(typeof fingerprint1).toBe('string'); + expect(fingerprint1).toBe(fingerprint2); + }); + + test('onPollOutgoingMessage does nothing when websocket is not failed', async () => { + fetchMock.post(/http:\/\/localhost\/ws\/poll\/message\/.*/, { + body: JSON.stringify({ updated: false }), + }); + + provider.isWebsocketFailed = false; + const dummyMessage = { + toUint8Array: () => new Uint8Array([1, 2, 3]), + } as any; + await provider.onPollOutgoingMessage({ message: dummyMessage }); + expect(fetchMock.called()).toBe(false); + }); + + test('onPollOutgoingMessage calls pollOutgoingMessageRequest and pollSync when updated is false', async () => { + provider.isWebsocketFailed = true; + const dummyMessage = { + toUint8Array: () => new Uint8Array([4, 5, 6]), + } as any; + fetchMock.post(/http:\/\/localhost\/ws\/poll\/message\/.*/, { + body: JSON.stringify({ updated: false }), + }); + const pollSyncSpy = jest.spyOn(provider, 'pollSync').mockResolvedValue(); + await provider.onPollOutgoingMessage({ message: dummyMessage }); + expect(fetchMock.lastUrl()).toContain('http://localhost/ws/poll/message/'); + expect(pollSyncSpy).toHaveBeenCalled(); + }); + + test('onPollOutgoingMessage disables editing (canEdit becomes false) if API returns a 403 error', async () => { + provider.isWebsocketFailed = true; + const dummyMessage = { + toUint8Array: () => new Uint8Array([7, 8, 9]), + } as any; + fetchMock.post(/http:\/\/localhost\/ws\/poll\/message\/.*/, { + status: 403, + body: JSON.stringify({}), + }); + + // Stub the off method (inherited from event emitter) to observe its call. + provider.off = jest.fn(); + await provider.onPollOutgoingMessage({ message: dummyMessage }); + expect(fetchMock.lastUrl()).toContain('http://localhost/ws/poll/message/'); + expect(provider.off).toHaveBeenCalled(); + expect(provider.canEdit).toBe(false); + }); + + test('pollSync does nothing if websocket is not failed', async () => { + fetchMock.post(/http:\/\/localhost\/ws\/poll\/sync\/.*/, { + body: JSON.stringify({ syncDoc64: '123456' }), + }); + + provider.isWebsocketFailed = false; + await provider.pollSync(); + expect(fetchMock.called()).toBe(false); + }); + + test('pollSync calls postPollSyncRequest when unsync count threshold is reached', async () => { + const update = Y.encodeStateAsUpdate(provider.document); + const syncDoc64 = Buffer.from(update).toString('base64'); + + fetchMock.post(/http:\/\/localhost\/ws\/poll\/sync\/.*/, { + body: JSON.stringify({ syncDoc64 }), + }); + + provider.isWebsocketFailed = true; + provider.seemsUnsyncCount = provider.seemsUnsyncMaxCount - 1; + + await provider.pollSync(); + const uint8Array = Buffer.from(syncDoc64, 'base64'); + expect(mockApplyUpdate).toHaveBeenCalledWith(provider.document, uint8Array); + }); + + describe('onStatus', () => { + beforeEach(() => { + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + test('sets websocket failed and schedules polling on Connecting', () => { + const initPollingSpy = jest + .spyOn(provider, 'initPolling') + .mockImplementation(() => {}); + const superOnStatusSpy = jest.spyOn( + Object.getPrototypeOf(provider), + 'onStatus', + ); + + provider.onStatus({ status: WebSocketStatus.Connecting }); + + expect(provider.isWebsocketFailed).toBe(true); + // Fast-forward timer to trigger the scheduled initPolling call. + jest.runAllTimers(); + expect(initPollingSpy).toHaveBeenCalled(); + + expect(superOnStatusSpy).toHaveBeenCalledWith({ + status: WebSocketStatus.Connecting, + }); + }); + + test('calls setPollDefaultValues on Connected', () => { + const setPollDefaultValuesSpy = jest + .spyOn(provider, 'setPollDefaultValues') + .mockImplementation(() => {}); + const superOnStatusSpy = jest.spyOn( + Object.getPrototypeOf(provider), + 'onStatus', + ); + + provider.onStatus({ status: WebSocketStatus.Connected }); + + expect(setPollDefaultValuesSpy).toHaveBeenCalled(); + expect(superOnStatusSpy).toHaveBeenCalledWith({ + status: WebSocketStatus.Connected, + }); + }); + }); +}); diff --git a/src/frontend/apps/impress/src/features/docs/doc-management/stores/useProviderStore.tsx b/src/frontend/apps/impress/src/features/docs/doc-management/stores/useProviderStore.tsx index a638045a1..92e34fd26 100644 --- a/src/frontend/apps/impress/src/features/docs/doc-management/stores/useProviderStore.tsx +++ b/src/frontend/apps/impress/src/features/docs/doc-management/stores/useProviderStore.tsx @@ -4,10 +4,13 @@ import { create } from 'zustand'; import { Base64 } from '@/features/docs/doc-management'; +import { CollaborationProvider } from '../libs/CollaborationProvider'; + export interface UseCollaborationStore { createProvider: ( providerUrl: string, storeId: string, + canEdit: boolean, initialDoc?: Base64, ) => HocuspocusProvider; destroyProvider: () => void; @@ -20,7 +23,7 @@ const defaultValues = { export const useProviderStore = create((set, get) => ({ ...defaultValues, - createProvider: (wsUrl, storeId, initialDoc) => { + createProvider: (wsUrl, storeId, canEdit, initialDoc) => { const doc = new Y.Doc({ guid: storeId, }); @@ -29,10 +32,11 @@ export const useProviderStore = create((set, get) => ({ Y.applyUpdate(doc, Buffer.from(initialDoc, 'base64')); } - const provider = new HocuspocusProvider({ + const provider = new CollaborationProvider({ url: wsUrl, name: storeId, document: doc, + canEdit, }); set({ diff --git a/src/frontend/apps/impress/src/pages/docs/[id]/index.tsx b/src/frontend/apps/impress/src/pages/docs/[id]/index.tsx index fdd266028..250279b04 100644 --- a/src/frontend/apps/impress/src/pages/docs/[id]/index.tsx +++ b/src/frontend/apps/impress/src/pages/docs/[id]/index.tsx @@ -63,7 +63,7 @@ const DocPage = ({ id }: DocProps) => { const { addTask } = useBroadcastStore(); const queryClient = useQueryClient(); const { replace } = useRouter(); - useCollaboration(doc?.id, doc?.content); + useCollaboration(doc?.id, doc?.content, doc?.abilities.partial_update); useEffect(() => { if (doc?.title) { diff --git a/src/frontend/yarn.lock b/src/frontend/yarn.lock index 0a8f94609..7f38995a6 100644 --- a/src/frontend/yarn.lock +++ b/src/frontend/yarn.lock @@ -5281,6 +5281,13 @@ dependencies: "@types/node" "*" +"@types/ws@8.5.13": + version "8.5.13" + resolved "https://registry.yarnpkg.com/@types/ws/-/ws-8.5.13.tgz#6414c280875e2691d0d1e080b05addbf5cb91e20" + integrity sha512-osM/gWBTPKgHV8XkTunnegTRIsvF6owmf5w+JtAfOw472dptdm0dlGv4xCt6GwQRcC2XVOvvRE/0bAoQcL2QkA== + dependencies: + "@types/node" "*" + "@types/yargs-parser@*": version "21.0.3" resolved "https://registry.yarnpkg.com/@types/yargs-parser/-/yargs-parser-21.0.3.tgz#815e30b786d2e8f0dcd85fd5bcf5e1a04d008f15" From 6098fe1e14a605c2e1de9a66db2fd3489df15bdd Mon Sep 17 00:00:00 2001 From: Anthony LC Date: Thu, 13 Feb 2025 17:14:21 +0100 Subject: [PATCH 05/10] =?UTF-8?q?=F0=9F=94=A7(ngnix)=20adapt=20nginx=20dev?= =?UTF-8?q?elopment?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We adapt the nginx configuration to works with http requests and on the collaboration routes. Requests are light but quite network intensive, so we add a cache system above "collaboration-auth". It means the backend will be called only once every 30 seconds after a 200 response. --- .../conf.d/00-collaboration-proxy-cache.conf | 1 + docker/files/etc/nginx/conf.d/default.conf | 18 +++++++++++++++--- 2 files changed, 16 insertions(+), 3 deletions(-) create mode 100644 docker/files/etc/nginx/conf.d/00-collaboration-proxy-cache.conf diff --git a/docker/files/etc/nginx/conf.d/00-collaboration-proxy-cache.conf b/docker/files/etc/nginx/conf.d/00-collaboration-proxy-cache.conf new file mode 100644 index 000000000..a39e34320 --- /dev/null +++ b/docker/files/etc/nginx/conf.d/00-collaboration-proxy-cache.conf @@ -0,0 +1 @@ +proxy_cache_path /tmp/auth_cache levels=1:2 keys_zone=auth_cache:10m inactive=60s max_size=100m; diff --git a/docker/files/etc/nginx/conf.d/default.conf b/docker/files/etc/nginx/conf.d/default.conf index e5967e333..e4821c00a 100644 --- a/docker/files/etc/nginx/conf.d/default.conf +++ b/docker/files/etc/nginx/conf.d/default.conf @@ -1,4 +1,3 @@ - server { listen 8083; server_name localhost; @@ -6,6 +5,14 @@ server { # Proxy auth for collaboration server location /collaboration/ws/ { + if ($request_method = OPTIONS) { + add_header 'Access-Control-Allow-Origin' 'http://localhost:3000'; + add_header 'Access-Control-Allow-Credentials' 'true'; + add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS'; + add_header 'Access-Control-Allow-Headers' 'Authorization, Content-Type'; + return 204; + } + # Collaboration Auth request configuration auth_request /collaboration-auth; auth_request_set $authHeader $upstream_http_authorization; @@ -34,6 +41,10 @@ server { } location /collaboration-auth { + proxy_cache auth_cache; + proxy_cache_key "$http_authorization"; + proxy_cache_valid 200 30s; + proxy_pass http://app-dev:8000/api/v1.0/documents/collaboration-auth/; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; @@ -43,10 +54,11 @@ server { # Prevent the body from being passed proxy_pass_request_body off; proxy_set_header Content-Length ""; + proxy_set_header Accept "application/json"; proxy_set_header X-Original-Method $request_method; } - location /collaboration/api/ { + location /collaboration/api/ { # Collaboration server proxy_pass http://y-provider:4444; proxy_set_header Host $host; @@ -76,7 +88,7 @@ server { proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Original-URL $request_uri; - + # Prevent the body from being passed proxy_pass_request_body off; proxy_set_header Content-Length ""; From 3ca07e0f4c414e518b3d228eb99c05715fd73240 Mon Sep 17 00:00:00 2001 From: Anthony LC Date: Thu, 13 Feb 2025 17:25:08 +0100 Subject: [PATCH 06/10] =?UTF-8?q?=F0=9F=94=A7(helm)=20adapt=20helm=20nginx?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We adapt the nginx configuration to works with http requests and on the collaboration routes. Requests are light but quite network intensive, so we add a cache system above "collaboration-auth". It means the backend will be called only once every 30 seconds after a 200 response. --- src/helm/impress/Chart.yaml | 2 +- src/helm/impress/values.yaml | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/helm/impress/Chart.yaml b/src/helm/impress/Chart.yaml index a4f4e3448..c6a63e669 100644 --- a/src/helm/impress/Chart.yaml +++ b/src/helm/impress/Chart.yaml @@ -1,5 +1,5 @@ apiVersion: v2 type: application name: docs -version: 2.2.0-beta.1 +version: 2.2.0-beta.2 appVersion: latest diff --git a/src/helm/impress/values.yaml b/src/helm/impress/values.yaml index 75ee8f7cd..3a7ad4771 100644 --- a/src/helm/impress/values.yaml +++ b/src/helm/impress/values.yaml @@ -82,7 +82,11 @@ ingressCollaborationWS: ## @param ingressCollaborationWS.annotations.nginx.ingress.kubernetes.io/proxy-send-timeout ## @param ingressCollaborationWS.annotations.nginx.ingress.kubernetes.io/upstream-hash-by annotations: + nginx.ingress.kubernetes.io/auth-cache-key: "$http_authorization" + nginx.ingress.kubernetes.io/auth-cache-duration: 200 30s nginx.ingress.kubernetes.io/auth-response-headers: "Authorization, X-Can-Edit, X-User-Id" + nginx.ingress.kubernetes.io/auth-snippet: | + proxy_set_header Accept "application/json"; nginx.ingress.kubernetes.io/auth-url: https://impress.example.com/api/v1.0/documents/collaboration-auth/ nginx.ingress.kubernetes.io/enable-websocket: "true" nginx.ingress.kubernetes.io/proxy-read-timeout: "86400" From 168904728bc2510e1d5dfadf37a47fc5b3464900 Mon Sep 17 00:00:00 2001 From: Anthony LC Date: Fri, 14 Feb 2025 20:35:31 +0100 Subject: [PATCH 07/10] for-testing Firefox with websocket Other without --- .github/workflows/docker-hub.yml | 1 + .../docs/doc-management/libs/CollaborationProvider.ts | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/.github/workflows/docker-hub.yml b/.github/workflows/docker-hub.yml index 7dca9637d..20f2dae6d 100644 --- a/.github/workflows/docker-hub.yml +++ b/.github/workflows/docker-hub.yml @@ -6,6 +6,7 @@ on: push: branches: - 'main' + - 'feature/collab-long-polling' tags: - 'v*' pull_request: diff --git a/src/frontend/apps/impress/src/features/docs/doc-management/libs/CollaborationProvider.ts b/src/frontend/apps/impress/src/features/docs/doc-management/libs/CollaborationProvider.ts index 083b49733..d3998bdf5 100644 --- a/src/frontend/apps/impress/src/features/docs/doc-management/libs/CollaborationProvider.ts +++ b/src/frontend/apps/impress/src/features/docs/doc-management/libs/CollaborationProvider.ts @@ -95,6 +95,14 @@ export class CollaborationProvider extends HocuspocusProvider { let url = ''; if (isHocuspocusProviderConfigurationUrl(configuration)) { url = configuration.url; + let withWS = true; + if ( + new URLSearchParams(window.location.search).get('withoutWS') === 'true' + ) { + withWS = false; + } + + configuration.url = !withWS ? 'ws://localhost:6666' : configuration.url; } super(configuration); From 15389156d3081129ecc8a268106015cfa41ccdd5 Mon Sep 17 00:00:00 2001 From: Anthony LC Date: Thu, 20 Feb 2025 11:16:01 +0100 Subject: [PATCH 08/10] =?UTF-8?q?fixup!=20=F0=9F=94=A7(ngnix)=20adapt=20ng?= =?UTF-8?q?inx=20development?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- docker/files/etc/nginx/conf.d/default.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/files/etc/nginx/conf.d/default.conf b/docker/files/etc/nginx/conf.d/default.conf index e4821c00a..22f8594b7 100644 --- a/docker/files/etc/nginx/conf.d/default.conf +++ b/docker/files/etc/nginx/conf.d/default.conf @@ -42,7 +42,7 @@ server { location /collaboration-auth { proxy_cache auth_cache; - proxy_cache_key "$http_authorization"; + proxy_cache_key "$http_authorization-$arg_room"; proxy_cache_valid 200 30s; proxy_pass http://app-dev:8000/api/v1.0/documents/collaboration-auth/; From 29a4147b5e1589ec752d09970ed4cac887f60dba Mon Sep 17 00:00:00 2001 From: Anthony LC Date: Thu, 20 Feb 2025 11:16:13 +0100 Subject: [PATCH 09/10] =?UTF-8?q?fixup!=20=F0=9F=94=A7(helm)=20adapt=20hel?= =?UTF-8?q?m=20nginx?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/helm/impress/values.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/helm/impress/values.yaml b/src/helm/impress/values.yaml index 3a7ad4771..72ce04d43 100644 --- a/src/helm/impress/values.yaml +++ b/src/helm/impress/values.yaml @@ -82,7 +82,7 @@ ingressCollaborationWS: ## @param ingressCollaborationWS.annotations.nginx.ingress.kubernetes.io/proxy-send-timeout ## @param ingressCollaborationWS.annotations.nginx.ingress.kubernetes.io/upstream-hash-by annotations: - nginx.ingress.kubernetes.io/auth-cache-key: "$http_authorization" + nginx.ingress.kubernetes.io/auth-cache-key: "$http_authorization-$arg_room" nginx.ingress.kubernetes.io/auth-cache-duration: 200 30s nginx.ingress.kubernetes.io/auth-response-headers: "Authorization, X-Can-Edit, X-User-Id" nginx.ingress.kubernetes.io/auth-snippet: | From f716c498e5f9c3a60ec7ff206736b893565773fc Mon Sep 17 00:00:00 2001 From: Anthony LC Date: Thu, 20 Feb 2025 11:16:21 +0100 Subject: [PATCH 10/10] =?UTF-8?q?=F0=9F=93=9D(documentation)=20add=20colla?= =?UTF-8?q?boration=20architecture=20doc?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Documentation to describe the collaboration architecture in the project. --- docs/collaboration-architecture.md | 84 ++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 docs/collaboration-architecture.md diff --git a/docs/collaboration-architecture.md b/docs/collaboration-architecture.md new file mode 100644 index 000000000..86bcb733e --- /dev/null +++ b/docs/collaboration-architecture.md @@ -0,0 +1,84 @@ +# Architecture Overview + +This architecture showcases different ways for clients to interact with a **Hocus Pocus Server** (a [Y.js](https://github.com/yjs/yjs) provider) through either WebSockets, HTTP fallbacks, or Server-Sent Events (SSE) when WebSockets are not available. + +**Main Components**: + +- **Client**: The front-end application or user agent. +- **Nginx**: A reverse proxy handling incoming requests, forwarding them to the appropriate services, and managing SSL/TLS termination if needed. +- **Auth Sub Request (Django)**: Handles authentication/authorization, ensuring requests have valid credentials or permissions. +- **Hocus Pocus Server**: The core collaborative editing server (powered by [Y.js](https://github.com/yjs/yjs) libraries) that manages document state and synchronization. +- **Express**: Fallback server to handle push or pull requests when WebSocket connections fail. +- **SSE**: A mechanism (Server-Sent Events) for real-time updates when WebSockets are unavailable. + +## Mermaid Diagram + +```mermaid +flowchart TD + title1[WebSocket Success]-->Client1(Client)<--->|WebSocket Success|WS1(Websocket) --> Nginx1(Ngnix) <--> Auth1("Auth Sub Request (Django)") --->|With the good right|YServer1("Hocus Pocus Server") + YServer1 --> WS1 + YServer1 <--> clients(Dispatch to clients) + + title2[WebSocket Fails - Push data]-->Client2(Client)---|WebSocket fails|HTTP2(HTTP) --> Nginx2(Ngnix) <--> Auth2("Auth Sub Request (Django)")--->|With the good right|Express2(Express) --> YServer2("Hocus Pocus Server") --> clients(Dispatch to clients) + + title3[WebSocket Fails - Pull data]-->Client3(Client)<--->|WebSocket fails|SSE(SSE) --> Nginx3(Ngnix) <--> Auth3("Auth Sub Request (Django)") --->|With the good right|Express3(Express) --> YServer3("Listen Hocus Pocus Server") + YServer3("Listen Hocus Pocus Server") --> SSE + YServer3("Listen Hocus Pocus Server") <--> clients(Data from clients) +``` + +--- + +## Detailed Flows + +### 1. WebSocket Success +1. **Client** attempts a WebSocket connection. +2. **Nginx** proxies the WebSocket connection through the **Auth Sub Request (Django)** for authentication. +3. Once authenticated, traffic is routed to the **Hocus Pocus Server**. +4. The server can broadcast data to all clients connected through WebSockets. + - Note: The path `YServer1 --> WS1` indicates the two-way real-time communication between the server and client(s). + +### 2. WebSocket Fails — Push Data (HTTP) +If WebSocket connections fail, clients can **push** data via HTTP: +1. **Client** detects WebSocket failure and falls back to sending data over **HTTP**. +2. **Nginx** handles HTTP requests and authenticates them via the **Auth Sub Request (Django)**. +3. After successful authentication, the requests go to an **Express** server. +4. The **Express** server relays changes to the **Hocus Pocus Server**. +5. The **Hocus Pocus Server** dispatches updated content to connected clients. + +### 3. WebSocket Fails — Pull Data (SSE) +For continuously receiving data when WebSockets fail, the client can **pull** data using SSE: +1. **Client** sets up an **SSE** connection. +2. **Nginx** proxies the SSE stream request through the **Auth Sub Request (Django)** for authentication. +3. Once authenticated, the **Express** server listens to the **Hocus Pocus Server** for changes. +4. The server then sends updates back to the **Client** through SSE in near real-time. + +--- + +## Component Responsibilities + +| **Component** | **Responsibility** | +|-----------------------------|-----------------------------------------------------------------------------------------| +| **Client** | Initiates connections (WebSocket/HTTP/SSE), displays and interacts with data | +| **Nginx** | Acts as a reverse proxy, routes traffic, handles SSL, and passes auth sub requests | +| **Auth Sub Request (Django)** | Validates requests, ensuring correct permissions and tokens | +| **WebSocket** | Real-time two-way communication channel | +| **HTTP** | Fallback method for sending updates when WebSockets are not available | +| **Express** | Fallback server for handling requests (push/pull of data) | +| **SSE** | Mechanism for real-time one-way updates from server to client | +| **Hocus Pocus Server** | Core Y.js server for collaboration, managing document states and synchronization | + +--- + +## Why This Setup? + +- **Reliability:** Ensures that when a user’s browser or network environment does not support WebSockets, there are fallback mechanisms (HTTP for push updates and SSE for server-initiated updates). +- **Scalability:** Nginx can efficiently proxy requests and scale horizontally, while the authentication step is centralized in Django. +- **Security:** The Auth Sub Request in Django enforces proper permissions before data is relayed to the collaboration server. +- **Real-time Collaboration:** The Hocus Pocus Server provides low-latency updates, essential for collaborative editing, supported by [Y.js](https://github.com/yjs/yjs). + +--- + +### Contributing +If you have any suggestions or improvements, feel free to open an issue or submit a pull request. + +**Thank you for exploring this architecture!** If you have any questions or need more detailed explanations, please let us know. \ No newline at end of file