diff --git a/docusaurus/docs/Flutter/05-advanced/10-manual-video-quality-selection.mdx b/docusaurus/docs/Flutter/05-advanced/10-manual-video-quality-selection.mdx new file mode 100644 index 000000000..d2c4462e2 --- /dev/null +++ b/docusaurus/docs/Flutter/05-advanced/10-manual-video-quality-selection.mdx @@ -0,0 +1,66 @@ +--- +title: Manual Video Quality Selection +slug: /manual-video-quality-selection +sidebar_position: 10 +description: Learn how to manually select the incoming video quality in the Stream Video Flutter SDK. +--- + +By default, our SDK chooses the incoming video quality that best matches the size of a video element for a given participant. It makes less sense to waste bandwidth receiving Full HD video when it's going to be displayed in a 320 by 240 pixel rectangle. + +However, it's still possible to override this behavior and manually request higher resolution video for better quality, or lower resolution to save bandwidth. It's also possible to disable incoming video altogether for an audio-only experience. + +## Overriding Preferred Resolution + +To override the preferred incoming video resolution, use the `call.setPreferredIncomingVideoResolution` method: + +```dart +await call.setPreferredIncomingVideoResolution(VideoResolution(width: 640, height: 480)); +``` + +:::note +Actual incoming video quality depends on a number of factors, such as the quality of the source video, and network conditions. Manual video quality selection allows you to specify your preference, while the actual resolution is automatically selected from the available resolutions to match that preference as closely as possible. +::: + +It's also possible to override the incoming video resolution for only a selected subset of call participants. The `call.setPreferredIncomingVideoResolution()` method optionally takes a list of participant session identifiers as its optional argument. Session identifiers can be obtained from the call participant state: + +```dart +final [first, second, ..._] = call.state.value.otherParticipants; + +// Set preferred incoming video resolution for the first two participants only: +await call.setPreferredIncomingVideoResolution( + VideoResolution(width: 640, height: 480), + sessionIds: [first.sessionId, second.sessionId], +); +``` + +Calling this method will enable incoming video for the selected participants if it was previously disabled. + +To clear a previously set preference, pass `null` instead of resolution: + +```dart +// Clear resolution preference for selected participants: +await call.setPreferredIncomingVideoResolution( + null, + sessionIds: [ + participant.sessionId, + ], +); +// Clear resolution preference for all participants: +await call.setPreferredIncomingVideoResolution(null); +``` + +## Disabling Incoming Video + +To completely disable incoming video (either to save data, or for an audio-only experience), use the `call.setIncomingVideoEnabled()` method: + +```dart +await call.setIncomingVideoEnabled(false); +``` + +To enable incoming video again, pass `true` as an argument: + +```js +await call.setIncomingVideoEnabled(true); +``` + +Calling this method will clear the previously set resolution preferences. \ No newline at end of file diff --git a/docusaurus/docs/Flutter/05-advanced/11-session-timers.mdx b/docusaurus/docs/Flutter/05-advanced/11-session-timers.mdx new file mode 100644 index 000000000..e6f512769 --- /dev/null +++ b/docusaurus/docs/Flutter/05-advanced/11-session-timers.mdx @@ -0,0 +1,47 @@ +--- +title: Session Timers +slug: /session-timers +sidebar_position: 11 +description: Learn how to limit the maximum duration of a call in the Stream Video Flutter SDK. +--- + +A session timer allows you to limit the maximum duration of a call. The duration [can be configured](https://getstream.io/video/docs/api/calls/#session-timers) for all calls of a certain type, or on a per-call basis. When a session timer reaches zero, the call automatically ends. + +## Creating a call with a session timer + +Let's see how to create a single call with a limited duration: + +```dart +final call = client.makeCall(callType: StreamCallType.defaultType(), id: 'REPLACE_WITH_CALL_ID'); +await call.getOrCreate( + limits: const StreamLimitsSettings( + maxDurationSeconds: 3600, + ), +); +``` + +This code creates a call with a duration of 3600 seconds (1 hour) from the time the session is starts (a participant joins the call). + +After joining the call with the specified `maxDurationSeconds`, you can examine a call state's `timerEndsAt` field, which provides the timestamp when the call will end. When a call ends, all participants are removed from the call. + +```dart +await call.join(); +print(call.state.value.timerEndsAt); +``` + +## Extending a call + +​You can also extend the duration of a call, both before or during the call. To do that, you should use the `call.update` method: + +```dart +final duration = + call.state.value.settings.limits.maxDurationSeconds! + 60; + +call.update( + limits: StreamLimitsSettings( + maxDurationSeconds: duration, + ), +); +``` + +If the call duration is extended, the `timerEndsAt` is updated to reflect this change. Call participants will receive the `call.updated` event to notify them about this change. \ No newline at end of file diff --git a/dogfooding/lib/widgets/settings_menu.dart b/dogfooding/lib/widgets/settings_menu.dart index 6c989dc36..bf5e01161 100644 --- a/dogfooding/lib/widgets/settings_menu.dart +++ b/dogfooding/lib/widgets/settings_menu.dart @@ -11,6 +11,23 @@ CallReactionData _raisedHandReaction = const CallReactionData( icon: '✋', ); +enum IncomingVideoQuality { + auto('Auto'), + p2160('2160p'), + p1080('1080p'), + p720('720p'), + p480('480p'), + p144('144p'), + off('Off'); + + final String name; + + const IncomingVideoQuality(this.name); + + @override + String toString() => name; +} + class SettingsMenu extends StatefulWidget { const SettingsMenu({ required this.call, @@ -40,7 +57,9 @@ class _SettingsMenuState extends State { bool showAudioOutputs = false; bool showAudioInputs = false; - bool get showMainSettings => !showAudioOutputs && !showAudioInputs; + bool showIncomingQuality = false; + bool get showMainSettings => + !showAudioOutputs && !showAudioInputs && !showIncomingQuality; @override void initState() { @@ -83,11 +102,15 @@ class _SettingsMenuState extends State { if (showMainSettings) ..._buildMenuItems(), if (showAudioOutputs) ..._buildAudioOutputsMenu(), if (showAudioInputs) ..._buildAudioInputsMenu(), + if (showIncomingQuality) ..._buildIncomingQualityMenu(), ]), ); } List _buildMenuItems() { + final incomingVideoQuality = getIncomingVideoQuality( + widget.call.dynascaleManager.incomingVideoSettings); + return [ Row( mainAxisAlignment: MainAxisAlignment.spaceEvenly, @@ -155,6 +178,24 @@ class _SettingsMenuState extends State { showAudioInputs = true; }); }, + ), + const SizedBox(height: 16), + StandardActionMenuItem( + icon: Icons.high_quality_sharp, + label: 'Incoming video quality', + trailing: Text( + incomingVideoQuality.name, + style: TextStyle( + color: incomingVideoQuality != IncomingVideoQuality.auto + ? AppColorPalette.appGreen + : null, + ), + ), + onPressed: () { + setState(() { + showIncomingQuality = true; + }); + }, ) ]; } @@ -229,6 +270,87 @@ class _SettingsMenuState extends State { .insertBetween(const SizedBox(height: 16)), ]; } + + List _buildIncomingQualityMenu() { + return [ + GestureDetector( + onTap: () { + setState(() { + showIncomingQuality = false; + }); + }, + child: const Align( + alignment: Alignment.centerLeft, + child: Icon(Icons.arrow_back, size: 24), + ), + ), + const SizedBox(height: 16), + ...IncomingVideoQuality.values + .map( + (quality) { + return StandardActionMenuItem( + icon: Icons.video_settings, + label: quality.name, + color: getIncomingVideoQuality(widget + .call.dynascaleManager.incomingVideoSettings) == + quality + ? AppColorPalette.appGreen + : null, + onPressed: () { + if (quality == IncomingVideoQuality.off) { + widget.call.setIncomingVideoEnabled(false); + } else { + widget.call.setPreferredIncomingVideoResolution( + getIncomingVideoResolution(quality)); + } + }, + ); + }, + ) + .cast() + .insertBetween(const SizedBox(height: 16)), + ]; + } + + VideoResolution? getIncomingVideoResolution(IncomingVideoQuality quality) { + switch (quality) { + case IncomingVideoQuality.auto: + case IncomingVideoQuality.off: + return null; + case IncomingVideoQuality.p2160: + return VideoResolution(width: 3840, height: 2160); + case IncomingVideoQuality.p1080: + return VideoResolution(width: 1920, height: 1080); + case IncomingVideoQuality.p720: + return VideoResolution(width: 1280, height: 720); + case IncomingVideoQuality.p480: + return VideoResolution(width: 640, height: 480); + case IncomingVideoQuality.p144: + return VideoResolution(width: 256, height: 144); + } + } + + IncomingVideoQuality getIncomingVideoQuality(IncomingVideoSettings? setting) { + final preferredResolution = setting?.preferredResolution; + if (setting?.enabled == false) { + return IncomingVideoQuality.off; + } + if (preferredResolution == null) { + return IncomingVideoQuality.auto; + } else if (preferredResolution.height >= 2160) { + return IncomingVideoQuality.p2160; + } else if (preferredResolution.height >= 1080) { + return IncomingVideoQuality.p1080; + } else if (preferredResolution.height >= 720) { + return IncomingVideoQuality.p720; + } else if (preferredResolution.height >= 480) { + return IncomingVideoQuality.p480; + } else if (preferredResolution.height >= 144) { + return IncomingVideoQuality.p144; + } else { + return IncomingVideoQuality.auto; + } + } } class SettingsMenuItem extends StatelessWidget { @@ -265,10 +387,12 @@ class StandardActionMenuItem extends StatelessWidget { required this.label, this.color, this.onPressed, + this.trailing, }); final IconData icon; final String label; + final Widget? trailing; final Color? color; final void Function()? onPressed; @@ -285,8 +409,16 @@ class StandardActionMenuItem extends StatelessWidget { color: color, ), const SizedBox(width: 8), - Text(label, - style: TextStyle(color: color, fontWeight: FontWeight.bold)), + Text( + label, + style: TextStyle( + color: color, + fontWeight: FontWeight.bold, + ), + ), + const Spacer(), + if (trailing != null) trailing!, + const SizedBox(width: 8), ], ), ); diff --git a/packages/stream_video/lib/src/call/call.dart b/packages/stream_video/lib/src/call/call.dart index dbca8ad06..9e471ca47 100644 --- a/packages/stream_video/lib/src/call/call.dart +++ b/packages/stream_video/lib/src/call/call.dart @@ -191,7 +191,8 @@ class Call { _streamVideo = streamVideo, _preferences = preferences, _retryPolicy = retryPolicy, - _credentials = credentials { + _credentials = credentials, + dynascaleManager = DynascaleManager(stateManager: stateManager) { streamLog.i(_tag, () => ' state: ${stateManager.callState}'); if (stateManager.callState.isRingingFlow) { @@ -213,6 +214,7 @@ class Call { final CallSessionFactory _sessionFactory; final CallStateNotifier _stateManager; final PermissionsManager _permissionsManager; + final DynascaleManager dynascaleManager; CallCredentials? _credentials; CallSession? _session; @@ -640,6 +642,7 @@ class Call { sessionId: performingRejoin ? null : _previousSession?.sessionId, credentials: _credentials!, stateManager: _stateManager, + dynascaleManager: dynascaleManager, onPeerConnectionFailure: (pc) async { if (state.value.status is! CallStatusReconnecting) { await pc.pc.restartIce().onError((_, __) { @@ -648,6 +651,11 @@ class Call { } }, ); + + dynascaleManager.init( + sfuClient: _session!.sfuClient, + sessionId: _session!.sessionId, + ); } else { _logger.v( () => @@ -1259,6 +1267,7 @@ class Call { _cancelables.cancelAll(); await _session?.dispose(); _session = null; + await dynascaleManager.dispose(); await _streamVideo.state.setActiveCall(null); await _streamVideo.state.setOutgoingCall(null); _logger.v(() => '[clear] completed'); @@ -1789,7 +1798,7 @@ class Call { /// can be override by passing a [track] to the function. /// /// Note: The user calling this function must have permission to perform the - /// action else it will result in an error. + /// action else it will result in an error. Future> muteOthers({TrackType track = TrackType.audio}) { return _permissionsManager.muteOthers(track: track); } @@ -1798,7 +1807,7 @@ class Call { /// calling the function. /// /// Note: The user calling this function must have permission to perform the - // action else it will result in an error. + /// action else it will result in an error. Future> muteAllUsers() { return _permissionsManager.muteAllUsers(); } @@ -2050,15 +2059,7 @@ class Call { Future> setSubscriptions( List changes, ) async { - final result = await _session?.setSubscriptions(changes) ?? - Result.error('Session is null'); - - // TODO: Verify this is not needed - // if (result.isSuccess) { - // _stateManager - // .participantUpdateSubscriptions(); - // } - + final result = await dynascaleManager.setSubscriptions(changes); return result; } @@ -2075,24 +2076,9 @@ class Call { trackTypes: trackTypes, ); - final result = await _session?.setSubscriptions([ - change, - ]) ?? - Result.error('Session is null'); - - // TODO: Verify this is not needed - // if (result.isSuccess) { - // _stateManager.participantUpdateSubscriptions( - // UpdateSubscriptions([ - // UpdateSubscription( - // userId: userId, - // sessionId: sessionId, - // trackIdPrefix: trackIdPrefix, - // trackType: trackTypes.keys.first, - // ), - // ]), - // ); - // } + final result = await dynascaleManager.setSubscriptions([ + change, + ]); return result; } @@ -2104,16 +2090,15 @@ class Call { required SfuTrackTypeVideo trackType, RtcVideoDimension? videoDimension, }) async { - final result = await _session?.updateSubscription( - SubscriptionChange.update( - userId: userId, - sessionId: sessionId, - trackIdPrefix: trackIdPrefix, - trackType: trackType, - videoDimension: videoDimension, - ), - ) ?? - Result.error('Session is null'); + final result = await dynascaleManager.updateSubscription( + SubscriptionChange.update( + userId: userId, + sessionId: sessionId, + trackIdPrefix: trackIdPrefix, + trackType: trackType, + videoDimension: videoDimension, + ), + ); if (result.isSuccess) { _stateManager.participantUpdateSubscription( @@ -2135,15 +2120,14 @@ class Call { required SfuTrackTypeVideo trackType, RtcVideoDimension? videoDimension, }) async { - final result = await _session?.updateSubscription( - SubscriptionChange.update( - userId: userId, - sessionId: sessionId, - trackIdPrefix: trackIdPrefix, - trackType: trackType, - ), - ) ?? - Result.error('Session is null'); + final result = await dynascaleManager.updateSubscription( + SubscriptionChange.update( + userId: userId, + sessionId: sessionId, + trackIdPrefix: trackIdPrefix, + trackType: trackType, + ), + ); if (result.isSuccess) { _stateManager.participantRemoveSubscription( @@ -2157,6 +2141,43 @@ class Call { return result; } + /// Specifies the preference for incoming video resolution. The preference will + /// be matched as closely as possible, but the actual resolution will depend + /// on the video source quality and client network conditions. This will enable + /// incoming video if it was previously disabled. + /// + /// [resolution] is the preferred resolution, or `null` to clear the preference. + /// [sessionIds] optionally specifies the session IDs of the participants this + /// preference affects. By default, it affects all participants. + Future> setPreferredIncomingVideoResolution( + VideoResolution? resolution, { + List? sessionIds, + }) async { + dynascaleManager.setVideoTrackSubscriptionOverrides( + override: resolution != null + ? VideoTrackSubscriptionOverride( + dimension: RtcVideoDimension( + width: resolution.width, + height: resolution.height, + ), + ) + : null, + sessionIds: sessionIds, + ); + + return dynascaleManager.applyTrackSubscriptions(); + } + + /// Enables or disables incoming video from all remote call participants, + /// and removes any preference for preferred resolution. + Future> setIncomingVideoEnabled(bool enabled) async { + dynascaleManager.setVideoTrackSubscriptionOverrides( + override: VideoTrackSubscriptionOverride(enabled: enabled), + ); + + return dynascaleManager.applyTrackSubscriptions(); + } + Future> queryMembers({ required Map filterConditions, String? next, diff --git a/packages/stream_video/lib/src/call/session/call_session.dart b/packages/stream_video/lib/src/call/session/call_session.dart index 1cd05e8f9..f8784d6d9 100644 --- a/packages/stream_video/lib/src/call/session/call_session.dart +++ b/packages/stream_video/lib/src/call/session/call_session.dart @@ -50,6 +50,7 @@ class CallSession extends Disposable { required this.sessionId, required this.config, required this.stateManager, + required this.dynascaleManager, required this.onPeerConnectionIssue, required SdpEditor sdpEditor, this.joinResponseTimeout = const Duration(seconds: 5), @@ -79,17 +80,15 @@ class CallSession extends Disposable { final String sessionId; final CallSessionConfig config; final CallStateNotifier stateManager; + final DynascaleManager dynascaleManager; final SfuClient sfuClient; final SfuWebSocket sfuWS; final RtcManagerFactory rtcManagerFactory; final OnPeerConnectionIssue onPeerConnectionIssue; final Duration joinResponseTimeout; - final BehaviorSubject> - _currentTrackSubscriptionsSubject = BehaviorSubject.seeded({}); RtcManager? rtcManager; - BehaviorSubject? _rtcManagerSubject; StreamSubscription? _eventsSubscription; StreamSubscription>? _statsSubscription; @@ -102,12 +101,6 @@ class CallSession extends Disposable { SharedEmitter get events => sfuWS.events; - late final _saBuffer = DebounceBuffer>( - duration: _debounceDuration, - onBuffered: updateSubscriptions, - onCancel: () => Result.error('SubscriptionChange cancelled'), - ); - late final _vvBuffer = DebounceBuffer>( duration: _debounceDuration, onBuffered: updateViewportVisibilities, @@ -191,8 +184,11 @@ class CallSession extends Disposable { int? reconnectAttempts, }) { final announcedTracks = rtcManager?.getPublisherTrackInfos().toDTO(); - final subscribedTracks = - _currentTrackSubscriptionsSubject.value.values.toList().toDTO(); + final subscribedTracks = dynascaleManager + .getTrackSubscriptions(ignoreOverride: true) + .values + .toList() + .toDTO(); return sfu_events.ReconnectDetails( strategy: strategy.toDto(), @@ -412,12 +408,12 @@ class CallSession extends Disposable { _logger.d(() => '[close] code: $code, closeReason: $closeReason'); await _stats.close(); - await _saBuffer.cancel(); + // await _saBuffer.cancel(); await _eventsSubscription?.cancel(); _eventsSubscription = null; await _statsSubscription?.cancel(); _statsSubscription = null; - await _currentTrackSubscriptionsSubject.close(); + // await _currentTrackSubscriptionsSubject.close(); await sfuWS.disconnect( code.value, @@ -803,79 +799,6 @@ class CallSession extends Disposable { return const Result.success(none); } - Future> setSubscriptions( - List subscriptionChanges, - ) async { - _logger.d( - () => '[setSubscriptions] subscriptionChanges: $subscriptionChanges', - ); - - final participants = stateManager.callState.callParticipants; - final exclude = {SfuTrackType.video, SfuTrackType.screenShare}; - final subscriptions = { - ...participants.getSubscriptions(exclude: exclude), - }; - _logger.v(() => '[setSubscriptions] source: $subscriptions'); - for (final change in subscriptionChanges) { - final changeSubscriptions = change.getSubscriptions(); - subscriptions.addAll(changeSubscriptions); - } - - _logger.v(() => '[setSubscriptions] updated: $subscriptions'); - final result = await sfuClient.update( - sessionId: sessionId, - subscriptions: subscriptions.values, - ); - - _currentTrackSubscriptionsSubject.add(subscriptions); - - _logger.v(() => '[setSubscriptions] result: $result'); - return result; - } - - Future> updateSubscription( - SubscriptionChange subscriptionChange, - ) async { - _logger.d( - () => '[updateSubscription] subscriptionChange: $subscriptionChange', - ); - - if (!_saBuffer.isClosed) { - return _saBuffer.post(subscriptionChange); - } - - //Ignore the subscription change if the buffer is closed - return const Result.success(none); - } - - Future> updateSubscriptions( - List changes, - ) async { - _logger.d(() => '[updateSubscriptions] changes: $changes'); - final participants = stateManager.callState.callParticipants; - final subscriptions = { - ...participants.getSubscriptions(), - }; - _logger.v(() => '[updateSubscriptions] source: $subscriptions'); - for (final change in changes) { - if (change.subscribed) { - subscriptions[change.trackId!] = change.toSubscription(); - } else if (!change.subscribed) { - subscriptions.remove(change.trackId); - } - } - _logger.v(() => '[updateSubscriptions] updated: $subscriptions'); - final result = await sfuClient.update( - sessionId: sessionId, - subscriptions: subscriptions.values, - ); - - _currentTrackSubscriptionsSubject.add(subscriptions); - - _logger.v(() => '[updateSubscriptions] result: $result'); - return result; - } - Future> setAudioOutputDevice(RtcMediaDevice device) async { final rtcManager = this.rtcManager; if (rtcManager == null) { @@ -1013,100 +936,3 @@ extension SfuSubscriptionDetailsEx on List { }).toList(); } } - -extension on SfuClient { - Future> update({ - required String sessionId, - required Iterable subscriptions, - }) async { - final result = await updateSubscriptions( - sfu.UpdateSubscriptionsRequest( - sessionId: sessionId, - tracks: subscriptions.map( - (it) => sfu.TrackSubscriptionDetails( - userId: it.userId, - sessionId: it.sessionId, - trackType: it.trackType.toDTO(), - dimension: it.dimension?.toDTO(), - ), - ), - ), - ); - - return result.fold( - failure: (it) => it, - success: (it) { - if (it.data.hasError()) { - final error = it.data.error; - return Result.error('${error.code} - ${error.message}'); - } - return const Result.success(none); - }, - ); - } -} - -extension on List { - Map getSubscriptions({ - Set exclude = const {}, - }) { - final subscriptions = {}; - - for (final participant in this) { - // We only care about remote participants. - if (participant.isLocal) continue; - - streamLog.v( - _tag, - () => '[getSubscriptions] userId: ${participant.userId}, ' - 'published: ${participant.publishedTracks.keys}', - ); - - subscriptions.addAll( - participant.getSubscriptions(exclude: exclude), - ); - } - - return subscriptions; - } -} - -extension on CallParticipantState { - Map getSubscriptions({ - Set exclude = const {}, - }) { - final subscriptions = {}; - - for (final trackType in publishedTracks.keys) { - final trackState = publishedTracks[trackType]; - - streamLog.v( - _tag, - () => '[getSubscriptions] trackType: $trackType, ' - 'trackState: $trackState', - ); - - // We only care about remote tracks. - if (trackState is! RemoteTrackState) continue; - - // Continue if we should exclude this trackType. - final shouldExclude = exclude.contains(trackType); - if (shouldExclude) continue; - - // We only care about tracks that are subscribed. - if (!trackState.subscribed) continue; - - final detail = SfuSubscriptionDetails( - userId: userId, - sessionId: sessionId, - trackIdPrefix: trackIdPrefix, - trackType: trackType, - dimension: trackState.videoDimension, - ); - - subscriptions[detail.trackId] = detail; - } - - return subscriptions; - } -} diff --git a/packages/stream_video/lib/src/call/session/call_session_factory.dart b/packages/stream_video/lib/src/call/session/call_session_factory.dart index b0cd6a339..f7f0898a3 100644 --- a/packages/stream_video/lib/src/call/session/call_session_factory.dart +++ b/packages/stream_video/lib/src/call/session/call_session_factory.dart @@ -9,6 +9,7 @@ import '../../webrtc/sdp/editor/sdp_editor.dart'; import '../state/call_state_notifier.dart'; import 'call_session.dart'; import 'call_session_config.dart'; +import 'dynascale_manager.dart'; int _sessionSeq = 1; @@ -27,6 +28,7 @@ class CallSessionFactory { String? sessionId, required CallCredentials credentials, required CallStateNotifier stateManager, + required DynascaleManager dynascaleManager, required OnPeerConnectionIssue onPeerConnectionFailure, }) async { final finalSessionId = sessionId ?? const Uuid().v4(); @@ -53,6 +55,7 @@ class CallSessionFactory { sessionId: finalSessionId, config: sessionConfig, stateManager: stateManager, + dynascaleManager: dynascaleManager, sdpEditor: sdpEditor, onPeerConnectionIssue: onPeerConnectionFailure, ); diff --git a/packages/stream_video/lib/src/call/session/dynascale_manager.dart b/packages/stream_video/lib/src/call/session/dynascale_manager.dart new file mode 100644 index 000000000..06b8366dc --- /dev/null +++ b/packages/stream_video/lib/src/call/session/dynascale_manager.dart @@ -0,0 +1,277 @@ +import 'dart:async'; + +import 'package:rxdart/rxdart.dart'; + +import '../../../protobuf/video/sfu/signal_rpc/signal.pb.dart' as sfu; +import '../../../stream_video.dart'; +import '../../extensions/call_participant_state_ext.dart'; +import '../../sfu/data/models/sfu_model_mapper_extensions.dart'; +import '../../sfu/data/models/sfu_subscription_details.dart'; +import '../../sfu/sfu_client.dart'; +import '../../utils/debounce_buffer.dart'; +import '../../webrtc/model/rtc_model_mapper_extensions.dart'; +import '../state/call_state_notifier.dart'; + +class IncomingVideoSettings { + IncomingVideoSettings({ + required this.enabled, + required this.preferredResolution, + required this.participants, + }); + + factory IncomingVideoSettings.default_() => IncomingVideoSettings( + enabled: null, + preferredResolution: null, + participants: {}, + ); + + final bool? enabled; + final RtcVideoDimension? preferredResolution; + final Map + participants; + + bool isParticipantVideoEnabled(String sessionId) { + return participants[sessionId]?.enabled ?? enabled ?? true; + } +} + +const _debounceDuration = Duration(milliseconds: 200); + +class DynascaleManager { + DynascaleManager({ + required this.stateManager, + }); + + late final _logger = taggedLogger(tag: 'dynascaleManager'); + + final CallStateNotifier stateManager; + late SfuClient sfuClient; + late String sessionId; + + final BehaviorSubject> + _currentTrackSubscriptionsSubject = BehaviorSubject.seeded({}); + // final BehaviorSubject> + // _videoTrackSubscriptionsOverrides = BehaviorSubject.seeded({}); + + final BehaviorSubject + _videoTrackSubscriptionsOverrides = BehaviorSubject.seeded(null); + + IncomingVideoSettings? get incomingVideoSettings => + _videoTrackSubscriptionsOverrides.value; + + late final _saBuffer = DebounceBuffer>( + duration: _debounceDuration, + onBuffered: updateSubscriptions, + onCancel: () => Result.error('SubscriptionChange cancelled'), + ); + + void init({ + required SfuClient sfuClient, + required String sessionId, + }) { + this.sfuClient = sfuClient; + this.sessionId = sessionId; + } + + Map getTrackSubscriptions({ + bool ignoreOverride = false, + }) { + final subscribtions = {}; + + for (final entry in _currentTrackSubscriptionsSubject.value.entries) { + final key = entry.key; + final value = entry.value; + + if (value.trackType == SfuTrackType.video && !ignoreOverride) { + final enabled = incomingVideoSettings?.participants[key]?.enabled ?? + incomingVideoSettings?.enabled; + + final preferredResolution = + incomingVideoSettings?.participants[key]?.preferredResolution ?? + incomingVideoSettings?.preferredResolution; + + if (enabled == null) { + subscribtions[key] = value; + } else if (enabled) { + subscribtions[key] = value.copyWith(dimension: preferredResolution); + } + } else { + subscribtions[key] = value; + } + } + + return subscribtions; + } + + Future> setSubscriptions( + List subscriptionChanges, + ) async { + _logger.d( + () => '[setSubscriptions] subscriptionChanges: $subscriptionChanges', + ); + + final participants = stateManager.callState.callParticipants; + final exclude = {SfuTrackType.video, SfuTrackType.screenShare}; + final subscriptions = { + ...participants.getSubscriptions(exclude: exclude), + }; + + _logger.v(() => '[setSubscriptions] source: $subscriptions'); + + for (final change in subscriptionChanges) { + final changeSubscriptions = change.getSubscriptions(); + subscriptions.addAll(changeSubscriptions); + } + + _logger.v(() => '[setSubscriptions] updated: $subscriptions'); + + _currentTrackSubscriptionsSubject.add(subscriptions); + + final result = await sfuClient.update( + sessionId: sessionId, + subscriptions: getTrackSubscriptions().values, + ); + + _logger.v(() => '[setSubscriptions] result: $result'); + return result; + } + + Future> updateSubscription( + SubscriptionChange subscriptionChange, + ) async { + _logger.d( + () => '[updateSubscription] subscriptionChange: $subscriptionChange', + ); + + if (!_saBuffer.isClosed) { + return _saBuffer.post(subscriptionChange); + } + + //Ignore the subscription change if the buffer is closed + return const Result.success(none); + } + + Future> updateSubscriptions( + List changes, + ) async { + _logger.d(() => '[updateSubscriptions] changes: $changes'); + + final participants = stateManager.callState.callParticipants; + final subscriptions = { + ...participants.getSubscriptions(), + }; + + _logger.v(() => '[updateSubscriptions] source: $subscriptions'); + + for (final change in changes) { + if (change.subscribed) { + subscriptions[change.trackId!] = change.toSubscription(); + } else if (!change.subscribed) { + subscriptions.remove(change.trackId); + } + } + + _logger.v(() => '[updateSubscriptions] updated: $subscriptions'); + _currentTrackSubscriptionsSubject.add(subscriptions); + + final result = await applyTrackSubscriptions(); + + _logger.v(() => '[updateSubscriptions] result: $result'); + return result; + } + + Future> applyTrackSubscriptions() async { + final result = await sfuClient.update( + sessionId: sessionId, + subscriptions: getTrackSubscriptions().values, + ); + + return result; + } + + void setVideoTrackSubscriptionOverrides({ + required VideoTrackSubscriptionOverride? override, + List? sessionIds, + }) { + _logger.d( + () => '[setVideoTrackSubscriptionOverrides] sessionIds: $sessionIds, ' + 'override: $override', + ); + + if (sessionIds == null) { + if (override == null) { + return _videoTrackSubscriptionsOverrides + .add(IncomingVideoSettings.default_()); + } + + return _videoTrackSubscriptionsOverrides.add( + IncomingVideoSettings( + enabled: override.enabled, + preferredResolution: override.dimension, + participants: {}, + ), + ); + } + + return _videoTrackSubscriptionsOverrides.add( + IncomingVideoSettings( + enabled: _videoTrackSubscriptionsOverrides.value?.enabled, + preferredResolution: + _videoTrackSubscriptionsOverrides.value?.preferredResolution, + participants: { + ...(_videoTrackSubscriptionsOverrides.value?.participants ?? {}) + ..removeWhere( + (key, _) => override == null && sessionIds.contains(key), + ), + if (override != null) + ...sessionIds.asMap().map( + (_, id) => MapEntry( + id, + ( + enabled: override.enabled, + preferredResolution: override.dimension + ), + ), + ), + }, + ), + ); + } + + Future dispose() async { + await _saBuffer.cancel(); + await _currentTrackSubscriptionsSubject.close(); + } +} + +extension on SfuClient { + Future> update({ + required String sessionId, + required Iterable subscriptions, + }) async { + final result = await updateSubscriptions( + sfu.UpdateSubscriptionsRequest( + sessionId: sessionId, + tracks: subscriptions.map( + (it) => sfu.TrackSubscriptionDetails( + userId: it.userId, + sessionId: it.sessionId, + trackType: it.trackType.toDTO(), + dimension: it.dimension?.toDTO(), + ), + ), + ), + ); + + return result.fold( + failure: (it) => it, + success: (it) { + if (it.data.hasError()) { + final error = it.data.error; + return Result.error('${error.code} - ${error.message}'); + } + return const Result.success(none); + }, + ); + } +} diff --git a/packages/stream_video/lib/src/call_state.dart b/packages/stream_video/lib/src/call_state.dart index 2f106364f..9f5fb8041 100644 --- a/packages/stream_video/lib/src/call_state.dart +++ b/packages/stream_video/lib/src/call_state.dart @@ -36,6 +36,7 @@ class CallState extends Equatable { endedAt: null, liveStartedAt: null, liveEndedAt: null, + timerEndsAt: null, publisherStats: null, subscriberStats: null, localStats: null, @@ -73,6 +74,7 @@ class CallState extends Equatable { required this.endedAt, required this.liveStartedAt, required this.liveEndedAt, + required this.timerEndsAt, required this.publisherStats, required this.subscriberStats, required this.localStats, @@ -107,6 +109,7 @@ class CallState extends Equatable { final DateTime? updatedAt; final DateTime? liveStartedAt; final DateTime? liveEndedAt; + final DateTime? timerEndsAt; final PeerConnectionStats? publisherStats; final PeerConnectionStats? subscriberStats; final LocalStats? localStats; @@ -155,6 +158,7 @@ class CallState extends Equatable { DateTime? endedAt, DateTime? liveStartedAt, DateTime? liveEndedAt, + DateTime? timerEndsAt, PeerConnectionStats? publisherStats, PeerConnectionStats? subscriberStats, LocalStats? localStats, @@ -189,6 +193,7 @@ class CallState extends Equatable { endedAt: endedAt ?? this.endedAt, liveStartedAt: liveStartedAt ?? this.liveStartedAt, liveEndedAt: liveEndedAt ?? this.liveEndedAt, + timerEndsAt: timerEndsAt ?? this.timerEndsAt, publisherStats: publisherStats ?? this.publisherStats, subscriberStats: subscriberStats ?? this.subscriberStats, localStats: localStats ?? this.localStats, @@ -222,6 +227,7 @@ class CallState extends Equatable { ownCapabilities: capabilities.isEmpty ? null : capabilities, liveStartedAt: metadata.session.liveStartedAt, liveEndedAt: metadata.session.liveEndedAt, + timerEndsAt: metadata.session.timerEndsAt, ); } @@ -250,6 +256,7 @@ class CallState extends Equatable { endedAt, liveStartedAt, liveEndedAt, + timerEndsAt, publisherStats, subscriberStats, localStats, diff --git a/packages/stream_video/lib/src/extensions/call_participant_state_ext.dart b/packages/stream_video/lib/src/extensions/call_participant_state_ext.dart new file mode 100644 index 000000000..10eae53e5 --- /dev/null +++ b/packages/stream_video/lib/src/extensions/call_participant_state_ext.dart @@ -0,0 +1,57 @@ +import '../models/call_participant_state.dart'; +import '../models/call_track_state.dart'; +import '../sfu/data/models/sfu_subscription_details.dart'; +import '../sfu/data/models/sfu_track_type.dart'; + +extension CallParticipantStateListEx on List { + Map getSubscriptions({ + Set exclude = const {}, + }) { + final subscriptions = {}; + + for (final participant in this) { + // We only care about remote participants. + if (participant.isLocal) continue; + + subscriptions.addAll( + participant.getSubscriptions(exclude: exclude), + ); + } + + return subscriptions; + } +} + +extension CallParticipantStateEx on CallParticipantState { + Map getSubscriptions({ + Set exclude = const {}, + }) { + final subscriptions = {}; + + for (final trackType in publishedTracks.keys) { + final trackState = publishedTracks[trackType]; + + // We only care about remote tracks. + if (trackState is! RemoteTrackState) continue; + + // Continue if we should exclude this trackType. + final shouldExclude = exclude.contains(trackType); + if (shouldExclude) continue; + + // We only care about tracks that are subscribed. + if (!trackState.subscribed) continue; + + final detail = SfuSubscriptionDetails( + userId: userId, + sessionId: sessionId, + trackIdPrefix: trackIdPrefix, + trackType: trackType, + dimension: trackState.videoDimension, + ); + + subscriptions[detail.trackId] = detail; + } + + return subscriptions; + } +} diff --git a/packages/stream_video/lib/src/models/subscription_change.dart b/packages/stream_video/lib/src/models/subscription_change.dart index faec85b47..bf160c893 100644 --- a/packages/stream_video/lib/src/models/subscription_change.dart +++ b/packages/stream_video/lib/src/models/subscription_change.dart @@ -73,3 +73,13 @@ class SubscriptionChange { 'subscribed: $subscribed, trackId: $trackId}'; } } + +class VideoTrackSubscriptionOverride { + VideoTrackSubscriptionOverride({ + this.dimension, + this.enabled = true, + }); + + final RtcVideoDimension? dimension; + final bool enabled; +} diff --git a/packages/stream_video/lib/src/sfu/data/models/sfu_subscription_details.dart b/packages/stream_video/lib/src/sfu/data/models/sfu_subscription_details.dart index f2386dc25..3d5a7b62f 100644 --- a/packages/stream_video/lib/src/sfu/data/models/sfu_subscription_details.dart +++ b/packages/stream_video/lib/src/sfu/data/models/sfu_subscription_details.dart @@ -19,6 +19,22 @@ class SfuSubscriptionDetails with EquatableMixin { final String trackId; + SfuSubscriptionDetails copyWith({ + String? userId, + String? sessionId, + String? trackIdPrefix, + SfuTrackType? trackType, + RtcVideoDimension? dimension, + }) { + return SfuSubscriptionDetails( + userId: userId ?? this.userId, + sessionId: sessionId ?? this.sessionId, + trackIdPrefix: trackIdPrefix ?? this.trackIdPrefix, + trackType: trackType ?? this.trackType, + dimension: dimension ?? this.dimension, + ); + } + @override bool? get stringify => true; diff --git a/packages/stream_video/lib/stream_video.dart b/packages/stream_video/lib/stream_video.dart index 31c262c84..3c4962af9 100644 --- a/packages/stream_video/lib/stream_video.dart +++ b/packages/stream_video/lib/stream_video.dart @@ -30,6 +30,7 @@ export 'src/call/call_events.dart'; export 'src/call/call_reject_reason.dart'; export 'src/call/call_ringing_state.dart'; export 'src/call/call_type.dart'; +export 'src/call/session/dynascale_manager.dart'; export 'src/call_state.dart'; export 'src/coordinator/coordinator_client.dart'; export 'src/coordinator/models/coordinator_events.dart'; diff --git a/packages/stream_video_flutter/lib/src/renderer/video_renderer.dart b/packages/stream_video_flutter/lib/src/renderer/video_renderer.dart index c77aa521e..462711a59 100644 --- a/packages/stream_video_flutter/lib/src/renderer/video_renderer.dart +++ b/packages/stream_video_flutter/lib/src/renderer/video_renderer.dart @@ -50,8 +50,18 @@ class StreamVideoRenderer extends StatelessWidget { // The video track is local and is already published. child = _buildVideoTrackRenderer(context, trackState); } else if (trackState.subscribed && trackState.received) { - // The video track is remote and has been received. - child = _buildVideoTrackRenderer(context, trackState); + final incomingVideoSettingsEnabled = call + .dynascaleManager.incomingVideoSettings + ?.isParticipantVideoEnabled(participant.sessionId) ?? + true; + + if (!incomingVideoSettingsEnabled) { + // The video track is remote and has been received, but has been disabled. + child = placeholderBuilder.call(context); + } else { + // The video track is remote and has been received. + child = _buildVideoTrackRenderer(context, trackState); + } } else { // The video track is remote and hasn't been received yet. child = placeholderBuilder.call(context);