diff --git a/README.md b/README.md index c4f81da..0055444 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,381 @@ # AsyncExtensions -AsyncExtensions mimics combine operators for async sequences. + +

+Build Status + +CombineExt supports Swift Package Manager (SPM) + + +AsyncExtensions provides a collection of operators, async sequences and async streams that mimics Combine behaviour. + +The purpose is to be able to chain operators, just as you would do with any reactive programming framework: + +```swift +AsyncSequences + .Merge(sequence1, sequence2, sequence3) + .prepend(0) + .handleEvents(onElement: { print($0) }, onFinish: { print("Finished") }) + .scan("") { accumulator, element in accumulator + "\(element)" } + .collect { print($0) } +``` + +### Async Sequences +* [Just](#Just) +* [Empty](#Empty) +* [Fail](#Fail) +* [From](#From) +* [Merge](#Merge) +* [Zip2](#Zip2) +* [Zip3](#Zip3) +* [Zip](#Zip) + +### Async Streams +* [Passthrough](#Passthrough) +* [CurrentValue](#CurrentValue) +* [Replay](#Replay) + +### Operators +* [Collect](#Collect) +* [Scan](#Scan) +* [SwitchToLatest](#SwitchToLatest) +* [FlatMapLatest](#FlatMapLatest) +* [HandleEvents](#HandleEvents) +* [EraseToAnyAsyncSequence](#EraseToAnyAsyncSequence) + +More operators and extensions are to come. Pull requests are of course welcome. + +## Async Sequences + +### Just + +`Just` is an AsyncSequence that outputs a single value and finishes. + +```swift +let justSequence = AsyncSequences.Just(1) +for try await element in justSequence { + // will be called once with element = 1 +} +``` + +### Empty + +`Empty` is an AsyncSequence that immediately finishes without emitting values. + +```swift +let emptySequence = AsyncSequences.Empty() +for try await element in emptySequence { + // will never be called +} +``` + +### Fail + +`Fail` is an AsyncSequence that outputs no elements and throws an error. + +```swift +let failSequence = AsyncSequences.Fail(error: NSError(domain: "", code: 1)) +do { + for try await element in failSequence { + // will never be called + } +} catch { + // will catch `NSError(domain: "", code: 1)` here +} +``` + +### From + +`From` is an AsyncSequence that outputs elements from a traditional Sequence. + +```swift +let fromSequence = AsyncSequences.From([1, 2, 3, 4, 5]) + +for await element in fromSequence { + print(element) // will print 1 2 3 4 5 +} +``` + +A variation offers to set an interval of time between each element. + +```swift +let fromSequence = AsyncSequences.From([1, 2, 3, 4, 5], interval: .milliSeconds(10)) + +for await element in fromSequence { + print(element) // will print 1 2 3 4 5 with an interval of 10ms between elements +} +``` + +### Merge + +`Merge` is an AsyncSequence that merges several async sequences respecting their temporality while being iterated over. +When all the async sequences have finished, so too does the merged async sequence. +If an async sequence fails, so too does the merged async sequence. + +```swift +// 0.1ms 1ms 1.5ms 2ms 3ms 4.5ms +// 4 1 5 2 3 6 + +let asyncSequence1 = AsyncStream(Int.self, bufferingPolicy: .unbounded) { continuation in + Task { + try await Task.sleep(nanoseconds: 1_000_000) + continuation.yield(1) + try await Task.sleep(nanoseconds: 1_000_000) + continuation.yield(2) + try await Task.sleep(nanoseconds: 1_000_000) + continuation.yield(3) + continuation.finish() + } +} + +let asyncSequence2 = AsyncStream(Int.self, bufferingPolicy: .unbounded) { continuation in + Task { + try await Task.sleep(nanoseconds: 100_000) + continuation.yield(4) + try await Task.sleep(nanoseconds: 1_400_000) + continuation.yield(5) + try await Task.sleep(nanoseconds: 3_000_000) + continuation.yield(6) + continuation.finish() + } +} + +let mergedAsyncSequence = AsyncSequences.Merge(asyncSequence1, asyncSequence2) + +for try await element in mergedAsyncSequence { + print(element) // will print -> 4 1 5 2 3 6 +} +``` + +### Zip2 + +`Zip2` is an AsyncSequence that combines the latest elements from two sequences according to their temporality and emits a tuple to the client. +If any async sequence ends successfully or fails with an error, so too does the zipped async Sequence. + +```swift +let asyncSequence1 = AsyncSequences.From([1, 2, 3, 4, 5]) +let asyncSequence2 = AsyncSequences.From(["5", "4", "3", "2", "1"]) + +let zippedAsyncSequence = AsyncSequences.Zip2(asyncSequence1, asyncSequence2) + +for try await element in zippedAsyncSequence { + print(element) // will print -> (1, "5") (2, "4") (3, "3") (4, "2") (5, "1") +} +``` + +### Zip3 + +`Zip3` is an AsyncSequence that combines the latest elements from two sequences according to their temporality and emits a tuple to the client. +If any async sequence ends successfully or fails with an error, so too does the zipped async Sequence. + +```swift +let asyncSequence1 = AsyncSequences.From([1, 2, 3, 4, 5]) +let asyncSequence2 = AsyncSequences.From(["5", "4", "3", "2", "1"]) +let asyncSequence3 = AsyncSequences.From([true, false, true, false, true]) + +let zippedAsyncSequence = AsyncSequences.Zip3(asyncSequence1, asyncSequence2, asyncSequence3) + +for try await element in zippedAsyncSequence { + print(element) // will print -> (1, "5", true) (2, "4", false) (3, "3", true) (4, "2", false) (5, "1", true) +} +``` + +### Zip + +`Zip` is an AsyncSequence that combines the latest elements from several sequences according to their temporality and emits an array to the client. +If any async sequence ends successfully or fails with an error, so too does the zipped async Sequence. + +```swift +let asyncSequence1 = AsyncSequences.From([1, 2, 3]) +let asyncSequence2 = AsyncSequences.From([1, 2, 3]) +let asyncSequence3 = AsyncSequences.From([1, 2, 3]) +let asyncSequence4 = AsyncSequences.From([1, 2, 3]) +let asyncSequence5 = AsyncSequences.From([1, 2, 3]) + +let zippedAsyncSequence = AsyncSequences.Zip(asyncSequence1, asyncSequence2, asyncSequence3, asyncSequence4, asyncSequence5) + +for try await element in zippedAsyncSequence { + print(element) // will print -> [1, 1, 1, 1, 1] [2, 2, 2, 2, 2] [3, 3, 3, 3, 3] +} +``` + +## Async Streams + +### Passthrough + +`Passthrough` is an async sequence in which one can send values over time. + +```swift +let passthrough = AsyncStreams.Passthrough() + +Task { + for try await element in passthrough { + print(element) // will print 1 2 + } +} + +Task { + for try await element in passthrough { + print(element) // will print 1 2 + } +} + +.. later in the application flow + +passthrough.send(1) +passthrough.send(2) +``` + +### CurrentValue + +`CurrentValue` is an async sequence in which one can send values over time. +The current value is always accessible as an instance variable. +The current value is replayed for any new async loop. + +```swift +let currentValue = AsyncStreams.CurrentValue(1) + +Task { + for try await element in passthrough { + print(element) // will print 1 2 + } +} + +Task { + for try await element in passthrough { + print(element) // will print 1 2 + } +} + +.. later in the application flow + +currentValue.send(2) + +print(currentValue.element) // will print 2 +``` + +### Replay + +`Replay`is an async sequence in which one can send values over time. +Values are buffered in a FIFO fashion so they can be iterated over by new loops. +When the `bufferSize` is outreached the oldest value is dropped. + +```swift +let replay = AsyncStreams.Replay(bufferSize: 3) + +(1...5).forEach { replay.send($0) } + +for try await element in replay { + print(element) // will print 3, 4, 5 +} +``` + +## Operators + +### Collect + +`collect(_:)` iterates over each element of the AsyncSequence and give it to the async block. + +```swift +let fromSequence = AsyncSequences.From([1, 2, 3]) +fromSequence + .collect { print($0) } // will print 1 2 3 +``` + +### Scan + +`scan(_:_:)` transforms elements from the upstream async sequence by providing the current element to a closure along with the last value returned by the closure. Each intermediate value will be emitted in the downstream async sequence. + +```swift +let sourceSequence = AsyncSequences.From([1, 2, 3, 4, 5]) +let scannedSequence = sourceSequence.scan("") { accumulator, element in accumulator + "\(element)"} + +for try await element in scannedSequence { + print(element) +} + +// will print: +1 +12 +123 +1234 +12345 +``` + +### SwitchToLatest + +`switchToLatest()` re-emits elements sent by the most recently received async sequence. This operator applies only in the case where the upstream async sequence's `Element` is it-self an async sequence. + +``` +let sourceSequence = AsyncSequences.From([1, 2, 3]) +let mappedSequence = sourceSequence.map { element in + AsyncSequences.From(["a\(element)", "b\(element)"]) +} +let switchedSequence = mappedSequence.switchToLatest() + +for try await element in switchedSequence { + print(element) // will print a3 b3 +} +``` + +### FlatMapLatest + +`flatMapLatest(_:)` transforms the upstream async sequence elements into an async sequence and flattens the sequence of events from these multiple sources async sequences to appear as if they were coming from a single async sequence of events. Mapping to a new async sequence will cancel the task related to the previous one. + +This operator is basically a shortcut for `map()` and `switchToLatest()`. + +```swift +let sourceSequence = AsyncSequences.From([1, 2, 3]) +let flatMapLatestSequence = sourceSequence.flatMapLatest { element in + AsyncSequences.From(["a\(element)", "b\(element)"]) +} + +for try await element in flatMapLatestSequence { + print(element) // will print a3 b3 +} +``` + +### Prepend + +`prepend(_:)` prepends an element to the upstream async sequence. + +```swift +let sourceSequence = AsyncSequences.From([1, 2, 3]) +let prependSequence = sourceSequence.prepend(0) + +for try await element in prependSequence { + print(element) // will print 0 1 2 3 +} +``` + +### HandleEvents + +`handleEvents(onStart:onElement:onCancel:onFinish)` performs the specified closures when async sequences events occur. + +```swift +let sourceSequence = AsyncSequences.From([1, 2, 3, 4, 5]) +let handledSequence = sourceSequence.handleEvents { + print("Begin iterating") +} onElement: { element in + print("Element is \(element)") +} onCancel: { + print("Cancelled") +} onFinish: { termination in + print(termination) +} + +for try await element in handledSequence {} + +// will print: +// Begin iterating +// Element is 1 +// Element is 2 +// Element is 3 +// Element is 4 +// Element is 5 +// finished +``` + +### EraseToAnyAsyncSequence + +`eraseToAnyAsyncSequence()` type-erases the async sequence into an AnyAsyncSequence. diff --git a/Sources/AsyncSequences/AsyncSequences+From.swift b/Sources/AsyncSequences/AsyncSequences+From.swift index 2234e0a..901391b 100644 --- a/Sources/AsyncSequences/AsyncSequences+From.swift +++ b/Sources/AsyncSequences/AsyncSequences+From.swift @@ -8,6 +8,24 @@ public extension AsyncSequences { /// `From` is an AsyncSequence that outputs elements from a traditional Sequence. /// If the parent task is cancelled while iterating then the iteration finishes. + /// + /// ``` + /// let fromSequence = AsyncSequences.From([1, 2, 3, 4, 5]) + /// + /// for await element in fromSequence { + /// print(element) // will print 1 2 3 4 5 + /// } + /// ``` + /// + /// A variation offers to set an interval of time between each element. + /// + /// ``` + /// let fromSequence = AsyncSequences.From([1, 2, 3, 4, 5], interval: .milliSeconds(10)) + /// + /// for await element in fromSequence { + /// print(element) // will print 1 2 3 4 5 with an interval of 10ms between elements + /// } + /// ``` typealias From = AsyncFromSequence } diff --git a/Sources/AsyncSequences/AsyncSequences+Merge.swift b/Sources/AsyncSequences/AsyncSequences+Merge.swift index 8e8288f..fcd2cda 100644 --- a/Sources/AsyncSequences/AsyncSequences+Merge.swift +++ b/Sources/AsyncSequences/AsyncSequences+Merge.swift @@ -9,8 +9,9 @@ public extension AsyncSequences { /// `Merge` is an AsyncSequence that merges several async sequences respecting /// their temporality while being iterated over. If the parent task is cancelled while iterating /// then the iteration finishes. + /// When all the async sequences have finished, so too does the merged async sequence. /// - /// ``` + /// ``` /// // 0.1ms 1ms 1.5ms 2ms 3ms 4.5ms /// // 4 1 5 2 3 6 /// @@ -43,7 +44,7 @@ public extension AsyncSequences { /// for try await element in mergedAsyncSequence { /// print(element) // will print -> 4 1 5 2 3 6 /// } - /// ``` + /// ``` typealias Merge = AsyncMergeSequence } diff --git a/Sources/AsyncSequences/AsyncSequences+Zip.swift b/Sources/AsyncSequences/AsyncSequences+Zip.swift index 9517d3f..1e121eb 100644 --- a/Sources/AsyncSequences/AsyncSequences+Zip.swift +++ b/Sources/AsyncSequences/AsyncSequences+Zip.swift @@ -11,7 +11,7 @@ public extension AsyncSequences { /// and emits a tuple to the client. If any Async Sequence ends successfully or fails with an error, so too does the zipped /// Async Sequence. /// - /// ``` + /// ``` /// let asyncSequence1 = [1, 2, 3, 4, 5].asyncElements /// let asyncSequence2 = ["1", "2", "3", "4", "5"].asyncElements /// @@ -20,7 +20,7 @@ public extension AsyncSequences { /// for try await element in zippedAsyncSequence { /// print(element) // will print -> (1, "1") (2, "2") (3, "3") (4, "4") (5, "5") /// } - /// ``` + /// ``` typealias Zip2 = AsyncZip2Sequence @@ -29,7 +29,7 @@ public extension AsyncSequences { /// and emits a tuple to the client. If any Async Sequence ends successfully or fails with an error, so too does the zipped /// Async Sequence. /// - /// ``` + /// ``` /// let asyncSequence1 = [1, 2, 3, 4, 5].asyncElements /// let asyncSequence2 = ["1", "2", "3", "4", "5"].asyncElements /// let asyncSequence3 = [true, false, true, false, true].asyncElements @@ -39,7 +39,7 @@ public extension AsyncSequences { /// for try await element in zippedAsyncSequence { /// print(element) // will print -> (1, "1", true) (2, "2", false) (3, "3", true) (4, "4", false) (5, "5", true) /// } - /// ``` + /// ``` typealias Zip3 = AsyncZip3Sequence [1, 1, 1, 1, 1] [2, 2, 2, 2, 2] [3, 3, 3, 3, 3] /// } - /// ``` + /// ``` typealias Zip = AsyncZipSequence } diff --git a/Sources/AsyncStreams/AsyncStreams+CurrentValue.swift b/Sources/AsyncStreams/AsyncStreams+CurrentValue.swift index b0d8001..0cbf8d1 100644 --- a/Sources/AsyncStreams/AsyncStreams+CurrentValue.swift +++ b/Sources/AsyncStreams/AsyncStreams+CurrentValue.swift @@ -13,30 +13,25 @@ public extension AsyncStreams { /// The current value is replayed in any new async for in loops. /// /// ``` - /// let subject = AsyncStreams.CurrentValue(5) + /// let currentValue = AsyncStreams.CurrentValue(1) /// - /// for try await element in subject { - /// print(element) + /// Task { + /// for try await element in passthrough { + /// print(element) // will print 1 2 + /// } /// } /// - /// // will print: - /// 1 - /// - /// ... - /// - /// let subject = AsyncStreams.CurrentValue(5) - /// /// Task { - /// for try await element in subject { - /// print(element) + /// for try await element in passthrough { + /// print(element) // will print 1 2 /// } /// } /// - /// subject.send(1) + /// .. later in the application flow + /// + /// currentValue.send(2) /// - /// // will print: - /// 5 - /// 1 + /// print(currentValue.element) // will print 2 /// ``` typealias CurrentValue = AsyncCurrentValueStream } diff --git a/Sources/AsyncStreams/AsyncStreams+Passthrough.swift b/Sources/AsyncStreams/AsyncStreams+Passthrough.swift index af94f72..38b8c38 100644 --- a/Sources/AsyncStreams/AsyncStreams+Passthrough.swift +++ b/Sources/AsyncStreams/AsyncStreams+Passthrough.swift @@ -8,25 +8,28 @@ import Foundation public extension AsyncStreams { - /// A `Passthrough`is an async sequence in which one can send values over time. + /// A `Passthrough` is an async sequence in which one can send values over time. /// /// ``` - /// let subject = AsyncStreams.Passthrough() + /// let passthrough = AsyncStreams.Passthrough() /// /// Task { - /// for try await element in subject { - /// print(element) + /// for try await element in passthrough { + /// print(element) // will print 1 2 /// } /// } /// - /// subject.send(1) - /// subject.send(2) + /// Task { + /// for try await element in passthrough { + /// print(element) // will print 1 2 + /// } + /// } /// - /// // will print: - /// 1 - /// 2 - /// ``` + /// .. later in the application flow /// + /// passthrough.send(1) + /// passthrough.send(2) + /// ``` typealias Passthrough = AsyncPassthroughStream } diff --git a/Sources/AsyncStreams/AsyncStreams+Replay.swift b/Sources/AsyncStreams/AsyncStreams+Replay.swift index b65169c..0a0840a 100644 --- a/Sources/AsyncStreams/AsyncStreams+Replay.swift +++ b/Sources/AsyncStreams/AsyncStreams+Replay.swift @@ -13,35 +13,13 @@ public extension AsyncStreams { /// When the `bufferSize` is outreached the oldest value is dropped. /// /// ``` - /// let subject = AsyncStreams.Replay(bufferSize: 3) + /// let replay = AsyncStreams.Replay(bufferSize: 3) /// - /// (1...5).forEach { subject.send($0) } + /// (1...5).forEach { replay.send($0) } /// - /// for try await element in subject { - /// print(element) + /// for try await element in replay { + /// print(element) // will print 3, 4, 5 /// } - /// - /// // will print: - /// 3 - /// 4 - /// 5 - /// - /// ... - /// - /// let subject = AsyncStreams.Replay(bufferSize: 0) - /// - /// Task { - /// for try await element in subject { - /// print(element) - /// } - /// } - /// - /// subject.send(1) - /// - /// // will print: - /// 3 - /// 4 - /// 5 /// ``` typealias Replay = AsyncReplayStream } diff --git a/Sources/Operators/AsyncSequence+Collect.swift b/Sources/Operators/AsyncSequence+Collect.swift index b036045..0eeddbc 100644 --- a/Sources/Operators/AsyncSequence+Collect.swift +++ b/Sources/Operators/AsyncSequence+Collect.swift @@ -6,11 +6,18 @@ // public extension AsyncSequence { - /// Iterates over each element of the AsyncSequence and give it to the block. + /// Iterates over each element of the AsyncSequence and give it to the block.\ + /// + /// ``` + /// let fromSequence = AsyncSequences.From([1, 2, 3]) + /// fromSequence + /// .collect { print($0) } // will print 1 2 3 + /// ``` + /// /// - Parameter block: The closure to execute on each element of the async sequence. - func collect(_ block: ((Element) -> Void)? = nil) async rethrows { + func collect(_ block: ((Element) async -> Void)? = nil) async rethrows { for try await element in self { - block?(element) + await block?(element) } } } diff --git a/Sources/Operators/AsyncSequence+FlatMapLatest.swift b/Sources/Operators/AsyncSequence+FlatMapLatest.swift index 2921c71..210ed57 100644 --- a/Sources/Operators/AsyncSequence+FlatMapLatest.swift +++ b/Sources/Operators/AsyncSequence+FlatMapLatest.swift @@ -11,7 +11,7 @@ public extension AsyncSequence { /// Mapping to a new async sequence will cancel the task related to the previous one. /// /// ``` - /// let sourceSequence = [1, 2, 3].asyncElements + /// let sourceSequence = AsyncSequences.From([1, 2, 3]) /// let flatMapLatestSequence = sourceSequence.map { element in ["a\(element)", "b\(element)"] } /// /// for try await element in flatMapLatestSequence { diff --git a/Sources/Operators/AsyncSequence+HandleEvents.swift b/Sources/Operators/AsyncSequence+HandleEvents.swift index 2457dc1..c03d5c1 100644 --- a/Sources/Operators/AsyncSequence+HandleEvents.swift +++ b/Sources/Operators/AsyncSequence+HandleEvents.swift @@ -5,11 +5,13 @@ // Created by Thibault Wittemberg on 31/12/2021. // +import Combine + public extension AsyncSequence { - /// Executes the `receiveElement` closure for each element of the upstream async sequence. + /// Performs the specified closures when async sequences events occur. /// /// ``` - /// let sourceSequence = [1, 2, 3, 4, 5].asyncElements + /// let sourceSequence = AsyncSequences.From([1, 2, 3, 4, 5]) /// let handledSequence = sourceSequence.handleEvents { /// print("Begin looping") /// } onElement: { element in @@ -30,7 +32,6 @@ public extension AsyncSequence { /// // Element is 4 /// // Element is 5 /// // finished - /// /// ``` /// /// - Parameters: diff --git a/Sources/Operators/AsyncSequence+Prepend.swift b/Sources/Operators/AsyncSequence+Prepend.swift index 34c0842..504c23f 100644 --- a/Sources/Operators/AsyncSequence+Prepend.swift +++ b/Sources/Operators/AsyncSequence+Prepend.swift @@ -9,7 +9,7 @@ public extension AsyncSequence { /// Prepends an element to the upstream async sequence. /// /// ``` - /// let sourceSequence = [1, 2, 3].asyncElements + /// let sourceSequence = AsyncSequences.From([1, 2, 3]) /// let prependSequence = sourceSequence.prepend(0) /// /// for try await element in prependSequence { diff --git a/Sources/Operators/AsyncSequence+Scan.swift b/Sources/Operators/AsyncSequence+Scan.swift index d25ba6d..ce79621 100644 --- a/Sources/Operators/AsyncSequence+Scan.swift +++ b/Sources/Operators/AsyncSequence+Scan.swift @@ -10,7 +10,7 @@ public extension AsyncSequence { /// along with the last value returned by the closure. /// /// ``` - /// let sourceSequence = [1, 2, 3, 4, 5].asyncElements + /// let sourceSequence = AsyncSequences.From([1, 2, 3, 4, 5]) /// let scannedSequence = sourceSequence.scan("") { accumulator, element in /// return accumulator + "\(element)" /// } diff --git a/Sources/Operators/AsyncSequence+SwitchToLatest.swift b/Sources/Operators/AsyncSequence+SwitchToLatest.swift index ab86e80..e9cd4c0 100644 --- a/Sources/Operators/AsyncSequence+SwitchToLatest.swift +++ b/Sources/Operators/AsyncSequence+SwitchToLatest.swift @@ -9,7 +9,7 @@ public extension AsyncSequence where Element: AsyncSequence { /// Republishes elements sent by the most recently received async sequence. /// /// ``` - /// let sourceSequence = [1, 2, 3].asyncElements + /// let sourceSequence = AsyncSequences.From([1, 2, 3]) /// let mappedSequence = sourceSequence.map { element in ["a\(element)", "b\(element)"].asyncElements } /// let switchedSequence = mappedSequence.switchToLatest() ///