From bb53494ce306b8bc2f49ca52d4d082f6b3492dd8 Mon Sep 17 00:00:00 2001 From: Michael Wang Date: Tue, 21 Jan 2025 23:39:35 +0800 Subject: [PATCH 1/4] feat: add `enableAwareness` option to `SocketIOProvider` --- src/y-socket-io/client.js | 154 +++++++++++++++++++++++--------------- 1 file changed, 94 insertions(+), 60 deletions(-) diff --git a/src/y-socket-io/client.js b/src/y-socket-io/client.js index c1a1d11..6e57faa 100644 --- a/src/y-socket-io/client.js +++ b/src/y-socket-io/client.js @@ -23,6 +23,9 @@ import { io } from 'socket.io-client' * @prop {boolean=} autoConnect * (Optional) This boolean specify if the provider should connect when the instance is created, by default is true * + * @prop {boolean=} enableAwareness + * (Optional) This boolean enable the awareness functionality, by default is true + * * @prop {AwarenessProtocol.Awareness=} awareness * (Optional) An existent awareness, by default is a new AwarenessProtocol.Awareness instance * @@ -73,9 +76,14 @@ export class SocketIOProvider extends Observable { * @public */ doc + /** + * Enable awareness + * @type {boolean} + */ + enableAwareness /** * The awareness - * @type {AwarenessProtocol.Awareness} + * @type {AwarenessProtocol.Awareness=} * @public */ awareness @@ -126,7 +134,8 @@ export class SocketIOProvider extends Observable { doc = new Y.Doc(), { autoConnect = true, - awareness = new AwarenessProtocol.Awareness(doc), + enableAwareness = true, + awareness = enableAwareness ? new AwarenessProtocol.Awareness(doc) : undefined, resyncInterval = -1, disableBc = false, auth = {} @@ -140,6 +149,8 @@ export class SocketIOProvider extends Observable { this._url = url this.roomName = roomName this.doc = doc + + this.enableAwareness = enableAwareness this.awareness = awareness this._broadcastChannel = `${url}/${roomName}` @@ -167,11 +178,13 @@ export class SocketIOProvider extends Observable { this.initSyncListeners() - this.initAwarenessListeners() + if (this.enableAwareness) { + this.initAwarenessListeners() + awareness?.on('update', this.awarenessUpdate) + } this.initSystemListeners() - awareness.on('update', this.awarenessUpdate) if (autoConnect) this.connect() } @@ -260,6 +273,8 @@ export class SocketIOProvider extends Observable { */ initAwarenessListeners = () => { this.socket.on('awareness-update', (/** @type {ArrayBuffer} */ update) => { + if (!this.awareness) return + AwarenessProtocol.applyAwarenessUpdate( this.awareness, new Uint8Array(update), @@ -310,7 +325,7 @@ export class SocketIOProvider extends Observable { Y.applyUpdate(this.doc, new Uint8Array(update), this) } ) - if (this.awareness.getLocalState() !== null) { + if (this.enableAwareness && this.awareness && this.awareness.getLocalState() !== null) { this.socket.emit( 'awareness-update', AwarenessProtocol.encodeAwarenessUpdate(this.awareness, [ @@ -355,13 +370,15 @@ export class SocketIOProvider extends Observable { this.emit('connection-close', [event, this]) this.synced = false - AwarenessProtocol.removeAwarenessStates( - this.awareness, - Array.from(this.awareness.getStates().keys()).filter( - (client) => client !== this.doc.clientID - ), - this - ) + if (this.enableAwareness && this.awareness) { + AwarenessProtocol.removeAwarenessStates( + this.awareness, + Array.from(this.awareness.getStates().keys()).filter( + (client) => client !== this.doc.clientID + ), + this + ) + } this.emit('status', [{ status: 'disconnected' }]) } @@ -382,8 +399,10 @@ export class SocketIOProvider extends Observable { if (this.resyncInterval != null) clearInterval(this.resyncInterval) this.disconnect() if (typeof window !== 'undefined') { window.removeEventListener('beforeunload', this.beforeUnloadHandler) } else if (typeof process !== 'undefined') { process.off('exit', this.beforeUnloadHandler) } - this.awareness.off('update', this.awarenessUpdate) - this.awareness.destroy() + if (this.enableAwareness) { + this.awareness?.off('update', this.awarenessUpdate) + this.awareness?.destroy() + } this.doc.off('update', this.onUpdateDoc) super.destroy() } @@ -429,6 +448,8 @@ export class SocketIOProvider extends Observable { * @readonly */ awarenessUpdate = ({ added, updated, removed }, origin) => { + if (!this.awareness) return + const changedClients = added.concat(updated).concat(removed) this.socket.emit( 'awareness-update', @@ -457,6 +478,8 @@ export class SocketIOProvider extends Observable { * @readonly */ beforeUnloadHandler = () => { + if (!this.enableAwareness || !this.awareness) return + AwarenessProtocol.removeAwarenessStates( this.awareness, [this.doc.clientID], @@ -485,21 +508,24 @@ export class SocketIOProvider extends Observable { { type: 'sync-step-2', data: Y.encodeStateAsUpdate(this.doc) }, this ) - bc.publish( - this._broadcastChannel, - { type: 'query-awareness', data: null }, - this - ) - bc.publish( - this._broadcastChannel, - { - type: 'awareness-update', - data: AwarenessProtocol.encodeAwarenessUpdate(this.awareness, [ - this.doc.clientID - ]) - }, - this - ) + + if (this.enableAwareness && this.awareness) { + bc.publish( + this._broadcastChannel, + { type: 'query-awareness', data: null }, + this + ) + bc.publish( + this._broadcastChannel, + { + type: 'awareness-update', + data: AwarenessProtocol.encodeAwarenessUpdate(this.awareness, [ + this.doc.clientID + ]) + }, + this + ) + } } /** @@ -509,18 +535,20 @@ export class SocketIOProvider extends Observable { * @readonly */ disconnectBc = () => { - bc.publish( - this._broadcastChannel, - { - type: 'awareness-update', - data: AwarenessProtocol.encodeAwarenessUpdate( - this.awareness, - [this.doc.clientID], - new Map() - ) - }, - this - ) + if (this.enableAwareness && this.awareness) { + bc.publish( + this._broadcastChannel, + { + type: 'awareness-update', + data: AwarenessProtocol.encodeAwarenessUpdate( + this.awareness, + [this.doc.clientID], + new Map() + ) + }, + this + ) + } if (this.bcconnected) { bc.unsubscribe(this._broadcastChannel, this.onBroadcastChannelMessage) this.bcconnected = false @@ -556,27 +584,33 @@ export class SocketIOProvider extends Observable { Y.applyUpdate(this.doc, new Uint8Array(message.data), this) break - case 'query-awareness': - bc.publish( - this._broadcastChannel, - { - type: 'awareness-update', - data: AwarenessProtocol.encodeAwarenessUpdate( - this.awareness, - Array.from(this.awareness.getStates().keys()) - ) - }, - this - ) + case 'query-awareness': { + if (this.enableAwareness && this.awareness) { + bc.publish( + this._broadcastChannel, + { + type: 'awareness-update', + data: AwarenessProtocol.encodeAwarenessUpdate( + this.awareness, + Array.from(this.awareness.getStates().keys()) + ) + }, + this + ) + } break - - case 'awareness-update': - AwarenessProtocol.applyAwarenessUpdate( - this.awareness, - new Uint8Array(message.data), - this - ) + } + + case 'awareness-update': { + if (this.enableAwareness && this.awareness) { + AwarenessProtocol.applyAwarenessUpdate( + this.awareness, + new Uint8Array(message.data), + this + ) + } break + } default: break From 64a1cb71f503496437e4693ddd2f5a4bee1e22c7 Mon Sep 17 00:00:00 2001 From: Michael Wang Date: Wed, 22 Jan 2025 22:42:40 +0800 Subject: [PATCH 2/4] feat: add `enableAwareness` option to API and SocketIO server --- src/api.js | 24 ++++++++++++++++-------- src/socketio.js | 11 +++++++++-- src/subscriber.js | 2 +- src/y-socket-io/y-socket-io.js | 20 ++++++++++++++------ 4 files changed, 40 insertions(+), 17 deletions(-) diff --git a/src/api.js b/src/api.js index 64860fd..e712ff6 100644 --- a/src/api.js +++ b/src/api.js @@ -84,10 +84,10 @@ const decodeRedisRoomStreamName = (rediskey, expectedPrefix) => { /** * @param {import('./storage.js').AbstractStorage} store - * @param {{ redisPrefix?: string, redisUrl?: string }} opts + * @param {{ redisPrefix?: string, redisUrl?: string, enableAwareness?: boolean }} opts */ -export const createApiClient = async (store, { redisPrefix, redisUrl }) => { - const a = new Api(store, redisPrefix, redisUrl) +export const createApiClient = async (store, { redisPrefix, redisUrl, enableAwareness = true }) => { + const a = new Api(store, redisPrefix, redisUrl, { enableAwareness }) await a.redis.connect() try { await a.redis.xGroupCreate(a.redisWorkerStreamName, a.redisWorkerGroupName, '0', { MKSTREAM: true }) @@ -100,10 +100,13 @@ export class Api { * @param {import('./storage.js').AbstractStorage} store * @param {string=} prefix * @param {string=} url + * @param {Object} opts + * @param {boolean=} opts.enableAwareness */ - constructor (store, prefix = 'y', url = env.ensureConf('ysr-redis')) { + constructor (store, prefix = 'y', url = env.ensureConf('ysr-redis'), { enableAwareness = true } = {}) { this.store = store this.prefix = prefix + this.enableAwareness = enableAwareness this.consumername = random.uuidv4() /** * After this timeout, a new worker will pick up the task @@ -240,8 +243,11 @@ export class Api { if (docMessages?.messages) logApi(`processing messages of length: ${docMessages?.messages.length} in room: ${room}`) const docstate = await this.store.retrieveDoc(room, docid) const ydoc = new Y.Doc() - const awareness = new awarenessProtocol.Awareness(ydoc) - awareness.setLocalState(null) // we don't want to propagate awareness state + let awareness = null + if (this.enableAwareness) { + awareness = new awarenessProtocol.Awareness(ydoc) + awareness.setLocalState(null) // we don't want to propagate awareness state + } const now = performance.now() if (docstate) { Y.applyUpdateV2(ydoc, docstate.doc) } let changed = false @@ -257,7 +263,9 @@ export class Api { break } case 1: { // awareness message - awarenessProtocol.applyAwarenessUpdate(awareness, decoding.readVarUint8Array(decoder), null) + if (this.enableAwareness && awareness) { + awarenessProtocol.applyAwarenessUpdate(awareness, decoding.readVarUint8Array(decoder), null) + } break } } @@ -394,7 +402,7 @@ export class Api { /** * @param {import('./storage.js').AbstractStorage} store - * @param {{ redisPrefix?: string, redisUrl?: string }} opts + * @param {{ redisPrefix?: string, redisUrl?: string, enableAwareness?: boolean }} opts */ export const createWorker = async (store, opts) => { const a = await createApiClient(store, opts) diff --git a/src/socketio.js b/src/socketio.js index 0d26540..2388385 100644 --- a/src/socketio.js +++ b/src/socketio.js @@ -38,9 +38,16 @@ class YSocketIOServer { * @param {string} [conf.redisUrl] * @param {import('./y-socket-io/y-socket-io.js').YSocketIOConfiguration['authenticate']} conf.authenticate * @param {import('worker_threads').Worker=} [conf.persistWorker] + * @param {boolean} [conf.enableAwareness] */ -export const registerYSocketIOServer = async (io, store, { authenticate, redisUrl, redisPrefix, persistWorker }) => { - const app = new YSocketIO(io, { authenticate }) +export const registerYSocketIOServer = async (io, store, { + authenticate, + redisUrl, + redisPrefix, + persistWorker, + enableAwareness = true +}) => { + const app = new YSocketIO(io, { authenticate, enableAwareness }) const { client, subscriber } = await app.initialize(store, { redisUrl, redisPrefix, persistWorker }) return new YSocketIOServer(app, client, subscriber) } diff --git a/src/subscriber.js b/src/subscriber.js index fe2fc47..530a1fa 100644 --- a/src/subscriber.js +++ b/src/subscriber.js @@ -32,7 +32,7 @@ const run = async subscriber => { /** * @param {import('./storage.js').AbstractStorage} store - * @param {{ redisPrefix?: string, redisUrl?: string }} opts + * @param {{ redisPrefix?: string, redisUrl?: string, enableAwareness?: boolean }} opts */ export const createSubscriber = async (store, opts) => { const client = await api.createApiClient(store, opts) diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 8a50f55..82fab70 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -57,6 +57,8 @@ process.on('SIGINT', function () { * * @prop {(socket: Socket)=> Promise | UserLike | null} authenticate * Callback to authenticate the client connection. + * @prop {boolean=} enableAwareness + * Enable/disable awareness functionality, defaults to true */ /** @@ -159,7 +161,10 @@ export class YSocketIO { */ constructor (io, configuration) { this.io = io - this.configuration = configuration + this.configuration = { + enableAwareness: true, + ...configuration + } } /** @@ -174,9 +179,10 @@ export class YSocketIO { * @public */ async initialize (store, { redisUrl, redisPrefix = 'y', persistWorker } = {}) { + const { enableAwareness } = this.configuration const [client, subscriber] = await promise.all([ - api.createApiClient(store, { redisUrl, redisPrefix }), - createSubscriber(store, { redisUrl, redisPrefix }) + api.createApiClient(store, { redisUrl, redisPrefix, enableAwareness }), + createSubscriber(store, { redisUrl, redisPrefix, enableAwareness }) ]) this.client = client this.subscriber = subscriber @@ -243,7 +249,9 @@ export class YSocketIO { this.streamNamespaceMap.set(stream, namespace) this.initSyncListeners(socket) - this.initAwarenessListeners(socket) + if (this.configuration.enableAwareness) { + this.initAwarenessListeners(socket) + } this.initSocketListeners(socket) ;(async () => { assert(this.client) @@ -404,7 +412,7 @@ export class YSocketIO { .catch(console.error) } ) - if (doc.awareness.states.size > 0) { + if (this.configuration.enableAwareness && doc.awareness.states.size > 0) { socket.emit( 'awareness-update', AwarenessProtocol.encodeAwarenessUpdate( @@ -436,7 +444,7 @@ export class YSocketIO { for (const m of messages) { const decoded = this.fromRedis(m) - if (decoded.type === 'awareness-update') awareness.push(decoded.message) + if (decoded.type === 'awareness-update' && this.configuration.enableAwareness) awareness.push(decoded.message) else updates.push(decoded.message) } From a13c93e89a4fe9abd1fc07a02b98d9cebb661faf Mon Sep 17 00:00:00 2001 From: Michael Wang Date: Wed, 22 Jan 2025 22:46:14 +0800 Subject: [PATCH 3/4] fix: typing --- src/y-socket-io/client.js | 1 - src/y-socket-io/y-socket-io.js | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/y-socket-io/client.js b/src/y-socket-io/client.js index 6e57faa..a84f140 100644 --- a/src/y-socket-io/client.js +++ b/src/y-socket-io/client.js @@ -185,7 +185,6 @@ export class SocketIOProvider extends Observable { this.initSystemListeners() - if (autoConnect) this.connect() } diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 82fab70..89267f5 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -44,7 +44,7 @@ process.on('SIGINT', function () { * * @typedef {{ * ydoc: Y.Doc; - * awareness: AwarenessProtocol.Awareness; + * awareness: AwarenessProtocol.Awareness | null; * redisLastId: string; * storeReferences: any[] | null; * }} RedisDoc @@ -412,7 +412,7 @@ export class YSocketIO { .catch(console.error) } ) - if (this.configuration.enableAwareness && doc.awareness.states.size > 0) { + if (this.configuration.enableAwareness && doc.awareness && doc.awareness.states.size > 0) { socket.emit( 'awareness-update', AwarenessProtocol.encodeAwarenessUpdate( From 941f3a267e48840bb24944b86f8d91b4216e116d Mon Sep 17 00:00:00 2001 From: Charlie Hsieh Date: Thu, 23 Jan 2025 15:24:10 +0800 Subject: [PATCH 4/4] fix: null check --- src/y-socket-io/y-socket-io.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/y-socket-io/y-socket-io.js b/src/y-socket-io/y-socket-io.js index 89267f5..1afa49b 100644 --- a/src/y-socket-io/y-socket-io.js +++ b/src/y-socket-io/y-socket-io.js @@ -470,9 +470,11 @@ export class YSocketIO { if (msg.length === 0) continue Y.applyUpdate(existDoc.ydoc, msg) } - for (const msg of awareness) { - if (msg.length === 0) continue - AwarenessProtocol.applyAwarenessUpdate(existDoc.awareness, msg, null) + if (existDoc.awareness) { + for (const msg of awareness) { + if (msg.length === 0) continue + AwarenessProtocol.applyAwarenessUpdate(existDoc.awareness, msg, null) + } } }) }