diff --git a/multicast/build.gradle.kts b/multicast/build.gradle.kts index 2b602120f..3d9b1a816 100644 --- a/multicast/build.gradle.kts +++ b/multicast/build.gradle.kts @@ -47,6 +47,16 @@ kotlin { implementation(libs.kotlinx.coroutines.core) } } + + val commonTest by getting { + dependencies { + implementation(kotlin("test")) + implementation(libs.junit) + implementation(libs.kotlinx.coroutines.test) + implementation(libs.turbine) + } + } + val jvmMain by getting val androidMain by getting val nativeMain by creating { diff --git a/multicast/src/commonMain/kotlin/org/mobilenativefoundation/store/multicast5/ChannelManager.kt b/multicast/src/commonMain/kotlin/org/mobilenativefoundation/store/multicast5/ChannelManager.kt index 6c35fa005..e0bb23bb6 100644 --- a/multicast/src/commonMain/kotlin/org/mobilenativefoundation/store/multicast5/ChannelManager.kt +++ b/multicast/src/commonMain/kotlin/org/mobilenativefoundation/store/multicast5/ChannelManager.kt @@ -22,6 +22,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.SendChannel import kotlinx.coroutines.flow.Flow import org.mobilenativefoundation.store.multicast5.ChannelManager.Message +import kotlin.coroutines.cancellation.CancellationException internal interface ChannelManager { @@ -55,7 +56,11 @@ internal interface ChannelManager { suspend fun dispatchValue(value: Message.Dispatch.Value) { _awaitsDispatch = false - channel.send(value) + try { + channel.send(value) + } catch (e: CancellationException) { + // ignore + } } fun dispatchError(error: Throwable) { diff --git a/multicast/src/commonTest/kotlin/org/mobilenativefoundation/store/multicast5/StoreChannelManagerTests.kt b/multicast/src/commonTest/kotlin/org/mobilenativefoundation/store/multicast5/StoreChannelManagerTests.kt new file mode 100644 index 000000000..60f4ff859 --- /dev/null +++ b/multicast/src/commonTest/kotlin/org/mobilenativefoundation/store/multicast5/StoreChannelManagerTests.kt @@ -0,0 +1,75 @@ +package org.mobilenativefoundation.store.multicast5 + + +import app.cash.turbine.test +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.flow.filterIsInstance +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertEquals + +class StoreChannelManagerTests { + + @Test + fun cancelledDownstreamChannelShouldNotCancelOtherChannels() = runTest { + val coroutineScope = CoroutineScope(Dispatchers.Default) + val lockUpstream = Mutex(true) + val testMessages = listOf(1, 2, 3) + val numChannels = 20 + val upstreamFlow = flow { + lockUpstream.withLock { + testMessages.onEach { emit(it) } + } + } + val channelManager = StoreChannelManager( + scope = coroutineScope, + bufferSize = 0, + upstream = upstreamFlow, + piggybackingDownstream = false, + keepUpstreamAlive = false, + onEach = { } + ) + val channels = createChannels(numChannels) + val channelToBeCancelled = Channel>(Channel.UNLIMITED) + .also { channel -> + coroutineScope.launch { + channel.consumeAsFlow().test { + cancelAndIgnoreRemainingEvents() + } + } + } + coroutineScope.launch { + channels.forEach { channelManager.addDownstream(it) } + lockUpstream.unlock() + } + coroutineScope.launch { + channelManager.addDownstream(channelToBeCancelled) + } + + channels.forEach { channel -> + val messagesFlow = channel.consumeAsFlow() + .filterIsInstance>() + .onEach { it.delivered.complete(Unit) } + + messagesFlow.test { + for (message in testMessages) { + val dispatchValue = awaitItem() + assertEquals(message, dispatchValue.value) + } + awaitComplete() + } + } + } + + private fun createChannels(count: Int): List>> { + return (1..count).map { Channel(Channel.UNLIMITED) } + } +}