Skip to content

Commit

Permalink
parMapNotNullUnordered
Browse files Browse the repository at this point in the history
  • Loading branch information
serras committed Aug 2, 2023
1 parent cb03363 commit cef47ac
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 0 deletions.
2 changes: 2 additions & 0 deletions arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ public final class arrow/fx/coroutines/FlowExtensions {
public static final fun metered-HG0u8IE (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun parMap (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun parMap$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function3;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun parMapNotNullUnordered (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun parMapNotNullUnordered$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun parMapUnordered (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun parMapUnordered$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun repeat (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public inline fun <A, B> Flow<A>.parMap(
* <!--- KNIT example-flow-04.kt -->
*/
@FlowPreview
@ExperimentalCoroutinesApi
public inline fun <A, B> Flow<A>.parMapUnordered(
concurrency: Int = DEFAULT_CONCURRENCY,
crossinline transform: suspend (a: A) -> B
Expand All @@ -193,6 +194,43 @@ public inline fun <A, B> Flow<A>.parMapUnordered(
}
}.flattenMerge(concurrency)

/**
* Like [mapNotNull], but will evaluate effects in parallel, emitting the results downstream.
* The number of concurrent effects is limited by [concurrency].
*
* See [parMap] if retaining the original order of the stream is required.
*
* ```kotlin
* import kotlinx.coroutines.delay
* import kotlinx.coroutines.flow.flowOf
* import kotlinx.coroutines.flow.toList
* import kotlinx.coroutines.flow.collect
* import arrow.fx.coroutines.parMapNotNullUnordered
*
* //sampleStart
* suspend fun main(): Unit {
* flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
* .parMapNotNullUnordered { a ->
* delay(100)
* a.takeIf { a % 2 == 0 }
* }.toList() // [4, 6, 2, 8, 10]
* }
* //sampleEnd
* ```
* <!--- KNIT example-flow-05.kt -->
*/
@FlowPreview
@ExperimentalCoroutinesApi
public inline fun <A, B> Flow<A>.parMapNotNullUnordered(
concurrency: Int = DEFAULT_CONCURRENCY,
crossinline transform: suspend (a: A) -> B?
): Flow<B> =
map { o ->
flow {
transform(o)?.let { emit(it) }
}
}.flattenMerge(concurrency)

/** Repeats the Flow forever */
public fun <A> Flow<A>.repeat(): Flow<A> =
flow {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// This file was automatically generated from flow.kt by Knit tool. Do not edit.
package arrow.fx.coroutines.examples.exampleFlow05

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.flow.collect
import arrow.fx.coroutines.parMapNotNullUnordered

suspend fun main(): Unit {
flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.parMapNotNullUnordered { a ->
delay(100)
a.takeIf { a % 2 == 0 }
}.toList() // [4, 6, 2, 8, 10]
}

0 comments on commit cef47ac

Please sign in to comment.