From 56a06d0ec6df6924b70cea198cb984d823d2420d Mon Sep 17 00:00:00 2001 From: Roman Kalukiewicz Date: Sun, 17 Nov 2024 07:46:57 -0800 Subject: [PATCH] Remove the use of deprecated BroadcastChannel. (#659) * Remove the use of deprecated BroadcastChannel. Signed-off-by: Roman Kalukiewicz * Renamed channels to flows. Signed-off-by: Roman Kalukiewicz * Run ./gradlew ktlintFormat Signed-off-by: Matt Ramotar --------- Signed-off-by: Roman Kalukiewicz Signed-off-by: Matt Ramotar --- .../store/store5/util/AsFlowable.kt | 65 +++++++++---------- 1 file changed, 30 insertions(+), 35 deletions(-) diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/util/AsFlowable.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/util/AsFlowable.kt index c13b21990..60d8bb9ca 100644 --- a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/util/AsFlowable.kt +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/util/AsFlowable.kt @@ -1,12 +1,14 @@ package org.mobilenativefoundation.store.store5.util -import kotlinx.coroutines.channels.BroadcastChannel -import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.flow import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.withContext import org.mobilenativefoundation.store.store5.SourceOfTruth /** @@ -58,11 +60,11 @@ fun SimplePersisterAsFlowable.asSourceOfT internal class KeyTracker { private val lock = Mutex() - // list of open key channels - private val channels = mutableMapOf() + // list of open key flows + private val flows = mutableMapOf() // for testing - internal fun activeKeyCount() = channels.size + internal fun activeKeyCount() = flows.size /** * invalidates the given key. If there are flows returned from [keyFlow] for the given [key], @@ -70,8 +72,8 @@ internal class KeyTracker { */ suspend fun invalidate(key: Key) { lock.withLock { - channels[key] - }?.channel?.send(Unit) + flows[key] + }?.flow?.emit(Unit) } /** @@ -79,31 +81,25 @@ internal class KeyTracker { * [invalidate] */ suspend fun keyFlow(key: Key): Flow { - // it is important to allocate KeyChannel lazily (ony when the returned flow is collected + // it is important to allocate KeyFlow lazily (ony when the returned flow is collected // from). Otherwise, we might just create many of them that are never observed hence never // cleaned up return flow { - val keyChannel = + val keyFlow = lock.withLock { - channels.getOrPut(key) { - KeyChannel( - channel = - BroadcastChannel(Channel.CONFLATED).apply { - // start w/ an initial value. - trySend(Unit).isSuccess - }, - ) - }.also { - it.acquire() // refcount + flows.getOrPut(key) { KeyFlow() }.also { + it.acquire() } } + emit(Unit) try { - emitAll(keyChannel.channel.openSubscription()) + emitAll(keyFlow.flow) } finally { - lock.withLock { - keyChannel.release() - if (keyChannel.channel.isClosedForSend) { - channels.remove(key) + withContext(NonCancellable) { + lock.withLock { + if (keyFlow.release()) { + flows.remove(key) + } } } } @@ -111,22 +107,21 @@ internal class KeyTracker { } /** - * A data structure to count how many active flows we have on this channel + * A data structure to count how many active flows we have on this flow */ - private data class KeyChannel( - val channel: BroadcastChannel, - var collectors: Int = 0, - ) { + private class KeyFlow { + val flow = + MutableSharedFlow( + extraBufferCapacity = 1, + onBufferOverflow = BufferOverflow.DROP_OLDEST, + ) + private var collectors: Int = 0 + fun acquire() { collectors++ } - fun release() { - collectors-- - if (collectors == 0) { - channel.close() - } - } + fun release() = (--collectors) == 0 } }