diff --git a/packages/live-status-gateway/src/collections/adLibActionsHandler.ts b/packages/live-status-gateway/src/collections/adLibActionsHandler.ts index 256d1fb571..8411660e64 100644 --- a/packages/live-status-gateway/src/collections/adLibActionsHandler.ts +++ b/packages/live-status-gateway/src/collections/adLibActionsHandler.ts @@ -1,79 +1,56 @@ import { Logger } from 'winston' import { CoreHandler } from '../coreHandler' -import { CollectionBase, Collection, CollectionObserver } from '../wsHandler' +import { Collection, PickArr, PublicationCollection } from '../wsHandler' import { AdLibAction } from '@sofie-automation/corelib/dist/dataModel/AdlibAction' -import { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance' import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections' import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub' -import { AdLibActionId, RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids' -import { SelectedPartInstances } from './partInstancesHandler' +import { RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids' +import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' +import { CollectionHandlers } from '../liveStatusServer' + +const PLAYLIST_KEYS = ['currentPartInfo', 'nextPartInfo'] as const +type Playlist = PickArr export class AdLibActionsHandler - extends CollectionBase - implements Collection, CollectionObserver + extends PublicationCollection + implements Collection { - public observerName: string - private _curRundownId: RundownId | undefined - private _curPartInstance: DBPartInstance | undefined + private _currentRundownId: RundownId | undefined constructor(logger: Logger, coreHandler: CoreHandler) { - super(AdLibActionsHandler.name, CollectionName.AdLibActions, CorelibPubSub.adLibActions, logger, coreHandler) - this.observerName = this._name + super(CollectionName.AdLibActions, CorelibPubSub.adLibActions, logger, coreHandler) + } + + init(handlers: CollectionHandlers): void { + super.init(handlers) + + handlers.playlistHandler.subscribe(this.onPlaylistUpdate, PLAYLIST_KEYS) } - async changed(id: AdLibActionId, changeType: string): Promise { - this.logDocumentChange(id, changeType) - if (!this._collectionName) return - const col = this._core.getCollection(this._collectionName) - if (!col) throw new Error(`collection '${this._collectionName}' not found!`) - this._collectionData = col.find({ rundownId: this._curRundownId }) - await this.notify(this._collectionData) + protected changed(): void { + this.updateAndNotify() } - async update(source: string, data: SelectedPartInstances | undefined): Promise { - this.logUpdateReceived('partInstances', source) - const prevRundownId = this._curRundownId - this._curPartInstance = data ? data.current ?? data.next : undefined - this._curRundownId = this._curPartInstance ? this._curPartInstance.rundownId : undefined + private onPlaylistUpdate = (data: Playlist | undefined): void => { + this.logUpdateReceived('playlist') + const prevRundownId = this._currentRundownId - await new Promise(process.nextTick.bind(this)) - if (!this._collectionName) return - if (!this._publicationName) return - if (prevRundownId !== this._curRundownId) { - if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId) - if (this._dbObserver) this._dbObserver.stop() - if (this._curRundownId && this._curPartInstance) { - this._subscriptionId = await this._coreHandler.setupSubscription(this._publicationName, [ - this._curRundownId, - ]) - this._dbObserver = this._coreHandler.setupObserver(this._collectionName) - this._dbObserver.added = (id) => { - void this.changed(id, 'added').catch(this._logger.error) - } - this._dbObserver.changed = (id) => { - void this.changed(id, 'changed').catch(this._logger.error) - } - this._dbObserver.removed = (id) => { - void this.changed(id, 'removed').catch(this._logger.error) - } + const rundownPlaylist = data - const collection = this._core.getCollection(this._collectionName) - if (!collection) throw new Error(`collection '${this._collectionName}' not found!`) - this._collectionData = collection.find({ - rundownId: this._curRundownId, - }) - await this.notify(this._collectionData) + this._currentRundownId = rundownPlaylist?.currentPartInfo?.rundownId ?? rundownPlaylist?.nextPartInfo?.rundownId + + if (prevRundownId !== this._currentRundownId) { + this.stopSubscription() + if (this._currentRundownId) { + this.setupSubscription([this._currentRundownId]) } + // no need to trigger updateAndNotify() because the subscription will take care of this } } - // override notify to implement empty array handling - async notify(data: AdLibAction[] | undefined): Promise { - this.logNotifyingUpdate(data?.length) - if (data !== undefined) { - for (const observer of this._observers) { - await observer.update(this._name, data) - } - } + protected updateAndNotify(): void { + const col = this.getCollectionOrFail() + this._collectionData = col.find({ rundownId: this._currentRundownId }) + this.notify(this._collectionData) } } diff --git a/packages/live-status-gateway/src/collections/adLibsHandler.ts b/packages/live-status-gateway/src/collections/adLibsHandler.ts index e34fbcb11f..15ae8ce348 100644 --- a/packages/live-status-gateway/src/collections/adLibsHandler.ts +++ b/packages/live-status-gateway/src/collections/adLibsHandler.ts @@ -1,80 +1,55 @@ import { Logger } from 'winston' import { CoreHandler } from '../coreHandler' -import { CollectionBase, Collection, CollectionObserver } from '../wsHandler' +import { Collection, PickArr, PublicationCollection } from '../wsHandler' import { AdLibPiece } from '@sofie-automation/corelib/dist/dataModel/AdLibPiece' -import { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance' import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections' import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub' -import { PieceId, RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids' -import { SelectedPartInstances } from './partInstancesHandler' +import { RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids' +import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' +import { CollectionHandlers } from '../liveStatusServer' + +const PLAYLIST_KEYS = ['currentPartInfo', 'nextPartInfo'] as const +type Playlist = PickArr export class AdLibsHandler - extends CollectionBase - implements Collection, CollectionObserver + extends PublicationCollection + implements Collection { - public observerName: string - // private _core: CoreConnection private _currentRundownId: RundownId | undefined - private _currentPartInstance: DBPartInstance | undefined constructor(logger: Logger, coreHandler: CoreHandler) { - super(AdLibsHandler.name, CollectionName.AdLibPieces, CorelibPubSub.adLibPieces, logger, coreHandler) - this.observerName = this._name + super(CollectionName.AdLibPieces, CorelibPubSub.adLibPieces, logger, coreHandler) + } + + init(handlers: CollectionHandlers): void { + super.init(handlers) + + handlers.playlistHandler.subscribe(this.onPlaylistUpdate, PLAYLIST_KEYS) } - async changed(id: PieceId, changeType: string): Promise { - this.logDocumentChange(id, changeType) - if (!this._collectionName) return - const col = this._core.getCollection(this._collectionName) - if (!col) throw new Error(`collection '${this._collectionName}' not found!`) - this._collectionData = col.find({ rundownId: this._currentRundownId }) - await this.notify(this._collectionData) + changed(): void { + this.updateAndNotify() } - async update(source: string, data: SelectedPartInstances | undefined): Promise { - this.logUpdateReceived('partInstances', source) + private onPlaylistUpdate = (data: Playlist | undefined): void => { + this.logUpdateReceived('playlist') const prevRundownId = this._currentRundownId - this._currentPartInstance = data ? data.current ?? data.next : undefined - this._currentRundownId = this._currentPartInstance?.rundownId + const rundownPlaylist = data - await new Promise(process.nextTick.bind(this)) - if (!this._collectionName) return - if (!this._publicationName) return - if (prevRundownId !== this._currentRundownId) { - if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId) - if (this._dbObserver) this._dbObserver.stop() - if (this._currentRundownId && this._currentPartInstance) { - this._subscriptionId = await this._coreHandler.setupSubscription(this._publicationName, [ - this._currentRundownId, - ]) - this._dbObserver = this._coreHandler.setupObserver(this._collectionName) - this._dbObserver.added = (id) => { - void this.changed(id, 'added').catch(this._logger.error) - } - this._dbObserver.changed = (id) => { - void this.changed(id, 'changed').catch(this._logger.error) - } - this._dbObserver.removed = (id) => { - void this.changed(id, 'removed').catch(this._logger.error) - } + this._currentRundownId = rundownPlaylist?.currentPartInfo?.rundownId ?? rundownPlaylist?.nextPartInfo?.rundownId - const collection = this._core.getCollection(this._collectionName) - if (!collection) throw new Error(`collection '${this._collectionName}' not found!`) - this._collectionData = collection.find({ - rundownId: this._currentRundownId, - }) - await this.notify(this._collectionData) + if (prevRundownId !== this._currentRundownId) { + this.stopSubscription() + if (this._currentRundownId) { + this.setupSubscription([this._currentRundownId]) } + // no need to trigger updateAndNotify() because the subscription will take care of this } } - // override notify to implement empty array handling - async notify(data: AdLibPiece[] | undefined): Promise { - this.logNotifyingUpdate(data?.length) - if (data !== undefined) { - for (const observer of this._observers) { - await observer.update(this._name, data) - } - } + protected updateAndNotify(): void { + const collection = this.getCollectionOrFail() + this._collectionData = collection.find({ rundownId: this._currentRundownId }) + this.notify(this._collectionData) } } diff --git a/packages/live-status-gateway/src/collections/globalAdLibActionsHandler.ts b/packages/live-status-gateway/src/collections/globalAdLibActionsHandler.ts index 4ec34285b6..349ec2cbca 100644 --- a/packages/live-status-gateway/src/collections/globalAdLibActionsHandler.ts +++ b/packages/live-status-gateway/src/collections/globalAdLibActionsHandler.ts @@ -1,85 +1,63 @@ import { Logger } from 'winston' import { CoreHandler } from '../coreHandler' -import { CollectionBase, Collection, CollectionObserver } from '../wsHandler' +import { Collection, PickArr, PublicationCollection } from '../wsHandler' import { RundownBaselineAdLibAction } from '@sofie-automation/corelib/dist/dataModel/RundownBaselineAdLibAction' import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections' import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub' -import { RundownBaselineAdLibActionId, RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids' -import { SelectedPartInstances } from './partInstancesHandler' +import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' +import { CollectionHandlers } from '../liveStatusServer' +import { RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids' + +const PLAYLIST_KEYS = ['currentPartInfo', 'nextPartInfo'] as const +type Playlist = PickArr export class GlobalAdLibActionsHandler - extends CollectionBase< + extends PublicationCollection< RundownBaselineAdLibAction[], CorelibPubSub.rundownBaselineAdLibActions, CollectionName.RundownBaselineAdLibActions > - implements Collection, CollectionObserver + implements Collection { - public observerName: string private _currentRundownId: RundownId | undefined constructor(logger: Logger, coreHandler: CoreHandler) { super( - GlobalAdLibActionsHandler.name, CollectionName.RundownBaselineAdLibActions, CorelibPubSub.rundownBaselineAdLibActions, logger, coreHandler ) - this.observerName = this._name } - async changed(id: RundownBaselineAdLibActionId, changeType: string): Promise { - this.logDocumentChange(id, changeType) - if (!this._collectionName) return - const col = this._core.getCollection(this._collectionName) - if (!col) throw new Error(`collection '${this._collectionName}' not found!`) - this._collectionData = col.find({ rundownId: this._currentRundownId }) - await this.notify(this._collectionData) + init(handlers: CollectionHandlers): void { + super.init(handlers) + + handlers.playlistHandler.subscribe(this.onPlaylistUpdate, PLAYLIST_KEYS) + } + + changed(): void { + this.updateAndNotify() } - async update(source: string, data: SelectedPartInstances | undefined): Promise { - this.logUpdateReceived('partInstances', source) + private onPlaylistUpdate = (data: Playlist | undefined): void => { + this.logUpdateReceived('playlist') const prevRundownId = this._currentRundownId - const partInstance = data ? data.current ?? data.next : undefined - this._currentRundownId = partInstance?.rundownId + const rundownPlaylist = data + + this._currentRundownId = rundownPlaylist?.currentPartInfo?.rundownId ?? rundownPlaylist?.nextPartInfo?.rundownId - await new Promise(process.nextTick.bind(this)) - if (!this._collectionName) return - if (!this._publicationName) return if (prevRundownId !== this._currentRundownId) { - if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId) - if (this._dbObserver) this._dbObserver.stop() + this.stopSubscription() if (this._currentRundownId) { - this._subscriptionId = await this._coreHandler.setupSubscription(this._publicationName, [ - this._currentRundownId, - ]) - this._dbObserver = this._coreHandler.setupObserver(this._collectionName) - this._dbObserver.added = (id) => { - void this.changed(id, 'added').catch(this._logger.error) - } - this._dbObserver.changed = (id) => { - void this.changed(id, 'changed').catch(this._logger.error) - } - this._dbObserver.removed = (id) => { - void this.changed(id, 'removed').catch(this._logger.error) - } - - const collection = this._core.getCollection(this._collectionName) - if (!collection) throw new Error(`collection '${this._collectionName}' not found!`) - this._collectionData = collection.find({ rundownId: this._currentRundownId }) - await this.notify(this._collectionData) + this.setupSubscription([this._currentRundownId]) } } } - // override notify to implement empty array handling - async notify(data: RundownBaselineAdLibAction[] | undefined): Promise { - this.logNotifyingUpdate(data?.length) - if (data !== undefined) { - for (const observer of this._observers) { - await observer.update(this._name, data) - } - } + protected updateAndNotify(): void { + const collection = this.getCollectionOrFail() + this._collectionData = collection.find({ rundownId: this._currentRundownId }) + this.notify(this._collectionData) } } diff --git a/packages/live-status-gateway/src/collections/globalAdLibsHandler.ts b/packages/live-status-gateway/src/collections/globalAdLibsHandler.ts index 2f8f8fb662..fd449dd25e 100644 --- a/packages/live-status-gateway/src/collections/globalAdLibsHandler.ts +++ b/packages/live-status-gateway/src/collections/globalAdLibsHandler.ts @@ -1,85 +1,58 @@ import { Logger } from 'winston' import { CoreHandler } from '../coreHandler' -import { CollectionBase, Collection, CollectionObserver } from '../wsHandler' +import { Collection, PickArr, PublicationCollection } from '../wsHandler' import { RundownBaselineAdLibItem } from '@sofie-automation/corelib/dist/dataModel/RundownBaselineAdLibPiece' import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections' import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub' -import { PieceId, RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids' -import { SelectedPartInstances } from './partInstancesHandler' +import { RundownId } from '@sofie-automation/corelib/dist/dataModel/Ids' +import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' +import { CollectionHandlers } from '../liveStatusServer' + +const PLAYLIST_KEYS = ['currentPartInfo', 'nextPartInfo'] as const +type Playlist = PickArr export class GlobalAdLibsHandler - extends CollectionBase< + extends PublicationCollection< RundownBaselineAdLibItem[], CorelibPubSub.rundownBaselineAdLibPieces, CollectionName.RundownBaselineAdLibPieces > - implements Collection, CollectionObserver + implements Collection { - public observerName: string private _currentRundownId: RundownId | undefined constructor(logger: Logger, coreHandler: CoreHandler) { - super( - GlobalAdLibsHandler.name, - CollectionName.RundownBaselineAdLibPieces, - CorelibPubSub.rundownBaselineAdLibPieces, - logger, - coreHandler - ) - this.observerName = this._name + super(CollectionName.RundownBaselineAdLibPieces, CorelibPubSub.rundownBaselineAdLibPieces, logger, coreHandler) } - async changed(id: PieceId, changeType: string): Promise { - this.logDocumentChange(id, changeType) - if (!this._collectionName) return - const collection = this._core.getCollection(this._collectionName) - if (!collection) throw new Error(`collection '${this._collectionName}' not found!`) - this._collectionData = collection.find({ rundownId: this._currentRundownId }) - await this.notify(this._collectionData) + init(handlers: CollectionHandlers): void { + super.init(handlers) + + handlers.playlistHandler.subscribe(this.onPlaylistUpdate, PLAYLIST_KEYS) + } + + changed(): void { + this.updateAndNotify() } - async update(source: string, data: SelectedPartInstances | undefined): Promise { - this.logUpdateReceived('globalAdLibs', source) + private onPlaylistUpdate = (data: Playlist | undefined): void => { + this.logUpdateReceived('playlist') const prevRundownId = this._currentRundownId - const partInstance = data ? data.current ?? data.next : undefined - this._currentRundownId = partInstance?.rundownId + const rundownPlaylist = data + + this._currentRundownId = rundownPlaylist?.currentPartInfo?.rundownId ?? rundownPlaylist?.nextPartInfo?.rundownId - await new Promise(process.nextTick.bind(this)) - if (!this._collectionName) return - if (!this._publicationName) return if (prevRundownId !== this._currentRundownId) { - if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId) - if (this._dbObserver) this._dbObserver.stop() + this.stopSubscription() if (this._currentRundownId) { - this._subscriptionId = await this._coreHandler.setupSubscription(this._publicationName, [ - this._currentRundownId, - ]) - this._dbObserver = this._coreHandler.setupObserver(this._collectionName) - this._dbObserver.added = (id) => { - void this.changed(id, 'added').catch(this._logger.error) - } - this._dbObserver.changed = (id) => { - void this.changed(id, 'changed').catch(this._logger.error) - } - this._dbObserver.removed = (id) => { - void this.changed(id, 'removed').catch(this._logger.error) - } - - const collection = this._core.getCollection(this._collectionName) - if (!collection) throw new Error(`collection '${this._collectionName}' not found!`) - this._collectionData = collection.find({ rundownId: this._currentRundownId }) - await this.notify(this._collectionData) + this.setupSubscription([this._currentRundownId]) } } } - // override notify to implement empty array handling - async notify(data: RundownBaselineAdLibItem[] | undefined): Promise { - this.logNotifyingUpdate(data?.length) - if (data !== undefined) { - for (const observer of this._observers) { - await observer.update(this._name, data) - } - } + protected updateAndNotify(): void { + const collection = this.getCollectionOrFail() + this._collectionData = collection.find({ rundownId: this._currentRundownId }) + this.notify(this._collectionData) } } diff --git a/packages/live-status-gateway/src/collections/partHandler.ts b/packages/live-status-gateway/src/collections/partHandler.ts index c2df5416da..0af5bb9ee7 100644 --- a/packages/live-status-gateway/src/collections/partHandler.ts +++ b/packages/live-status-gateway/src/collections/partHandler.ts @@ -1,102 +1,71 @@ import { Logger } from 'winston' import { CoreHandler } from '../coreHandler' -import { CollectionBase, Collection, CollectionObserver } from '../wsHandler' +import { Collection, PickArr, PublicationCollection } from '../wsHandler' import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' import { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance' import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part' -import { PartInstancesHandler, SelectedPartInstances } from './partInstancesHandler' -import { PlaylistHandler } from './playlistHandler' +import { SelectedPartInstances } from './partInstancesHandler' import { PartsHandler } from './partsHandler' import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections' import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub' -import { PartId } from '@sofie-automation/corelib/dist/dataModel/Ids' -import areElementsShallowEqual from '@sofie-automation/shared-lib/dist/lib/isShallowEqual' +import { CollectionHandlers } from '../liveStatusServer' + +const PLAYLIST_KEYS = ['_id', 'rundownIdsInOrder'] as const +type Playlist = PickArr + +const PART_INSTANCES_KEYS = ['current'] as const +type PartInstances = PickArr export class PartHandler - extends CollectionBase - implements Collection, CollectionObserver, CollectionObserver + extends PublicationCollection + implements Collection { - public observerName: string - private _activePlaylist: DBRundownPlaylist | undefined + private _activePlaylist: Playlist | undefined private _currentPartInstance: DBPartInstance | undefined constructor(logger: Logger, coreHandler: CoreHandler, private _partsHandler: PartsHandler) { - super(PartHandler.name, CollectionName.Parts, CorelibPubSub.parts, logger, coreHandler) - this.observerName = this._name + super(CollectionName.Parts, CorelibPubSub.parts, logger, coreHandler) + } + + init(handlers: CollectionHandlers): void { + super.init(handlers) + + handlers.playlistHandler.subscribe(this.onPlaylistUpdate, PLAYLIST_KEYS) + handlers.partInstancesHandler.subscribe(this.onPartInstanceUpdate, PART_INSTANCES_KEYS) } - async changed(id: PartId, changeType: string): Promise { - this.logDocumentChange(id, changeType) - if (!this._collectionName) return - const collection = this._core.getCollection(this._collectionName) - if (!collection) throw new Error(`collection '${this._collectionName}' not found!`) + changed(): void { + const collection = this.getCollectionOrFail() const allParts = collection.find(undefined) - await this._partsHandler.setParts(allParts) + this._partsHandler.setParts(allParts) if (this._collectionData) { this._collectionData = collection.findOne(this._collectionData._id) - await this.notify(this._collectionData) + this.notify(this._collectionData) } } - async update(source: string, data: DBRundownPlaylist | SelectedPartInstances | undefined): Promise { - const prevRundownIds = this._activePlaylist?.rundownIdsInOrder ?? [] - const prevCurPartInstance = this._currentPartInstance + private onPlaylistUpdate = (rundownPlaylist: Playlist | undefined): void => { + this.logUpdateReceived('playlist', `rundownPlaylistId ${rundownPlaylist?._id}`) + this._activePlaylist = rundownPlaylist - const rundownPlaylist = data ? (data as DBRundownPlaylist) : undefined - const partInstances = data as SelectedPartInstances - switch (source) { - case PlaylistHandler.name: - this.logUpdateReceived('playlist', source, `rundownPlaylistId ${rundownPlaylist?._id}`) - this._activePlaylist = rundownPlaylist - break - case PartInstancesHandler.name: - this.logUpdateReceived('partInstances', source) - this._currentPartInstance = partInstances.current - break - default: - throw new Error(`${this._name} received unsupported update from ${source}}`) + this.stopSubscription() + if (this._activePlaylist) { + const rundownIds = this._activePlaylist.rundownIdsInOrder + this.setupSubscription(rundownIds, null) } + } - await new Promise(process.nextTick.bind(this)) - if (!this._collectionName) return - if (!this._publicationName) return - const rundownsChanged = !areElementsShallowEqual(this._activePlaylist?.rundownIdsInOrder ?? [], prevRundownIds) - if (rundownsChanged) { - if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId) - if (this._dbObserver) this._dbObserver.stop() - if (this._activePlaylist) { - const rundownIds = this._activePlaylist.rundownIdsInOrder - this._subscriptionId = await this._coreHandler.setupSubscription( - this._publicationName, - rundownIds, - null - ) - this._dbObserver = this._coreHandler.setupObserver(this._collectionName) - this._dbObserver.added = (id) => { - void this.changed(id, 'added').catch(this._logger.error) - } - this._dbObserver.changed = (id) => { - void this.changed(id, 'changed').catch(this._logger.error) - } - this._dbObserver.removed = (id) => { - void this.changed(id, 'removed').catch(this._logger.error) - } - } - } - const collection = this._core.getCollection(this._collectionName) - if (rundownsChanged) { - const allParts = collection.find(undefined) - await this._partsHandler.setParts(allParts) - } - if (prevCurPartInstance !== this._currentPartInstance) { - this._logger.debug( - `${this._name} found updated partInstances with current part ${this._activePlaylist?.currentPartInfo?.partInstanceId}` - ) - if (!collection) throw new Error(`collection '${this._collectionName}' not found!`) - if (this._currentPartInstance) { - this._collectionData = collection.findOne(this._currentPartInstance.part._id) - await this.notify(this._collectionData) - } + private onPartInstanceUpdate = (partInstances: PartInstances | SelectedPartInstances | undefined): void => { + if (!partInstances) return + + this.logUpdateReceived('partInstances') + this._currentPartInstance = partInstances.current + + const collection = this.getCollectionOrFail() + + if (this._currentPartInstance) { + this._collectionData = collection.findOne(this._currentPartInstance.part._id) + this.notify(this._collectionData) } } } diff --git a/packages/live-status-gateway/src/collections/partInstancesHandler.ts b/packages/live-status-gateway/src/collections/partInstancesHandler.ts index acca11e154..0597800745 100644 --- a/packages/live-status-gateway/src/collections/partInstancesHandler.ts +++ b/packages/live-status-gateway/src/collections/partInstancesHandler.ts @@ -1,6 +1,6 @@ import { Logger } from 'winston' import { CoreHandler } from '../coreHandler' -import { CollectionBase, Collection, CollectionObserver } from '../wsHandler' +import { Collection, PickArr, PublicationCollection } from '../wsHandler' import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' import { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance' import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections' @@ -8,7 +8,8 @@ import areElementsShallowEqual from '@sofie-automation/shared-lib/dist/lib/isSha import _ = require('underscore') import throttleToNextTick from '@sofie-automation/shared-lib/dist/lib/throttleToNextTick' import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub' -import { PartInstanceId, RundownId, RundownPlaylistActivationId } from '@sofie-automation/corelib/dist/dataModel/Ids' +import { RundownId, RundownPlaylistActivationId } from '@sofie-automation/corelib/dist/dataModel/Ids' +import { CollectionHandlers } from '../liveStatusServer' export interface SelectedPartInstances { previous: DBPartInstance | undefined @@ -18,24 +19,31 @@ export interface SelectedPartInstances { inCurrentSegment: DBPartInstance[] } +const PLAYLIST_KEYS = [ + '_id', + 'activationId', + 'previousPartInfo', + 'currentPartInfo', + 'nextPartInfo', + 'rundownIdsInOrder', +] as const +type Playlist = PickArr + export class PartInstancesHandler - extends CollectionBase - implements Collection, CollectionObserver + extends PublicationCollection + implements Collection { - public observerName: string - private _currentPlaylist: DBRundownPlaylist | undefined + private _currentPlaylist: Playlist | undefined private _rundownIds: RundownId[] = [] private _activationId: RundownPlaylistActivationId | undefined - private _subscriptionPending = false private _throttledUpdateAndNotify = throttleToNextTick(() => { this.updateCollectionData() - this.notify(this._collectionData).catch(this._logger.error) + this.notify(this._collectionData) }) constructor(logger: Logger, coreHandler: CoreHandler) { - super(PartInstancesHandler.name, CollectionName.PartInstances, CorelibPubSub.partInstances, logger, coreHandler) - this.observerName = this._name + super(CollectionName.PartInstances, CorelibPubSub.partInstances, logger, coreHandler) this._collectionData = { previous: undefined, current: undefined, @@ -45,17 +53,19 @@ export class PartInstancesHandler } } - async changed(id: PartInstanceId, changeType: string): Promise { - this.logDocumentChange(id, changeType) - if (!this._collectionName || this._subscriptionPending) return + init(handlers: CollectionHandlers): void { + super.init(handlers) + + handlers.playlistHandler.subscribe(this.onPlaylistUpdate, PLAYLIST_KEYS) + } + changed(): void { this._throttledUpdateAndNotify() } private updateCollectionData(): boolean { - if (!this._collectionName || !this._collectionData) return false - const collection = this._core.getCollection(this._collectionName) - if (!collection) throw new Error(`collection '${this._collectionName}' not found!`) + if (!this._collectionData) return false + const collection = this.getCollectionOrFail() const previousPartInstance = this._currentPlaylist?.previousPartInfo?.partInstanceId ? collection.findOne(this._currentPlaylist.previousPartInfo.partInstanceId) : undefined @@ -99,25 +109,25 @@ export class PartInstancesHandler } private clearCollectionData() { - if (!this._collectionName || !this._collectionData) return - this._collectionData.previous = undefined - this._collectionData.current = undefined - this._collectionData.next = undefined - this._collectionData.firstInSegmentPlayout = undefined - this._collectionData.inCurrentSegment = [] + if (!this._collectionData) return + this._collectionData = { + previous: undefined, + current: undefined, + next: undefined, + firstInSegmentPlayout: undefined, + inCurrentSegment: [], + } } - async update(source: string, data: DBRundownPlaylist | undefined): Promise { - const prevRundownIds = this._rundownIds.map((rid) => rid) + private onPlaylistUpdate = (data: Playlist | undefined): void => { + const prevRundownIds = [...this._rundownIds] const prevActivationId = this._activationId this.logUpdateReceived( 'playlist', - source, `rundownPlaylistId ${data?._id}, active ${data?.activationId ? true : false}` ) this._currentPlaylist = data - if (!this._collectionName) return this._rundownIds = this._currentPlaylist ? this._currentPlaylist.rundownIdsInOrder : [] this._activationId = this._currentPlaylist?.activationId @@ -125,49 +135,27 @@ export class PartInstancesHandler const sameSubscription = areElementsShallowEqual(this._rundownIds, prevRundownIds) && prevActivationId === this._activationId if (!sameSubscription) { - await new Promise(process.nextTick.bind(this)) - if (!this._collectionName) return - if (!this._publicationName) return - if (!this._currentPlaylist) return - if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId) - this._subscriptionPending = true - this._subscriptionId = await this._coreHandler.setupSubscription( - this._publicationName, - this._rundownIds, - this._activationId - ) - this._subscriptionPending = false - this._dbObserver = this._coreHandler.setupObserver(this._collectionName) - this._dbObserver.added = (id) => { - void this.changed(id, 'added').catch(this._logger.error) - } - this._dbObserver.changed = (id) => { - void this.changed(id, 'changed').catch(this._logger.error) - } - this._dbObserver.removed = (id) => { - void this.changed(id, 'removed').catch(this._logger.error) - } - - await this.updateAndNotify() + this.stopSubscription() + this.setupSubscription(this._rundownIds, this._activationId) } else if (this._subscriptionId) { - await this.updateAndNotify() + this.updateAndNotify() } else { - await this.clearAndNotify() + this.clearAndNotify() } } else { - await this.clearAndNotify() + this.clearAndNotify() } } - private async clearAndNotify() { + private clearAndNotify() { this.clearCollectionData() - await this.notify(this._collectionData) + this.notify(this._collectionData) } - private async updateAndNotify() { + private updateAndNotify() { const hasAnythingChanged = this.updateCollectionData() if (hasAnythingChanged) { - await this.notify(this._collectionData) + this.notify(this._collectionData) } } } diff --git a/packages/live-status-gateway/src/collections/partsHandler.ts b/packages/live-status-gateway/src/collections/partsHandler.ts index aece1f9c6d..1d12956527 100644 --- a/packages/live-status-gateway/src/collections/partsHandler.ts +++ b/packages/live-status-gateway/src/collections/partsHandler.ts @@ -3,36 +3,21 @@ import { CoreHandler } from '../coreHandler' import { CollectionBase, Collection } from '../wsHandler' import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part' import _ = require('underscore') -import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub' import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections' const THROTTLE_PERIOD_MS = 200 -export class PartsHandler - extends CollectionBase - implements Collection -{ - public observerName: string - private throttledNotify: (data: DBPart[]) => Promise +export class PartsHandler extends CollectionBase implements Collection { + private throttledNotify: (data: DBPart[]) => void constructor(logger: Logger, coreHandler: CoreHandler) { - super(PartsHandler.name, CollectionName.Parts, CorelibPubSub.parts, logger, coreHandler) - this.observerName = this._name + super(CollectionName.Parts, logger, coreHandler) this.throttledNotify = _.throttle(this.notify.bind(this), THROTTLE_PERIOD_MS, { leading: true, trailing: true }) } - async setParts(parts: DBPart[]): Promise { + setParts(parts: DBPart[]): void { this.logUpdateReceived('parts', parts.length) this._collectionData = parts - await this.throttledNotify(this._collectionData) - } - - async notify(data: DBPart[] | undefined): Promise { - this.logNotifyingUpdate(this._collectionData?.length) - if (data !== undefined) { - for (const observer of this._observers) { - await observer.update(this._name, data) - } - } + this.throttledNotify(this._collectionData) } } diff --git a/packages/live-status-gateway/src/collections/pieceInstancesHandler.ts b/packages/live-status-gateway/src/collections/pieceInstancesHandler.ts index 74bff309e2..30fdf1f280 100644 --- a/packages/live-status-gateway/src/collections/pieceInstancesHandler.ts +++ b/packages/live-status-gateway/src/collections/pieceInstancesHandler.ts @@ -1,26 +1,42 @@ import { Logger } from 'winston' import { CoreHandler } from '../coreHandler' -import { CollectionBase, Collection, CollectionObserver } from '../wsHandler' +import { Collection, PickArr, PublicationCollection } from '../wsHandler' import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' import { PieceInstance } from '@sofie-automation/corelib/dist/dataModel/PieceInstance' -import { unprotectString } from '@sofie-automation/corelib/dist/protectedString' import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections' import areElementsShallowEqual from '@sofie-automation/shared-lib/dist/lib/isShallowEqual' -import throttleToNextTick from '@sofie-automation/shared-lib/dist/lib/throttleToNextTick' import _ = require('underscore') import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub' -import { PartInstanceId, PieceInstanceId } from '@sofie-automation/corelib/dist/dataModel/Ids' +import { PartInstanceId } from '@sofie-automation/corelib/dist/dataModel/Ids' import { + PieceInstanceWithTimings, processAndPrunePieceInstanceTimings, resolvePrunedPieceInstance, } from '@sofie-automation/corelib/dist/playout/processAndPrune' -import { ShowStyleBaseExt, ShowStyleBaseHandler } from './showStyleBaseHandler' -import { PlaylistHandler } from './playlistHandler' +import { ShowStyleBaseExt } from './showStyleBaseHandler' import { SourceLayers } from '@sofie-automation/corelib/dist/dataModel/ShowStyleBase' -import { PartInstancesHandler, SelectedPartInstances } from './partInstancesHandler' +import { SelectedPartInstances } from './partInstancesHandler' import { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance' +import { arePropertiesDeepEqual } from '../helpers/equality' +import { CollectionHandlers } from '../liveStatusServer' import { ReadonlyDeep } from 'type-fest' +const PLAYLIST_KEYS = [ + '_id', + 'activationId', + 'currentPartInfo', + 'nextPartInfo', + 'previousPartInfo', + 'rundownIdsInOrder', +] as const +type Playlist = PickArr + +const PART_INSTANCES_KEYS = ['previous', 'current'] as const +type PartInstances = PickArr + +const SHOW_STYLE_BASE_KEYS = ['sourceLayers'] as const +type ShowStyle = PickArr + export type PieceInstanceMin = Omit, 'reportedStartedPlayback' | 'reportedStoppedPlayback'> export interface SelectedPieceInstances { @@ -35,30 +51,16 @@ export interface SelectedPieceInstances { } export class PieceInstancesHandler - extends CollectionBase - implements Collection, CollectionObserver + extends PublicationCollection + implements Collection { - public observerName: string - private _currentPlaylist: DBRundownPlaylist | undefined + private _currentPlaylist: Playlist | undefined private _partInstanceIds: PartInstanceId[] = [] - private _activationId: string | undefined - private _subscriptionPending = false private _sourceLayers: SourceLayers = {} - private _partInstances: SelectedPartInstances | undefined - - private _throttledUpdateAndNotify = throttleToNextTick(() => { - this.updateAndNotify().catch(this._logger.error) - }) + private _partInstances: PartInstances | undefined constructor(logger: Logger, coreHandler: CoreHandler) { - super( - PieceInstancesHandler.name, - CollectionName.PieceInstances, - CorelibPubSub.pieceInstances, - logger, - coreHandler - ) - this.observerName = this._name + super(CollectionName.PieceInstances, CorelibPubSub.pieceInstances, logger, coreHandler) this._collectionData = { active: [], currentPartInstance: [], @@ -66,17 +68,23 @@ export class PieceInstancesHandler } } - async changed(id: PieceInstanceId, changeType: string): Promise { - this.logDocumentChange(id, changeType) - if (!this._collectionName || this._subscriptionPending) return - this._throttledUpdateAndNotify() + init(handlers: CollectionHandlers): void { + super.init(handlers) + + handlers.playlistHandler.subscribe(this.onPlaylistUpdate, PLAYLIST_KEYS) + handlers.partInstancesHandler.subscribe(this.onPartInstancesUpdate, PART_INSTANCES_KEYS) + handlers.showStyleBaseHandler.subscribe(this.onShowStyleBaseUpdate, SHOW_STYLE_BASE_KEYS) + } + + changed(): void { + this.updateAndNotify() } private processAndPrunePieceInstanceTimings( partInstance: DBPartInstance | undefined, pieceInstances: PieceInstance[], filterActive: boolean - ): ReadonlyDeep[] { + ): PieceInstanceWithTimings[] { // Approximate when 'now' is in the PartInstance, so that any adlibbed Pieces will be timed roughly correctly const partStarted = partInstance?.timings?.plannedStartedPlayback const nowInPart = partStarted === undefined ? 0 : Date.now() - partStarted @@ -104,9 +112,8 @@ export class PieceInstancesHandler } private updateCollectionData(): boolean { - if (!this._collectionName || !this._collectionData) return false - const collection = this._core.getCollection(this._collectionName) - if (!collection) throw new Error(`collection '${this._collectionName}' not found!`) + if (!this._collectionData) return false + const collection = this.getCollectionOrFail() const inPreviousPartInstance = this._currentPlaylist?.previousPartInfo?.partInstanceId ? this.processAndPrunePieceInstanceTimings( @@ -145,20 +152,35 @@ export class PieceInstancesHandler hasAnythingChanged = true } if ( - !_.isEqual(this._collectionData.currentPartInstance, inCurrentPartInstance) && - (this._collectionData.currentPartInstance.length !== inCurrentPartInstance.length || - this._collectionData.currentPartInstance.some((pieceInstance, index) => { - return !arePropertiesDeepEqual>( - inCurrentPartInstance[index], - pieceInstance, - ['reportedStartedPlayback', 'reportedStoppedPlayback'] - ) - })) + this._collectionData.currentPartInstance.length !== inCurrentPartInstance.length || + this._collectionData.currentPartInstance.some((pieceInstance, index) => { + return !arePropertiesDeepEqual(inCurrentPartInstance[index], pieceInstance, [ + 'plannedStartedPlayback', + 'plannedStoppedPlayback', + 'reportedStartedPlayback', + 'reportedStoppedPlayback', + 'resolvedEndCap', + 'priority', + ]) + }) ) { + this._logger.debug('xcur', { prev: this._collectionData.currentPartInstance, cur: inCurrentPartInstance }) this._collectionData.currentPartInstance = inCurrentPartInstance hasAnythingChanged = true } - if (!_.isEqual(this._collectionData.nextPartInstance, inNextPartInstance)) { + if ( + this._collectionData.nextPartInstance.length !== inNextPartInstance.length || + this._collectionData.nextPartInstance.some((pieceInstance, index) => { + return !arePropertiesDeepEqual(inNextPartInstance[index], pieceInstance, [ + 'plannedStartedPlayback', + 'plannedStoppedPlayback', + 'reportedStartedPlayback', + 'reportedStoppedPlayback', + 'resolvedEndCap', + 'priority', + ]) + }) + ) { this._collectionData.nextPartInstance = inNextPartInstance hasAnythingChanged = true } @@ -166,122 +188,70 @@ export class PieceInstancesHandler } private clearCollectionData() { - if (!this._collectionName || !this._collectionData) return + if (!this._collectionData) return this._collectionData.active = [] this._collectionData.currentPartInstance = [] this._collectionData.nextPartInstance = [] } - async update( - source: string, - data: DBRundownPlaylist | ShowStyleBaseExt | SelectedPartInstances | undefined - ): Promise { - switch (source) { - case PlaylistHandler.name: - return this.updateRundownPlaylist(source, data as DBRundownPlaylist | undefined) - case ShowStyleBaseHandler.name: { - this.logUpdateReceived('showStyleBase', source) - const showStyleBaseExt = data as ShowStyleBaseExt | undefined - this._sourceLayers = showStyleBaseExt?.sourceLayers ?? {} - this._throttledUpdateAndNotify() - break - } - case PartInstancesHandler.name: { - this.logUpdateReceived('partInstances', source) - this._partInstances = data as SelectedPartInstances - this._throttledUpdateAndNotify() - break - } - default: - throw new Error(`${this._name} received unsupported update from ${source}}`) - } + private onShowStyleBaseUpdate = (showStyleBase: ShowStyle | undefined): void => { + this.logUpdateReceived('showStyleBase') + this._sourceLayers = showStyleBase?.sourceLayers ?? {} + this.updateAndNotify() } - private async updateRundownPlaylist(source: string, data: DBRundownPlaylist | undefined): Promise { + private onPartInstancesUpdate = (partInstances: PartInstances | undefined): void => { + this.logUpdateReceived('partInstances') + this._partInstances = partInstances + this.updateAndNotify() + } + + private onPlaylistUpdate = (playlist: Playlist | undefined): void => { + this.logUpdateReceived('playlist', `rundownPlaylistId ${playlist?._id}, active ${!!playlist?.activationId}`) + const prevPartInstanceIds = this._partInstanceIds - const prevActivationId = this._activationId + const prevPlaylist = this._currentPlaylist - this.logUpdateReceived('playlist', source, `rundownPlaylistId ${data?._id}, active ${!!data?.activationId}`) - this._currentPlaylist = data - if (!this._collectionName) return + this._currentPlaylist = playlist this._partInstanceIds = this._currentPlaylist - ? _.compact([ - this._currentPlaylist.previousPartInfo?.partInstanceId, - this._currentPlaylist.nextPartInfo?.partInstanceId, - this._currentPlaylist.currentPartInfo?.partInstanceId, - ]) + ? _.compact( + [ + this._currentPlaylist.previousPartInfo?.partInstanceId, + this._currentPlaylist.nextPartInfo?.partInstanceId, + this._currentPlaylist.currentPartInfo?.partInstanceId, + ].sort() + ) : [] - this._activationId = unprotectString(this._currentPlaylist?.activationId) - if (this._currentPlaylist && this._partInstanceIds.length && this._activationId) { + if (this._currentPlaylist && this._partInstanceIds.length && this._currentPlaylist?.activationId) { const sameSubscription = areElementsShallowEqual(this._partInstanceIds, prevPartInstanceIds) && - prevActivationId === this._activationId + areElementsShallowEqual( + prevPlaylist?.rundownIdsInOrder ?? [], + this._currentPlaylist.rundownIdsInOrder + ) && + prevPlaylist?.activationId === this._currentPlaylist?.activationId if (!sameSubscription) { - await new Promise(process.nextTick.bind(this)) - if (!this._collectionName) return - if (!this._publicationName) return - if (!this._currentPlaylist) return - if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId) - this._subscriptionPending = true - this._subscriptionId = await this._coreHandler.setupSubscription( - this._publicationName, - this._currentPlaylist.rundownIdsInOrder, - this._partInstanceIds, - {} - ) - this._subscriptionPending = false - this._dbObserver = this._coreHandler.setupObserver(this._collectionName) - this._dbObserver.added = (id) => { - void this.changed(id, 'added').catch(this._logger.error) - } - this._dbObserver.changed = (id) => { - void this.changed(id, 'changed').catch(this._logger.error) - } - this._dbObserver.removed = (id) => { - void this.changed(id, 'removed').catch(this._logger.error) - } - - await this.updateAndNotify() + this.setupSubscription(this._currentPlaylist.rundownIdsInOrder, this._partInstanceIds, {}) } else if (this._subscriptionId) { - await this.updateAndNotify() + this.updateAndNotify() } else { - await this.clearAndNotify() + this.clearAndNotify() } } else { - this.clearCollectionData() - await this.notify(this._collectionData) + this.clearAndNotify() } } - private async clearAndNotify() { + private clearAndNotify() { this.clearCollectionData() - await this.notify(this._collectionData) + this.notify(this._collectionData) } - private async updateAndNotify() { + private updateAndNotify() { const hasAnythingChanged = this.updateCollectionData() if (hasAnythingChanged) { - await this.notify(this._collectionData) + this.notify(this._collectionData) } } } - -function arePropertiesDeepEqual>(a: T, b: T, omitProperties: Array): boolean { - if (typeof a !== 'object' || a == null || typeof b !== 'object' || b == null) { - return false - } - - const keysA = Object.keys(a).filter((key) => !omitProperties.includes(key)) - const keysB = Object.keys(b).filter((key) => !omitProperties.includes(key)) - - if (keysA.length !== keysB.length) return false - - for (const key of keysA) { - if (!keysB.includes(key) || !_.isEqual(a[key], b[key])) { - return false - } - } - - return true -} diff --git a/packages/live-status-gateway/src/collections/playlistHandler.ts b/packages/live-status-gateway/src/collections/playlistHandler.ts index 3c81b0818c..4251cc874f 100644 --- a/packages/live-status-gateway/src/collections/playlistHandler.ts +++ b/packages/live-status-gateway/src/collections/playlistHandler.ts @@ -1,89 +1,61 @@ import { Logger } from 'winston' import { CoreHandler } from '../coreHandler' -import { CollectionBase, Collection } from '../wsHandler' +import { CollectionBase, Collection, PublicationCollection } from '../wsHandler' import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections' import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub' -import { RundownPlaylistId } from '@sofie-automation/corelib/dist/dataModel/Ids' +import { CollectionHandlers } from '../liveStatusServer' export class PlaylistsHandler - extends CollectionBase + extends CollectionBase implements Collection { - public observerName: string - constructor(logger: Logger, coreHandler: CoreHandler) { - super(PlaylistsHandler.name, CollectionName.RundownPlaylists, undefined, logger, coreHandler) - this.observerName = this._name + super(CollectionName.RundownPlaylists, logger, coreHandler) } - async setPlaylists(playlists: DBRundownPlaylist[]): Promise { + setPlaylists(playlists: DBRundownPlaylist[]): void { this.logUpdateReceived('playlists', playlists.length) this._collectionData = playlists - await this.notify(this._collectionData) - } - - // override notify to implement empty array handling - async notify(data: DBRundownPlaylist[] | undefined): Promise { - this.logNotifyingUpdate(this._collectionData?.length) - if (data !== undefined) { - for (const observer of this._observers) { - await observer.update(this._name, data) - } - } + this.notify(this._collectionData) } } export class PlaylistHandler - extends CollectionBase + extends PublicationCollection implements Collection { - public observerName: string private _playlistsHandler: PlaylistsHandler constructor(logger: Logger, coreHandler: CoreHandler) { - super( - PlaylistHandler.name, - CollectionName.RundownPlaylists, - CorelibPubSub.rundownPlaylists, - logger, - coreHandler - ) - this.observerName = this._name + super(CollectionName.RundownPlaylists, CorelibPubSub.rundownPlaylists, logger, coreHandler) this._playlistsHandler = new PlaylistsHandler(this._logger, this._coreHandler) } - async init(): Promise { - await super.init() - if (!this._studioId) return - if (!this._collectionName) return - if (!this._publicationName) return - this._subscriptionId = await this._coreHandler.setupSubscription(this._publicationName, null, [this._studioId]) - this._dbObserver = this._coreHandler.setupObserver(this._collectionName) - if (this._collectionName) { - const col = this._core.getCollection(this._collectionName) - if (!col) throw new Error(`collection '${this._collectionName}' not found!`) - const playlists = col.find(undefined) - this._collectionData = playlists.find((p) => p.activationId) - await this._playlistsHandler.setPlaylists(playlists) - this._dbObserver.added = (id) => { - void this.changed(id, 'added').catch(this._logger.error) - } - this._dbObserver.changed = (id) => { - void this.changed(id, 'changed').catch(this._logger.error) - } - } + init(handlers: CollectionHandlers): void { + super.init(handlers) + this.setupSubscription(null, [this._studioId]) } - async changed(id: RundownPlaylistId, changeType: string): Promise { - this.logDocumentChange(id, changeType) - if (!this._collectionName) return - const collection = this._core.getCollection(this._collectionName) - if (!collection) throw new Error(`collection '${this._collectionName}' not found!`) + changed(): void { + this.updateAndNotify() + } + + protected updateAndNotify(): void { + const collection = this.getCollectionOrFail() const playlists = collection.find(undefined) - await this._playlistsHandler.setPlaylists(playlists) - this._collectionData = playlists.find((p) => p.activationId) - await this.notify(this._collectionData) + this._playlistsHandler.setPlaylists(playlists) + + this.updateAndNotifyActivePlaylist(playlists) + } + + private updateAndNotifyActivePlaylist(playlists: DBRundownPlaylist[]) { + const prevActivePlaylist = this._collectionData + const activePlaylist = playlists.find((p) => p.activationId) + this._collectionData = activePlaylist + if (prevActivePlaylist !== activePlaylist) { + this.notify(this._collectionData) + } } get playlistsHandler(): PlaylistsHandler { diff --git a/packages/live-status-gateway/src/collections/rundownHandler.ts b/packages/live-status-gateway/src/collections/rundownHandler.ts index c59f077874..a29bd994e9 100644 --- a/packages/live-status-gateway/src/collections/rundownHandler.ts +++ b/packages/live-status-gateway/src/collections/rundownHandler.ts @@ -1,89 +1,69 @@ import { Logger } from 'winston' import { CoreHandler } from '../coreHandler' -import { CollectionBase, Collection, CollectionObserver } from '../wsHandler' +import { Collection, PickArr, PublicationCollection } from '../wsHandler' import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' import { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown' -import { unprotectString } from '@sofie-automation/shared-lib/dist/lib/protectedString' -import { PartInstancesHandler, SelectedPartInstances } from './partInstancesHandler' import { RundownId, RundownPlaylistId } from '@sofie-automation/corelib/dist/dataModel/Ids' import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections' -import { PlaylistHandler } from './playlistHandler' import { RundownsHandler } from './rundownsHandler' import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub' +import { unprotectString } from '@sofie-automation/server-core-integration' +import { CollectionHandlers } from '../liveStatusServer' + +const PLAYLIST_KEYS = ['_id', 'currentPartInfo', 'nextPartInfo'] as const +type Playlist = PickArr export class RundownHandler - extends CollectionBase - implements Collection, CollectionObserver, CollectionObserver + extends PublicationCollection + implements Collection { - public observerName: string private _currentPlaylistId: RundownPlaylistId | undefined private _currentRundownId: RundownId | undefined constructor(logger: Logger, coreHandler: CoreHandler, private _rundownsHandler?: RundownsHandler) { - super(RundownHandler.name, CollectionName.Rundowns, CorelibPubSub.rundownsInPlaylists, logger, coreHandler) - this.observerName = this._name + super(CollectionName.Rundowns, CorelibPubSub.rundownsInPlaylists, logger, coreHandler) } - async changed(id: RundownId, changeType: string): Promise { - this.logDocumentChange(id, changeType) - if (id !== this._currentRundownId) - throw new Error(`${this._name} received change with unexpected id ${id} !== ${this._currentRundownId}`) - if (!this._collectionName) return - const collection = this._core.getCollection(this._collectionName) - if (!collection) throw new Error(`collection '${this._collectionName}' not found!`) - await this._rundownsHandler?.setRundowns(collection.find(undefined)) - if (this._collectionData) this._collectionData = collection.findOne(this._collectionData._id) - await this.notify(this._collectionData) + init(handlers: CollectionHandlers): void { + super.init(handlers) + + handlers.playlistHandler.subscribe(this.onPlaylistUpdate, PLAYLIST_KEYS) } - async update(source: string, data: DBRundownPlaylist | SelectedPartInstances | undefined): Promise { + changed(): void { + this.updateAndNotify() + } + + protected updateAndNotify(): void { + const collection = this.getCollectionOrFail() + this._rundownsHandler?.setRundowns(collection.find(undefined)) + if (this._currentRundownId) { + this._collectionData = collection.findOne(this._currentRundownId) + } else { + this._collectionData = undefined + } + this.notify(this._collectionData) + } + + private onPlaylistUpdate = (data: Playlist | undefined): void => { const prevPlaylistId = this._currentPlaylistId const prevCurRundownId = this._currentRundownId - const rundownPlaylist = data ? (data as DBRundownPlaylist) : undefined - const partInstances = data as SelectedPartInstances - switch (source) { - case PlaylistHandler.name: - this.logUpdateReceived('playlist', source, unprotectString(rundownPlaylist?._id)) - this._currentPlaylistId = rundownPlaylist?._id - break - case PartInstancesHandler.name: - this.logUpdateReceived('partInstances', source) - this._currentRundownId = partInstances.current?.rundownId ?? partInstances.next?.rundownId - break - default: - throw new Error(`${this._name} received unsupported update from ${source}}`) - } + const rundownPlaylist = data + + this.logUpdateReceived('playlist', unprotectString(rundownPlaylist?._id)) + this._currentPlaylistId = rundownPlaylist?._id + this._currentRundownId = rundownPlaylist?.currentPartInfo?.rundownId ?? rundownPlaylist?.nextPartInfo?.rundownId - await new Promise(process.nextTick.bind(this)) - if (!this._collectionName) return - if (!this._publicationName) return if (prevPlaylistId !== this._currentPlaylistId) { - if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId) - if (this._dbObserver) this._dbObserver.stop() + this.stopSubscription() if (this._currentPlaylistId) { - this._subscriptionId = await this._coreHandler.setupSubscription(this._publicationName, [ - this._currentPlaylistId, - ]) - this._dbObserver = this._coreHandler.setupObserver(this._collectionName) - this._dbObserver.added = (id) => { - void this.changed(id, 'added').catch(this._logger.error) - } - this._dbObserver.changed = (id) => { - void this.changed(id, 'changed').catch(this._logger.error) - } + this.setupSubscription([this._currentPlaylistId]) } + return } - if (prevCurRundownId !== this._currentPlaylistId) { - const currentPlaylistId = this._currentRundownId - if (currentPlaylistId) { - const collection = this._core.getCollection(this._collectionName) - if (!collection) throw new Error(`collection '${this._collectionName}' not found!`) - const rundown = collection.findOne(currentPlaylistId) - if (!rundown) throw new Error(`rundown '${currentPlaylistId}' not found!`) - this._collectionData = rundown - } else this._collectionData = undefined - await this.notify(this._collectionData) + if (prevCurRundownId !== this._currentRundownId) { + this.updateAndNotify() } } } diff --git a/packages/live-status-gateway/src/collections/rundownsHandler.ts b/packages/live-status-gateway/src/collections/rundownsHandler.ts index 0ffb07422c..539c4c87df 100644 --- a/packages/live-status-gateway/src/collections/rundownsHandler.ts +++ b/packages/live-status-gateway/src/collections/rundownsHandler.ts @@ -5,29 +5,16 @@ import { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown' import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections' export class RundownsHandler - extends CollectionBase + extends CollectionBase implements Collection { - public observerName: string - constructor(logger: Logger, coreHandler: CoreHandler) { - super(RundownsHandler.name, CollectionName.Rundowns, undefined, logger, coreHandler) - this.observerName = this._name + super(CollectionName.Rundowns, logger, coreHandler) } - async setRundowns(rundowns: DBRundown[]): Promise { + setRundowns(rundowns: DBRundown[]): void { this.logUpdateReceived('rundowns', rundowns.length) this._collectionData = rundowns - await this.notify(this._collectionData) - } - - // override notify to implement empty array handling - async notify(data: DBRundown[] | undefined): Promise { - this.logNotifyingUpdate(this._collectionData?.length) - if (data !== undefined) { - for (const observer of this._observers) { - await observer.update(this._name, data) - } - } + this.notify(this._collectionData) } } diff --git a/packages/live-status-gateway/src/collections/segmentHandler.ts b/packages/live-status-gateway/src/collections/segmentHandler.ts index 830af41e0b..773c6126a2 100644 --- a/packages/live-status-gateway/src/collections/segmentHandler.ts +++ b/packages/live-status-gateway/src/collections/segmentHandler.ts @@ -1,101 +1,86 @@ import { Logger } from 'winston' import { CoreHandler } from '../coreHandler' -import { CollectionBase, Collection, CollectionObserver } from '../wsHandler' +import { Collection, PickArr, PublicationCollection } from '../wsHandler' import { DBSegment } from '@sofie-automation/corelib/dist/dataModel/Segment' -import { PartInstancesHandler, SelectedPartInstances } from './partInstancesHandler' +import { SelectedPartInstances } from './partInstancesHandler' import { RundownId, SegmentId } from '@sofie-automation/corelib/dist/dataModel/Ids' import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections' import areElementsShallowEqual from '@sofie-automation/shared-lib/dist/lib/isShallowEqual' import { SegmentsHandler } from './segmentsHandler' import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' -import { PlaylistHandler } from './playlistHandler' import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub' +import { CollectionHandlers } from '../liveStatusServer' + +const PLAYLIST_KEYS = ['rundownIdsInOrder'] as const +type Playlist = PickArr + +const PART_INSTANCES_KEYS = ['current'] as const +type PartInstances = PickArr export class SegmentHandler - extends CollectionBase - implements Collection, CollectionObserver, CollectionObserver + extends PublicationCollection + implements Collection { - public observerName: string private _currentSegmentId: SegmentId | undefined private _rundownIds: RundownId[] = [] constructor(logger: Logger, coreHandler: CoreHandler, private _segmentsHandler: SegmentsHandler) { - super(SegmentHandler.name, CollectionName.Segments, CorelibPubSub.segments, logger, coreHandler) - this.observerName = this._name + super(CollectionName.Segments, CorelibPubSub.segments, logger, coreHandler) + } + + init(handlers: CollectionHandlers): void { + super.init(handlers) + + handlers.playlistHandler.subscribe(this.onPlaylistUpdate, PLAYLIST_KEYS) + handlers.partInstancesHandler.subscribe(this.onPartInstancesUpdate, PART_INSTANCES_KEYS) + } + + changed(): void { + this.updateAndNotify() } - async changed(id: SegmentId, changeType: string): Promise { - this.logDocumentChange(id, changeType) - if (!this._collectionName) return - const collection = this._core.getCollection(this._collectionName) - if (!collection) throw new Error(`collection '${this._collectionName}' not found!`) + private updateAndNotify() { + const collection = this.getCollectionOrFail() const allSegments = collection.find(undefined) - await this._segmentsHandler.setSegments(allSegments) - await this.updateAndNotify() + this._segmentsHandler.setSegments(allSegments) + if (this._currentSegmentId && collection.findOne(this._currentSegmentId) !== this._collectionData) { + this.updateAndNotifyCurrentSegment() + } } - private async updateAndNotify() { - const collection = this._core.getCollection(this._collectionName) - const newData = this._currentSegmentId ? collection.findOne(this._currentSegmentId) : undefined - if (this._collectionData !== newData) { - this._collectionData = newData - await this.notify(this._collectionData) + private updateAndNotifyCurrentSegment() { + const collection = this.getCollectionOrFail() + if (this._currentSegmentId) { + this._collectionData = collection.findOne(this._currentSegmentId) + this.notify(this._collectionData) } } - async update(source: string, data: SelectedPartInstances | DBRundownPlaylist | undefined): Promise { + private onPlaylistUpdate = (playlist: Playlist | undefined): void => { const previousRundownIds = this._rundownIds - switch (source) { - case PartInstancesHandler.name: { - this.logUpdateReceived('partInstances', source) - const partInstanceMap = data as SelectedPartInstances - this._currentSegmentId = data ? partInstanceMap.current?.segmentId : undefined - break - } - case PlaylistHandler.name: { - this.logUpdateReceived('playlist', source) - this._rundownIds = (data as DBRundownPlaylist | undefined)?.rundownIdsInOrder ?? [] - break - } - default: - throw new Error(`${this._name} received unsupported update from ${source}}`) - } - await new Promise(process.nextTick.bind(this)) - if (!this._collectionName) return - if (!this._publicationName) return + this.logUpdateReceived('playlist') + this._rundownIds = playlist?.rundownIdsInOrder ?? [] const rundownsChanged = !areElementsShallowEqual(this._rundownIds, previousRundownIds) if (rundownsChanged) { - if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId) - if (this._dbObserver) this._dbObserver.stop() + this.stopSubscription() if (this._rundownIds.length) { - this._subscriptionId = await this._coreHandler.setupSubscription( - this._publicationName, - this._rundownIds, - { - omitHidden: true, - } - ) - this._dbObserver = this._coreHandler.setupObserver(this._collectionName) - this._dbObserver.added = (id) => { - void this.changed(id, 'added').catch(this._logger.error) - } - this._dbObserver.changed = (id) => { - void this.changed(id, 'changed').catch(this._logger.error) - } - this._dbObserver.removed = (id) => { - void this.changed(id, 'removed').catch(this._logger.error) - } + this.setupSubscription(this._rundownIds, { + omitHidden: true, + }) } } + } - const collection = this._core.getCollection(this._collectionName) - if (!collection) throw new Error(`collection '${this._collectionName}' not found!`) - if (rundownsChanged) { - const allSegments = collection.find(undefined) - await this._segmentsHandler.setSegments(allSegments) + private onPartInstancesUpdate = (data: PartInstances | undefined): void => { + this.logUpdateReceived('partInstances') + + const previousSegmentId = this._currentSegmentId + this._currentSegmentId = data?.current?.segmentId + + if (previousSegmentId !== this._currentSegmentId) { + this.updateAndNotifyCurrentSegment() } - await this.updateAndNotify() } } diff --git a/packages/live-status-gateway/src/collections/segmentsHandler.ts b/packages/live-status-gateway/src/collections/segmentsHandler.ts index 401def1ead..75bd03b2dc 100644 --- a/packages/live-status-gateway/src/collections/segmentsHandler.ts +++ b/packages/live-status-gateway/src/collections/segmentsHandler.ts @@ -8,31 +8,19 @@ import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collect const THROTTLE_PERIOD_MS = 200 export class SegmentsHandler - extends CollectionBase + extends CollectionBase implements Collection { - public observerName: string - private throttledNotify: (data: DBSegment[]) => Promise + private throttledNotify: (data: DBSegment[]) => void constructor(logger: Logger, coreHandler: CoreHandler) { - super(SegmentsHandler.name, CollectionName.Segments, undefined, logger, coreHandler) - this.observerName = this._name + super(CollectionName.Segments, logger, coreHandler) this.throttledNotify = _.throttle(this.notify.bind(this), THROTTLE_PERIOD_MS, { leading: true, trailing: true }) } - async setSegments(segments: DBSegment[]): Promise { + setSegments(segments: DBSegment[]): void { this.logUpdateReceived('segments', segments.length) this._collectionData = segments - await this.throttledNotify(this._collectionData) - } - - // override notify to implement empty array handling - async notify(data: DBSegment[] | undefined): Promise { - this.logNotifyingUpdate(this._collectionData?.length) - if (data !== undefined) { - for (const observer of this._observers) { - await observer.update(this._name, data) - } - } + this.throttledNotify(this._collectionData) } } diff --git a/packages/live-status-gateway/src/collections/showStyleBaseHandler.ts b/packages/live-status-gateway/src/collections/showStyleBaseHandler.ts index 2cdcdd6541..81e4e8051b 100644 --- a/packages/live-status-gateway/src/collections/showStyleBaseHandler.ts +++ b/packages/live-status-gateway/src/collections/showStyleBaseHandler.ts @@ -1,6 +1,6 @@ import { Logger } from 'winston' import { CoreHandler } from '../coreHandler' -import { CollectionBase, Collection, CollectionObserver } from '../wsHandler' +import { Collection, PublicationCollection } from '../wsHandler' import { DBRundown } from '@sofie-automation/corelib/dist/dataModel/Rundown' import { DBShowStyleBase, OutputLayers, SourceLayers } from '@sofie-automation/corelib/dist/dataModel/ShowStyleBase' import { ShowStyleBaseId } from '@sofie-automation/corelib/dist/dataModel/Ids' @@ -8,6 +8,7 @@ import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collect import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub' import { applyAndValidateOverrides } from '@sofie-automation/corelib/dist/settings/objectWithOverrides' import { IOutputLayer, ISourceLayer } from '@sofie-automation/blueprints-integration' +import { CollectionHandlers } from '../liveStatusServer' export interface ShowStyleBaseExt extends DBShowStyleBase { sourceLayerNamesById: ReadonlyMap @@ -16,66 +17,45 @@ export interface ShowStyleBaseExt extends DBShowStyleBase { } export class ShowStyleBaseHandler - extends CollectionBase - implements Collection, CollectionObserver + extends PublicationCollection + implements Collection { - public observerName: string private _showStyleBaseId: ShowStyleBaseId | undefined private _sourceLayersMap: Map = new Map() private _outputLayersMap: Map = new Map() constructor(logger: Logger, coreHandler: CoreHandler) { - super( - ShowStyleBaseHandler.name, - CollectionName.ShowStyleBases, - CorelibPubSub.showStyleBases, - logger, - coreHandler - ) - this.observerName = this._name + super(CollectionName.ShowStyleBases, CorelibPubSub.showStyleBases, logger, coreHandler) } - async changed(id: ShowStyleBaseId, changeType: string): Promise { - this.logDocumentChange(id, changeType) - if (!this._collectionName) return + init(handlers: CollectionHandlers): void { + super.init(handlers) + + handlers.rundownHandler.subscribe(this.onRundownUpdate) + } + + changed(): void { if (this._showStyleBaseId) { this.updateCollectionData() - await this.notify(this._collectionData) + this.notify(this._collectionData) } } - async update(source: string, data: DBRundown | undefined): Promise { - this.logUpdateReceived('rundown', source, `rundownId ${data?._id}, showStyleBaseId ${data?.showStyleBaseId}`) + onRundownUpdate = (data: DBRundown | undefined): void => { + this.logUpdateReceived('rundown', `rundownId ${data?._id}, showStyleBaseId ${data?.showStyleBaseId}`) const prevShowStyleBaseId = this._showStyleBaseId this._showStyleBaseId = data?.showStyleBaseId - await new Promise(process.nextTick.bind(this)) - if (!this._collectionName) return - if (!this._publicationName) return if (prevShowStyleBaseId !== this._showStyleBaseId) { - if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId) - if (this._dbObserver) this._dbObserver.stop() + this.stopSubscription() if (this._showStyleBaseId) { - this._subscriptionId = await this._coreHandler.setupSubscription(this._publicationName, [ - this._showStyleBaseId, - ]) - this._dbObserver = this._coreHandler.setupObserver(this._collectionName) - this._dbObserver.added = (id) => { - void this.changed(id, 'added').catch(this._logger.error) - } - this._dbObserver.changed = (id) => { - void this.changed(id, 'changed').catch(this._logger.error) - } - - this.updateCollectionData() - await this.notify(this._collectionData) + this.setupSubscription([this._showStyleBaseId]) } } } updateCollectionData(): void { - const collection = this._core.getCollection(this._collectionName) - if (!collection) throw new Error(`collection '${this._collectionName}' not found!`) + const collection = this.getCollectionOrFail() if (!this._showStyleBaseId) return const showStyleBase = collection.findOne(this._showStyleBaseId) if (!showStyleBase) { diff --git a/packages/live-status-gateway/src/collections/studioHandler.ts b/packages/live-status-gateway/src/collections/studioHandler.ts index fb7be9b795..c709ce2e76 100644 --- a/packages/live-status-gateway/src/collections/studioHandler.ts +++ b/packages/live-status-gateway/src/collections/studioHandler.ts @@ -1,53 +1,29 @@ import { Logger } from 'winston' import { DBStudio } from '@sofie-automation/corelib/dist/dataModel/Studio' import { CoreHandler } from '../coreHandler' -import { CollectionBase, Collection } from '../wsHandler' +import { Collection, PublicationCollection } from '../wsHandler' import { CollectionName } from '@sofie-automation/corelib/dist/dataModel/Collections' import { CorelibPubSub } from '@sofie-automation/corelib/dist/pubsub' -import { StudioId } from '@sofie-automation/server-core-integration' +import { CollectionHandlers } from '../liveStatusServer' export class StudioHandler - extends CollectionBase + extends PublicationCollection implements Collection { - public observerName: string - constructor(logger: Logger, coreHandler: CoreHandler) { - super(StudioHandler.name, CollectionName.Studios, CorelibPubSub.studios, logger, coreHandler) - this.observerName = this._name + super(CollectionName.Studios, CorelibPubSub.studios, logger, coreHandler) } - async init(): Promise { - await super.init() - if (!this._collectionName) return - if (!this._publicationName) return - if (!this._studioId) return - this._subscriptionId = await this._coreHandler.setupSubscription(this._publicationName, [this._studioId]) - this._dbObserver = this._coreHandler.setupObserver(this._collectionName) + init(handlers: CollectionHandlers): void { + super.init(handlers) - if (this._collectionName) { - const col = this._core.getCollection(this._collectionName) - if (!col) throw new Error(`collection '${this._collectionName}' not found!`) - const studio = col.findOne(this._studioId) - if (!studio) throw new Error(`studio '${this._studioId}' not found!`) - this._collectionData = studio - this._dbObserver.added = (id) => { - void this.changed(id, 'added').catch(this._logger.error) - } - this._dbObserver.changed = (id) => { - void this.changed(id, 'changed').catch(this._logger.error) - } - } + this.setupSubscription([this._studioId]) } - async changed(id: StudioId, changeType: string): Promise { - this.logDocumentChange(id, changeType) - if (!(id === this._studioId && this._collectionName)) return - const collection = this._core.getCollection(this._collectionName) - if (!collection) throw new Error(`collection '${this._collectionName}' not found!`) - const studio = collection.findOne(id) - if (!studio) throw new Error(`studio '${this._studioId}' not found on changed!`) + changed(): void { + const collection = this.getCollectionOrFail() + const studio = collection.findOne(this._studioId) this._collectionData = studio - await this.notify(this._collectionData) + this.notify(this._collectionData) } } diff --git a/packages/live-status-gateway/src/helpers/equality.ts b/packages/live-status-gateway/src/helpers/equality.ts new file mode 100644 index 0000000000..b720caeaaf --- /dev/null +++ b/packages/live-status-gateway/src/helpers/equality.ts @@ -0,0 +1,75 @@ +import _ = require('underscore') + +export function arePropertiesShallowEqual>( + a: T, + b: Partial, + omitProperties?: readonly (keyof T)[], + selectProperties?: readonly (keyof T)[] +): boolean { + if (typeof a !== 'object' || a == null || typeof b !== 'object' || b == null) { + return false + } + + const keysA = Object.keys(a).filter( + omitProperties + ? (key) => !omitProperties.includes(key) + : selectProperties + ? (key) => selectProperties.includes(key) + : () => true + ) + const keysB = Object.keys(b).filter( + omitProperties + ? (key) => !omitProperties.includes(key) + : selectProperties + ? (key) => selectProperties.includes(key) + : () => true + ) + + if (keysA.length !== keysB.length) return false + + for (const key of keysA) { + if (!keysB.includes(key) || a[key] !== b[key]) { + return false + } + } + + return true +} + +export function arePropertiesDeepEqual>( + a: T, + b: Partial, + omitProperties?: readonly (keyof T)[], + selectProperties?: readonly (keyof T)[] +): boolean { + if (typeof a !== 'object' || a == null || typeof b !== 'object' || b == null) { + return false + } + + const keysA = Object.keys(a).filter( + omitProperties + ? (key) => !omitProperties.includes(key) + : selectProperties + ? (key) => selectProperties.includes(key) + : () => true + ) + const keysB = Object.keys(b).filter( + omitProperties + ? (key) => !omitProperties.includes(key) + : selectProperties + ? (key) => selectProperties.includes(key) + : () => true + ) + + if (keysA.length !== keysB.length) { + return false + } + + for (const key of keysA) { + if (!keysB.includes(key) || !_.isEqual(a[key], b[key])) { + return false + } + } + + return true +} diff --git a/packages/live-status-gateway/src/liveStatusServer.ts b/packages/live-status-gateway/src/liveStatusServer.ts index 90bd64c471..7f0094993e 100644 --- a/packages/live-status-gateway/src/liveStatusServer.ts +++ b/packages/live-status-gateway/src/liveStatusServer.ts @@ -3,7 +3,7 @@ import { CoreHandler } from './coreHandler' import { WebSocket, WebSocketServer } from 'ws' import { StudioHandler } from './collections/studioHandler' import { ShowStyleBaseHandler } from './collections/showStyleBaseHandler' -import { PlaylistHandler } from './collections/playlistHandler' +import { PlaylistHandler, PlaylistsHandler } from './collections/playlistHandler' import { RundownHandler } from './collections/rundownHandler' // import { RundownsHandler } from './collections/rundownsHandler' import { SegmentHandler } from './collections/segmentHandler' @@ -24,6 +24,24 @@ import { PieceInstancesHandler } from './collections/pieceInstancesHandler' import { AdLibsTopic } from './topics/adLibsTopic' import { ActivePiecesTopic } from './topics/activePiecesTopic' +export interface CollectionHandlers { + studioHandler: StudioHandler + showStyleBaseHandler: ShowStyleBaseHandler + playlistHandler: PlaylistHandler + playlistsHandler: PlaylistsHandler + rundownHandler: RundownHandler + segmentsHandler: SegmentsHandler + segmentHandler: SegmentHandler + partsHandler: PartsHandler + partHandler: PartHandler + partInstancesHandler: PartInstancesHandler + pieceInstancesHandler: PieceInstancesHandler + adLibActionsHandler: AdLibActionsHandler + adLibsHandler: AdLibsHandler + globalAdLibActionsHandler: GlobalAdLibActionsHandler + globalAdLibsHandler: GlobalAdLibsHandler +} + export class LiveStatusServer { _logger: Logger _coreHandler: CoreHandler @@ -39,94 +57,55 @@ export class LiveStatusServer { const rootChannel = new RootChannel(this._logger) - const studioTopic = new StudioTopic(this._logger) - const activePiecesTopic = new ActivePiecesTopic(this._logger) - const activePlaylistTopic = new ActivePlaylistTopic(this._logger) - const segmentsTopic = new SegmentsTopic(this._logger) - const adLibsTopic = new AdLibsTopic(this._logger) - - rootChannel.addTopic(StatusChannels.studio, studioTopic) - rootChannel.addTopic(StatusChannels.activePlaylist, activePlaylistTopic) - rootChannel.addTopic(StatusChannels.activePieces, activePiecesTopic) - rootChannel.addTopic(StatusChannels.segments, segmentsTopic) - rootChannel.addTopic(StatusChannels.adLibs, adLibsTopic) - const studioHandler = new StudioHandler(this._logger, this._coreHandler) - await studioHandler.init() const showStyleBaseHandler = new ShowStyleBaseHandler(this._logger, this._coreHandler) - await showStyleBaseHandler.init() const playlistHandler = new PlaylistHandler(this._logger, this._coreHandler) - await playlistHandler.init() - // const rundownsHandler = new RundownsHandler(this._logger, this._coreHandler) - // await rundownsHandler.init() + const playlistsHandler = playlistHandler.playlistsHandler const rundownHandler = new RundownHandler(this._logger, this._coreHandler) - await rundownHandler.init() const segmentsHandler = new SegmentsHandler(this._logger, this._coreHandler) - await segmentsHandler.init() const segmentHandler = new SegmentHandler(this._logger, this._coreHandler, segmentsHandler) - await segmentHandler.init() const partsHandler = new PartsHandler(this._logger, this._coreHandler) - await partsHandler.init() const partHandler = new PartHandler(this._logger, this._coreHandler, partsHandler) - await partHandler.init() const partInstancesHandler = new PartInstancesHandler(this._logger, this._coreHandler) - await partInstancesHandler.init() const pieceInstancesHandler = new PieceInstancesHandler(this._logger, this._coreHandler) - await pieceInstancesHandler.init() const adLibActionsHandler = new AdLibActionsHandler(this._logger, this._coreHandler) - await adLibActionsHandler.init() const adLibsHandler = new AdLibsHandler(this._logger, this._coreHandler) - await adLibsHandler.init() const globalAdLibActionsHandler = new GlobalAdLibActionsHandler(this._logger, this._coreHandler) - await globalAdLibActionsHandler.init() const globalAdLibsHandler = new GlobalAdLibsHandler(this._logger, this._coreHandler) - await globalAdLibsHandler.init() - // add observers for collection subscription updates - await playlistHandler.subscribe(rundownHandler) - await playlistHandler.subscribe(segmentHandler) - await playlistHandler.subscribe(partHandler) - await playlistHandler.subscribe(partInstancesHandler) - await playlistHandler.subscribe(pieceInstancesHandler) - await rundownHandler.subscribe(showStyleBaseHandler) - await partInstancesHandler.subscribe(rundownHandler) - await partInstancesHandler.subscribe(segmentHandler) - // partInstancesHandler.subscribe(partHandler) - await partInstancesHandler.subscribe(adLibActionsHandler) - await partInstancesHandler.subscribe(globalAdLibActionsHandler) - await partInstancesHandler.subscribe(adLibsHandler) - await partInstancesHandler.subscribe(globalAdLibsHandler) - await showStyleBaseHandler.subscribe(pieceInstancesHandler) - await partInstancesHandler.subscribe(pieceInstancesHandler) + const handlers: CollectionHandlers = { + studioHandler, + showStyleBaseHandler, + playlistHandler, + playlistsHandler, + rundownHandler, + segmentsHandler, + segmentHandler, + partsHandler, + partHandler, + partInstancesHandler, + pieceInstancesHandler, + adLibActionsHandler, + adLibsHandler, + globalAdLibActionsHandler, + globalAdLibsHandler, + } + + for (const handlerName in handlers) { + handlers[handlerName as keyof CollectionHandlers].init(handlers) + } + + const studioTopic = new StudioTopic(this._logger, handlers) + const activePiecesTopic = new ActivePiecesTopic(this._logger, handlers) + const activePlaylistTopic = new ActivePlaylistTopic(this._logger, handlers) + const segmentsTopic = new SegmentsTopic(this._logger, handlers) + const adLibsTopic = new AdLibsTopic(this._logger, handlers) - // add observers for websocket topic updates - await studioHandler.subscribe(studioTopic) - await playlistHandler.playlistsHandler.subscribe(studioTopic) - - await playlistHandler.subscribe(activePlaylistTopic) - await showStyleBaseHandler.subscribe(activePlaylistTopic) - await partInstancesHandler.subscribe(activePlaylistTopic) - await partsHandler.subscribe(activePlaylistTopic) - await pieceInstancesHandler.subscribe(activePlaylistTopic) - await segmentHandler.subscribe(activePlaylistTopic) - await segmentsHandler.subscribe(activePlaylistTopic) - - await playlistHandler.subscribe(activePiecesTopic) - await showStyleBaseHandler.subscribe(activePiecesTopic) - await pieceInstancesHandler.subscribe(activePiecesTopic) - - await playlistHandler.subscribe(segmentsTopic) - await segmentsHandler.subscribe(segmentsTopic) - await partsHandler.subscribe(segmentsTopic) - - await showStyleBaseHandler.subscribe(adLibsTopic) - await partsHandler.subscribe(adLibsTopic) - await segmentsHandler.subscribe(adLibsTopic) - await playlistHandler.subscribe(adLibsTopic) - await adLibActionsHandler.subscribe(adLibsTopic) - await adLibsHandler.subscribe(adLibsTopic) - await globalAdLibActionsHandler.subscribe(adLibsTopic) - await globalAdLibsHandler.subscribe(adLibsTopic) + rootChannel.addTopic(StatusChannels.studio, studioTopic) + rootChannel.addTopic(StatusChannels.activePlaylist, activePlaylistTopic) + rootChannel.addTopic(StatusChannels.activePieces, activePiecesTopic) + rootChannel.addTopic(StatusChannels.segments, segmentsTopic) + rootChannel.addTopic(StatusChannels.adLibs, adLibsTopic) const wss = new WebSocketServer({ port: 8080 }) wss.on('connection', (ws, request) => { diff --git a/packages/live-status-gateway/src/topics/__tests__/activePieces.spec.ts b/packages/live-status-gateway/src/topics/__tests__/activePieces.spec.ts index 7e7f469e13..8513169c11 100644 --- a/packages/live-status-gateway/src/topics/__tests__/activePieces.spec.ts +++ b/packages/live-status-gateway/src/topics/__tests__/activePieces.spec.ts @@ -1,16 +1,16 @@ -import { makeMockLogger, makeMockSubscriber, makeTestPlaylist, makeTestShowStyleBase } from './utils' -import { PlaylistHandler } from '../../collections/playlistHandler' -import { ShowStyleBaseExt, ShowStyleBaseHandler } from '../../collections/showStyleBaseHandler' +import { makeMockHandlers, makeMockLogger, makeMockSubscriber, makeTestPlaylist, makeTestShowStyleBase } from './utils' +import { ShowStyleBaseExt } from '../../collections/showStyleBaseHandler' import { protectString } from '@sofie-automation/server-core-integration/dist' import { PartialDeep } from 'type-fest' import { literal } from '@sofie-automation/corelib/dist/lib' -import { PieceInstancesHandler, SelectedPieceInstances } from '../../collections/pieceInstancesHandler' +import { SelectedPieceInstances } from '../../collections/pieceInstancesHandler' import { PieceInstance } from '@sofie-automation/corelib/dist/dataModel/PieceInstance' import { ActivePiecesStatus, ActivePiecesTopic } from '../activePiecesTopic' describe('ActivePiecesTopic', () => { it('provides active pieces', async () => { - const topic = new ActivePiecesTopic(makeMockLogger()) + const handlers = makeMockHandlers() + const topic = new ActivePiecesTopic(makeMockLogger(), handlers) const mockSubscriber = makeMockSubscriber() const currentPartInstanceId = 'CURRENT_PART_INSTANCE_ID' @@ -23,10 +23,10 @@ describe('ActivePiecesTopic', () => { partInstanceId: protectString(currentPartInstanceId), rundownId: playlist.rundownIdsInOrder[0], } - await topic.update(PlaylistHandler.name, playlist) + handlers.playlistHandler.notify(playlist) const testShowStyleBase = makeTestShowStyleBase() - await topic.update(ShowStyleBaseHandler.name, testShowStyleBase as ShowStyleBaseExt) + handlers.showStyleBaseHandler.notify(testShowStyleBase as ShowStyleBaseExt) const testPieceInstances: PartialDeep = { currentPartInstance: [], @@ -44,7 +44,7 @@ describe('ActivePiecesTopic', () => { }), ] as PieceInstance[], } - await topic.update(PieceInstancesHandler.name, testPieceInstances as SelectedPieceInstances) + handlers.pieceInstancesHandler.notify(testPieceInstances as SelectedPieceInstances) topic.addSubscriber(mockSubscriber) diff --git a/packages/live-status-gateway/src/topics/__tests__/activePlaylist.spec.ts b/packages/live-status-gateway/src/topics/__tests__/activePlaylist.spec.ts index b9b1af95b3..79d94fa7bb 100644 --- a/packages/live-status-gateway/src/topics/__tests__/activePlaylist.spec.ts +++ b/packages/live-status-gateway/src/topics/__tests__/activePlaylist.spec.ts @@ -1,15 +1,12 @@ import { ActivePlaylistStatus, ActivePlaylistTopic } from '../activePlaylistTopic' -import { makeMockLogger, makeMockSubscriber, makeTestPlaylist, makeTestShowStyleBase } from './utils' -import { PlaylistHandler } from '../../collections/playlistHandler' -import { ShowStyleBaseExt, ShowStyleBaseHandler } from '../../collections/showStyleBaseHandler' -import { PartInstancesHandler, SelectedPartInstances } from '../../collections/partInstancesHandler' +import { makeMockHandlers, makeMockLogger, makeMockSubscriber, makeTestPlaylist, makeTestShowStyleBase } from './utils' +import { ShowStyleBaseExt } from '../../collections/showStyleBaseHandler' +import { SelectedPartInstances } from '../../collections/partInstancesHandler' import { protectString, unprotectString, unprotectStringArray } from '@sofie-automation/server-core-integration/dist' import { PartialDeep } from 'type-fest' import { literal } from '@sofie-automation/corelib/dist/lib' import { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance' -import { PartsHandler } from '../../collections/partsHandler' import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part' -import { SegmentHandler } from '../../collections/segmentHandler' import { DBSegment } from '@sofie-automation/corelib/dist/dataModel/Segment' import { CountdownType } from '@sofie-automation/blueprints-integration' import { PlaylistTimingType } from '@sofie-automation/blueprints-integration' @@ -26,18 +23,19 @@ function makeEmptyTestPartInstances(): SelectedPartInstances { describe('ActivePlaylistTopic', () => { it('notifies subscribers', async () => { - const topic = new ActivePlaylistTopic(makeMockLogger()) + const handlers = makeMockHandlers() + const topic = new ActivePlaylistTopic(makeMockLogger(), handlers) const mockSubscriber = makeMockSubscriber() const playlist = makeTestPlaylist() playlist.activationId = protectString('somethingRandom') - await topic.update(PlaylistHandler.name, playlist) + handlers.playlistHandler.notify(playlist) const testShowStyleBase = makeTestShowStyleBase() - await topic.update(ShowStyleBaseHandler.name, testShowStyleBase as ShowStyleBaseExt) + handlers.showStyleBaseHandler.notify(testShowStyleBase as ShowStyleBaseExt) const testPartInstancesMap = makeEmptyTestPartInstances() - await topic.update(PartInstancesHandler.name, testPartInstancesMap) + handlers.partInstancesHandler.notify(testPartInstancesMap) topic.addSubscriber(mockSubscriber) @@ -64,7 +62,8 @@ describe('ActivePlaylistTopic', () => { }) it('provides segment and part', async () => { - const topic = new ActivePlaylistTopic(makeMockLogger()) + const handlers = makeMockHandlers() + const topic = new ActivePlaylistTopic(makeMockLogger(), handlers) const mockSubscriber = makeMockSubscriber() const currentPartInstanceId = 'CURRENT_PART_INSTANCE_ID' @@ -77,12 +76,11 @@ describe('ActivePlaylistTopic', () => { partInstanceId: protectString(currentPartInstanceId), rundownId: playlist.rundownIdsInOrder[0], } - await topic.update(PlaylistHandler.name, playlist) + handlers.playlistHandler.notify(playlist) const testShowStyleBase = makeTestShowStyleBase() - await topic.update(ShowStyleBaseHandler.name, testShowStyleBase as ShowStyleBaseExt) - const segment1id = protectString('SEGMENT_1') + handlers.showStyleBaseHandler.notify(testShowStyleBase as ShowStyleBaseExt) const part1: Partial = { _id: protectString('PART_1'), title: 'Test Part', @@ -107,11 +105,11 @@ describe('ActivePlaylistTopic', () => { }), ] as DBPartInstance[], } - await topic.update(PartInstancesHandler.name, testPartInstances as SelectedPartInstances) + handlers.partInstancesHandler.notify(testPartInstances as SelectedPartInstances) - await topic.update(PartsHandler.name, [part1] as DBPart[]) + handlers.partsHandler.notify([part1] as DBPart[]) - await topic.update(SegmentHandler.name, { + handlers.segmentHandler.notify({ _id: segment1id, } as DBSegment) @@ -154,7 +152,8 @@ describe('ActivePlaylistTopic', () => { }) it('provides segment and part with segment timing', async () => { - const topic = new ActivePlaylistTopic(makeMockLogger()) + const handlers = makeMockHandlers() + const topic = new ActivePlaylistTopic(makeMockLogger(), handlers) const mockSubscriber = makeMockSubscriber() const currentPartInstanceId = 'CURRENT_PART_INSTANCE_ID' @@ -167,10 +166,10 @@ describe('ActivePlaylistTopic', () => { partInstanceId: protectString(currentPartInstanceId), rundownId: playlist.rundownIdsInOrder[0], } - await topic.update(PlaylistHandler.name, playlist) + handlers.playlistHandler.notify(playlist) const testShowStyleBase = makeTestShowStyleBase() - await topic.update(ShowStyleBaseHandler.name, testShowStyleBase as ShowStyleBaseExt) + handlers.showStyleBaseHandler.notify(testShowStyleBase as ShowStyleBaseExt) const segment1id = protectString('SEGMENT_1') const part1: Partial = { @@ -198,11 +197,11 @@ describe('ActivePlaylistTopic', () => { }), ] as DBPartInstance[], } - await topic.update(PartInstancesHandler.name, testPartInstances as SelectedPartInstances) + handlers.partInstancesHandler.notify(testPartInstances as SelectedPartInstances) - await topic.update(PartsHandler.name, [part1] as DBPart[]) + handlers.partsHandler.notify([part1] as DBPart[]) - await topic.update(SegmentHandler.name, { + handlers.segmentHandler.notify({ _id: segment1id, segmentTiming: { budgetDuration: 12300, countdownType: CountdownType.SEGMENT_BUDGET_DURATION }, } as DBSegment) diff --git a/packages/live-status-gateway/src/topics/__tests__/adLibs.spec.ts b/packages/live-status-gateway/src/topics/__tests__/adLibs.spec.ts index 48d25ea994..b3579f616e 100644 --- a/packages/live-status-gateway/src/topics/__tests__/adLibs.spec.ts +++ b/packages/live-status-gateway/src/topics/__tests__/adLibs.spec.ts @@ -1,13 +1,16 @@ import { protectString, unprotectString } from '@sofie-automation/server-core-integration' -import { makeMockLogger, makeMockSubscriber, makeTestParts, makeTestPlaylist, makeTestShowStyleBase } from './utils' +import { + makeMockHandlers, + makeMockLogger, + makeMockSubscriber, + makeTestParts, + makeTestPlaylist, + makeTestShowStyleBase, +} from './utils' import { AdLibsStatus, AdLibsTopic } from '../adLibsTopic' -import { PlaylistHandler } from '../../collections/playlistHandler' -import { ShowStyleBaseExt, ShowStyleBaseHandler } from '../../collections/showStyleBaseHandler' +import { ShowStyleBaseExt } from '../../collections/showStyleBaseHandler' import { AdLibAction } from '@sofie-automation/corelib/dist/dataModel/AdlibAction' import { RundownBaselineAdLibAction } from '@sofie-automation/corelib/dist/dataModel/RundownBaselineAdLibAction' -import { AdLibActionsHandler } from '../../collections/adLibActionsHandler' -import { GlobalAdLibActionsHandler } from '../../collections/globalAdLibActionsHandler' -import { PartsHandler } from '../../collections/partsHandler' function makeTestAdLibActions(): AdLibAction[] { return [ @@ -52,26 +55,27 @@ function makeTestGlobalAdLibActions(): RundownBaselineAdLibAction[] { ] } -describe('ActivePlaylistTopic', () => { +describe('AdLibsTopic', () => { it('notifies subscribers', async () => { - const topic = new AdLibsTopic(makeMockLogger()) + const handlers = makeMockHandlers() + const topic = new AdLibsTopic(makeMockLogger(), handlers) const mockSubscriber = makeMockSubscriber() const playlist = makeTestPlaylist() playlist.activationId = protectString('somethingRandom') - await topic.update(PlaylistHandler.name, playlist) + handlers.playlistHandler.notify(playlist) const parts = makeTestParts() - await topic.update(PartsHandler.name, parts) + handlers.partsHandler.notify(parts) const testShowStyleBase = makeTestShowStyleBase() - await topic.update(ShowStyleBaseHandler.name, testShowStyleBase as ShowStyleBaseExt) + handlers.showStyleBaseHandler.notify(testShowStyleBase as ShowStyleBaseExt) const testAdLibActions = makeTestAdLibActions() - await topic.update(AdLibActionsHandler.name, testAdLibActions) + handlers.adLibActionsHandler.notify(testAdLibActions) const testGlobalAdLibActions = makeTestGlobalAdLibActions() - await topic.update(GlobalAdLibActionsHandler.name, testGlobalAdLibActions) + handlers.globalAdLibActionsHandler.notify(testGlobalAdLibActions) // TODO: AdLibPieces and Global AdLibPieces diff --git a/packages/live-status-gateway/src/topics/__tests__/segmentsTopic.spec.ts b/packages/live-status-gateway/src/topics/__tests__/segmentsTopic.spec.ts index c26f1f1763..3c9e06290b 100644 --- a/packages/live-status-gateway/src/topics/__tests__/segmentsTopic.spec.ts +++ b/packages/live-status-gateway/src/topics/__tests__/segmentsTopic.spec.ts @@ -1,11 +1,8 @@ import { SegmentsStatus, SegmentsTopic } from '../segmentsTopic' -import { PlaylistHandler } from '../../collections/playlistHandler' import { protectString, unprotectString } from '@sofie-automation/server-core-integration' -import { SegmentsHandler } from '../../collections/segmentsHandler' import { DBSegment } from '@sofie-automation/corelib/dist/dataModel/Segment' -import { makeMockLogger, makeMockSubscriber, makeTestPlaylist } from './utils' +import { makeMockHandlers, makeMockLogger, makeMockSubscriber, makeTestPlaylist } from './utils' import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part' -import { PartsHandler } from '../../collections/partsHandler' const RUNDOWN_1_ID = 'RUNDOWN_1' const RUNDOWN_2_ID = 'RUNDOWN_2' @@ -50,7 +47,8 @@ describe('SegmentsTopic', () => { }) it('notifies added subscribers immediately', async () => { - const topic = new SegmentsTopic(makeMockLogger()) + const handlers = makeMockHandlers() + const topic = new SegmentsTopic(makeMockLogger(), handlers) const mockSubscriber = makeMockSubscriber() topic.addSubscriber(mockSubscriber) @@ -65,35 +63,38 @@ describe('SegmentsTopic', () => { }) it('notifies subscribers when playlist changes from null', async () => { - const topic = new SegmentsTopic(makeMockLogger()) + const handlers = makeMockHandlers() + const topic = new SegmentsTopic(makeMockLogger(), handlers) const mockSubscriber = makeMockSubscriber() topic.addSubscriber(mockSubscriber) mockSubscriber.send.mockClear() const testPlaylist = makeTestPlaylist() - await topic.update(PlaylistHandler.name, testPlaylist) + handlers.playlistHandler.notify(testPlaylist) const expectedStatus: SegmentsStatus = { event: 'segments', rundownPlaylistId: unprotectString(testPlaylist._id), segments: [], } + jest.advanceTimersByTime(THROTTLE_PERIOD_MS) expect(mockSubscriber.send.mock.calls).toEqual([[JSON.stringify(expectedStatus)]]) }) it('notifies subscribers when playlist id changes', async () => { - const topic = new SegmentsTopic(makeMockLogger()) + const handlers = makeMockHandlers() + const topic = new SegmentsTopic(makeMockLogger(), handlers) const mockSubscriber = makeMockSubscriber() const testPlaylist = makeTestPlaylist() - await topic.update(PlaylistHandler.name, testPlaylist) + handlers.playlistHandler.notify(testPlaylist) topic.addSubscriber(mockSubscriber) mockSubscriber.send.mockClear() const testPlaylist2 = makeTestPlaylist('PLAYLIST_2') - await topic.update(PlaylistHandler.name, testPlaylist2) + handlers.playlistHandler.notify(testPlaylist2) jest.advanceTimersByTime(THROTTLE_PERIOD_MS) const expectedStatus2: SegmentsStatus = { @@ -104,45 +105,18 @@ describe('SegmentsTopic', () => { expect(mockSubscriber.send.mock.calls).toEqual([[JSON.stringify(expectedStatus2)]]) }) - it('does not notify subscribers when an unimportant property of the playlist changes', async () => { - const topic = new SegmentsTopic(makeMockLogger()) - const mockSubscriber = makeMockSubscriber() - - const testPlaylist = makeTestPlaylist() - await topic.update(PlaylistHandler.name, testPlaylist) - - topic.addSubscriber(mockSubscriber) - mockSubscriber.send.mockClear() - - const testPlaylist2 = makeTestPlaylist() - testPlaylist2.currentPartInfo = { - partInstanceId: protectString('PI_1'), - consumesQueuedSegmentId: true, - manuallySelected: false, - rundownId: protectString(RUNDOWN_1_ID), - } - testPlaylist2.name = 'Another Playlist' - testPlaylist2.startedPlayback = Date.now() - // ... this is enough to prove that it works as expected - - await topic.update(PlaylistHandler.name, testPlaylist2) - jest.advanceTimersByTime(THROTTLE_PERIOD_MS) - - // eslint-disable-next-line @typescript-eslint/unbound-method - expect(mockSubscriber.send).toHaveBeenCalledTimes(0) - }) - it('notifies subscribers when segments change', async () => { - const topic = new SegmentsTopic(makeMockLogger()) + const handlers = makeMockHandlers() + const topic = new SegmentsTopic(makeMockLogger(), handlers) const mockSubscriber = makeMockSubscriber() const testPlaylist = makeTestPlaylist() - await topic.update(PlaylistHandler.name, testPlaylist) + handlers.playlistHandler.notify(testPlaylist) topic.addSubscriber(mockSubscriber) mockSubscriber.send.mockClear() - await topic.update(SegmentsHandler.name, [ + handlers.segmentsHandler.notify([ makeTestSegment('2_1', 1, RUNDOWN_2_ID), makeTestSegment('2_2', 2, RUNDOWN_2_ID), makeTestSegment('1_2', 2, RUNDOWN_1_ID), @@ -188,12 +162,13 @@ describe('SegmentsTopic', () => { }) it('notifies subscribers when rundown order changes', async () => { - const topic = new SegmentsTopic(makeMockLogger()) + const handlers = makeMockHandlers() + const topic = new SegmentsTopic(makeMockLogger(), handlers) const mockSubscriber = makeMockSubscriber() const testPlaylist = makeTestPlaylist() - await topic.update(PlaylistHandler.name, testPlaylist) - await topic.update(SegmentsHandler.name, [ + handlers.playlistHandler.notify(testPlaylist) + handlers.segmentsHandler.notify([ makeTestSegment('2_1', 1, RUNDOWN_2_ID), makeTestSegment('2_2', 2, RUNDOWN_2_ID), makeTestSegment('1_2', 2, RUNDOWN_1_ID), @@ -205,7 +180,7 @@ describe('SegmentsTopic', () => { const testPlaylist2 = makeTestPlaylist() testPlaylist2.rundownIdsInOrder = [protectString(RUNDOWN_2_ID), protectString(RUNDOWN_1_ID)] - await topic.update(PlaylistHandler.name, testPlaylist2) + handlers.playlistHandler.notify(testPlaylist2) jest.advanceTimersByTime(THROTTLE_PERIOD_MS) const expectedStatus: SegmentsStatus = { @@ -246,11 +221,12 @@ describe('SegmentsTopic', () => { }) it('exposes budgetDuration', async () => { - const topic = new SegmentsTopic(makeMockLogger()) + const handlers = makeMockHandlers() + const topic = new SegmentsTopic(makeMockLogger(), handlers) const mockSubscriber = makeMockSubscriber() const testPlaylist = makeTestPlaylist() - await topic.update(PlaylistHandler.name, testPlaylist) + handlers.playlistHandler.notify(testPlaylist) topic.addSubscriber(mockSubscriber) mockSubscriber.send.mockClear() @@ -258,14 +234,14 @@ describe('SegmentsTopic', () => { const segment_1_1_id = '1_1' const segment_1_2_id = '1_2' const segment_2_2_id = '2_2' - await topic.update(SegmentsHandler.name, [ + handlers.segmentsHandler.notify([ makeTestSegment('2_1', 1, RUNDOWN_2_ID), makeTestSegment(segment_2_2_id, 2, RUNDOWN_2_ID, { segmentTiming: { budgetDuration: 51000 } }), makeTestSegment(segment_1_2_id, 2, RUNDOWN_1_ID, { segmentTiming: { budgetDuration: 15000 } }), makeTestSegment(segment_1_1_id, 1, RUNDOWN_1_ID, { segmentTiming: { budgetDuration: 5000 } }), ]) mockSubscriber.send.mockClear() - await topic.update(PartsHandler.name, [ + handlers.partsHandler.notify([ makeTestPart('1_2_1', 1, RUNDOWN_1_ID, segment_1_2_id), makeTestPart('2_2_1', 1, RUNDOWN_1_ID, segment_2_2_id), makeTestPart('1_2_2', 2, RUNDOWN_1_ID, segment_1_2_id), @@ -319,11 +295,12 @@ describe('SegmentsTopic', () => { }) it('exposes expectedDuration', async () => { - const topic = new SegmentsTopic(makeMockLogger()) + const handlers = makeMockHandlers() + const topic = new SegmentsTopic(makeMockLogger(), handlers) const mockSubscriber = makeMockSubscriber() const testPlaylist = makeTestPlaylist() - await topic.update(PlaylistHandler.name, testPlaylist) + handlers.playlistHandler.notify(testPlaylist) topic.addSubscriber(mockSubscriber) mockSubscriber.send.mockClear() @@ -331,14 +308,14 @@ describe('SegmentsTopic', () => { const segment_1_1_id = '1_1' const segment_1_2_id = '1_2' const segment_2_2_id = '2_2' - await topic.update(SegmentsHandler.name, [ + handlers.segmentsHandler.notify([ makeTestSegment('2_1', 1, RUNDOWN_2_ID), makeTestSegment(segment_2_2_id, 2, RUNDOWN_2_ID), makeTestSegment(segment_1_2_id, 2, RUNDOWN_1_ID), makeTestSegment(segment_1_1_id, 1, RUNDOWN_1_ID), ]) mockSubscriber.send.mockClear() - await topic.update(PartsHandler.name, [ + handlers.partsHandler.notify([ makeTestPart('1_2_1', 1, RUNDOWN_1_ID, segment_1_2_id, { expectedDurationWithTransition: 10000, }), @@ -401,12 +378,13 @@ describe('SegmentsTopic', () => { }) it('includes segment identifier', async () => { - const topic = new SegmentsTopic(makeMockLogger()) + const handlers = makeMockHandlers() + const topic = new SegmentsTopic(makeMockLogger(), handlers) const mockSubscriber = makeMockSubscriber() const testPlaylist = makeTestPlaylist() - await topic.update(PlaylistHandler.name, testPlaylist) - await topic.update(SegmentsHandler.name, [ + handlers.playlistHandler.notify(testPlaylist) + handlers.segmentsHandler.notify([ { ...makeTestSegment('1_2', 2, RUNDOWN_1_ID), identifier: 'SomeIdentifier' }, makeTestSegment('1_1', 1, RUNDOWN_1_ID), ]) @@ -416,7 +394,7 @@ describe('SegmentsTopic', () => { const testPlaylist2 = makeTestPlaylist() testPlaylist2.rundownIdsInOrder = [protectString(RUNDOWN_2_ID), protectString(RUNDOWN_1_ID)] - await topic.update(PlaylistHandler.name, testPlaylist2) + handlers.playlistHandler.notify(testPlaylist2) jest.advanceTimersByTime(THROTTLE_PERIOD_MS) const expectedStatus: SegmentsStatus = { diff --git a/packages/live-status-gateway/src/topics/__tests__/utils.ts b/packages/live-status-gateway/src/topics/__tests__/utils.ts index f3fb929a09..de3e007eb9 100644 --- a/packages/live-status-gateway/src/topics/__tests__/utils.ts +++ b/packages/live-status-gateway/src/topics/__tests__/utils.ts @@ -7,6 +7,7 @@ import { ShowStyleBaseExt } from '../../collections/showStyleBaseHandler' import { Logger } from 'winston' import { WebSocket } from 'ws' import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part' +import { CollectionHandlers } from '../../liveStatusServer' const RUNDOWN_1_ID = 'RUNDOWN_1' const RUNDOWN_2_ID = 'RUNDOWN_2' @@ -58,3 +59,41 @@ export function makeTestParts(): DBPart[] { }, ] } + +export function makeMockHandlers(): CollectionHandlers { + return { + adLibActionsHandler: makeMockHandler(), + adLibsHandler: makeMockHandler(), + bucketAdLibActionsHandler: makeMockHandler(), + bucketAdLibsHandler: makeMockHandler(), + bucketsHandler: makeMockHandler(), + globalAdLibActionsHandler: makeMockHandler(), + globalAdLibsHandler: makeMockHandler(), + partHandler: makeMockHandler(), + partInstancesHandler: makeMockHandler(), + partsHandler: makeMockHandler(), + pieceContentStatusesHandler: makeMockHandler(), + pieceInstancesHandler: makeMockHandler(), + playlistHandler: makeMockHandler(), + playlistsHandler: makeMockHandler(), + rundownHandler: makeMockHandler(), + segmentHandler: makeMockHandler(), + segmentsHandler: makeMockHandler(), + showStyleBaseHandler: makeMockHandler(), + studioHandler: makeMockHandler(), + } as unknown as CollectionHandlers +} + +function makeMockHandler() { + const subscribers: Array<(data: unknown) => void> = [] + return { + subscribe: (callback: (data: unknown) => void) => { + subscribers.push(callback) + }, + notify: (data: unknown) => { + subscribers.forEach((callback) => { + callback(data) + }) + }, + } +} diff --git a/packages/live-status-gateway/src/topics/activePiecesTopic.ts b/packages/live-status-gateway/src/topics/activePiecesTopic.ts index fcb53adf4f..abe735ecc1 100644 --- a/packages/live-status-gateway/src/topics/activePiecesTopic.ts +++ b/packages/live-status-gateway/src/topics/activePiecesTopic.ts @@ -1,15 +1,14 @@ import { Logger } from 'winston' import { WebSocket } from 'ws' import { unprotectString } from '@sofie-automation/shared-lib/dist/lib/protectedString' -import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' import { literal } from '@sofie-automation/shared-lib/dist/lib/lib' -import { WebSocketTopicBase, WebSocketTopic, CollectionObserver } from '../wsHandler' -import { PlaylistHandler } from '../collections/playlistHandler' -import { ShowStyleBaseExt, ShowStyleBaseHandler } from '../collections/showStyleBaseHandler' -import _ = require('underscore') -import { SelectedPieceInstances, PieceInstancesHandler, PieceInstanceMin } from '../collections/pieceInstancesHandler' +import { WebSocketTopicBase, WebSocketTopic, PickArr } from '../wsHandler' +import { ShowStyleBaseExt } from '../collections/showStyleBaseHandler' +import { SelectedPieceInstances, PieceInstanceMin } from '../collections/pieceInstancesHandler' import { PieceStatus, toPieceStatus } from './helpers/pieceStatus' import { RundownPlaylistId } from '@sofie-automation/corelib/dist/dataModel/Ids' +import { CollectionHandlers } from '../liveStatusServer' +import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' const THROTTLE_PERIOD_MS = 100 @@ -19,31 +18,23 @@ export interface ActivePiecesStatus { activePieces: PieceStatus[] } -export class ActivePiecesTopic - extends WebSocketTopicBase - implements - WebSocketTopic, - CollectionObserver, - CollectionObserver, - CollectionObserver -{ - public observerName = ActivePiecesTopic.name +const PLAYLIST_KEYS = ['_id', 'activationId'] as const +type Playlist = PickArr + +const PIECE_INSTANCES_KEYS = ['active'] as const +type PieceInstances = PickArr + +export class ActivePiecesTopic extends WebSocketTopicBase implements WebSocketTopic { private _activePlaylistId: RundownPlaylistId | undefined private _activePieceInstances: PieceInstanceMin[] | undefined private _showStyleBaseExt: ShowStyleBaseExt | undefined - private throttledSendStatusToAll: () => void - constructor(logger: Logger) { - super(ActivePiecesTopic.name, logger) - this.throttledSendStatusToAll = _.throttle(this.sendStatusToAll.bind(this), THROTTLE_PERIOD_MS, { - leading: false, - trailing: true, - }) - } + constructor(logger: Logger, handlers: CollectionHandlers) { + super(ActivePiecesTopic.name, logger, THROTTLE_PERIOD_MS) - addSubscriber(ws: WebSocket): void { - super.addSubscriber(ws) - this.sendStatus([ws]) + handlers.playlistHandler.subscribe(this.onPlaylistUpdate, PLAYLIST_KEYS) + handlers.showStyleBaseHandler.subscribe(this.onShowStyleBaseUpdate) + handlers.pieceInstancesHandler.subscribe(this.onPieceInstancesUpdate, PIECE_INSTANCES_KEYS) } sendStatus(subscribers: Iterable): void { @@ -63,53 +54,31 @@ export class ActivePiecesTopic this.sendMessage(subscribers, message) } - async update( - source: string, - data: DBRundownPlaylist | ShowStyleBaseExt | SelectedPieceInstances | undefined - ): Promise { - let hasAnythingChanged = false - switch (source) { - case PlaylistHandler.name: { - const rundownPlaylist = data ? (data as DBRundownPlaylist) : undefined - this._logger.info( - `${this._name} received playlist update ${rundownPlaylist?._id}, activationId ${rundownPlaylist?.activationId}` - ) - const previousActivePlaylistId = this._activePlaylistId - this._activePlaylistId = unprotectString(rundownPlaylist?.activationId) - ? rundownPlaylist?._id - : undefined + protected onShowStyleBaseUpdate = (showStyleBase: ShowStyleBaseExt | undefined): void => { + this.logUpdateReceived('showStyleBase') + this._showStyleBaseExt = showStyleBase + this.throttledSendStatusToAll() + } - if (previousActivePlaylistId !== this._activePlaylistId) { - hasAnythingChanged = true - } - break - } - case ShowStyleBaseHandler.name: { - const showStyleBaseExt = data ? (data as ShowStyleBaseExt) : undefined - this._logger.info(`${this._name} received showStyleBase update from ${source}`) - this._showStyleBaseExt = showStyleBaseExt - hasAnythingChanged = true - break - } - case PieceInstancesHandler.name: { - const pieceInstances = data as SelectedPieceInstances - this._logger.info(`${this._name} received pieceInstances update from ${source}`) - if (pieceInstances.active !== this._activePieceInstances) { - hasAnythingChanged = true - } - this._activePieceInstances = pieceInstances.active - break - } - default: - throw new Error(`${this._name} received unsupported update from ${source}}`) - } + protected onPlaylistUpdate = (rundownPlaylist: Playlist | undefined): void => { + this.logUpdateReceived( + 'playlist', + `rundownPlaylistId ${rundownPlaylist?._id}, activationId ${rundownPlaylist?.activationId}` + ) + const previousActivePlaylistId = this._activePlaylistId + this._activePlaylistId = unprotectString(rundownPlaylist?.activationId) ? rundownPlaylist?._id : undefined - if (hasAnythingChanged) { + if (previousActivePlaylistId !== this._activePlaylistId) { this.throttledSendStatusToAll() } } - private sendStatusToAll() { - this.sendStatus(this._subscribers) + protected onPieceInstancesUpdate = (pieceInstances: PieceInstances | undefined): void => { + this.logUpdateReceived('pieceInstances') + const prevPieceInstances = this._activePieceInstances + this._activePieceInstances = pieceInstances?.active + if (prevPieceInstances !== this._activePieceInstances) { + this.throttledSendStatusToAll() + } } } diff --git a/packages/live-status-gateway/src/topics/activePlaylistTopic.ts b/packages/live-status-gateway/src/topics/activePlaylistTopic.ts index b557a4e1d9..faa3cac47c 100644 --- a/packages/live-status-gateway/src/topics/activePlaylistTopic.ts +++ b/packages/live-status-gateway/src/topics/activePlaylistTopic.ts @@ -8,22 +8,20 @@ import { } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' import { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance' import { assertNever, literal } from '@sofie-automation/shared-lib/dist/lib/lib' -import { WebSocketTopicBase, WebSocketTopic, CollectionObserver } from '../wsHandler' -import { SelectedPartInstances, PartInstancesHandler } from '../collections/partInstancesHandler' -import { PlaylistHandler } from '../collections/playlistHandler' -import { ShowStyleBaseExt, ShowStyleBaseHandler } from '../collections/showStyleBaseHandler' +import { SelectedPartInstances } from '../collections/partInstancesHandler' +import { ShowStyleBaseExt } from '../collections/showStyleBaseHandler' +import { WebSocketTopicBase, WebSocketTopic, PickArr } from '../wsHandler' import { CurrentSegmentTiming, calculateCurrentSegmentTiming } from './helpers/segmentTiming' import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part' -import { PartsHandler } from '../collections/partsHandler' import _ = require('underscore') import { PartTiming, calculateCurrentPartTiming } from './helpers/partTiming' -import { SelectedPieceInstances, PieceInstancesHandler, PieceInstanceMin } from '../collections/pieceInstancesHandler' +import { SelectedPieceInstances, PieceInstanceMin } from '../collections/pieceInstancesHandler' import { PieceStatus, toPieceStatus } from './helpers/pieceStatus' import { DBSegment } from '@sofie-automation/corelib/dist/dataModel/Segment' -import { SegmentHandler } from '../collections/segmentHandler' import { PlaylistTimingType } from '@sofie-automation/blueprints-integration' -import { SegmentsHandler } from '../collections/segmentsHandler' import { normalizeArray } from '@sofie-automation/corelib/dist/lib' +import { CollectionHandlers } from '../liveStatusServer' +import areElementsShallowEqual from '@sofie-automation/shared-lib/dist/lib/isShallowEqual' const THROTTLE_PERIOD_MS = 100 @@ -78,19 +76,31 @@ export interface ActivePlaylistStatus { } } -export class ActivePlaylistTopic - extends WebSocketTopicBase - implements - WebSocketTopic, - CollectionObserver, - CollectionObserver, - CollectionObserver, - CollectionObserver, - CollectionObserver, - CollectionObserver -{ - public observerName = ActivePlaylistTopic.name - private _activePlaylist: DBRundownPlaylist | undefined +const PLAYLIST_KEYS = [ + '_id', + 'activationId', + 'name', + 'rundownIdsInOrder', + 'publicData', + 'currentPartInfo', + 'nextPartInfo', + 'timing', + 'startedPlayback', + 'quickLoop', +] as const +type Playlist = PickArr + +const PART_INSTANCES_KEYS = ['current', 'next', 'inCurrentSegment', 'firstInSegmentPlayout'] as const +type PartInstances = PickArr + +const PIECE_INSTANCES_KEYS = ['currentPartInstance', 'nextPartInstance'] as const +type PieceInstances = PickArr + +const SEGMENT_KEYS = ['_id', 'segmentTiming'] as const +type Segment = PickArr + +export class ActivePlaylistTopic extends WebSocketTopicBase implements WebSocketTopic { + private _activePlaylist: Playlist | undefined private _currentPartInstance: DBPartInstance | undefined private _nextPartInstance: DBPartInstance | undefined private _firstInstanceInSegmentPlayout: DBPartInstance | undefined @@ -101,25 +111,24 @@ export class ActivePlaylistTopic private _pieceInstancesInCurrentPartInstance: PieceInstanceMin[] | undefined private _pieceInstancesInNextPartInstance: PieceInstanceMin[] | undefined private _showStyleBaseExt: ShowStyleBaseExt | undefined - private _currentSegment: DBSegment | undefined - private throttledSendStatusToAll: () => void - - constructor(logger: Logger) { - super(ActivePlaylistTopic.name, logger) - this.throttledSendStatusToAll = _.throttle(this.sendStatusToAll.bind(this), THROTTLE_PERIOD_MS, { - leading: false, - trailing: true, - }) - } + private _currentSegment: Segment | undefined + + constructor(logger: Logger, handlers: CollectionHandlers) { + super(ActivePlaylistTopic.name, logger, THROTTLE_PERIOD_MS) - addSubscriber(ws: WebSocket): void { - super.addSubscriber(ws) - this.sendStatus([ws]) + handlers.playlistHandler.subscribe(this.onPlaylistUpdate, PLAYLIST_KEYS) + handlers.partsHandler.subscribe(this.onPartsUpdate) + handlers.partInstancesHandler.subscribe(this.onPartInstancesUpdate, PART_INSTANCES_KEYS) + handlers.pieceInstancesHandler.subscribe(this.onPieceInstancesUpdate, PIECE_INSTANCES_KEYS) + handlers.showStyleBaseHandler.subscribe(this.onShowStyleBaseUpdate) + handlers.segmentHandler.subscribe(this.onSegmentUpdate, SEGMENT_KEYS) + handlers.segmentsHandler.subscribe(this.onSegmentsUpdate) } sendStatus(subscribers: Iterable): void { if (this.isDataInconsistent()) { // data is inconsistent, let's wait + this._logger.debug('Encountered inconsistent data.') return } @@ -155,7 +164,7 @@ export class ActivePlaylistTopic ? literal({ id: unprotectString(currentPart.segmentId), timing: calculateCurrentSegmentTiming( - this._currentSegment, + this._currentSegment.segmentTiming, this._currentPartInstance, this._firstInstanceInSegmentPlayout, this._partInstancesInCurrentSegment, @@ -282,94 +291,69 @@ export class ActivePlaylistTopic ) } - async update( - source: string, - data: - | DBRundownPlaylist - | ShowStyleBaseExt - | SelectedPartInstances - | DBPart[] - | SelectedPieceInstances - | DBSegment - | DBSegment[] - | undefined - ): Promise { - let hasAnythingChanged = false - switch (source) { - case PlaylistHandler.name: { - const rundownPlaylist = data ? (data as DBRundownPlaylist) : undefined - this.logUpdateReceived( - 'playlist', - source, - `rundownPlaylistId ${rundownPlaylist?._id}, activationId ${rundownPlaylist?.activationId}` - ) - this._activePlaylist = unprotectString(rundownPlaylist?.activationId) ? rundownPlaylist : undefined - hasAnythingChanged = true - break - } - case ShowStyleBaseHandler.name: { - const showStyleBaseExt = data ? (data as ShowStyleBaseExt) : undefined - this.logUpdateReceived('showStyleBase', source) - this._showStyleBaseExt = showStyleBaseExt - hasAnythingChanged = true - break - } - case PartInstancesHandler.name: { - const partInstances = data as SelectedPartInstances - this.logUpdateReceived( - 'partInstances', - source, - `${partInstances.inCurrentSegment.length} instances in segment` - ) - this._currentPartInstance = partInstances.current - this._nextPartInstance = partInstances.next - this._firstInstanceInSegmentPlayout = partInstances.firstInSegmentPlayout - this._partInstancesInCurrentSegment = partInstances.inCurrentSegment - hasAnythingChanged = true - break - } - case PartsHandler.name: { - this._partsById = normalizeArray(data as DBPart[], '_id') - this._partsBySegmentId = _.groupBy(data as DBPart[], 'segmentId') - this.logUpdateReceived('parts', source) - hasAnythingChanged = true // TODO: can this be smarter? - break - } - case PieceInstancesHandler.name: { - const pieceInstances = data as SelectedPieceInstances - this.logUpdateReceived('pieceInstances', source) - if ( - pieceInstances.currentPartInstance !== this._pieceInstancesInCurrentPartInstance || - pieceInstances.nextPartInstance !== this._pieceInstancesInNextPartInstance - ) { - hasAnythingChanged = true - } - this._pieceInstancesInCurrentPartInstance = pieceInstances.currentPartInstance - this._pieceInstancesInNextPartInstance = pieceInstances.nextPartInstance - break - } - case SegmentHandler.name: { - this._currentSegment = data as DBSegment - this.logUpdateReceived('segment', source) - hasAnythingChanged = true - break - } - case SegmentsHandler.name: { - this._segmentsById = normalizeArray(data as DBSegment[], '_id') - this.logUpdateReceived('segments', source) - hasAnythingChanged = true // TODO: can this be smarter? - break - } - default: - throw new Error(`${this._name} received unsupported update from ${source}}`) - } + private onPlaylistUpdate = (rundownPlaylist: Playlist | undefined): void => { + this.logUpdateReceived( + 'playlist', + `rundownPlaylistId ${rundownPlaylist?._id}, activationId ${rundownPlaylist?.activationId}` + ) + this._activePlaylist = unprotectString(rundownPlaylist?.activationId) ? rundownPlaylist : undefined + + this.throttledSendStatusToAll() + } - if (hasAnythingChanged) { + private onPartsUpdate = (parts: DBPart[] | undefined): void => { + const previousParts = this._partsBySegmentId + this._partsBySegmentId = _.groupBy(parts ?? [], 'segmentId') + this.logUpdateReceived('parts') + + const currentSegmentId = unprotectString(this._currentPartInstance?.segmentId) + if ( + currentSegmentId && + !areElementsShallowEqual( + previousParts[currentSegmentId] ?? [], + this._partsBySegmentId[currentSegmentId] ?? [] + ) + ) { + // we have to collect all the parts, but only when those from the current segment change, we should update status this.throttledSendStatusToAll() } } - private sendStatusToAll() { - this.sendStatus(this._subscribers) + private onPartInstancesUpdate = (partInstances: PartInstances | undefined): void => { + this.logUpdateReceived('partInstances', `${partInstances?.inCurrentSegment.length} instances in segment`) + + if (!partInstances) return + this._currentPartInstance = partInstances.current + this._nextPartInstance = partInstances.next + this._firstInstanceInSegmentPlayout = partInstances.firstInSegmentPlayout + this._partInstancesInCurrentSegment = partInstances.inCurrentSegment + this.throttledSendStatusToAll() + } + + private onPieceInstancesUpdate = (pieceInstances: PieceInstances | undefined): void => { + this.logUpdateReceived('pieceInstances') + if (!pieceInstances) return + + this._pieceInstancesInCurrentPartInstance = pieceInstances.currentPartInstance + this._pieceInstancesInNextPartInstance = pieceInstances.nextPartInstance + this.throttledSendStatusToAll() + } + + private onShowStyleBaseUpdate = (showStyleBase: ShowStyleBaseExt | undefined): void => { + this.logUpdateReceived('showStyleBase') + this._showStyleBaseExt = showStyleBase + this.throttledSendStatusToAll() + } + + private onSegmentUpdate = (segment: Segment | undefined): void => { + this.logUpdateReceived('segment') + this._currentSegment = segment + this.throttledSendStatusToAll() + } + + private onSegmentsUpdate = (segments: DBSegment[] | undefined): void => { + this.logUpdateReceived('segments') + this._segmentsById = segments ? normalizeArray(segments, '_id') : {} + this.throttledSendStatusToAll() // TODO: can this be smarter? } } diff --git a/packages/live-status-gateway/src/topics/adLibsTopic.ts b/packages/live-status-gateway/src/topics/adLibsTopic.ts index f3f07b8557..cfcd40644d 100644 --- a/packages/live-status-gateway/src/topics/adLibsTopic.ts +++ b/packages/live-status-gateway/src/topics/adLibsTopic.ts @@ -1,29 +1,21 @@ import { Logger } from 'winston' import { WebSocket } from 'ws' import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' -import { WebSocketTopicBase, WebSocketTopic, CollectionObserver } from '../wsHandler' -import { PlaylistHandler } from '../collections/playlistHandler' +import { WebSocketTopicBase, WebSocketTopic, PickArr } from '../wsHandler' import { literal } from '@sofie-automation/corelib/dist/lib' import { unprotectString } from '@sofie-automation/shared-lib/dist/lib/protectedString' -import _ = require('underscore') import { AdLibAction } from '@sofie-automation/corelib/dist/dataModel/AdlibAction' import { RundownBaselineAdLibAction } from '@sofie-automation/corelib/dist/dataModel/RundownBaselineAdLibAction' -import { AdLibActionsHandler } from '../collections/adLibActionsHandler' -import { GlobalAdLibActionsHandler } from '../collections/globalAdLibActionsHandler' import { AdLibPiece } from '@sofie-automation/corelib/dist/dataModel/AdLibPiece' import { RundownBaselineAdLibItem } from '@sofie-automation/corelib/dist/dataModel/RundownBaselineAdLibPiece' import { IBlueprintActionManifestDisplayContent } from '@sofie-automation/blueprints-integration' -import { ShowStyleBaseExt, ShowStyleBaseHandler } from '../collections/showStyleBaseHandler' +import { ShowStyleBaseExt } from '../collections/showStyleBaseHandler' import { interpollateTranslation } from '@sofie-automation/corelib/dist/TranslatableMessage' -import { AdLibsHandler } from '../collections/adLibsHandler' -import { GlobalAdLibsHandler } from '../collections/globalAdLibsHandler' import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part' -import { PartsHandler } from '../collections/partsHandler' import { PartId, SegmentId } from '@sofie-automation/corelib/dist/dataModel/Ids' import { WithSortingMetadata, getRank, sortContent } from './helpers/contentSorting' -import { isDeepStrictEqual } from 'util' import { DBSegment } from '@sofie-automation/corelib/dist/dataModel/Segment' -import { SegmentsHandler } from '../collections/segmentsHandler' +import { CollectionHandlers } from '../liveStatusServer' const THROTTLE_PERIOD_MS = 100 @@ -57,39 +49,34 @@ interface AdLibStatusBase { optionsSchema?: any } -export class AdLibsTopic - extends WebSocketTopicBase - implements - WebSocketTopic, - CollectionObserver, - CollectionObserver, - CollectionObserver, - CollectionObserver, - CollectionObserver -{ - public observerName = AdLibsTopic.name - private _activePlaylist: DBRundownPlaylist | undefined +const PLAYLIST_KEYS = ['_id', 'rundownIdsInOrder', 'activationId'] as const +type Playlist = PickArr + +const SHOW_STYLE_BASE_KEYS = ['sourceLayerNamesById', 'outputLayerNamesById'] as const +type ShowStyle = PickArr + +export class AdLibsTopic extends WebSocketTopicBase implements WebSocketTopic { + private _activePlaylist: Playlist | undefined private _sourceLayersMap: ReadonlyMap = new Map() private _outputLayersMap: ReadonlyMap = new Map() private _adLibActions: AdLibAction[] | undefined - private _abLibs: AdLibPiece[] | undefined + private _adLibs: AdLibPiece[] | undefined private _parts: ReadonlyMap = new Map() private _segments: ReadonlyMap = new Map() private _globalAdLibActions: RundownBaselineAdLibAction[] | undefined private _globalAdLibs: RundownBaselineAdLibItem[] | undefined - private throttledSendStatusToAll: () => void - constructor(logger: Logger) { - super(AdLibsTopic.name, logger) - this.throttledSendStatusToAll = _.throttle(this.sendStatusToAll.bind(this), THROTTLE_PERIOD_MS, { - leading: true, - trailing: true, - }) - } + constructor(logger: Logger, handlers: CollectionHandlers) { + super(AdLibsTopic.name, logger, THROTTLE_PERIOD_MS) - addSubscriber(ws: WebSocket): void { - super.addSubscriber(ws) - this.sendStatus([ws]) + handlers.playlistHandler.subscribe(this.onPlaylistUpdate, PLAYLIST_KEYS) + handlers.showStyleBaseHandler.subscribe(this.onShowStyleBaseUpdate, SHOW_STYLE_BASE_KEYS) + handlers.adLibActionsHandler.subscribe(this.onAdLibActionsUpdate) + handlers.adLibsHandler.subscribe(this.onAdLibsUpdate) + handlers.globalAdLibActionsHandler.subscribe(this.onGlobalAdLibActionsUpdate) + handlers.globalAdLibsHandler.subscribe(this.onGlobalAdLibsUpdate) + handlers.segmentsHandler.subscribe(this.onSegmentsUpdate) + handlers.partsHandler.subscribe(this.onPartsUpdate) } sendStatus(subscribers: Iterable): void { @@ -139,9 +126,9 @@ export class AdLibsTopic ) } - if (this._abLibs) { + if (this._adLibs) { adLibs.push( - ...this._abLibs.map((adLib) => { + ...this._adLibs.map((adLib) => { const sourceLayerName = this._sourceLayersMap.get(adLib.sourceLayerId) const outputLayerName = this._outputLayersMap.get(adLib.outputLayerId) const segmentId = adLib.partId ? this._parts.get(adLib.partId)?.segmentId : undefined @@ -242,92 +229,65 @@ export class AdLibsTopic this.sendMessage(subscribers, adLibsStatus) } - async update( - source: string, - data: - | DBRundownPlaylist - | ShowStyleBaseExt - | AdLibAction[] - | RundownBaselineAdLibAction[] - | AdLibPiece[] - | RundownBaselineAdLibItem[] - | DBPart[] - | DBSegment[] - | undefined - ): Promise { - switch (source) { - case PlaylistHandler.name: { - const previousPlaylist = this._activePlaylist - this.logUpdateReceived('playlist', source) - this._activePlaylist = data as DBRundownPlaylist | undefined - // PlaylistHandler is quite chatty (will update on every take), so let's make sure there's a point - // in sending a status - if ( - previousPlaylist?._id === this._activePlaylist?._id && - isDeepStrictEqual(previousPlaylist?.rundownIdsInOrder, this._activePlaylist?.rundownIdsInOrder) - ) - return - break - } - case AdLibActionsHandler.name: { - const adLibActions = data ? (data as AdLibAction[]) : [] - this.logUpdateReceived('adLibActions', source) - this._adLibActions = adLibActions - break - } - case GlobalAdLibActionsHandler.name: { - const globalAdLibActions = data ? (data as RundownBaselineAdLibAction[]) : [] - this.logUpdateReceived('globalAdLibActions', source) - this._globalAdLibActions = globalAdLibActions - break - } - case AdLibsHandler.name: { - const adLibs = data ? (data as AdLibPiece[]) : [] - this.logUpdateReceived('adLibs', source) - this._abLibs = adLibs - break - } - case GlobalAdLibsHandler.name: { - const globalAdLibs = data ? (data as RundownBaselineAdLibItem[]) : [] - this.logUpdateReceived('globalAdLibs', source) - this._globalAdLibs = globalAdLibs - break - } - case ShowStyleBaseHandler.name: { - const showStyleBaseExt = data ? (data as ShowStyleBaseExt) : undefined - this.logUpdateReceived('showStyleBase', source) - this._sourceLayersMap = showStyleBaseExt?.sourceLayerNamesById ?? new Map() - this._outputLayersMap = showStyleBaseExt?.outputLayerNamesById ?? new Map() - break - } - case SegmentsHandler.name: { - const segments = data ? (data as DBPart[]) : [] - this.logUpdateReceived('segments', source) - const newSegments = new Map() - segments.forEach((segment) => { - newSegments.set(segment._id, segment) - }) - this._segments = newSegments - break - } - case PartsHandler.name: { - const parts = data ? (data as DBPart[]) : [] - this.logUpdateReceived('parts', source) - const newParts = new Map() - parts.forEach((part) => { - newParts.set(part._id, part) - }) - this._parts = newParts - break - } - default: - throw new Error(`${this._name} received unsupported update from ${source}}`) - } + private onPlaylistUpdate = (rundownPlaylist: Playlist | undefined): void => { + this.logUpdateReceived( + 'playlist', + `rundownPlaylistId ${rundownPlaylist?._id}, activationId ${rundownPlaylist?.activationId}` + ) + this._activePlaylist = rundownPlaylist + this.throttledSendStatusToAll() + } + private onShowStyleBaseUpdate = (showStyleBase: ShowStyle | undefined): void => { + this.logUpdateReceived('showStyleBase') + this._sourceLayersMap = showStyleBase?.sourceLayerNamesById ?? new Map() + this._outputLayersMap = showStyleBase?.outputLayerNamesById ?? new Map() this.throttledSendStatusToAll() } - private sendStatusToAll() { - this.sendStatus(this._subscribers) + protected onAdLibActionsUpdate = (adLibActions: AdLibAction[] | undefined): void => { + this.logUpdateReceived('adLibActions') + this._adLibActions = adLibActions + this.throttledSendStatusToAll() + } + + protected onAdLibsUpdate = (adLibs: AdLibPiece[] | undefined): void => { + this.logUpdateReceived('adLibs') + this._adLibs = adLibs + this.throttledSendStatusToAll() + } + + protected onGlobalAdLibActionsUpdate = (adLibActions: RundownBaselineAdLibAction[] | undefined): void => { + this.logUpdateReceived('globalAdLibActions') + this._globalAdLibActions = adLibActions + this.throttledSendStatusToAll() + } + + protected onGlobalAdLibsUpdate = (adLibs: RundownBaselineAdLibItem[] | undefined): void => { + this.logUpdateReceived('globalAdLibs') + this._globalAdLibs = adLibs + this.throttledSendStatusToAll() + } + + protected onSegmentsUpdate = (segments: DBSegment[] | undefined): void => { + this.logUpdateReceived('segments') + const newSegments = new Map() + segments ??= [] + segments.forEach((segment) => { + newSegments.set(segment._id, segment) + }) + this._segments = newSegments + this.throttledSendStatusToAll() + } + + protected onPartsUpdate = (parts: DBPart[] | undefined): void => { + this.logUpdateReceived('parts') + const newParts = new Map() + parts ??= [] + parts.forEach((part) => { + newParts.set(part._id, part) + }) + this._parts = newParts + this.throttledSendStatusToAll() } } diff --git a/packages/live-status-gateway/src/topics/helpers/segmentTiming.ts b/packages/live-status-gateway/src/topics/helpers/segmentTiming.ts index 693dff9555..1f80f8515a 100644 --- a/packages/live-status-gateway/src/topics/helpers/segmentTiming.ts +++ b/packages/live-status-gateway/src/topics/helpers/segmentTiming.ts @@ -1,6 +1,6 @@ +import { SegmentTimingInfo } from '@sofie-automation/blueprints-integration' import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part' import { DBPartInstance } from '@sofie-automation/corelib/dist/dataModel/PartInstance' -import { DBSegment } from '@sofie-automation/corelib/dist/dataModel/Segment' export interface SegmentTiming { budgetDurationMs?: number @@ -13,13 +13,13 @@ export interface CurrentSegmentTiming extends SegmentTiming { } export function calculateCurrentSegmentTiming( - segment: DBSegment, + segmentTimingInfo: SegmentTimingInfo | undefined, currentPartInstance: DBPartInstance, firstInstanceInSegmentPlayout: DBPartInstance | undefined, segmentPartInstances: DBPartInstance[], segmentParts: DBPart[] ): CurrentSegmentTiming { - const segmentTiming = calculateSegmentTiming(segment, segmentParts) + const segmentTiming = calculateSegmentTiming(segmentTimingInfo, segmentParts) const playedDurations = segmentPartInstances.reduce((sum, partInstance) => { return (partInstance.timings?.duration ?? 0) + sum }, 0) @@ -39,14 +39,17 @@ export function calculateCurrentSegmentTiming( } } -export function calculateSegmentTiming(segment: DBSegment, segmentParts: DBPart[]): SegmentTiming { +export function calculateSegmentTiming( + segmentTimingInfo: SegmentTimingInfo | undefined, + segmentParts: DBPart[] +): SegmentTiming { return { - budgetDurationMs: segment.segmentTiming?.budgetDuration, + budgetDurationMs: segmentTimingInfo?.budgetDuration, expectedDurationMs: segmentParts.reduce((sum, part): number => { return part.expectedDurationWithTransition != null && !part.untimed ? sum + part.expectedDurationWithTransition : sum }, 0), - countdownType: segment.segmentTiming?.countdownType, + countdownType: segmentTimingInfo?.countdownType, } } diff --git a/packages/live-status-gateway/src/topics/root.ts b/packages/live-status-gateway/src/topics/root.ts index 6307aae73a..0c3f3cdca1 100644 --- a/packages/live-status-gateway/src/topics/root.ts +++ b/packages/live-status-gateway/src/topics/root.ts @@ -158,4 +158,8 @@ export class RootChannel extends WebSocketTopicBase implements WebSocketTopic { ) } } + + sendStatus(): void { + // no status here + } } diff --git a/packages/live-status-gateway/src/topics/segmentsTopic.ts b/packages/live-status-gateway/src/topics/segmentsTopic.ts index 3f9e17247a..66a671a866 100644 --- a/packages/live-status-gateway/src/topics/segmentsTopic.ts +++ b/packages/live-status-gateway/src/topics/segmentsTopic.ts @@ -1,17 +1,14 @@ import { Logger } from 'winston' import { WebSocket } from 'ws' import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' -import { WebSocketTopicBase, WebSocketTopic, CollectionObserver } from '../wsHandler' +import { WebSocketTopicBase, WebSocketTopic, PickArr } from '../wsHandler' import { DBSegment } from '@sofie-automation/corelib/dist/dataModel/Segment' -import { PlaylistHandler } from '../collections/playlistHandler' import { groupByToMap } from '@sofie-automation/corelib/dist/lib' import { unprotectString } from '@sofie-automation/shared-lib/dist/lib/protectedString' -import { SegmentsHandler } from '../collections/segmentsHandler' -import areElementsShallowEqual from '@sofie-automation/shared-lib/dist/lib/isShallowEqual' -import { PartsHandler } from '../collections/partsHandler' import { DBPart } from '@sofie-automation/corelib/dist/dataModel/Part' import _ = require('underscore') import { SegmentTiming, calculateSegmentTiming } from './helpers/segmentTiming' +import { CollectionHandlers } from '../liveStatusServer' const THROTTLE_PERIOD_MS = 200 @@ -30,32 +27,21 @@ export interface SegmentsStatus { segments: SegmentStatus[] } -export class SegmentsTopic - extends WebSocketTopicBase - implements - WebSocketTopic, - CollectionObserver, - CollectionObserver, - CollectionObserver -{ - public observerName = SegmentsTopic.name - private _activePlaylist: DBRundownPlaylist | undefined +const PLAYLIST_KEYS = ['_id', 'rundownIdsInOrder', 'activationId'] as const +type Playlist = PickArr + +export class SegmentsTopic extends WebSocketTopicBase implements WebSocketTopic { + private _activePlaylist: Playlist | undefined private _segments: DBSegment[] = [] private _partsBySegment: Record = {} private _orderedSegments: DBSegment[] = [] - private throttledSendStatusToAll: () => void - constructor(logger: Logger) { - super(SegmentsTopic.name, logger) - this.throttledSendStatusToAll = _.throttle(this.sendStatusToAll.bind(this), THROTTLE_PERIOD_MS, { - leading: true, - trailing: true, - }) - } + constructor(logger: Logger, handlers: CollectionHandlers) { + super(SegmentsTopic.name, logger, THROTTLE_PERIOD_MS) - addSubscriber(ws: WebSocket): void { - super.addSubscriber(ws) - this.sendStatus([ws]) + handlers.playlistHandler.subscribe(this.onPlaylistUpdate, PLAYLIST_KEYS) + handlers.segmentsHandler.subscribe(this.onSegmentsUpdate) + handlers.partsHandler.subscribe(this.onPartsUpdate) } sendStatus(subscribers: Iterable): void { @@ -68,7 +54,7 @@ export class SegmentsTopic id: segmentId, rundownId: unprotectString(segment.rundownId), name: segment.name, - timing: calculateSegmentTiming(segment, this._partsBySegment[segmentId] ?? []), + timing: calculateSegmentTiming(segment.segmentTiming, this._partsBySegment[segmentId] ?? []), identifier: segment.identifier, publicData: segment.publicData, } @@ -78,51 +64,33 @@ export class SegmentsTopic this.sendMessage(subscribers, segmentsStatus) } - async update(source: string, data: DBRundownPlaylist | DBSegment[] | DBPart[] | undefined): Promise { - const prevSegments = this._segments - const prevRundownOrder = this._activePlaylist?.rundownIdsInOrder ?? [] - const prevParts = this._partsBySegment - const prevPlaylistId = this._activePlaylist?._id - switch (source) { - case PlaylistHandler.name: { - this._activePlaylist = data as DBRundownPlaylist | undefined - this.logUpdateReceived('playlist', source) - break - } - case SegmentsHandler.name: { - this._segments = data as DBSegment[] - this.logUpdateReceived('segments', source) - break - } - case PartsHandler.name: { - this._partsBySegment = _.groupBy(data as DBPart[], 'segmentId') - this.logUpdateReceived('parts', source) - break - } - default: - throw new Error(`${this._name} received unsupported update from ${source}}`) - } + private onPlaylistUpdate = (rundownPlaylist: Playlist | undefined): void => { + this.logUpdateReceived( + 'playlist', + `rundownPlaylistId ${rundownPlaylist?._id}, activationId ${rundownPlaylist?.activationId}` + ) + this._activePlaylist = rundownPlaylist + this.updateAndSendStatusToAll() + } - if (this._activePlaylist) { - if ( - this._activePlaylist._id !== prevPlaylistId || - prevSegments !== this._segments || - prevParts !== this._partsBySegment || - !areElementsShallowEqual(prevRundownOrder, this._activePlaylist.rundownIdsInOrder) - ) { - const segmentsByRundownId = groupByToMap(this._segments, 'rundownId') - this._orderedSegments = this._activePlaylist.rundownIdsInOrder.flatMap((rundownId) => { - return segmentsByRundownId.get(rundownId)?.sort((a, b) => a._rank - b._rank) ?? [] - }) - this.throttledSendStatusToAll() - } - } else { - this._orderedSegments = [] - this.throttledSendStatusToAll() - } + protected onSegmentsUpdate = (segments: DBSegment[] | undefined): void => { + this.logUpdateReceived('segments') + this._segments = segments ?? [] + this.updateAndSendStatusToAll() + } + + protected onPartsUpdate = (parts: DBPart[] | undefined): void => { + this.logUpdateReceived('parts') + this._partsBySegment = _.groupBy(parts ?? [], 'segmentId') + this.updateAndSendStatusToAll() } - private sendStatusToAll() { - this.sendStatus(this._subscribers) + private updateAndSendStatusToAll() { + const segmentsByRundownId = groupByToMap(this._segments, 'rundownId') + this._orderedSegments = + this._activePlaylist?.rundownIdsInOrder.flatMap((rundownId) => { + return segmentsByRundownId.get(rundownId)?.sort((a, b) => a._rank - b._rank) ?? [] + }) ?? [] + this.throttledSendStatusToAll() } } diff --git a/packages/live-status-gateway/src/topics/studioTopic.ts b/packages/live-status-gateway/src/topics/studioTopic.ts index 945317718d..9b4da5addf 100644 --- a/packages/live-status-gateway/src/topics/studioTopic.ts +++ b/packages/live-status-gateway/src/topics/studioTopic.ts @@ -4,9 +4,9 @@ import { unprotectString } from '@sofie-automation/shared-lib/dist/lib/protected import { DBStudio } from '@sofie-automation/corelib/dist/dataModel/Studio' import { DBRundownPlaylist } from '@sofie-automation/corelib/dist/dataModel/RundownPlaylist' import { literal } from '@sofie-automation/shared-lib/dist/lib/lib' -import { WebSocketTopicBase, WebSocketTopic, CollectionObserver } from '../wsHandler' -import { StudioHandler } from '../collections/studioHandler' -import { PlaylistsHandler } from '../collections/playlistHandler' +import { WebSocketTopicBase, WebSocketTopic } from '../wsHandler' +import { CollectionHandlers } from '../liveStatusServer' +import _ = require('underscore') type PlaylistActivationStatus = 'deactivated' | 'rehearsal' | 'activated' @@ -23,21 +23,16 @@ interface StudioStatus { playlists: PlaylistStatus[] } -export class StudioTopic - extends WebSocketTopicBase - implements WebSocketTopic, CollectionObserver, CollectionObserver -{ - public observerName = 'StudioTopic' +export class StudioTopic extends WebSocketTopicBase implements WebSocketTopic { private _studio: DBStudio | undefined private _playlists: PlaylistStatus[] = [] + private _lastSentPlaylists: PlaylistStatus[] = [] - constructor(logger: Logger) { + constructor(logger: Logger, handlers: CollectionHandlers) { super(StudioTopic.name, logger) - } - addSubscriber(ws: WebSocket): void { - super.addSubscriber(ws) - this.sendStatus([ws]) + handlers.studioHandler.subscribe(this.onStudioUpdate) + handlers.playlistsHandler.subscribe(this.onPlaylistsUpdate) } sendStatus(subscribers: Iterable): void { @@ -58,42 +53,35 @@ export class StudioTopic this.sendMessage(subscribers, studioStatus) } - async update(source: string, data: DBStudio | DBRundownPlaylist[] | undefined): Promise { - const prevPlaylistsStatus = this._playlists - const rundownPlaylists = data ? (data as DBRundownPlaylist[]) : [] - const studio = data ? (data as DBStudio) : undefined - switch (source) { - case StudioHandler.name: - this.logUpdateReceived('studio', source, `studioId ${studio?._id}`) - this._studio = studio - break - case PlaylistsHandler.name: - this.logUpdateReceived('playlists', source) - this._playlists = rundownPlaylists.map((p) => { - let activationStatus: PlaylistActivationStatus = - p.activationId === undefined ? 'deactivated' : 'activated' - if (p.activationId && p.rehearsal) activationStatus = 'rehearsal' - return literal({ - id: unprotectString(p._id), - name: p.name, - activationStatus: activationStatus, - }) + private onStudioUpdate = (studio: DBStudio | undefined): void => { + this.logUpdateReceived('studio', `studioId ${studio?._id}`) + this._studio = studio + this.sendStatusToAll() + } + + private onPlaylistsUpdate = (rundownPlaylists: DBRundownPlaylist[] | undefined): void => { + this.logUpdateReceived('playlists') + this._playlists = + rundownPlaylists?.map((p) => { + let activationStatus: PlaylistActivationStatus = + p.activationId === undefined ? 'deactivated' : 'activated' + if (p.activationId && p.rehearsal) activationStatus = 'rehearsal' + return literal({ + id: unprotectString(p._id), + name: p.name, + activationStatus: activationStatus, }) - break - default: - throw new Error(`${this._name} received unsupported update from ${source}}`) - } + }) ?? [] + this.sendStatusToAll() + } + protected sendStatusToAll = (): void => { const sameStatus = - this._playlists.length === prevPlaylistsStatus.length && - this._playlists.reduce( - (same, status, i) => - same && - !!prevPlaylistsStatus[i] && - status.id === prevPlaylistsStatus[i].id && - status.activationStatus === prevPlaylistsStatus[i].activationStatus, - true - ) - if (!sameStatus) this.sendStatus(this._subscribers) + this._playlists.length === this._lastSentPlaylists.length && + _.isEqual(this._playlists, this._lastSentPlaylists) + if (!sameStatus) { + this.sendStatus(this._subscribers) + this._lastSentPlaylists = this._playlists + } } } diff --git a/packages/live-status-gateway/src/wsHandler.ts b/packages/live-status-gateway/src/wsHandler.ts index dfa5ef411b..b3dcb54954 100644 --- a/packages/live-status-gateway/src/wsHandler.ts +++ b/packages/live-status-gateway/src/wsHandler.ts @@ -1,25 +1,47 @@ import { StudioId } from '@sofie-automation/corelib/dist/dataModel/Ids' -import { CoreConnection, Observer, ProtectedString, SubscriptionId } from '@sofie-automation/server-core-integration' +import { + CollectionDocCheck, + CoreConnection, + Observer, + PeripheralDevicePubSubCollections, + ProtectedString, + SubscriptionId, +} from '@sofie-automation/server-core-integration' import { Logger } from 'winston' import { WebSocket } from 'ws' import { CoreHandler } from './coreHandler' -import { CorelibPubSub, CorelibPubSubCollections, CorelibPubSubTypes } from '@sofie-automation/corelib/dist/pubsub' +import { CorelibPubSubCollections, CorelibPubSubTypes } from '@sofie-automation/corelib/dist/pubsub' +import throttleToNextTick from '@sofie-automation/shared-lib/dist/lib/throttleToNextTick' +import _ = require('underscore') +import { Collection as CoreCollection } from '@sofie-automation/server-core-integration' +import { CollectionHandlers } from './liveStatusServer' +import { arePropertiesShallowEqual } from './helpers/equality' +import { ParametersOfFunctionOrNever } from '@sofie-automation/server-core-integration/dist/lib/subscriptions' export abstract class WebSocketTopicBase { protected _name: string protected _logger: Logger protected _subscribers: Set = new Set() + protected throttledSendStatusToAll: () => void - constructor(name: string, logger: Logger) { + constructor(name: string, logger: Logger, throttlePeriodMs = 0) { this._name = name this._logger = logger this._logger.info(`Starting ${this._name} topic`) + this.throttledSendStatusToAll = + throttlePeriodMs > 0 + ? _.throttle(this.sendStatusToAll, throttlePeriodMs, { + leading: false, + trailing: true, + }) + : this.sendStatusToAll } addSubscriber(ws: WebSocket): void { this._logger.info(`${this._name} adding a websocket subscriber`) this._subscribers.add(ws) + this.sendStatus([ws]) } hasSubscriber(ws: WebSocket): boolean { @@ -54,13 +76,19 @@ export abstract class WebSocketTopicBase { } } - protected logUpdateReceived(collectionName: string, source: string, extraInfo?: string): void { - let message = `${this._name} received ${collectionName} update from ${source}` + protected logUpdateReceived(collectionName: string, extraInfo?: string): void { + let message = `${this._name} received ${collectionName} update` if (extraInfo) { message += `, ${extraInfo}` } this._logger.debug(message) } + + abstract sendStatus(_subscribers: Iterable): void + + protected sendStatusToAll = (): void => { + this.sendStatus(this._subscribers) + } } export interface WebSocketTopic { @@ -71,108 +99,226 @@ export interface WebSocketTopic { sendMessage(ws: WebSocket, msg: object): void } -export type ObserverForCollection = T extends keyof CorelibPubSubCollections - ? Observer - : undefined +const DEFAULT_THROTTLE_PERIOD_MS = 20 -export abstract class CollectionBase< - T, - TPubSub extends CorelibPubSub | undefined, - TCollection extends keyof CorelibPubSubCollections -> { +export abstract class CollectionBase { protected _name: string protected _collectionName: TCollection - protected _publicationName: TPubSub protected _logger: Logger protected _coreHandler: CoreHandler protected _studioId!: StudioId - protected _subscribers: Set = new Set() - protected _observers: Set> = new Set() + protected _observers: Map< + ObserverCallback, + { keysToPick: readonly (keyof T)[] | undefined; lastData: T | undefined } + > = new Map() protected _collectionData: T | undefined - protected _subscriptionId: SubscriptionId | undefined - protected _dbObserver: ObserverForCollection | undefined protected get _core(): CoreConnection { return this._coreHandler.core } + protected throttledChanged: () => void - constructor(name: string, collection: TCollection, publication: TPubSub, logger: Logger, coreHandler: CoreHandler) { - this._name = name + constructor( + collection: TCollection, + logger: Logger, + coreHandler: CoreHandler, + throttlePeriodMs = DEFAULT_THROTTLE_PERIOD_MS + ) { + this._name = this.constructor.name this._collectionName = collection - this._publicationName = publication this._logger = logger this._coreHandler = coreHandler + this.throttledChanged = throttleToNextTick( + throttlePeriodMs > 0 + ? _.throttle(() => this.changed(), throttlePeriodMs, { leading: true, trailing: true }) + : () => this.changed() + ) + this._logger.info(`Starting ${this._name} handler`) } - async init(): Promise { + init(_handlers: CollectionHandlers): void { if (!this._coreHandler.studioId) throw new Error('StudioId is not defined') this._studioId = this._coreHandler.studioId } close(): void { this._logger.info(`Closing ${this._name} handler`) - if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId) - if (this._dbObserver) this._dbObserver.stop() } - async subscribe(observer: CollectionObserver): Promise { - this._logger.info(`${observer.observerName}' added observer for '${this._name}'`) - if (this._collectionData) await observer.update(this._name, this._collectionData) - this._observers.add(observer) + subscribe(callback: ObserverCallback, keysToPick?: readonly K[]): void { + //this._logger.info(`${name}' added observer for '${this._name}'`) + if (this._collectionData) callback(this._collectionData) + this._observers.set(callback, { keysToPick, lastData: this.shallowClone(this._collectionData) }) } - async unsubscribe(observer: CollectionObserver): Promise { - this._logger.info(`${observer.observerName}' removed observer for '${this._name}'`) - this._observers.delete(observer) + /** + * Called after a batch of updates to documents in the collection + */ + protected changed(): void { + // override me } - async notify(data: T | undefined): Promise { - for (const observer of this._observers) { - await observer.update(this._name, data) + notify(data: T | undefined): void { + for (const [observer, o] of this._observers) { + if ( + !o.lastData || + !o.keysToPick || + !data || + !arePropertiesShallowEqual(o.lastData, data, undefined, o.keysToPick) + ) { + observer(data) + o.lastData = this.shallowClone(data) + } } } + protected shallowClone(data: T | undefined): T | undefined { + if (data === undefined) return undefined + if (Array.isArray(data)) return [...data] as T + if (typeof data === 'object') return { ...data } + return data + } + protected logDocumentChange(documentId: string | ProtectedString, changeType: string): void { this._logger.silly(`${this._name} ${changeType} ${documentId}`) } protected logUpdateReceived(collectionName: string, updateCount: number | undefined): void - protected logUpdateReceived(collectionName: string, source: string, extraInfo?: string): void + protected logUpdateReceived(collectionName: string, extraInfo?: string): void protected logUpdateReceived( collectionName: string, - sourceOrUpdateCount: string | number | undefined, - extraInfo?: string + extraInfoOrUpdateCount: string | number | undefined | null = null ): void { - if (typeof sourceOrUpdateCount === 'string') { - let message = `${this._name} received ${collectionName} update from ${sourceOrUpdateCount}` - if (extraInfo) { - message += `, ${extraInfo}` - } - this._logger.debug(message) - } else { - this._logger.debug(`'${this._name}' handler received ${sourceOrUpdateCount} ${collectionName}`) + let message = `${this._name} received ${collectionName} update` + if (typeof extraInfoOrUpdateCount === 'string') { + message += `, ${extraInfoOrUpdateCount}` + } else if (extraInfoOrUpdateCount !== null) { + message += `(${extraInfoOrUpdateCount})` } + this._logger.debug(message) } protected logNotifyingUpdate(updateCount: number | undefined): void { this._logger.debug(`${this._name} notifying update with ${updateCount} ${this._collectionName}`) } + + protected getCollectionOrFail(): CoreCollection> { + const collection = this._core.getCollection(this._collectionName) + if (!collection) throw new Error(`collection '${this._collectionName}' not found!`) + return collection + } +} + +export abstract class PublicationCollection< + T, + TPubSub extends keyof CorelibPubSubTypes, + TCollection extends keyof CorelibPubSubCollections +> extends CollectionBase { + protected _publicationName: TPubSub + protected _subscriptionId: SubscriptionId | undefined + protected _subscriptionPending = false + protected _dbObserver: + | Observer> + | undefined + + constructor( + collection: TCollection, + publication: TPubSub, + logger: Logger, + coreHandler: CoreHandler, + throttlePeriodMs = DEFAULT_THROTTLE_PERIOD_MS + ) { + super(collection, logger, coreHandler, throttlePeriodMs) + this._publicationName = publication + } + + close(): void { + super.close() + if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId) + this._dbObserver?.stop() + } + + subscribe(callback: ObserverCallback, keysToPick?: readonly K[]): void { + //this._logger.info(`${name}' added observer for '${this._name}'`) + if (this._collectionData) callback(this._collectionData) + this._observers.set(callback, { keysToPick, lastData: this.shallowClone(this._collectionData) }) + } + + /** + * Called after a batch of updates to documents in the collection + */ + protected changed(): void { + // override me + } + + protected onDocumentEvent(id: ProtectedString | string, changeType: string): void { + this.logDocumentChange(id, changeType) + if (!this._subscriptionId) { + this._logger.silly(`${this._name} ${changeType} ${id} skipping (lack of subscription)`) + return + } + if (this._subscriptionPending) { + this._logger.silly(`${this._name} ${changeType} ${id} skipping (subscription pending)`) + return + } + this.throttledChanged() + } + + protected setupObserver(): void { + this._dbObserver = this._coreHandler.setupObserver(this._collectionName) + this._dbObserver.added = (id) => { + this.onDocumentEvent(id, 'added') + } + this._dbObserver.changed = (id) => { + this.onDocumentEvent(id, 'changed') + } + this._dbObserver.removed = (id) => { + this.onDocumentEvent(id, 'removed') + } + } + + protected stopSubscription(): void { + if (this._subscriptionId) this._coreHandler.unsubscribe(this._subscriptionId) + this._subscriptionId = undefined + this._dbObserver?.stop() + this._dbObserver = undefined + } + + protected setupSubscription(...args: ParametersOfFunctionOrNever): void { + if (!this._publicationName) throw new Error(`Publication name not set for '${this._name}'`) + this.stopSubscription() + this._subscriptionPending = true + this._coreHandler + .setupSubscription(this._publicationName, ...args) + .then((subscriptionId) => { + this._subscriptionId = subscriptionId + this.setupObserver() + }) + .catch((e) => this._logger.error(e)) + .finally(() => { + this._subscriptionPending = false + this.changed() + }) + } } export interface Collection { - init(): Promise + init(handlers: CollectionHandlers): void close(): void - subscribe(observer: CollectionObserver): Promise - unsubscribe(observer: CollectionObserver): Promise - notify(data: T | undefined): Promise + subscribe(callback: ObserverCallback, keys?: K[]): void + notify(data: T | undefined): void } -export interface CollectionObserver { - observerName: string - update(source: string, data: T | undefined): Promise -} +export type ObserverCallback = (data: Pick | undefined) => void + +export type PickArr = Pick + +// export interface CollectionObserver { +// observerName: string +// update(source: string, data: Pick | undefined): void +// } function isIterable(obj: T | Iterable): obj is Iterable { // checks for null and undefined if (obj == null) {