Skip to content

Commit

Permalink
use pub-sub for tracking board data
Browse files Browse the repository at this point in the history
Signed-off-by: Nikolay Chunosov <[email protected]>
  • Loading branch information
Chunosov committed Feb 6, 2025
1 parent 6f3fe0c commit c31902d
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 73 deletions.
103 changes: 85 additions & 18 deletions plugins/presence-resources/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,42 @@ import presence from '@hcengineering/presence'
import presentation from '@hcengineering/presentation'
import { type Unsubscriber, get } from 'svelte/store'

import { myPresence, myData, onPersonUpdate, onPersonLeave, onPersonData } from './store'
import { myPresence, myData, onPersonUpdate, onPersonLeave, onPersonData, followee, toggleFollowee } from './store'
import type { RoomPresence, MyDataItem } from './types'

interface Message {
interface PresenceMessage {
id: Ref<Person>
type: 'update' | 'remove'
presence?: RoomPresence[]
lastUpdate?: number
}

interface DataMessage {
id: Ref<Person>
type: 'data'
key: string
sender: Ref<Person>
topic: string
data: any
}

interface FollowMessage {
type: 'follow'
follower: Ref<Person>
followee: Ref<Person>
}

interface UnfollowMessage {
type: 'unfollow'
follower: Ref<Person>
}

interface FollowedMessage {
type: 'followed'
follower: Ref<Person>
active: boolean
}

type IncomingMessage = PresenceMessage | DataMessage | FollowedMessage | UnfollowMessage

export class PresenceClient implements Disposable {
private ws: WebSocket | null = null
private closed = false
Expand All @@ -48,14 +67,19 @@ export class PresenceClient implements Disposable {
private readonly myDataTimestamps = new Map<string, number>()
private readonly myPresenceUnsub: Unsubscriber
private readonly myDataUnsub: Unsubscriber
private readonly followeeUnsub: Unsubscriber
private readonly followers = new Set<Ref<Person>>()

constructor (private readonly url: string | URL) {
this.presence = get(myPresence)
this.myPresenceUnsub = myPresence.subscribe((presence) => {
this.handlePresenceChanged(presence)
})
this.myDataUnsub = myData.subscribe((data) => {
this.handleMyDataChanged(data)
this.handleMyDataChanged(data, false)
})
this.followeeUnsub = followee.subscribe((followee) => {
this.handleFolloweeChanged(followee)
})

this.connect()
Expand All @@ -67,6 +91,7 @@ export class PresenceClient implements Disposable {

this.myPresenceUnsub()
this.myDataUnsub()
this.followeeUnsub()

if (this.ws !== null) {
this.ws.close()
Expand Down Expand Up @@ -128,18 +153,28 @@ export class PresenceClient implements Disposable {

private handleConnect (): void {
this.sendPresence(getCurrentEmployee(), this.presence)
this.sendMyData(getCurrentEmployee(), get(myData))

this.handleMyDataChanged(get(myData), true)

const f = get(followee)
if (f !== undefined) {
this.handleFolloweeChanged(f)
}
}

private handleMessage (data: string): void {
try {
const message = JSON.parse(data) as Message | DataMessage
const message = JSON.parse(data) as IncomingMessage
if (message.type === 'update' && message.presence !== undefined) {
onPersonUpdate(message.id, message.presence ?? [])
} else if (message.type === 'remove') {
onPersonLeave(message.id)
} else if (message.type === 'data') {
onPersonData(message.id, message.key, message.data)
onPersonData(message.sender, message.topic, message.data)
} else if (message.type === 'followed') {
this.onFollowed(message.follower, message.active)
} else if (message.type === 'unfollow') {
toggleFollowee(undefined)
} else {
console.warn('Unknown message type', message)
}
Expand All @@ -155,28 +190,60 @@ export class PresenceClient implements Disposable {

private sendPresence (person: Ref<Person>, presence: RoomPresence[]): void {
if (!this.closed && this.ws !== null && this.ws.readyState === WebSocket.OPEN) {
const message: Message = { id: person, type: 'update', presence }
const message: PresenceMessage = { id: person, type: 'update', presence }
this.ws.send(JSON.stringify(message))
}
}

private handleMyDataChanged (data: Map<string, MyDataItem>): void {
this.sendMyData(getCurrentEmployee(), data)
private handleMyDataChanged (data: Map<string, MyDataItem>, forceSend: boolean): void {
if (this.followers.size === 0) {
return
}
if (!this.closed && this.ws !== null && this.ws.readyState === WebSocket.OPEN) {
for (const [topic, value] of data) {
const lastSend = this.myDataTimestamps.get(topic) ?? 0
if (value.lastUpdated >= lastSend + this.myDataThrottleInterval || forceSend) {
this.myDataTimestamps.set(topic, value.lastUpdated)
const message: DataMessage = {
sender: getCurrentEmployee(),
type: 'data',
topic,
data: value.data
}
this.ws.send(JSON.stringify(message))
}
}
}
}

private sendMyData (person: Ref<Person>, data: Map<string, MyDataItem>): void {
private handleFolloweeChanged (followee: Ref<Person> | undefined): void {
if (!this.closed && this.ws !== null && this.ws.readyState === WebSocket.OPEN) {
for (const [key, value] of data) {
const lastSend = this.myDataTimestamps.get(key) ?? 0
if (value.lastUpdated >= lastSend + this.myDataThrottleInterval || value.forceSend) {
this.myDataTimestamps.set(key, value.lastUpdated)
const message: DataMessage = { id: person, type: 'data', key, data: value.data }
this.ws.send(JSON.stringify(message))
if (followee !== undefined) {
const message: FollowMessage = {
type: 'follow',
follower: getCurrentEmployee(),
followee
}
this.ws.send(JSON.stringify(message))
} else {
const message: UnfollowMessage = {
type: 'unfollow',
follower: getCurrentEmployee()
}
this.ws.send(JSON.stringify(message))
}
}
}

private onFollowed (follower: Ref<Person>, active: boolean): void {
if (active) {
this.followers.add(follower)
this.handleMyDataChanged(get(myData), true)
} else {
this.followers.delete(follower)
}
}

[Symbol.dispose] (): void {
this.close()
}
Expand Down
8 changes: 4 additions & 4 deletions plugins/presence-resources/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { type Resources } from '@hcengineering/platform'
import Presence from './components/Presence.svelte'
import PresenceAvatars from './components/PresenceAvatars.svelte'
import WorkbenchExtension from './components/WorkbenchExtension.svelte'
import { getFollowee, sendMyData, subscribeToOtherData, unsubscribeFromOtherData } from './store'
import { getFollowee, publishData, followeeDataSubscribe, followeeDataUnsubscribe } from './store'

export { Presence, PresenceAvatars }
export { updateMyPresence, removeMyPresence, presenceByObjectId } from './store'
Expand All @@ -32,9 +32,9 @@ export default async (): Promise<Resources> => ({
WorkbenchExtension
},
function: {
SendMyData: sendMyData,
PublishData: publishData,
GetFollowee: getFollowee,
SubscribeToOtherData: subscribeToOtherData,
UnsubscribeFromOtherData: unsubscribeFromOtherData
FolloweeDataSubscribe: followeeDataSubscribe,
FolloweeDataUnsubscribe: followeeDataUnsubscribe
}
})
68 changes: 34 additions & 34 deletions plugins/presence-resources/src/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ export const myData = writable<Map<string, MyDataItem>>(new Map())
export const otherPresence = writable<PersonPresenceMap>(new Map())
export const followee = writable<Ref<Person> | undefined>(undefined)

const otherDataMap = new Map<Ref<Person>, Map<string, any>>()
const otherDataHandlers = new Map<string, Set<(data: any) => void>>()
const personDataMap = new Map<Ref<Person>, Map<string, any>>()
const followeeDataHandlers = new Map<string, Set<(data: any) => void>>()

export const presenceByObjectId = derived<Readable<PersonPresenceMap>, Map<Ref<Doc>, PersonRoomPresence[]>>(
otherPresence,
Expand Down Expand Up @@ -86,15 +86,15 @@ export function onPersonLeave (person: Ref<Person>): void {
})
}

export function onPersonData (person: Ref<Person>, key: string, data: any): void {
const otherData = otherDataMap.get(person)
if (otherData !== undefined) {
otherData.set(key, data)
export function onPersonData (person: Ref<Person>, topic: string, data: any): void {
const personData = personDataMap.get(person)
if (personData !== undefined) {
personData.set(topic, data)
} else {
otherDataMap.set(person, new Map([[key, data]]))
personDataMap.set(person, new Map([[topic, data]]))
}
if (person === get(followee)) {
const handlers = otherDataHandlers.get(key)
const handlers = followeeDataHandlers.get(topic)
if (handlers !== undefined) {
for (const handler of handlers) {
handler(data)
Expand All @@ -103,52 +103,52 @@ export function onPersonData (person: Ref<Person>, key: string, data: any): void
}
}

export function subscribeToOtherData (key: string, callback: (data: any) => void): void {
const handlers = otherDataHandlers.get(key)
export function followeeDataSubscribe (topic: string, handler: (data: any) => void): void {
const handlers = followeeDataHandlers.get(topic)
if (handlers !== undefined) {
handlers.add(callback)
handlers.add(handler)
} else {
otherDataHandlers.set(key, new Set([callback]))
followeeDataHandlers.set(topic, new Set([handler]))
}
const p = get(followee)
if (p !== undefined) {
const otherData = otherDataMap.get(p)
if (otherData !== undefined) {
const data = otherData.get(key)
const f = get(followee)
if (f !== undefined) {
const followeeData = personDataMap.get(f)
if (followeeData !== undefined) {
const data = followeeData.get(topic)
if (data !== undefined) {
callback(data)
handler(data)
}
}
}
}

export function unsubscribeFromOtherData (key: string, callback: (data: any) => void): void {
const handlers = otherDataHandlers.get(key)
export function followeeDataUnsubscribe (topic: string, handler: (data: any) => void): void {
const handlers = followeeDataHandlers.get(topic)
if (handlers !== undefined) {
handlers.delete(callback)
handlers.delete(handler)
}
}

export function toggleFollowee (person: Ref<Person>): void {
export function toggleFollowee (person: Ref<Person> | undefined): void {
followee.update((p) => (p === person ? undefined : person))

const p = get(followee)
if (p !== undefined) {
const otherData = otherDataMap.get(p)
const f = get(followee)
if (f !== undefined) {
const otherData = personDataMap.get(f)
if (otherData !== undefined) {
for (const [key, data] of otherData) {
const handlers = otherDataHandlers.get(key)
for (const [topic, data] of otherData) {
const handlers = followeeDataHandlers.get(topic)
if (handlers !== undefined) {
for (const callback of handlers) {
callback(data)
for (const handler of handlers) {
handler(data)
}
}
}
}
} else {
for (const handlers of otherDataHandlers.values()) {
for (const callback of handlers) {
callback(undefined)
for (const handlers of followeeDataHandlers.values()) {
for (const handler of handlers) {
handler(undefined)
}
}
}
Expand All @@ -163,9 +163,9 @@ export function getFollowee (): Person | undefined {
return personMap.get(followeeId)
}

export function sendMyData (key: string, data: any, forceSend: boolean = false): void {
export function publishData (topic: string, data: any): void {
myData.update((map) => {
map.set(key, { lastUpdated: Date.now(), data, forceSend })
map.set(topic, { lastUpdated: Date.now(), data })
return map
})
}
1 change: 0 additions & 1 deletion plugins/presence-resources/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,4 @@ export interface PersonPresence {
export interface MyDataItem {
data: any
lastUpdated: number
forceSend: boolean
}
6 changes: 3 additions & 3 deletions plugins/presence/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ export const presencePlugin = plugin(presenceId, {
PresenceAvatars: '' as AnyComponent
},
function: {
SendMyData: '' as Resource<(key: string, data: any) => void>,
PublishData: '' as Resource<(topic: string, data: any) => void>,
GetFollowee: '' as Resource<() => Person | undefined>,
SubscribeToOtherData: '' as Resource<(key: string, callback: (data: any) => void) => void>,
UnsubscribeFromOtherData: '' as Resource<(key: string, callback: (data: any) => void) => void>
FolloweeDataSubscribe: '' as Resource<(topic: string, handler: (data: any) => void) => void>,
FolloweeDataUnsubscribe: '' as Resource<(topic: string, handler: (data: any) => void) => void>
}
})

Expand Down
Loading

0 comments on commit c31902d

Please sign in to comment.