From 979967d88df12cb53ecdaa9e8f546f2f39974c3e Mon Sep 17 00:00:00 2001 From: Liviu Timar <65943217+liviu-timar@users.noreply.github.com> Date: Wed, 5 Jun 2024 10:51:37 +0300 Subject: [PATCH] [PBE-3935] Fix RtcSession.connect() indefinite suspend bug (#1103) * Replace Flow with Mutex in code that determines if join event has been received * Increase replay cache for socket events flow --- .../video/android/core/call/RtcSession.kt | 25 +++++++++++++------ .../android/core/socket/PersistentSocket.kt | 2 +- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt index 6387e6c4ff..210b903166 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/call/RtcSession.kt @@ -66,6 +66,7 @@ import io.getstream.video.android.core.utils.stringify import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.cancel @@ -77,13 +78,15 @@ import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.flow.debounce -import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.retry import kotlinx.coroutines.flow.retryWhen import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext +import kotlinx.coroutines.withTimeout import kotlinx.serialization.encodeToString import kotlinx.serialization.json.Json import okio.IOException @@ -221,7 +224,7 @@ public class RtcSession internal constructor( /** * We can't publish tracks till we've received the join event response */ - private val joinEventResponse: MutableStateFlow = MutableStateFlow(null) + private val joinEventReceivedMutex = Mutex(locked = true) // participants by session id -> participant state private val trackPrefixToSessionIdMap = @@ -424,8 +427,16 @@ public class RtcSession internal constructor( timer.finish() // ensure that the join event has been handled before starting RTC - joinEventResponse.first { it != null } - connectRtc() + try { + withTimeout(2000L) { + joinEventReceivedMutex.withLock { connectRtc() } + } + } catch (e: TimeoutCancellationException) { + throw RtcException( + message = "RtcSession.connect() timed out. JoinCallResponseEvent not received in time.", + cause = e, + ) + } } private fun initializeVideoTransceiver() { @@ -1058,9 +1069,9 @@ public class RtcSession internal constructor( fun handleEvent(event: VideoEvent) { logger.i { "[rtc handleEvent] #sfu; event: $event" } if (event is JoinCallResponseEvent) { - logger.i { "[rtc handleEvent] joinEventResponse.value: $event" } + logger.i { "[rtc handleEvent] unlocking joinEventReceivedMutex" } - joinEventResponse.value = event + joinEventReceivedMutex.unlock() } if (event is SfuDataEvent) { coroutineScope.launch { @@ -1309,7 +1320,7 @@ public class RtcSession internal constructor( } // the Sfu WS needs to be connected before calling SetPublisherRequest - if (joinEventResponse.value == null) { + if (joinEventReceivedMutex.isLocked) { logger.e { "[negotiate] #$id; #sfu; #${peerType.stringify()}; SFU WS isn't connected" } return@launch } diff --git a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/PersistentSocket.kt b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/PersistentSocket.kt index 1bf541ceee..b53499bdcd 100644 --- a/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/PersistentSocket.kt +++ b/stream-video-android-core/src/main/kotlin/io/getstream/video/android/core/socket/PersistentSocket.kt @@ -75,7 +75,7 @@ public open class PersistentSocket( var reconnectTimeout: Long = 500 /** flow with all the events, listen to this */ - val events = MutableSharedFlow() + val events = MutableSharedFlow(replay = 3) /** flow with temporary and permanent errors */ val errors = MutableSharedFlow()