From 3451b6ed97e90b721e5e70800d1f5951d571f45e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5var=20Aamb=C3=B8=20Fosstveit?= Date: Tue, 12 Mar 2024 00:14:45 +0100 Subject: [PATCH] P2P update Have now moved the decision making of P2P producing down to the media level instead of inside the thunks. We might want to clone tracks instead of sharing one track among all the producers. --- .eslintrc | 4 +- Dockerfile | 2 - src/components/noiseslider/NoiseSlider.tsx | 16 +- src/components/unmutealert/UnmuteAlert.tsx | 7 +- src/components/videoview/VideoView.tsx | 4 +- src/components/volume/Volume.tsx | 6 +- src/services/mediaService.tsx | 369 ++++++++------------ src/services/signalingService.tsx | 6 + src/store/actions/managementActions.tsx | 15 +- src/store/actions/mediaActions.tsx | 340 ++++-------------- src/store/actions/roomActions.tsx | 9 +- src/store/middlewares/mediaMiddleware.tsx | 8 +- src/store/middlewares/peerMiddleware.tsx | 49 ++- src/store/selectors.tsx | 35 +- src/utils/mediaSender.tsx | 383 +++++++++++++++++++++ 15 files changed, 672 insertions(+), 581 deletions(-) create mode 100644 src/utils/mediaSender.tsx diff --git a/.eslintrc b/.eslintrc index 6d43cd10..08d19288 100644 --- a/.eslintrc +++ b/.eslintrc @@ -1,6 +1,6 @@ { "extends": [ "plugin:@typescript-eslint/recommended" ], - "plugins": ["react", "@typescript-eslint" ], + "plugins": [ "react", "@typescript-eslint" ], "env": { "browser": true, "es6": true @@ -155,4 +155,4 @@ "valid-typeof": 2, "yoda": 2 } -} \ No newline at end of file +} diff --git a/Dockerfile b/Dockerfile index faa8a73e..68b808da 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,8 +5,6 @@ WORKDIR /app COPY . . RUN yarn install --frozen-lockfile -RUN yarn add --dev eslint-config-react-app -RUN yarn add --dev eslint-plugin-import RUN yarn build FROM steebchen/nginx-spa:stable diff --git a/src/components/noiseslider/NoiseSlider.tsx b/src/components/noiseslider/NoiseSlider.tsx index 740b7a58..85e464b4 100644 --- a/src/components/noiseslider/NoiseSlider.tsx +++ b/src/components/noiseslider/NoiseSlider.tsx @@ -6,8 +6,6 @@ import { settingsActions } from '../../store/slices/settingsSlice'; import { ServiceContext } from '../../store/store'; import { VolumeWatcher } from '../../utils/volumeWatcher'; import { noiseSuppressionLabel } from '../translated/translatedComponents'; -import type { Producer } from 'mediasoup-client/lib/types'; -import type { Producer as PeerProducer } from 'ortc-p2p/src/types'; const StyledSlider = styled(Slider)(() => ({ '.MuiSlider-root': { @@ -43,14 +41,10 @@ const NoiseSlider = (): JSX.Element => { const [ sliderValue, setSliderValue ] = useState(noiseThreshold); useEffect(() => { - let producer: Producer | PeerProducer | undefined; let volumeWatcher: VolumeWatcher | undefined; if (micEnabled) - producer = mediaService.producers['mic']; - - if (producer) - volumeWatcher = producer.appData.volumeWatcher as VolumeWatcher | undefined; + volumeWatcher = mediaService.mediaSenders['mic'].volumeWatcher; const onVolumeChange = ({ volume }: { volume: number }): void => { setVolume(volume); @@ -63,7 +57,7 @@ const NoiseSlider = (): JSX.Element => { }; }, []); - const handleSliderChange = (event: Event, value: number | number[]): void => { + const handleSliderChange = (_event: Event, value: number | number[]): void => { setSliderValue(value as number); }; @@ -72,14 +66,10 @@ const NoiseSlider = (): JSX.Element => { value: number | number[] ): void => { if (sliderValue !== noiseThreshold) { - let producer: Producer | PeerProducer | undefined; let hark: Harker | undefined; if (micEnabled) - producer = mediaService.producers['mic']; - - if (producer) - hark = (producer.appData.volumeWatcher as VolumeWatcher | undefined)?.hark; + hark = mediaService.mediaSenders['mic'].volumeWatcher?.hark; hark?.setThreshold(value as number); dispatch(settingsActions.setNoiseThreshold(value as number)); diff --git a/src/components/unmutealert/UnmuteAlert.tsx b/src/components/unmutealert/UnmuteAlert.tsx index 7bb63059..41b4e317 100644 --- a/src/components/unmutealert/UnmuteAlert.tsx +++ b/src/components/unmutealert/UnmuteAlert.tsx @@ -2,7 +2,6 @@ import { Alert, Typography } from '@mui/material'; import { styled } from '@mui/material/styles'; import { useContext, useEffect, useState } from 'react'; import { ServiceContext } from '../../store/store'; -import { VolumeWatcher } from '../../utils/volumeWatcher'; import { mutedPTTLabel } from '../translated/translatedComponents'; import { useAppSelector } from '../../store/hooks'; @@ -31,11 +30,7 @@ const UnmuteAlert = (): JSX.Element => { const audioMuted = useAppSelector((state) => state.me.audioMuted); useEffect(() => { - const producer = mediaService.producers['mic']; - let volumeWatcher: VolumeWatcher | undefined; - - if (producer) - volumeWatcher = producer.appData.volumeWatcher as VolumeWatcher; + const volumeWatcher = mediaService.mediaSenders['mic'].volumeWatcher; const onVolumeChange = ({ scaledVolume }: { scaledVolume: number }): void => { setSpeaking(Boolean(scaledVolume)); diff --git a/src/components/videoview/VideoView.tsx b/src/components/videoview/VideoView.tsx index 416becf1..44f66bba 100644 --- a/src/components/videoview/VideoView.tsx +++ b/src/components/videoview/VideoView.tsx @@ -61,12 +61,12 @@ const VideoView = ({ useEffect(() => { let media: Consumer | PeerConsumer | undefined; - let track: MediaStreamTrack | undefined; + let track: MediaStreamTrack | null = null; if (previewTrack) track = mediaService.previewWebcamTrack; else if (source) - track = mediaService.tracks[source]; + track = mediaService.mediaSenders[source].track; else if (consumer) media = mediaService.getConsumer(consumer.id); diff --git a/src/components/volume/Volume.tsx b/src/components/volume/Volume.tsx index 6575fe1b..73e48bf8 100644 --- a/src/components/volume/Volume.tsx +++ b/src/components/volume/Volume.tsx @@ -1,11 +1,9 @@ import { styled } from '@mui/material'; import type { Consumer } from 'mediasoup-client/lib/Consumer'; -import type { Producer } from 'mediasoup-client/lib/Producer'; import { useContext, useEffect, useState } from 'react'; import { StateConsumer } from '../../store/slices/consumersSlice'; import { ServiceContext } from '../../store/store'; import { VolumeWatcher } from '../../utils/volumeWatcher'; -import type { Producer as PeerProducer } from 'ortc-p2p/src/types'; import type { Consumer as PeerConsumer } from 'ortc-p2p/src/types'; type VolumeBarProps = { @@ -50,13 +48,13 @@ const Volume = ({ const [ volume, setVolume ] = useState(0); useEffect(() => { - let media: Consumer | PeerConsumer | Producer | PeerProducer | undefined; + let media: Consumer | PeerConsumer | undefined; let volumeWatcher: VolumeWatcher | undefined; if (consumer) media = mediaService.getConsumer(consumer.id); else - media = mediaService.producers['mic']; + volumeWatcher = mediaService.mediaSenders['mic'].volumeWatcher; if (media) volumeWatcher = media.appData.volumeWatcher as VolumeWatcher | undefined; diff --git a/src/services/mediaService.tsx b/src/services/mediaService.tsx index b4bf135a..b38003c2 100644 --- a/src/services/mediaService.tsx +++ b/src/services/mediaService.tsx @@ -2,10 +2,8 @@ import EventEmitter from 'events'; import type { Device } from 'mediasoup-client'; import { Device as PeerDevice } from 'ortc-p2p/src/Device'; import type { Consumer as PeerConsumer } from 'ortc-p2p/src/Consumer'; -import type { Producer as PeerProducer } from 'ortc-p2p/src/Producer'; import type { Transport as PeerTransport } from 'ortc-p2p/src/Transport'; import type { Consumer } from 'mediasoup-client/lib/Consumer'; -import type { Producer, ProducerOptions } from 'mediasoup-client/lib/Producer'; import type { Transport } from 'mediasoup-client/lib/Transport'; import type { RtpCapabilities } from 'mediasoup-client/lib/RtpParameters'; import { SignalingService } from './signalingService'; @@ -17,6 +15,7 @@ import { ResolutionWatcher } from '../utils/resolutionWatcher'; import { Logger } from 'edumeet-common'; import { safePromise } from '../utils/safePromise'; import { ProducerSource } from '../utils/types'; +import { MediaSender } from '../utils/mediaSender'; const logger = new Logger('MediaService'); @@ -48,14 +47,9 @@ export interface PeerTranscript { done: boolean; } -type Producers = { +type MediaSenders = { // eslint-disable-next-line no-unused-vars - [key in ProducerSource]?: Producer; -}; - -type ProducersTracks = { - // eslint-disable-next-line no-unused-vars - [key in ProducerSource]?: MediaStreamTrack; + [key in ProducerSource]: MediaSender; }; export type Transcript = Omit; @@ -92,13 +86,7 @@ export declare interface MediaService { // eslint-disable-next-line no-unused-vars on(event: 'consumerScore', listener: (consumerId: string, score: number) => void): this; // eslint-disable-next-line no-unused-vars - on(event: 'producerClosed', listener: (producer: Producer) => void): this; - // eslint-disable-next-line no-unused-vars - on(event: 'producerPaused', listener: (producer: Producer) => void): this; - // eslint-disable-next-line no-unused-vars - on(event: 'producerResumed', listener: (producer: Producer) => void): this; - // eslint-disable-next-line no-unused-vars - on(event: 'producerScore', listener: (producerId: string, score: number) => void): this; + on(event: 'mediaClosed', listener: (source: ProducerSource) => void): this; // eslint-disable-next-line no-unused-vars on(event: 'transcriptionStarted', listener: () => void): this; // eslint-disable-next-line no-unused-vars @@ -113,29 +101,19 @@ export declare interface MediaService { export class MediaService extends EventEmitter { private signalingService: SignalingService; - private mediasoup?: Device; + public mediasoup?: Device; public iceServers: RTCIceServer[] = []; - private sendTransport: Transport | undefined; - private recvTransport: Transport | undefined; + public sendTransport: Transport | undefined; + public recvTransport: Transport | undefined; private consumers: Map = new Map(); private consumerCreationState: Map = new Map(); private dataConsumers: Map = new Map(); private dataProducers: Map = new Map(); - public previewMicTrack?: MediaStreamTrack; - public previewWebcamTrack?: MediaStreamTrack; + public previewMicTrack: MediaStreamTrack | null = null; + public previewWebcamTrack: MediaStreamTrack | null = null; - public producers: Producers = {}; - public peerProducers: Record> = { - mic: new Map(), - webcam: new Map(), - screen: new Map(), - screenaudio: new Map(), - extravideo: new Map(), - extraaudio: new Map(), - }; - - public tracks: ProducersTracks = {}; + public mediaSenders: MediaSenders; private peerDevices: Map = new Map(); // PeerId -> P2PDevice private peerSendTransports: Map> = new Map(); // PeerId -> P2PTransport @@ -160,6 +138,15 @@ export class MediaService extends EventEmitter { this.signalingService = signalingService; + this.mediaSenders = { + mic: new MediaSender(this, this.signalingService, 'mic').on('closed', () => this.emit('mediaClosed', 'mic')), + webcam: new MediaSender(this, this.signalingService, 'webcam').on('closed', () => this.emit('mediaClosed', 'webcam')), + screen: new MediaSender(this, this.signalingService, 'screen').on('closed', () => this.emit('mediaClosed', 'screen')), + screenaudio: new MediaSender(this, this.signalingService, 'screenaudio').on('closed', () => this.emit('mediaClosed', 'screenaudio')), + extravideo: new MediaSender(this, this.signalingService, 'extravideo').on('closed', () => this.emit('mediaClosed', 'extravideo')), + extraaudio: new MediaSender(this, this.signalingService, 'extraaudio').on('closed', () => this.emit('mediaClosed', 'extraaudio')), + }; + this.reset(); } @@ -196,6 +183,27 @@ export class MediaService extends EventEmitter { this.recvTransport?.close(); } + public setP2PMode(p2pMode: boolean): void { + logger.debug('setP2PMode() [p2pMode:%s]', p2pMode); + + for (const mediaSender of Object.values(this.mediaSenders)) { + if (p2pMode) mediaSender.startP2P(); + else mediaSender.stopP2P(); + } + } + + public addPeerId(peerId: string): void { + for (const mediaSender of Object.values(this.mediaSenders)) { + mediaSender.addPeerId(peerId); + } + } + + public removePeerId(peerId: string): void { + for (const mediaSender of Object.values(this.mediaSenders)) { + mediaSender.removePeerId(peerId); + } + } + public getConsumer(consumerId: string): Consumer | PeerConsumer | undefined { return this.consumers.get(consumerId); } @@ -264,40 +272,28 @@ export class MediaService extends EventEmitter { case 'peerConnect': { const { peerId, dtlsParameters, iceParameters, direction } = notification.data; + const transport = await this.getPeerTransport(peerId, direction === 'send' ? 'recv' : 'send'); - if (direction === 'recv') { - const p2pSendTransport = await this.getPeerSendTransport(peerId); - - await p2pSendTransport.connect({ dtlsParameters, iceParameters }); - } else { - const p2pRecvTransport = await this.getPeerRecvTransport(peerId); - - await p2pRecvTransport.connect({ dtlsParameters, iceParameters }); - } + await transport.connect({ dtlsParameters, iceParameters }); break; } case 'candidate': { const { peerId, candidate, direction } = notification.data; + const transport = await this.getPeerTransport(peerId, direction === 'send' ? 'recv' : 'send'); - if (direction === 'recv') { - const p2pSendTransport = await this.getPeerSendTransport(peerId); - - await p2pSendTransport.addIceCandidate({ candidate }); - } else { - const p2pRecvTransport = await this.getPeerRecvTransport(peerId); - - await p2pRecvTransport.addIceCandidate({ candidate }); - } - + await transport.addIceCandidate({ candidate }); + break; } case 'peerProduce': { const { peerId, id, kind, rtpParameters, appData } = notification.data; - const peerTransport = await this.getPeerRecvTransport(peerId); + this.consumerCreationState.set(id, { paused: false, closed: false }); + + const peerTransport = await this.getPeerTransport(peerId, 'recv'); const peerConsumer = await peerTransport.consume({ id, @@ -310,6 +306,23 @@ export class MediaService extends EventEmitter { }, }); + const { + paused: consumerPaused, + closed, + } = this.consumerCreationState.get(id) || { + paused: false, + closed: false, + }; + + peerConsumer.appData.producerPaused = consumerPaused; + + if (closed) { + this.consumerCreationState.delete(id); + peerConsumer.close(); + + return; + } + if (kind === 'audio') { const { track } = peerConsumer; const harkStream = new MediaStream(); @@ -333,6 +346,8 @@ export class MediaService extends EventEmitter { this.emit('consumerCreated', peerConsumer, false, false, true); + this.consumerCreationState.delete(id); + break; } @@ -341,6 +356,22 @@ export class MediaService extends EventEmitter { case 'peerResumeProducer': { const { producerId } = notification.data; + const consumer = this.consumers.get(producerId); + + if (!consumer) { + const consumerCreationState = this.consumerCreationState.get(producerId); + + if (consumerCreationState) { + if (notification.method === 'peerCloseProducer') consumerCreationState.closed = true; + if (notification.method === 'peerPauseProducer') consumerCreationState.paused = true; + if (notification.method === 'peerResumeProducer') consumerCreationState.paused = false; + + return; + } + + throw new Error('consumer not found'); + } + this.changeConsumer(producerId, changeEvent[notification.method] as MediaChange, false); break; @@ -393,13 +424,19 @@ export class MediaService extends EventEmitter { appData: { ...appData, peerId, + producerPaused, }, }); const { paused: consumerPaused, closed, - } = this.consumerCreationState.get(id) || {} as { paused: boolean; closed: boolean }; + } = this.consumerCreationState.get(id) || { + paused: producerPaused, + closed: false, + }; + + consumer.appData.producerPaused = consumerPaused; if (closed) { this.consumerCreationState.delete(id); @@ -447,6 +484,9 @@ export class MediaService extends EventEmitter { this.consumers.set(consumer.id, consumer); consumer.observer.once('close', () => this.consumers.delete(consumer.id)); consumer.once('transportclose', () => this.changeConsumer(consumer.id, 'close', false)); + + if (paused) this.changeConsumer(consumer.id, 'resume', true); + this.emit('consumerCreated', consumer, paused, consumerPaused, false); this.consumerCreationState.delete(id); @@ -522,6 +562,7 @@ export class MediaService extends EventEmitter { if (consumerCreationState) { if (notification.method === 'consumerClosed') consumerCreationState.closed = true; if (notification.method === 'consumerPaused') consumerCreationState.paused = true; + if (notification.method === 'consumerResumed') consumerCreationState.paused = false; return; } @@ -542,27 +583,6 @@ export class MediaService extends EventEmitter { break; } - case 'producerClosed': { - const { producerId } = notification.data; - - this.closeProducer(producerId, false); - - break; - } - - case 'newProducerLayer': { - const { producerId, spatialLayer } = notification.data; - - const producer = Object.values(this.producers).find((p) => p.id === producerId); - - if (!producer) - throw new Error('producer not found'); - - producer.setMaxSpatialLayer(spatialLayer); - - break; - } - case 'consumerScore': { const { consumerId, score: { score } } = notification.data; @@ -630,17 +650,25 @@ export class MediaService extends EventEmitter { const consumer = this.consumers.get(consumerId); - if (local && consumer) { + if (!consumer) return logger.warn('consumer not found'); + + if (local) { if (consumer.appData.peerConsumer) { this.signalingService.notify(`${change}PeerConsumer`, { consumerId: consumer.id }); } else { this.signalingService.notify(`${change}Consumer`, { consumerId: consumer.id }); } + } else if (!local) { + this.emit(`consumer${changeEvent[change]}`, consumer); } - if (!local) this.emit(`consumer${changeEvent[change]}`, consumer); - - consumer?.[`${change}`](); + if (change === 'close') { + consumer?.[`${change}`](); + } else if (local) { + consumer?.[`${change}`](); + } else { + consumer.appData.producerPaused = change === 'pause'; + } } public closeDataConsumer(dataConsumerId: string, local = true): void { @@ -657,23 +685,6 @@ export class MediaService extends EventEmitter { dataConsumer?.close(); } - public closeProducer(source: ProducerSource, local = true, notifyAll = false): void { - logger.debug('closeProducer() [source:%s]', source); - - const producer = this.producers[source]; - - if ((local || notifyAll) && producer) { - this.signalingService.notify('closeProducer', { producerId: producer.id }); - } - - if (producer?.kind === 'audio') - this.stopTranscription(); - - if (!local || notifyAll) this.emit('producerClosed', producer); - - producer?.close(); - } - public closeDataProducer(dataProducerId: string, local = true): void { logger.debug('closeDataProducer [dataProducerId:%s]', dataProducerId); @@ -785,92 +796,7 @@ export class MediaService extends EventEmitter { return transport; } - public async produce(source: ProducerSource, producerOptions: ProducerOptions, codec?: ProducerCodec): Promise { - logger.debug('produce() [options:%o]', producerOptions); - - await this.transportsReady; - - if (!this.sendTransport) throw new Error('Producer can not be created without sendTransport'); - - const producer = await this.sendTransport.produce({ - ...producerOptions, - codec: this.mediasoup?.rtpCapabilities.codecs?.find((c) => c.mimeType.toLowerCase() === codec) - }); - - const { kind, track } = producer; - - if (kind === 'audio' && track) { - const harkStream = new MediaStream(); - - harkStream.addTrack(track.clone()); - - const producerHark = hark(harkStream, { - play: false, - interval: 100, - threshold: -60, - history: 100 - }); - - producer.appData.hark = producerHark; - producer.appData.volumeWatcher = new VolumeWatcher({ hark: producerHark }); - } - - this.producers[source] = producer; - - producer.observer.once('close', () => delete this.producers[source]); - producer.observer.on('pause', () => this.signalingService.notify('pauseProducer', { producerId: producer.id })); - producer.observer.on('resume', () => this.signalingService.notify('resumeProducer', { producerId: producer.id })); - producer.once('transportclose', () => this.closeProducer(source, false)); - producer.once('trackended', () => this.closeProducer(source, true, true)); - - return producer; - } - - public async produceData(dataProducerOptions: DataProducerOptions): Promise { - logger.debug('produceData() [options:%o]', dataProducerOptions); - - if (!this.sendTransport) throw new Error('DataProducer can not be created without sendTransport'); - - const dataProducer = await this.sendTransport.produceData(dataProducerOptions); - - this.dataProducers.set(dataProducer.id, dataProducer); - - dataProducer.observer.once('close', () => this.dataProducers.delete(dataProducer.id)); - dataProducer.once('transportclose', () => this.closeDataProducer(dataProducer.id, false)); - - return dataProducer; - } - - public async peerProduce(peerId: string, source: ProducerSource, producerOptions: ProducerOptions, codec?: ProducerCodec): Promise { - logger.debug('p2pProduce() [peerId:%s, options:%o]', peerId, producerOptions); - - const peerDevice = this.getPeerDevice(peerId); - const transport = await this.getPeerSendTransport(peerId); - - const producer = await transport.produce({ - ...producerOptions, - codec: peerDevice.rtpCapabilities.codecs?.find((c) => c.mimeType.toLowerCase() === codec) - }); - - producer.appData.peerProducer = true; - producer.appData.peerId = peerId; - - this.peerProducers[source].set(peerId, producer); - - producer.observer.once('close', () => { - this.signalingService.notify('peerCloseProducer', { producerId: producer.id, peerId }); - - this.peerProducers[source].delete(peerId); - }); - - producer.observer.on('pause', () => this.signalingService.notify('peerPauseProducer', { producerId: producer.id, peerId })); - producer.observer.on('resume', () => this.signalingService.notify('peerResumeProducer', { producerId: producer.id, peerId })); - producer.once('trackended', () => producer.close()); - - return producer; - } - - private getPeerDevice(peerId: string): PeerDevice { + public getPeerDevice(peerId: string): PeerDevice { let p2pDevice = this.peerDevices.get(peerId); if (!p2pDevice) { @@ -888,67 +814,31 @@ export class MediaService extends EventEmitter { return p2pDevice; } - private async getPeerRecvTransport(peerId: string): Promise { - let p2pRecvTransport = this.peerRecvTransports.get(peerId); + public async getPeerTransport(peerId: string, direction: 'recv' | 'send'): Promise { + const map = direction === 'recv' ? this.peerRecvTransports : this.peerSendTransports; + + let peerTransport = map.get(peerId); - if (!p2pRecvTransport) { - p2pRecvTransport = (async () => { + if (!peerTransport) { + peerTransport = (async () => { const p2pDevice = this.getPeerDevice(peerId); await p2pDevice.ready; await this.mediaReady; - const transport = p2pDevice.createRecvTransport({ - iceServers: this.iceServers, - }); + let transport; - transport.on('icecandidate', (candidate) => { - this.signalingService.notify('candidate', { - peerId, - candidate, - direction: 'recv', - }); - }); - - transport.on('connect', ({ dtlsParameters, iceParameters }, callback, errback) => { - this.signalingService.sendRequest('peerConnect', { - peerId, - dtlsParameters, - iceParameters, - direction: 'recv' - }) - .then(callback) - .catch(errback); - }); - - return transport; - })(); - - this.peerRecvTransports.set(peerId, p2pRecvTransport); - } - - return p2pRecvTransport; - } - - private async getPeerSendTransport(peerId: string): Promise { - let p2pSendTransport = this.peerSendTransports.get(peerId); - - if (!p2pSendTransport) { - p2pSendTransport = (async () => { - const p2pDevice = this.getPeerDevice(peerId); - - await p2pDevice.ready; - await this.mediaReady; + if (direction === 'recv') { + transport = p2pDevice.createRecvTransport({ iceServers: this.iceServers }); + } else { + transport = p2pDevice.createSendTransport({ iceServers: this.iceServers }); + } - const transport = p2pDevice.createSendTransport({ - iceServers: this.iceServers, - }); - transport.on('icecandidate', (candidate) => { this.signalingService.notify('candidate', { peerId, candidate, - direction: 'send', + direction, }); }); @@ -957,7 +847,7 @@ export class MediaService extends EventEmitter { peerId, dtlsParameters, iceParameters, - direction: 'send' + direction }) .then(callback) .catch(errback); @@ -976,10 +866,23 @@ export class MediaService extends EventEmitter { return transport; })(); - this.peerSendTransports.set(peerId, p2pSendTransport); + map.set(peerId, peerTransport); } - return p2pSendTransport; + return peerTransport; + } + + public async produceData(options: DataProducerOptions): Promise { + await this.transportsReady; + + if (!this.sendTransport) throw new Error('Send transport not ready'); + + const dataProducer = await this.sendTransport.produceData(options); + + this.dataProducers.set(dataProducer.id, dataProducer); + dataProducer.observer.once('close', () => this.dataProducers.delete(dataProducer.id)); + + return dataProducer; } public async startTranscription(): Promise { diff --git a/src/services/signalingService.tsx b/src/services/signalingService.tsx index 09980a42..f98fd3ea 100644 --- a/src/services/signalingService.tsx +++ b/src/services/signalingService.tsx @@ -22,6 +22,12 @@ export class SignalingService extends EventEmitter { public connections = List(); private connected = false; + constructor() { + super(); + + this.setMaxListeners(Infinity); + } + @skipIfClosed public close(): void { logger.debug('close()'); diff --git a/src/store/actions/managementActions.tsx b/src/store/actions/managementActions.tsx index c55d2b37..21166a3a 100644 --- a/src/store/actions/managementActions.tsx +++ b/src/store/actions/managementActions.tsx @@ -1,7 +1,5 @@ import { Logger } from 'edumeet-common'; import { AppThunk } from '../store'; -import { notificationsActions } from '../slices/notificationsSlice'; -import { mgmtSvcUnavailable } from '../../components/translated/translatedComponents'; const logger = new Logger('ManagementActions'); @@ -18,16 +16,7 @@ export const getTenantFromFqdn = (fqdn: string): AppThunk> => async ( await managementService.service('rooms').create({ name: roomName }); -}; \ No newline at end of file +}; diff --git a/src/store/actions/mediaActions.tsx b/src/store/actions/mediaActions.tsx index a35d0c2f..53ed0be9 100644 --- a/src/store/actions/mediaActions.tsx +++ b/src/store/actions/mediaActions.tsx @@ -5,7 +5,6 @@ import { meActions } from '../slices/meSlice'; import { settingsActions } from '../slices/settingsSlice'; import { AppThunk } from '../store'; import { roomActions } from '../slices/roomSlice'; -import { p2pModeSelector, peersArraySelector } from '../selectors'; const logger = new Logger('MediaActions'); @@ -119,7 +118,7 @@ export const updatePreviewMic = ({ if (previewMicTrackId) { mediaService.previewMicTrack?.stop(); - mediaService.previewMicTrack = undefined; + mediaService.previewMicTrack = null; dispatch(meActions.setPreviewMicTrackId()); } @@ -179,7 +178,7 @@ export const stopPreviewMic = (): AppThunk> => async ( dispatch(meActions.setPreviewMicTrackId()); mediaService.previewMicTrack?.stop(); - mediaService.previewMicTrack = undefined; + mediaService.previewMicTrack = null; } dispatch(meActions.setAudioInProgress(false)); @@ -227,7 +226,7 @@ export const updatePreviewWebcam = ({ if (start && mediaService.previewWebcamTrack) { mediaService.previewWebcamTrack.stop(); effectsService.stop(mediaService.previewWebcamTrack.id); - mediaService.previewWebcamTrack = undefined; + mediaService.previewWebcamTrack = null; dispatch(meActions.setPreviewWebcamTrackId()); } @@ -287,7 +286,7 @@ export const stopPreviewWebcam = (): AppThunk> => async ( if (track) { track.stop(); effectsService.stop(track.id); - mediaService.previewWebcamTrack = undefined; + mediaService.previewWebcamTrack = null; } } @@ -342,7 +341,7 @@ export const updateMic = ({ dispatch(meActions.setAudioInProgress(true)); - let track: MediaStreamTrack | undefined; + let track: MediaStreamTrack | null = null; try { const canSendMic = getState().me.canSendMic; @@ -371,7 +370,7 @@ export const updateMic = ({ } = getState().settings; if (start) { - if (!replace) track = mediaService.tracks['mic']; + if (!replace) track = mediaService.mediaSenders['mic'].track; else track = mediaService.previewMicTrack; if (!track) { @@ -389,23 +388,23 @@ export const updateMic = ({ ([ track ] = stream.getAudioTracks()); - replace = true; + replace = mediaService.mediaSenders['mic'].running; } if (!track) throw new Error('no mic track'); dispatch(meActions.setPreviewMicTrackId()); - mediaService.previewMicTrack = undefined; + mediaService.previewMicTrack = null; const { deviceId: trackDeviceId } = track.getSettings(); dispatch(settingsActions.setSelectedAudioDevice(trackDeviceId)); - if (mediaService.producers['mic'] && replace) { - await mediaService.producers['mic'].replaceTrack({ track }); - } else if (!mediaService.producers['mic']) { - mediaService.producers['mic'] = await mediaService.produce('mic', { + if (replace) { + await mediaService.mediaSenders['mic'].replaceTrack(track); + } else if (!mediaService.mediaSenders['mic'].running) { + await mediaService.mediaSenders['mic'].start({ track, stopTracks: false, disableTrackOnPause: false, @@ -420,43 +419,8 @@ export const updateMic = ({ appData: { source: 'mic' } }); } - - if (replace) { - for (const micPeerProducer of mediaService.peerProducers['mic'].values()) { - await micPeerProducer.replaceTrack({ track }); - } - } - - if (!p2pModeSelector(getState())) { - if (!getState().me.audioMuted) mediaService.producers['mic']?.resume(); - - for (const peerProducer of mediaService.peerProducers['mic'].values()) { - peerProducer.close(); - } - } else { - for (const peer of peersArraySelector(getState())) { - if (!mediaService.peerProducers['mic'].has(peer.id)) { - const producer = await mediaService.peerProduce(peer.id, 'mic', { - track, - stopTracks: false, - disableTrackOnPause: false, - zeroRtpOnPause: true, - appData: { source: 'mic' } - }); - - if (getState().me.audioMuted) producer.pause(); - } - } - - mediaService.producers['mic']?.pause(); - } - - if (replace) { - mediaService.tracks['mic']?.stop(); - mediaService.tracks['mic'] = track; - } } else { - await mediaService.tracks['mic']?.applyConstraints({ + await mediaService.mediaSenders['mic'].track?.applyConstraints({ sampleRate, channelCount, autoGainControl, @@ -484,17 +448,10 @@ export const stopMic = (): AppThunk => ( ) => { logger.debug('stopMic()'); - mediaService.closeProducer('mic', true); - - for (const peerProducer of mediaService.peerProducers['mic'].values()) { - peerProducer.close(); - } + mediaService.mediaSenders['mic'].stop(); dispatch(meActions.setMicEnabled(false)); dispatch(meActions.setAudioMuted(true)); - - mediaService.tracks['mic']?.stop(); - mediaService.tracks['mic'] = undefined; }; export const pauseMic = (): AppThunk => ( @@ -504,11 +461,7 @@ export const pauseMic = (): AppThunk => ( ) => { logger.debug('pauseMic()'); - mediaService.producers['mic']?.pause(); - - for (const peerProducer of mediaService.peerProducers['mic'].values()) { - peerProducer.pause(); - } + mediaService.mediaSenders['mic'].pause(); dispatch(meActions.setAudioMuted(true)); }; @@ -520,13 +473,7 @@ export const resumeMic = (): AppThunk => ( ) => { logger.debug('resumeMic()'); - if (!p2pModeSelector(getState())) { - mediaService.producers['mic']?.resume(); - } else { - for (const peerProducer of mediaService.peerProducers['mic'].values()) { - peerProducer.resume(); - } - } + mediaService.mediaSenders['mic'].resume(); dispatch(meActions.setAudioMuted(false)); }; @@ -580,7 +527,7 @@ export const updateWebcam = ({ dispatch(meActions.setVideoInProgress(true)); - let track: MediaStreamTrack | undefined; + let track: MediaStreamTrack | null = null; try { const canSendWebcam = getState().me.canSendWebcam; @@ -602,7 +549,7 @@ export const updateWebcam = ({ if (!deviceId) logger.warn('no webcam devices'); if (start) { - if (!replace) track = mediaService.tracks['webcam']; + if (!replace) track = mediaService.mediaSenders['webcam'].track; else track = mediaService.previewWebcamTrack; if (!track) { @@ -616,14 +563,14 @@ export const updateWebcam = ({ ([ track ] = stream.getVideoTracks()); - replace = true; + replace = mediaService.mediaSenders['webcam'].running; } if (!track) throw new Error('no webcam track'); dispatch(meActions.setPreviewWebcamTrackId()); - mediaService.previewWebcamTrack = undefined; + mediaService.previewWebcamTrack = null; const { deviceId: trackDeviceId, width, height } = track.getSettings(); @@ -633,13 +580,16 @@ export const updateWebcam = ({ // so we need to update the selected device in the settings just in case dispatch(settingsActions.setSelectedVideoDevice(trackDeviceId)); - if (mediaService.producers['webcam'] && replace) { - await mediaService.producers['webcam'].replaceTrack({ track }); - } else if (!mediaService.producers['webcam']) { + if (replace) { + if (mediaService.mediaSenders['webcam'].track) + effectsService.stop(mediaService.mediaSenders['webcam'].track.id); + + await mediaService.mediaSenders['webcam'].replaceTrack(track); + } else if (!mediaService.mediaSenders['webcam'].running) { if (config.simulcast) { const encodings = getEncodings(width, height); - await mediaService.produce('webcam', { + await mediaService.mediaSenders['webcam'].start({ track, stopTracks: false, disableTrackOnPause: false, @@ -651,7 +601,7 @@ export const updateWebcam = ({ appData: { source: 'webcam' } }, 'video/h264'); } else { - await mediaService.produce('webcam', { + await mediaService.mediaSenders['webcam'].start({ track, stopTracks: false, disableTrackOnPause: false, @@ -660,48 +610,13 @@ export const updateWebcam = ({ }, 'video/h264'); } } - - if (replace) { - for (const webcamPeerProducer of mediaService.peerProducers['webcam'].values()) { - await webcamPeerProducer.replaceTrack({ track }); - } - } - - if (!p2pModeSelector(getState())) { - mediaService.producers['webcam']?.resume(); - - for (const peerProducer of mediaService.peerProducers['webcam'].values()) { - peerProducer.close(); - } - } else { - for (const peer of peersArraySelector(getState())) { - if (!mediaService.peerProducers['webcam'].has(peer.id)) { - await mediaService.peerProduce(peer.id, 'webcam', { - track, - stopTracks: false, - disableTrackOnPause: false, - zeroRtpOnPause: true, - appData: { source: 'webcam' } - }); - } - } - - mediaService.producers['webcam']?.pause(); - } - - if (replace && mediaService.tracks['webcam']) { - effectsService.stop(mediaService.tracks['webcam'].id); - mediaService.tracks['webcam'].stop(); - } - - mediaService.tracks['webcam'] = track; } else { - await mediaService.tracks['webcam']?.applyConstraints({ + await mediaService.mediaSenders['webcam'].track?.applyConstraints({ ...getVideoConstrains(resolution, aspectRatio), frameRate }); - await mediaService.tracks['extravideo']?.applyConstraints({ + await mediaService.mediaSenders['extravideo'].track?.applyConstraints({ ...getVideoConstrains(resolution, aspectRatio), frameRate }); @@ -725,19 +640,12 @@ export const stopWebcam = (): AppThunk => ( ): void => { logger.debug('stopWebcam()'); - mediaService.closeProducer('webcam', true); + if (mediaService.mediaSenders['webcam'].track) + effectsService.stop(mediaService.mediaSenders['webcam'].track.id); - for (const peerProducer of mediaService.peerProducers['webcam'].values()) { - peerProducer.close(); - } + mediaService.mediaSenders['webcam'].stop(); dispatch(meActions.setWebcamEnabled(false)); - - if (mediaService.tracks['webcam']) { - effectsService.stop(mediaService.tracks['webcam'].id); - mediaService.tracks['webcam'].stop(); - mediaService.tracks['webcam'] = undefined; - } }; /** @@ -787,8 +695,8 @@ export const updateScreenSharing = ({ dispatch(meActions.setScreenSharingInProgress(true)); - let audioTrack: MediaStreamTrack | undefined; - let videoTrack: MediaStreamTrack | undefined; + let audioTrack: MediaStreamTrack | null = null; + let videoTrack: MediaStreamTrack | null = null; let stream: MediaStream | undefined; try { @@ -814,7 +722,7 @@ export const updateScreenSharing = ({ } = getState().settings; if (start) { - if (!replace) videoTrack = mediaService.tracks['screen']; + if (!replace) videoTrack = mediaService.mediaSenders['screen'].track; if (!videoTrack) { const SCREENSHARE_CONSTRAINTS = { @@ -840,20 +748,20 @@ export const updateScreenSharing = ({ ([ videoTrack ] = stream.getVideoTracks()); - replace = true; + replace = mediaService.mediaSenders['screen'].running; } if (!videoTrack) throw new Error('no screen track'); const { width, height } = videoTrack.getSettings(); - if (mediaService.producers['screen'] && replace) { - await mediaService.producers['screen'].replaceTrack({ track: videoTrack }); - } else if (!mediaService.producers['screen']) { + if (replace) { + await mediaService.mediaSenders['screen'].replaceTrack(videoTrack); + } else if (!mediaService.mediaSenders['screen'].running) { if (config.simulcastSharing) { const encodings = getEncodings(width, height, false, true); - await mediaService.produce('screen', { + await mediaService.mediaSenders['screen'].start({ track: videoTrack, stopTracks: false, disableTrackOnPause: false, @@ -865,7 +773,7 @@ export const updateScreenSharing = ({ appData: { source: 'screen' } }, 'video/h264'); } else { - await mediaService.produce('screen', { + await mediaService.mediaSenders['screen'].start({ track: videoTrack, stopTracks: false, disableTrackOnPause: false, @@ -878,54 +786,21 @@ export const updateScreenSharing = ({ } } - if (replace) { - for (const screenPeerProducer of mediaService.peerProducers['screen'].values()) { - await screenPeerProducer.replaceTrack({ track: videoTrack }); - } - } - - if (!p2pModeSelector(getState())) { - mediaService.producers['screen']?.resume(); - - for (const peerProducer of mediaService.peerProducers['screen'].values()) { - peerProducer.close(); - } - } else { - for (const peer of peersArraySelector(getState())) { - if (!mediaService.peerProducers['screen'].has(peer.id)) { - await mediaService.peerProduce(peer.id, 'screen', { - track: videoTrack, - stopTracks: false, - disableTrackOnPause: false, - zeroRtpOnPause: true, - appData: { source: 'screen' } - }); - } - } - - mediaService.producers['screen']?.pause(); - } - - if (replace) { - mediaService.tracks['screen']?.stop(); - mediaService.tracks['screen'] = videoTrack; - } - if (enable) dispatch(meActions.setScreenEnabled(true)); - if (!replace) audioTrack = mediaService.tracks['screenaudio']; + if (!replace) audioTrack = mediaService.mediaSenders['screenaudio'].track; if (!audioTrack && stream) { ([ audioTrack ] = stream.getAudioTracks()); - replace = true; + replace = mediaService.mediaSenders['screenaudio'].running; } if (audioTrack) { - if (mediaService.producers['screenaudio'] && replace) { - await mediaService.producers['screenaudio'].replaceTrack({ track: audioTrack }); - } else if (!mediaService.producers['screenaudio']) { - await mediaService.produce('screenaudio', { + if (replace) { + await mediaService.mediaSenders['screenaudio'].replaceTrack(audioTrack); + } else if (!mediaService.mediaSenders['screenaudio'].running) { + await mediaService.mediaSenders['screenaudio'].start({ track: audioTrack, stopTracks: false, disableTrackOnPause: false, @@ -941,48 +816,15 @@ export const updateScreenSharing = ({ }); } - if (replace) { - for (const screenAudioPeerProducer of mediaService.peerProducers['screenaudio'].values()) { - await screenAudioPeerProducer.replaceTrack({ track: audioTrack }); - } - } - - if (!p2pModeSelector(getState())) { - mediaService.producers['screenaudio']?.resume(); - - for (const peerProducer of mediaService.peerProducers['screenaudio'].values()) { - peerProducer.close(); - } - } else { - for (const peer of peersArraySelector(getState())) { - if (!mediaService.peerProducers['screenaudio'].has(peer.id)) { - await mediaService.peerProduce(peer.id, 'screenaudio', { - track: audioTrack, - stopTracks: false, - disableTrackOnPause: false, - zeroRtpOnPause: true, - appData: { source: 'screenaudio' } - }); - } - } - - mediaService.producers['screenaudio']?.pause(); - } - - if (replace) { - mediaService.tracks['screenaudio']?.stop(); - mediaService.tracks['screenaudio'] = audioTrack; - } - if (enable) dispatch(meActions.setScreenAudioEnabled(true)); } } else { - await mediaService.tracks['screen']?.applyConstraints({ + await mediaService.mediaSenders['screen'].track?.applyConstraints({ ...getVideoConstrains(screenSharingResolution, aspectRatio), frameRate: screenSharingFrameRate }); - await mediaService.tracks['screenaudio']?.applyConstraints({ + await mediaService.mediaSenders['screenaudio'].track?.applyConstraints({ sampleRate, channelCount, autoGainControl, @@ -1005,24 +847,9 @@ export const stopScreenSharing = (): AppThunk => ( ) => { logger.debug('stopScreenSharing()'); - mediaService.closeProducer('screen', true); - - for (const peerProducer of mediaService.peerProducers['screen'].values()) { - peerProducer.close(); - } - - mediaService.closeProducer('screenaudio', true); - - for (const peerProducer of mediaService.peerProducers['screenaudio'].values()) { - peerProducer.close(); - } + mediaService.mediaSenders['screen'].stop(); dispatch(meActions.setScreenEnabled(false)); - - mediaService.tracks['screen']?.stop(); - mediaService.tracks['screen'] = undefined; - mediaService.tracks['screenaudio']?.stop(); - mediaService.tracks['screenaudio'] = undefined; }; /** @@ -1051,7 +878,7 @@ export const startExtraVideo = ({ dispatch(meActions.setVideoInProgress(true)); - let track: MediaStreamTrack | undefined; + let track: MediaStreamTrack | null = null; try { const canSendWebcam = getState().me.canSendWebcam; @@ -1070,7 +897,7 @@ export const startExtraVideo = ({ if (!deviceId) logger.warn('no extravideo device'); if (start) { - if (!replace) track = mediaService.tracks['extravideo']; + if (!replace) track = mediaService.mediaSenders['extravideo'].track; if (!track) { const stream = await navigator.mediaDevices.getUserMedia({ @@ -1083,7 +910,7 @@ export const startExtraVideo = ({ ([ track ] = stream.getVideoTracks()); - replace = true; + replace = mediaService.mediaSenders['extravideo'].running; } if (!track) throw new Error('no webcam track'); @@ -1092,13 +919,16 @@ export const startExtraVideo = ({ if (blurEnabled && replace) track = await effectsService.applyEffect(track); - if (mediaService.producers['extravideo'] && replace) { - await mediaService.producers['extravideo'].replaceTrack({ track }); - } else if (!mediaService.producers['extravideo']) { + if (replace) { + if (mediaService.mediaSenders['extravideo'].track) + effectsService.stop(mediaService.mediaSenders['extravideo'].track.id); + + await mediaService.mediaSenders['extravideo'].replaceTrack(track); + } else if (!mediaService.mediaSenders['extravideo'].running) { if (config.simulcast) { const encodings = getEncodings(width, height); - await mediaService.produce('extravideo', { + await mediaService.mediaSenders['extravideo'].start({ track, stopTracks: false, disableTrackOnPause: false, @@ -1110,7 +940,7 @@ export const startExtraVideo = ({ appData: { source: 'extravideo' } }, 'video/h264'); } else { - await mediaService.produce('extravideo', { + await mediaService.mediaSenders['extravideo'].start({ track, stopTracks: false, disableTrackOnPause: false, @@ -1119,43 +949,8 @@ export const startExtraVideo = ({ }, 'video/h264'); } } - - if (replace) { - for (const extraVideoPeerProducer of mediaService.peerProducers['extravideo'].values()) { - await extraVideoPeerProducer.replaceTrack({ track }); - } - } - - if (!p2pModeSelector(getState())) { - mediaService.producers['extravideo']?.resume(); - - for (const peerProducer of mediaService.peerProducers['extravideo'].values()) { - peerProducer.close(); - } - } else { - for (const peer of peersArraySelector(getState())) { - if (!mediaService.peerProducers['extravideo'].has(peer.id)) { - await mediaService.peerProduce(peer.id, 'extravideo', { - track, - stopTracks: false, - disableTrackOnPause: false, - zeroRtpOnPause: true, - appData: { source: 'extravideo' } - }); - } - } - - mediaService.producers['extravideo']?.pause(); - } - - if (replace && mediaService.tracks['extravideo']) { - effectsService.stop(mediaService.tracks['extravideo']?.id); - mediaService.tracks['extravideo']?.stop(); - } - - mediaService.tracks['extravideo'] = track; } else { - await mediaService.tracks['extravideo']?.applyConstraints({ + await mediaService.mediaSenders['extravideo'].track?.applyConstraints({ ...getVideoConstrains(resolution, aspectRatio), frameRate }); @@ -1176,14 +971,7 @@ export const stopExtraVideo = (): AppThunk => ( ) => { logger.debug('stopExtraVideo()'); - mediaService.closeProducer('extravideo', true); - - for (const peerProducer of mediaService.peerProducers['extravideo'].values()) { - peerProducer.close(); - } + mediaService.mediaSenders['extravideo'].stop(); dispatch(meActions.setExtraVideoEnabled(false)); - - mediaService.tracks['extravideo']?.stop(); - mediaService.tracks['extravideo'] = undefined; }; diff --git a/src/store/actions/roomActions.tsx b/src/store/actions/roomActions.tsx index 2491b856..0f49b84d 100644 --- a/src/store/actions/roomActions.tsx +++ b/src/store/actions/roomActions.tsx @@ -82,8 +82,8 @@ export const joinRoom = (): AppThunk> => async ( dispatch(roomSessionsActions.addFiles({ sessionId, files: fileHistory })); }); - if (!getState().me.audioMuted) dispatch(updateMic({ replace: true })); - if (!getState().me.videoMuted) dispatch(updateWebcam({ replace: true })); + if (!getState().me.audioMuted) dispatch(updateMic({ start: true })); + if (!getState().me.videoMuted) dispatch(updateWebcam({ start: true })); }; export const leaveRoom = (): AppThunk> => async ( @@ -174,9 +174,10 @@ export const joinBreakoutRoom = (sessionId: string): AppThunk> => dispatch(meActions.setSessionId(sessionId)); dispatch(roomSessionsActions.addMessages({ sessionId, messages: chatHistory })); dispatch(roomSessionsActions.addFiles({ sessionId, files: fileHistory })); - if (!audioMuted) dispatch(updateMic({ start: true })); - if (!videoMuted) dispatch(updateWebcam({ start: true })); }); + + if (!audioMuted) dispatch(updateMic({ start: true })); + if (!videoMuted) dispatch(updateWebcam({ start: true })); } catch (error) { logger.error('joinBreakoutRoom() [error:%o]', error); } finally { diff --git a/src/store/middlewares/mediaMiddleware.tsx b/src/store/middlewares/mediaMiddleware.tsx index c423b18b..38d9c64c 100644 --- a/src/store/middlewares/mediaMiddleware.tsx +++ b/src/store/middlewares/mediaMiddleware.tsx @@ -108,15 +108,17 @@ const createMediaMiddleware = ({ dispatch(consumersActions.setConsumerResumed({ consumerId: consumer.id, local: false })); }); - mediaService.on('producerClosed', (producer) => { - if (producer.appData.source === 'webcam' && !producer.paused) { + mediaService.on('mediaClosed', (source) => { + if (source === 'webcam') { dispatch(meActions.setLostVideo(true)); dispatch(meActions.setVideoMuted(true)); + dispatch(meActions.setWebcamEnabled(false)); } - if (producer.appData.source === 'mic' && !producer.paused) { + if (source === 'mic') { dispatch(meActions.setLostAudio(true)); dispatch(meActions.setAudioMuted(true)); + dispatch(meActions.setMicEnabled(false)); } }); diff --git a/src/store/middlewares/peerMiddleware.tsx b/src/store/middlewares/peerMiddleware.tsx index 9021c928..d5f91191 100644 --- a/src/store/middlewares/peerMiddleware.tsx +++ b/src/store/middlewares/peerMiddleware.tsx @@ -5,12 +5,15 @@ import { peersActions } from '../slices/peersSlice'; import { LobbyPeer, lobbyPeersActions } from '../slices/lobbyPeersSlice'; import { setRaisedHand } from '../actions/meActions'; import { Logger } from 'edumeet-common'; -import { startExtraVideo, stopMic, stopScreenSharing, stopWebcam, updateMic, updateScreenSharing, updateWebcam } from '../actions/mediaActions'; +import { stopMic, stopScreenSharing, stopWebcam } from '../actions/mediaActions'; +import { roomSessionsActions } from '../slices/roomSessionsSlice'; +import { p2pModeSelector } from '../selectors'; const logger = new Logger('PeerMiddleware'); const createPeerMiddleware = ({ signalingService, + mediaService, }: MiddlewareOptions): Middleware => { logger.debug('createPeerMiddleware()'); @@ -46,11 +49,6 @@ const createPeerMiddleware = ({ transcripts: [], })); - if (getState().me.micEnabled) dispatch(updateMic({ enable: false })); - if (getState().me.webcamEnabled) dispatch(updateWebcam({ enable: false })); - if (getState().me.screenEnabled) dispatch(updateScreenSharing({ enable: false })); - if (getState().me.extraVideoEnabled) dispatch(startExtraVideo({ enable: false })); - break; } @@ -59,11 +57,6 @@ const createPeerMiddleware = ({ dispatch(peersActions.removePeer({ id: peerId })); - if (getState().me.micEnabled) dispatch(updateMic({ enable: false })); - if (getState().me.webcamEnabled) dispatch(updateWebcam({ enable: false })); - if (getState().me.screenEnabled) dispatch(updateScreenSharing({ enable: false })); - if (getState().me.extraVideoEnabled) dispatch(startExtraVideo({ enable: false })); - break; } @@ -177,6 +170,40 @@ const createPeerMiddleware = ({ }); } + if (peersActions.addPeer.match(action)) { + mediaService.addPeerId(action.payload.id); + } + + if (peersActions.addPeers.match(action)) { + action.payload.forEach((peer) => { + mediaService.addPeerId(peer.id); + }); + } + + if (peersActions.removePeer.match(action)) { + mediaService.removePeerId(action.payload.id); + } + + if ( + peersActions.addPeer.match(action) || + peersActions.addPeers.match(action) || + peersActions.removePeer.match(action) || + roomSessionsActions.addRoomSession.match(action) || + roomSessionsActions.removeRoomSession.match(action) + ) { + const oldP2pMode = p2pModeSelector(getState()); + + next(action); + + const p2pMode = p2pModeSelector(getState()); + + if (oldP2pMode !== p2pMode) { + mediaService.setP2PMode(p2pMode); + } + + return; + } + return next(action); }; diff --git a/src/store/selectors.tsx b/src/store/selectors.tsx index 6933a8d1..add6a6d6 100644 --- a/src/store/selectors.tsx +++ b/src/store/selectors.tsx @@ -43,9 +43,30 @@ export const peersArraySelector = createSelector( (peers) => Object.values(peers) ); -export const p2pModeSelector = createSelector( +/** + * Returns the number of peers excluding the client. + * + * @returns {number} the number of peers. + */ +export const peersLengthSelector = createSelector( peersArraySelector, - (peers) => peers.length < 2 + (peers) => peers.length +); + +export const roomSessionsArraySelector = createSelector( + roomSessionsSelect, + (roomSessions) => Object.values(roomSessions) +); + +export const roomSessionsLengthSelector = createSelector( + roomSessionsArraySelector, + (roomSessions) => roomSessions.length +); + +export const p2pModeSelector = createSelector( + roomSessionsLengthSelector, + peersLengthSelector, + (sessions, peers) => sessions === 1 && peers < 2 ); /** @@ -292,16 +313,6 @@ export const roomSessionCreationTimestampSelector = createSelector( (roomSession) => roomSession.creationTimestamp ); -/** - * Returns the number of peers excluding the client. - * - * @returns {number} the number of peers. - */ -export const peersLengthSelector = createSelector( - peersArraySelector, - (peers) => peers.length -); - /** * Returns the number of peers in the lobby. * diff --git a/src/utils/mediaSender.tsx b/src/utils/mediaSender.tsx new file mode 100644 index 00000000..7b22f062 --- /dev/null +++ b/src/utils/mediaSender.tsx @@ -0,0 +1,383 @@ +import EventEmitter from 'events'; +import { Logger } from 'edumeet-common'; +import { MediaService, ProducerCodec } from '../services/mediaService'; +import { ProducerSource } from './types'; +import { Producer, ProducerOptions } from 'mediasoup-client/lib/types'; +import type { Producer as PeerProducer } from 'ortc-p2p/src/Producer'; +import { SignalingService } from '../services/signalingService'; +import { VolumeWatcher } from './volumeWatcher'; +import hark from 'hark'; + +const logger = new Logger('MediaSender'); + +// eslint-disable-next-line @typescript-eslint/no-unsafe-declaration-merging +export declare interface MediaSender { + // eslint-disable-next-line no-unused-vars + on(event: 'closed', listener: () => void): this; +} + +// eslint-disable-next-line @typescript-eslint/no-unsafe-declaration-merging +export class MediaSender extends EventEmitter { + private mediaService: MediaService; + private signalingService: SignalingService; + public source: ProducerSource; + public volumeWatcher?: VolumeWatcher; + + public running = false; + public paused = false; + + public track: MediaStreamTrack | null = null; + private producerOptions: ProducerOptions = {}; + private codec?: ProducerCodec; + public producer?: Producer; + public peerProducers: Map = new Map(); + private peerProducingPromises: Map> = new Map(); + + public peerIds: string[] = []; + public p2pProduce = false; + + constructor( + mediaService: MediaService, + signalingService: SignalingService, + source: ProducerSource, + p2pProduce = false + ) { + super(); + + logger.debug('constructor()'); + + this.mediaService = mediaService; + this.signalingService = signalingService; + this.source = source; + this.p2pProduce = p2pProduce; + + this.handleSignaling(); + } + + private handleSignaling(): void { + this.signalingService.on('notification', (notification) => { + switch (notification.method) { + case 'producerClosed': { + const { producerId } = notification.data; + + // If the server closes our producer, we stop all producing + if (this.producer?.id === producerId) + this.stop(false); + + break; + } + + case 'newProducerLayer': { + const { producerId, spatialLayer } = notification.data; + + if (this.producer?.id === producerId) + this.producer?.setMaxSpatialLayer(spatialLayer); + + break; + } + } + }); + } + + public async start(producerOptions: ProducerOptions, codec?: ProducerCodec): Promise { + logger.debug('produce() [options:%o]', producerOptions); + + if (this.running) throw new Error('Already producing'); + + this.running = true; + this.producerOptions = producerOptions; + this.codec = codec; + + const promises: Promise[] = [ this.sfuProduce() ]; + + if (this.p2pProduce) { + promises.push(...this.peerIds.map((peerId) => this.peerProduce(peerId))); + } + + const [ sfuResult ] = await Promise.allSettled(promises); + + if (sfuResult.status === 'rejected') { + this.stop(); + + throw sfuResult.reason; + } else if (this.p2pProduce) { + sfuResult.value.pause(); + } + + this.track = producerOptions.track ?? null; + + this.maybeAddHark(); + } + + public stop(local = true): void { + logger.debug('stop() [local:%s, source:%s]', local, this.source); + + if (!this.running) return; + + this.running = false; + + if (this.producer) { + this.producer.appData.remoteClosed = !local; + this.producer.close(); + } + + for (const producer of this.peerProducers.values()) { + producer.appData.remoteClosed = !local; + producer.close(); + } + + for (const [ peerId, producerPromise ] of this.peerProducingPromises.entries()) { + this.peerProducingPromises.delete(peerId); + + producerPromise.then((producer) => { + producer.appData.remoteClosed = !local; + producer.close(); + }); + } + + this.track?.stop(); + this.track = null; + this.producerOptions = {}; + this.codec = undefined; + + if (!local && !this.paused) this.emit('closed'); + } + + public addPeerId(peerId: string): void { + logger.debug('addPeerId() [peerId:%s]', peerId); + + this.peerIds.push(peerId); + + if (this.running && this.p2pProduce && !this.peerProducers.has(peerId)) { + this.peerProduce(peerId); + } + } + + public removePeerId(peerId: string): void { + logger.debug('removePeerId() [peerId:%s]', peerId); + + this.peerIds = this.peerIds.filter((id) => id !== peerId); + + const producer = this.peerProducers.get(peerId); + + producer?.close(); + } + + public startP2P(): void { + logger.debug('startP2P()'); + + this.p2pProduce = true; + + if (!this.running) return; + + for (const peerId of this.peerIds) { + this.peerProduce(peerId).then((producer) => { + if (this.paused) producer.pause(); + }); + } + + // Always pause the SFU producer when starting P2P + this.producer?.pause(); + } + + public stopP2P(): void { + logger.debug('stopP2P()'); + + this.p2pProduce = false; + + if (!this.running) return; + + for (const producer of this.peerProducers.values()) { + producer.close(); + } + + for (const [ peerId, producerPromise ] of this.peerProducingPromises.entries()) { + this.peerProducingPromises.delete(peerId); + + producerPromise.then((producer) => producer.close()); + } + + // Maybe resume the SFU producer when stopping P2P + if (!this.paused) this.producer?.resume(); + } + + public pause(): void { + logger.debug('pause()'); + + this.paused = true; + + this.producer?.pause(); + + for (const producer of this.peerProducers.values()) { + producer.pause(); + } + + for (const producerPromise of this.peerProducingPromises.values()) { + producerPromise.then((producer) => { + if (producer.closed || !this.p2pProduce) return; + + if (this.paused) producer.pause(); + else producer.resume(); + }); + } + } + + public resume(): void { + logger.debug('resume()'); + + this.paused = false; + + if (!this.p2pProduce) this.producer?.resume(); + + for (const producer of this.peerProducers.values()) { + producer.resume(); + } + + for (const producerPromise of this.peerProducingPromises.values()) { + producerPromise.then((producer) => { + if (producer.closed || !this.p2pProduce) return; + + if (this.paused) producer.pause(); + else producer.resume(); + }); + } + } + + private maybeAddHark(): void { + this.volumeWatcher?.hark.stop(); + + if (this.track?.kind === 'audio') { + const harkStream = new MediaStream(); + + harkStream.addTrack(this.track.clone()); + + const producerHark = hark(harkStream, { + play: false, + interval: 100, + threshold: -60, + history: 100 + }); + + this.volumeWatcher = new VolumeWatcher({ hark: producerHark }); + } + } + + public async replaceTrack(track: MediaStreamTrack): Promise { + logger.debug('replaceTrack() [track:%o]', track); + + if (!this.running) throw new Error('Not producing'); + + const oldTrack = this.track; + + this.track = track; + this.producerOptions.track = track; + + this.maybeAddHark(); + + await this.producer?.replaceTrack({ track }); + + for (const producer of this.peerProducers.values()) { + await producer.replaceTrack({ track }); + } + + for (const producerPromise of this.peerProducingPromises.values()) { + await producerPromise.then((producer) => producer.replaceTrack({ track: this.track })); + } + + oldTrack?.stop(); + } + + private async sfuProduce(): Promise { + logger.debug('sfuProducer() [options:%o]', this.producerOptions); + + await this.mediaService.transportsReady; + + if (!this.mediaService.sendTransport) throw new Error('Send transport not ready'); + + const producer = await this.mediaService.sendTransport.produce({ + ...this.producerOptions, + codec: this.mediaService.mediasoup?.rtpCapabilities.codecs?.find((c) => c.mimeType.toLowerCase() === this.codec) + }); + + const pauseListener = () => this.signalingService.notify('pauseProducer', { producerId: producer.id }); + const resumeListener = () => this.signalingService.notify('resumeProducer', { producerId: producer.id }); + + producer.observer.once('close', () => { + producer.observer.off('pause', pauseListener); + producer.observer.off('resume', resumeListener); + + if (!producer.appData.remoteClosed) + this.signalingService.notify('closeProducer', { producerId: producer.id }); + }); + + if (!this.running) { + producer.close(); + + throw new Error('Producer not needed'); + } + + this.producer = producer; + + producer.observer.on('pause', pauseListener); + producer.observer.on('resume', resumeListener); + producer.once('trackended', () => producer.close()); + + return producer; + } + + private async peerProduce(peerId: string): Promise { + logger.debug('peerProduce() [peerId:%s, options:%o]', peerId, this.producerOptions); + + let peerProducingPromise = this.peerProducingPromises.get(peerId); + + if (!peerProducingPromise) { + peerProducingPromise = (async () => { + const peerDevice = this.mediaService.getPeerDevice(peerId); + const transport = await this.mediaService.getPeerTransport(peerId, 'send'); + + const producer = await transport.produce({ + ...this.producerOptions, + codec: peerDevice.rtpCapabilities.codecs?.find((c) => c.mimeType.toLowerCase() === this.codec) + }); + + const pauseListener = () => this.signalingService.notify('peerPauseProducer', { producerId: producer.id, peerId }); + const resumeListener = () => this.signalingService.notify('peerResumeProducer', { producerId: producer.id, peerId }); + + producer.observer.once('close', () => { + producer.observer.off('pause', pauseListener); + producer.observer.off('resume', resumeListener); + + if (!producer.appData.remoteClosed) + this.signalingService.notify('peerCloseProducer', { producerId: producer.id, peerId }); + + this.peerProducers.delete(peerId); + }); + + if (!this.p2pProduce || !this.peerIds.find((id) => id === peerId) || !this.running) { + this.peerProducingPromises.delete(peerId); + + producer.close(); + + throw new Error('Producer not needed'); + } + + producer.appData.peerProducer = true; + producer.appData.peerId = peerId; + + this.peerProducers.set(peerId, producer); + + producer.observer.on('pause', pauseListener); + producer.observer.on('resume', resumeListener); + producer.once('trackended', () => producer.close()); + + this.peerProducingPromises.delete(peerId); + + return producer; + })(); + + this.peerProducingPromises.set(peerId, peerProducingPromise); + } + + return peerProducingPromise; + } +}