Skip to content

Commit

Permalink
Remove the use of deprecated BroadcastChannel. (#659)
Browse files Browse the repository at this point in the history
* Remove the use of deprecated BroadcastChannel.

Signed-off-by: Roman Kalukiewicz <[email protected]>

* Renamed channels to flows.

Signed-off-by: Roman Kalukiewicz <[email protected]>

* Run ./gradlew ktlintFormat

Signed-off-by: Matt Ramotar <[email protected]>

---------

Signed-off-by: Roman Kalukiewicz <[email protected]>
Signed-off-by: Matt Ramotar <[email protected]>
  • Loading branch information
romkal authored Nov 17, 2024
1 parent 8b5db52 commit 56a06d0
Showing 1 changed file with 30 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package org.mobilenativefoundation.store.store5.util

import kotlinx.coroutines.channels.BroadcastChannel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import org.mobilenativefoundation.store.store5.SourceOfTruth

/**
Expand Down Expand Up @@ -58,75 +60,68 @@ fun <Key : Any, Output : Any> SimplePersisterAsFlowable<Key, Output>.asSourceOfT
internal class KeyTracker<Key> {
private val lock = Mutex()

// list of open key channels
private val channels = mutableMapOf<Key, KeyChannel>()
// list of open key flows
private val flows = mutableMapOf<Key, KeyFlow>()

// for testing
internal fun activeKeyCount() = channels.size
internal fun activeKeyCount() = flows.size

/**
* invalidates the given key. If there are flows returned from [keyFlow] for the given [key],
* they'll receive a new emission
*/
suspend fun invalidate(key: Key) {
lock.withLock {
channels[key]
}?.channel?.send(Unit)
flows[key]
}?.flow?.emit(Unit)
}

/**
* Returns a Flow that emits once and then every time the given [key] is invalidated via
* [invalidate]
*/
suspend fun keyFlow(key: Key): Flow<Unit> {
// it is important to allocate KeyChannel lazily (ony when the returned flow is collected
// it is important to allocate KeyFlow lazily (ony when the returned flow is collected
// from). Otherwise, we might just create many of them that are never observed hence never
// cleaned up
return flow {
val keyChannel =
val keyFlow =
lock.withLock {
channels.getOrPut(key) {
KeyChannel(
channel =
BroadcastChannel<Unit>(Channel.CONFLATED).apply {
// start w/ an initial value.
trySend(Unit).isSuccess
},
)
}.also {
it.acquire() // refcount
flows.getOrPut(key) { KeyFlow() }.also {
it.acquire()
}
}
emit(Unit)
try {
emitAll(keyChannel.channel.openSubscription())
emitAll(keyFlow.flow)
} finally {
lock.withLock {
keyChannel.release()
if (keyChannel.channel.isClosedForSend) {
channels.remove(key)
withContext(NonCancellable) {
lock.withLock {
if (keyFlow.release()) {
flows.remove(key)
}
}
}
}
}
}

/**
* A data structure to count how many active flows we have on this channel
* A data structure to count how many active flows we have on this flow
*/
private data class KeyChannel(
val channel: BroadcastChannel<Unit>,
var collectors: Int = 0,
) {
private class KeyFlow {
val flow =
MutableSharedFlow<Unit>(
extraBufferCapacity = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST,
)
private var collectors: Int = 0

fun acquire() {
collectors++
}

fun release() {
collectors--
if (collectors == 0) {
channel.close()
}
}
fun release() = (--collectors) == 0
}
}

Expand Down

0 comments on commit 56a06d0

Please sign in to comment.