Skip to content

Commit

Permalink
[PBE-3935] Fix RtcSession.connect() indefinite suspend bug (#1103)
Browse files Browse the repository at this point in the history
* Replace Flow with Mutex in code that determines if join event has been received

* Increase replay cache for socket events flow
  • Loading branch information
liviu-timar authored Jun 5, 2024
1 parent 4457d5f commit 979967d
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ import io.getstream.video.android.core.utils.stringify
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.cancel
Expand All @@ -77,13 +78,15 @@ import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.retry
import kotlinx.coroutines.flow.retryWhen
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import okio.IOException
Expand Down Expand Up @@ -221,7 +224,7 @@ public class RtcSession internal constructor(
/**
* We can't publish tracks till we've received the join event response
*/
private val joinEventResponse: MutableStateFlow<JoinCallResponseEvent?> = MutableStateFlow(null)
private val joinEventReceivedMutex = Mutex(locked = true)

// participants by session id -> participant state
private val trackPrefixToSessionIdMap =
Expand Down Expand Up @@ -424,8 +427,16 @@ public class RtcSession internal constructor(
timer.finish()

// ensure that the join event has been handled before starting RTC
joinEventResponse.first { it != null }
connectRtc()
try {
withTimeout(2000L) {
joinEventReceivedMutex.withLock { connectRtc() }
}
} catch (e: TimeoutCancellationException) {
throw RtcException(
message = "RtcSession.connect() timed out. JoinCallResponseEvent not received in time.",
cause = e,
)
}
}

private fun initializeVideoTransceiver() {
Expand Down Expand Up @@ -1058,9 +1069,9 @@ public class RtcSession internal constructor(
fun handleEvent(event: VideoEvent) {
logger.i { "[rtc handleEvent] #sfu; event: $event" }
if (event is JoinCallResponseEvent) {
logger.i { "[rtc handleEvent] joinEventResponse.value: $event" }
logger.i { "[rtc handleEvent] unlocking joinEventReceivedMutex" }

joinEventResponse.value = event
joinEventReceivedMutex.unlock()
}
if (event is SfuDataEvent) {
coroutineScope.launch {
Expand Down Expand Up @@ -1309,7 +1320,7 @@ public class RtcSession internal constructor(
}

// the Sfu WS needs to be connected before calling SetPublisherRequest
if (joinEventResponse.value == null) {
if (joinEventReceivedMutex.isLocked) {
logger.e { "[negotiate] #$id; #sfu; #${peerType.stringify()}; SFU WS isn't connected" }
return@launch
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public open class PersistentSocket<T>(
var reconnectTimeout: Long = 500

/** flow with all the events, listen to this */
val events = MutableSharedFlow<VideoEvent>()
val events = MutableSharedFlow<VideoEvent>(replay = 3)

/** flow with temporary and permanent errors */
val errors = MutableSharedFlow<Throwable>()
Expand Down

0 comments on commit 979967d

Please sign in to comment.