Skip to content

Commit

Permalink
Added Store#stateFlow(Lifecycle) and `Store#labelsChannel(Lifecycle…
Browse files Browse the repository at this point in the history
…)` API, promoted to stable
  • Loading branch information
arkivanov committed Jan 12, 2025
1 parent f23a901 commit bce53e9
Show file tree
Hide file tree
Showing 8 changed files with 328 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ public final class com/arkivanov/mvikotlin/extensions/coroutines/StoreExtKt {
public static final fun getLabels (Lcom/arkivanov/mvikotlin/core/store/Store;)Lkotlinx/coroutines/flow/Flow;
public static final fun getStateFlow (Lcom/arkivanov/mvikotlin/core/store/Store;)Lkotlinx/coroutines/flow/StateFlow;
public static final fun getStates (Lcom/arkivanov/mvikotlin/core/store/Store;)Lkotlinx/coroutines/flow/Flow;
public static final fun labelsChannel (Lcom/arkivanov/mvikotlin/core/store/Store;Lcom/arkivanov/essenty/lifecycle/Lifecycle;I)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun labelsChannel (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;I)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun labelsChannel$default (Lcom/arkivanov/mvikotlin/core/store/Store;Lcom/arkivanov/essenty/lifecycle/Lifecycle;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun labelsChannel$default (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun stateFlow (Lcom/arkivanov/mvikotlin/core/store/Store;Lcom/arkivanov/essenty/lifecycle/Lifecycle;)Lkotlinx/coroutines/flow/StateFlow;
public static final fun stateFlow (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;)Lkotlinx/coroutines/flow/StateFlow;
public static synthetic fun stateFlow$default (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;ILjava/lang/Object;)Lkotlinx/coroutines/flow/StateFlow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ public final class com/arkivanov/mvikotlin/extensions/coroutines/StoreExtKt {
public static final fun getLabels (Lcom/arkivanov/mvikotlin/core/store/Store;)Lkotlinx/coroutines/flow/Flow;
public static final fun getStateFlow (Lcom/arkivanov/mvikotlin/core/store/Store;)Lkotlinx/coroutines/flow/StateFlow;
public static final fun getStates (Lcom/arkivanov/mvikotlin/core/store/Store;)Lkotlinx/coroutines/flow/Flow;
public static final fun labelsChannel (Lcom/arkivanov/mvikotlin/core/store/Store;Lcom/arkivanov/essenty/lifecycle/Lifecycle;I)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun labelsChannel (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;I)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun labelsChannel$default (Lcom/arkivanov/mvikotlin/core/store/Store;Lcom/arkivanov/essenty/lifecycle/Lifecycle;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun labelsChannel$default (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun stateFlow (Lcom/arkivanov/mvikotlin/core/store/Store;Lcom/arkivanov/essenty/lifecycle/Lifecycle;)Lkotlinx/coroutines/flow/StateFlow;
public static final fun stateFlow (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;)Lkotlinx/coroutines/flow/StateFlow;
public static synthetic fun stateFlow$default (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;ILjava/lang/Object;)Lkotlinx/coroutines/flow/StateFlow;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.arkivanov.mvikotlin.extensions.coroutines

import com.arkivanov.essenty.lifecycle.Lifecycle
import com.arkivanov.essenty.lifecycle.doOnDestroy
import com.arkivanov.mvikotlin.core.rx.observer
import com.arkivanov.mvikotlin.core.store.Store
import com.arkivanov.mvikotlin.core.utils.ExperimentalMviKotlinApi
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.ExperimentalForInheritanceCoroutinesApi
Expand All @@ -19,15 +20,15 @@ import kotlinx.coroutines.launch
import kotlin.coroutines.CoroutineContext

/**
* Returns a [Flow] that emits [Store] states.
* Creates and returns a [Flow] that emits [Store] states.
*
* Please note that the actual collection of the [Flow] may not be synchronous depending on [CoroutineContext] being used.
*/
val <State : Any> Store<*, State, *>.states: Flow<State>
get() = toFlow(Store<*, State, *>::states)

/**
* Returns a [StateFlow] that emits [Store] states. The returned [StateFlow] is hot,
* Creates and returns a [StateFlow] that emits [Store] states. The returned [StateFlow] is hot,
* started in the given coroutine [scope], sharing the most recently emitted state from
* a single subscription to the [Store] with multiple downstream subscribers.
*
Expand All @@ -42,7 +43,25 @@ fun <State : Any> Store<*, State, *>.stateFlow(
): StateFlow<State> = states.stateIn(scope, started, state)

/**
* Returns a [StateFlow] that emits [Store] states.
* Creates and returns a [StateFlow] that emits [Store] states. The returned [StateFlow] is hot,
* sharing the most recently emitted state from a single subscription to the [Store]
* with multiple downstream subscribers.
*
* Please note that the actual collection of the [StateFlow] may not be synchronous
* depending on [CoroutineContext] being used.
*
* @param lifecycle a [Lifecycle] used for cancelling the underlying [MutableStateFlow].
*/
fun <State : Any> Store<*, State, *>.stateFlow(lifecycle: Lifecycle): StateFlow<State> {
val stateFlow = MutableStateFlow(state)
val disposable = states(observer(onNext = { stateFlow.value = it }))
lifecycle.doOnDestroy { disposable.dispose() }

return stateFlow
}

/**
* Creates and returns a [StateFlow] that emits [Store] states.
*
* This API is experimental because [StateFlow] interface is not stable for inheritance in 3rd party libraries.
* Please mind binary compatibility when using this API.
Expand All @@ -53,6 +72,7 @@ fun <State : Any> Store<*, State, *>.stateFlow(
val <State : Any> Store<*, State, *>.stateFlow: StateFlow<State>
get() = StoreStateFlow(store = this)

@Suppress("UnnecessaryOptInAnnotation")
@OptIn(ExperimentalForInheritanceCoroutinesApi::class)
private class StoreStateFlow<out State : Any>(
private val store: Store<*, State, *>,
Expand All @@ -74,7 +94,7 @@ private class StoreStateFlow<out State : Any>(
}

/**
* Returns a [Flow] that emits [Store] labels.
* Creates and returns a [Flow] that emits [Store] labels.
*
* Please note that the actual collection of the [Flow] may not be synchronous depending on [CoroutineContext] being used.
*/
Expand All @@ -90,12 +110,12 @@ val <Label : Any> Store<*, *, Label>.labels: Flow<Label>
*
* Due to the nature of how channels work, it is recommended to have one [Channel] per subscriber.
*
* Please note that the actual collection of the [Flow] may not be synchronous depending on [CoroutineContext] being used.
* Please note that the actual collection of the [ReceiveChannel] may not be synchronous depending on
* [CoroutineContext] being used.
*
* @param scope a [CoroutineScope] used for cancelling the underlying [Channel].
* @param capacity a capacity of the underlying [Channel], default value is [Channel.BUFFERED].
*/
@ExperimentalMviKotlinApi
fun <Label : Any> Store<*, *, Label>.labelsChannel(
scope: CoroutineScope,
capacity: Int = Channel.BUFFERED,
Expand All @@ -115,3 +135,32 @@ fun <Label : Any> Store<*, *, Label>.labelsChannel(
return channel
}

/**
* Returns a [ReceiveChannel] that emits [Store] labels. Unlike [labels] that returns a [Flow], this API
* is useful when labels must not be skipped while there is no subscriber. Please keep in mind that labels
* still may be skipped if they are dispatched synchronously on [Store] initialization. If that's the case,
* you can disable the automatic initialization by passing `autoInit = false` parameter when creating a [Store],
* see [StoreFactory.create][com.arkivanov.mvikotlin.core.store.StoreFactory.create] for more information.
*
* Due to the nature of how channels work, it is recommended to have one [Channel] per subscriber.
*
* Please note that the actual collection of the [ReceiveChannel] may not be synchronous depending on
* [CoroutineContext] being used.
*
* @param lifecycle a [Lifecycle] used for cancelling the underlying [Channel].
* @param capacity a capacity of the underlying [Channel], default value is [Channel.BUFFERED].
*/
fun <Label : Any> Store<*, *, Label>.labelsChannel(
lifecycle: Lifecycle,
capacity: Int = Channel.BUFFERED,
): ReceiveChannel<Label> {
val channel = Channel<Label>(capacity = capacity)
val disposable = labels(observer(onNext = channel::trySend))

lifecycle.doOnDestroy {
disposable.dispose()
channel.cancel()
}

return channel
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.arkivanov.mvikotlin.extensions.coroutines

import com.arkivanov.essenty.lifecycle.Lifecycle
import com.arkivanov.essenty.lifecycle.LifecycleRegistry
import com.arkivanov.essenty.lifecycle.destroy
import com.arkivanov.mvikotlin.core.rx.Disposable
import com.arkivanov.mvikotlin.core.rx.Observer
import com.arkivanov.mvikotlin.core.store.Store
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlin.test.Test
import kotlin.test.assertContentEquals
import kotlin.test.assertNull
import kotlin.test.assertTrue

@Suppress("TestFunctionName")
class LabelChannelWithLifecycleTest {

@Test
fun WHEN_label_emitted_THEN_label_collected() {
val store = TestStore()
val scope = CoroutineScope(Dispatchers.Unconfined)
val channel = store.labelsChannel(LifecycleRegistry())
val labels = ArrayList<Int>()

store.labelObserver?.onNext(1)

scope.launch {
for (label in channel) {
labels += label
}
}

store.labelObserver?.onNext(2)
store.labelObserver?.onNext(3)

assertContentEquals(listOf(1, 2, 3), labels)
}

@Test
fun WHEN_lifecycle_destroyed_THEN_unsubscribed_from_store() {
val store = TestStore()
val scope = CoroutineScope(Dispatchers.Unconfined)
val lifecycle = LifecycleRegistry(Lifecycle.State.CREATED)
val channel = store.labelsChannel(lifecycle)

scope.launch {
while (true) {
channel.receive()
}
}

lifecycle.destroy()

assertNull(store.labelObserver)
}

@OptIn(DelicateCoroutinesApi::class)
@Test
fun WHEN_lifecycle_destroyed_THEN_channel_cancelled() {
val store = TestStore()
val scope = CoroutineScope(Dispatchers.Unconfined)
val lifecycle = LifecycleRegistry(Lifecycle.State.CREATED)
val channel = store.labelsChannel(lifecycle)

scope.launch {
while (true) {
channel.receive()
}
}

lifecycle.destroy()

assertTrue(channel.isClosedForReceive)
}

private class TestStore : Store<Int, Int, Int> {
override val state: Int = 0
override val isDisposed: Boolean = false

var labelObserver: Observer<Int>? = null
private set

override fun states(observer: Observer<Int>): Disposable = error("Not required")

override fun labels(observer: Observer<Int>): Disposable {
labelObserver = observer

return Disposable { labelObserver = null }
}

override fun accept(intent: Int) {
// no-op
}

override fun init() {
// no-op
}

override fun dispose() {
// no-op
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package com.arkivanov.mvikotlin.extensions.coroutines
import com.arkivanov.mvikotlin.core.rx.Disposable
import com.arkivanov.mvikotlin.core.rx.Observer
import com.arkivanov.mvikotlin.core.store.Store
import com.arkivanov.mvikotlin.core.utils.ExperimentalMviKotlinApi
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
Expand All @@ -14,9 +13,8 @@ import kotlin.test.assertContentEquals
import kotlin.test.assertNull
import kotlin.test.assertTrue

@OptIn(ExperimentalMviKotlinApi::class)
@Suppress("TestFunctionName")
class LabelChannelTest {
class LabelChannelWithScopeTest {

@Test
fun WHEN_label_emitted_THEN_label_collected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ class StateFlowTest {
@Test
fun WHEN_state_emitted_THEN_state_collected() {
val store = TestStore()
val flow = store.stateFlow
val scope = CoroutineScope(Dispatchers.Unconfined)
val flow = store.stateFlow
val items = ArrayList<Int>()

scope.launch {
Expand All @@ -37,8 +37,8 @@ class StateFlowTest {
@Test
fun WHEN_collection_cancelled_THEN_unsubscribed_from_store() {
val store = TestStore()
val flow = store.stateFlow
val scope = CoroutineScope(Dispatchers.Unconfined)
val flow = store.stateFlow

scope.launch {
flow.collect {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.arkivanov.mvikotlin.extensions.coroutines

import com.arkivanov.essenty.lifecycle.Lifecycle
import com.arkivanov.essenty.lifecycle.LifecycleRegistry
import com.arkivanov.essenty.lifecycle.destroy
import com.arkivanov.mvikotlin.core.rx.Disposable
import com.arkivanov.mvikotlin.core.rx.Observer
import com.arkivanov.mvikotlin.core.store.Store
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlin.test.Test
import kotlin.test.assertContentEquals
import kotlin.test.assertNull

@Suppress("TestFunctionName")
class StateFlowWithLifecycleTest {

@Test
fun WHEN_state_emitted_THEN_state_collected() {
val store = TestStore()
val scope = CoroutineScope(Dispatchers.Unconfined)
val flow = store.stateFlow(LifecycleRegistry())
val items = ArrayList<Int>()

scope.launch {
flow.collect { items += it }
}

store.stateObserver?.onNext(1)
store.stateObserver?.onNext(2)
store.stateObserver?.onNext(3)

assertContentEquals(listOf(0, 1, 2, 3), items)
}

@Test
fun WHEN_lifecycle_destroyed_THEN_unsubscribed_from_store() {
val store = TestStore()
val lifecycle = LifecycleRegistry(Lifecycle.State.CREATED)
val scope = CoroutineScope(Dispatchers.Unconfined)
val flow = store.stateFlow(lifecycle)

scope.launch {
flow.collect {}
}

lifecycle.destroy()

assertNull(store.stateObserver)
}

private class TestStore : Store<Int, Int, Int> {
override val state: Int = 0
override val isDisposed: Boolean = false

var stateObserver: Observer<Int>? = null
private set

override fun states(observer: Observer<Int>): Disposable {
stateObserver = observer

return Disposable { stateObserver = null }
}

override fun labels(observer: Observer<Int>): Disposable = error("Not required")

override fun accept(intent: Int) {
// no-op
}

override fun init() {
// no-op
}

override fun dispose() {
// no-op
}
}
}
Loading

0 comments on commit bce53e9

Please sign in to comment.