Skip to content

Commit

Permalink
Added Store#labelsChannel API
Browse files Browse the repository at this point in the history
  • Loading branch information
arkivanov committed Aug 8, 2024
1 parent 018af7c commit 54cf368
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public final class com/arkivanov/mvikotlin/extensions/coroutines/StoreExtKt {
public static final fun getLabels (Lcom/arkivanov/mvikotlin/core/store/Store;)Lkotlinx/coroutines/flow/Flow;
public static final fun getStateFlow (Lcom/arkivanov/mvikotlin/core/store/Store;)Lkotlinx/coroutines/flow/StateFlow;
public static final fun getStates (Lcom/arkivanov/mvikotlin/core/store/Store;)Lkotlinx/coroutines/flow/Flow;
public static final fun labelsChannel (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;I)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun labelsChannel$default (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun stateFlow (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;)Lkotlinx/coroutines/flow/StateFlow;
public static synthetic fun stateFlow$default (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;ILjava/lang/Object;)Lkotlinx/coroutines/flow/StateFlow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ public final class com/arkivanov/mvikotlin/extensions/coroutines/StoreExtKt {
public static final fun getLabels (Lcom/arkivanov/mvikotlin/core/store/Store;)Lkotlinx/coroutines/flow/Flow;
public static final fun getStateFlow (Lcom/arkivanov/mvikotlin/core/store/Store;)Lkotlinx/coroutines/flow/StateFlow;
public static final fun getStates (Lcom/arkivanov/mvikotlin/core/store/Store;)Lkotlinx/coroutines/flow/Flow;
public static final fun labelsChannel (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;I)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun labelsChannel$default (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun stateFlow (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;)Lkotlinx/coroutines/flow/StateFlow;
public static synthetic fun stateFlow$default (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;ILjava/lang/Object;)Lkotlinx/coroutines/flow/StateFlow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@ package com.arkivanov.mvikotlin.extensions.coroutines

import com.arkivanov.mvikotlin.core.rx.observer
import com.arkivanov.mvikotlin.core.store.Store
import com.arkivanov.mvikotlin.core.utils.ExperimentalMviKotlinApi
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
import kotlin.coroutines.CoroutineContext

/**
Expand Down Expand Up @@ -69,7 +74,42 @@ private class StoreStateFlow<out State : Any>(
/**
* Returns a [Flow] that emits [Store] labels.
*
* Please note that the actual collection of the [Flow] may not be synchronous depending on [CoroutineContext] being used.
* Please note that the actual collection of the [Flow] may not be synchronous depending on [CoroutineContext] being used.
*/
val <Label : Any> Store<*, *, Label>.labels: Flow<Label>
get() = toFlow(Store<*, *, Label>::labels)

/**
* Returns a [ReceiveChannel] that emits [Store] labels. Unlike [labels] that returns a [Flow], this API
* is useful when labels must not be skipped while there is no subscriber. Please keep in mind that labels
* still may be skipped if they are dispatched synchronously on [Store] initialization. If that's the case,
* you can disable the automatic initialization by passing `autoInit = false` parameter when creating a [Store],
* see [StoreFactory.create][com.arkivanov.mvikotlin.core.store.StoreFactory.create] for more information.
*
* Due to the nature of how channels work, it is recommended to have one [Channel] per subscriber.
*
* Please note that the actual collection of the [Flow] may not be synchronous depending on [CoroutineContext] being used.
*
* @param scope a [CoroutineScope] used for cancelling the underlying [Channel].
* @param capacity a capacity of the underlying [Channel], default value is [Channel.BUFFERED].
*/
@ExperimentalMviKotlinApi
fun <Label : Any> Store<*, *, Label>.labelsChannel(
scope: CoroutineScope,
capacity: Int = Channel.BUFFERED,
): ReceiveChannel<Label> {
val channel = Channel<Label>(capacity = capacity)
val disposable = labels(observer(onNext = channel::trySend))

scope.launch {
try {
awaitCancellation()
} finally {
disposable.dispose()
channel.cancel()
}
}

return channel
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.arkivanov.mvikotlin.extensions.coroutines

import com.arkivanov.mvikotlin.core.rx.Disposable
import com.arkivanov.mvikotlin.core.rx.Observer
import com.arkivanov.mvikotlin.core.store.Store
import com.arkivanov.mvikotlin.core.utils.ExperimentalMviKotlinApi
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlin.test.Test
import kotlin.test.assertContentEquals
import kotlin.test.assertNull
import kotlin.test.assertTrue

@OptIn(ExperimentalMviKotlinApi::class)
@Suppress("TestFunctionName")
class LabelChannelTest {

@Test
fun WHEN_label_emitted_THEN_label_collected() {
val store = TestStore()
val scope = CoroutineScope(Dispatchers.Unconfined)
val channel = store.labelsChannel(scope)
val labels = ArrayList<Int>()

store.labelObserver?.onNext(1)

scope.launch {
for (label in channel) {
labels += label
}
}

store.labelObserver?.onNext(2)
store.labelObserver?.onNext(3)

assertContentEquals(listOf(1, 2, 3), labels)
}

@Test
fun WHEN_scope_cancelled_THEN_unsubscribed_from_store() {
val store = TestStore()
val scope = CoroutineScope(Dispatchers.Unconfined)
val channel = store.labelsChannel(scope)

scope.launch {
while (true) {
channel.receive()
}
}

scope.cancel()

assertNull(store.labelObserver)
}

@OptIn(DelicateCoroutinesApi::class)
@Test
fun WHEN_scope_cancelled_THEN_channel_cancelled() {
val store = TestStore()
val scope = CoroutineScope(Dispatchers.Unconfined)
val channel = store.labelsChannel(scope)

scope.launch {
while (true) {
channel.receive()
}
}

scope.cancel()

assertTrue(channel.isClosedForReceive)
}

private class TestStore : Store<Int, Int, Int> {
override val state: Int = 0
override val isDisposed: Boolean = false

var labelObserver: Observer<Int>? = null
private set

override fun states(observer: Observer<Int>): Disposable = error("Not required")

override fun labels(observer: Observer<Int>): Disposable {
labelObserver = observer

return Disposable { labelObserver = null }
}

override fun accept(intent: Int) {
// no-op
}

override fun init() {
// no-op
}

override fun dispose() {
// no-op
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.arkivanov.mvikotlin.extensions.coroutines

import com.arkivanov.mvikotlin.core.rx.Disposable
import com.arkivanov.mvikotlin.core.rx.Observer
import com.arkivanov.mvikotlin.core.store.Store
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlin.test.Test
import kotlin.test.assertContentEquals
import kotlin.test.assertNull

@Suppress("TestFunctionName")
class LabelFlowTest {

@Test
fun WHEN_label_emitted_THEN_label_collected() {
val store = TestStore()
val flow = store.labels
val scope = CoroutineScope(Dispatchers.Unconfined)
val items = ArrayList<Int>()

scope.launch {
flow.collect { items += it }
}

store.labelObserver?.onNext(1)
store.labelObserver?.onNext(2)
store.labelObserver?.onNext(3)

assertContentEquals(listOf(1, 2, 3), items)
}

@Test
fun WHEN_collection_cancelled_THEN_unsubscribed_from_store() {
val store = TestStore()
val flow = store.labels
val scope = CoroutineScope(Dispatchers.Unconfined)

scope.launch {
flow.collect {}
}

scope.cancel()

assertNull(store.labelObserver)
}

private class TestStore : Store<Int, Int, Int> {
override val state: Int = 0
override val isDisposed: Boolean = false

var labelObserver: Observer<Int>? = null
private set

override fun states(observer: Observer<Int>): Disposable = error("Not required")

override fun labels(observer: Observer<Int>): Disposable {
labelObserver = observer

return Disposable { labelObserver = null }
}

override fun accept(intent: Int) {
// no-op
}

override fun init() {
// no-op
}

override fun dispose() {
// no-op
}
}
}

0 comments on commit 54cf368

Please sign in to comment.