Skip to content

Commit

Permalink
Add initialDelay to FlowCycler
Browse files Browse the repository at this point in the history
  • Loading branch information
janseeger committed Dec 8, 2023
1 parent a151365 commit f2c3926
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,33 @@ import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onStart
import kotlin.time.Duration

fun tickEvery(time: Duration) = flow {
fun tickEvery(
interval: Duration,
initialDelay: Duration = Duration.ZERO
) = flow {
delay(initialDelay)
while (true) {
emit(Unit)
delay(time)
delay(interval)
}
}

fun <T> cycleBetween(
tickerInterval: Duration,
interval: Duration,
flow1: Flow<T>,
flow2: Flow<T>,
): Flow<T> = cycleBetween(tickEvery(tickerInterval), flow1, flow2)
initialDelay: Duration = Duration.ZERO
): Flow<T> = cycleBetween(tickEvery(interval, initialDelay), flow1, flow2)

fun <I, T> cycleBetween(
ticker: Flow<I?>,
flow1: Flow<T>,
flow2: Flow<T>,
): Flow<T> {
var first = false
return combine(ticker.onStart { emit(null) }, flow1, flow2) { _, a, b ->
return combine(ticker, flow1, flow2) { _, a, b ->
first = !first
return@combine when {
first -> a
else -> b
}
return@combine if (first) a else b
}
}

Expand All @@ -43,7 +45,7 @@ fun <T> cycleBetweenNonNull(

fun <I, T> cycleBetweenNonNull(ticker: Flow<I?>, flow1: Flow<T>, flow2: Flow<T>): Flow<T> {
var first = false
return combine(ticker.onStart { emit(null) }, flow1, flow2) { _, a, b ->
return combine(ticker, flow1, flow2) { _, a, b ->
first = !first
when {
first -> a ?: b
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class CycleBetweenNonNullTest {
@Test
fun cyclerReturnsAlwaysAWhenBIsNull() = runTest {
cycleBetweenNonNull(ticker.toFlow(), flowA, nullFlow).test {
tick()
assertEquals(contentA, awaitItem())

tick()
Expand All @@ -33,6 +34,7 @@ class CycleBetweenNonNullTest {
@Test
fun cyclerReturnsAlwaysBWhenAIsNull() = runTest {
cycleBetweenNonNull(ticker.toFlow(), nullFlow, flowB).test {
tick()
assertEquals(contentB, awaitItem())

tick()
Expand All @@ -46,6 +48,7 @@ class CycleBetweenNonNullTest {
@Test
fun cyclerReturnsNullWhenBothAAndBAreNull() = runTest {
cycleBetweenNonNull(ticker.toFlow(), nullFlow, nullFlow).test {
tick()
Assertions.assertNull(awaitItem())

tick()
Expand All @@ -72,6 +75,7 @@ class CycleBetweenNonNullTest {
@Test
fun cycleBetweenWithDurationAndNullWorks() = runTest {
cycleBetweenNonNull(ticker.toFlow(), flowA, nullFlow).test {
tick()
assertEquals(contentA, awaitItem())

tick()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
package de.sipgate.dachlatten.flow

import app.cash.turbine.test
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEmpty
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlin.time.ExperimentalTime
import kotlin.time.TimedValue
import kotlin.time.measureTimedValue

class CycleBetweenTest {
@Test
fun cycleBetweenSwitchesTheFlowBeingEmitted() = runTest {
cycleBetween(ticker.toFlow(), flowA, flowB).test {
tick()
assertEquals(contentA, awaitItem())

tick()
Expand All @@ -29,6 +33,7 @@ class CycleBetweenTest {
@Test
fun cycleBetweenWorksWithNullValues() = runTest {
cycleBetween(ticker.toFlow(), flowA, nullFlow).test {
tick()
assertEquals(contentA, awaitItem())

tick()
Expand All @@ -39,19 +44,47 @@ class CycleBetweenTest {
}
}

@OptIn(ExperimentalCoroutinesApi::class)
@Test
fun cycleBetweenWithDurationWorks() = runTest {
fun cycleBetweenWithDurationProducesTheCorrectValues() = runTest {
cycleBetween(5.seconds, flowA, flowB).test {
assertEquals(contentA, awaitItem())

expectNoEvents()
advanceTimeBy(5.seconds)
assertEquals(contentB, awaitItem())
val b = measureTimedValue { awaitItem() }
assertEquals(contentB, b.value)

expectNoEvents()
advanceTimeBy(5.seconds)
val a = measureTimedValue { awaitItem() }
assertEquals(contentA, a.value)
}
}

@Test
fun cycleBetweenWithDurationHasProperTiming() = runTest {
cycleBetween(5.seconds, flowA, flowB).test {
assertEquals(contentA, awaitItem())

val b = measureTimedValue { awaitItem() }
assertEquals(5.seconds, b.duration)

val a = measureTimedValue { awaitItem() }
assertEquals(5.seconds, a.duration)
}
}

@Test
fun cycleBetweenWithInitialDelayWorks() = runTest {
cycleBetween(
interval = 5.seconds,
initialDelay = 2.seconds,
flow1 = flowA,
flow2 = flowB
).test {
val initial = measureTimedValue { awaitItem() }
assertEquals(contentA, initial.value)
assertEquals(2.seconds, initial.duration)

val subsequent = measureTimedValue { awaitItem() }
assertEquals(5.seconds, subsequent.duration)
assertEquals(contentB, subsequent.value)
}
}

Expand All @@ -70,4 +103,13 @@ class CycleBetweenTest {
}

private fun <T> Channel<T>.toFlow() = consumeAsFlow().onEmpty<T?> { emit(null) }

@OptIn(ExperimentalTime::class)
private suspend fun <T> TestScope.measureTimedValue(block: suspend () -> T): TimedValue<T> {
return testScheduler.timeSource.measureTimedValue { block() }
}

private fun assertEquals(expected: Duration, actual: Duration) {
assertEquals(expected.inWholeMicroseconds, actual.inWholeMicroseconds)
}
}

0 comments on commit f2c3926

Please sign in to comment.