diff --git a/Sources/RawStream.swift b/Sources/RawStream.swift index 15162ed..9c4fe24 100644 --- a/Sources/RawStream.swift +++ b/Sources/RawStream.swift @@ -162,6 +162,7 @@ public extension RawStreamType { public func scan(initial: U, _ combine: (U, Event) -> U) -> RawStream { return RawStream { observer in var accumulator = initial + observer.observer(accumulator) return self.observe { event in accumulator = combine(accumulator, event) observer.observer(accumulator) @@ -775,10 +776,7 @@ public extension RawStreamType { /// Reduce stream events to a single event by applying given function on each emission. @warn_unused_result public func reduce(initial: U, _ combine: (U, Event) -> U) -> RawStream { - return RawStream { observer in - observer.observer(initial) - return self.scan(initial, combine).observe(observer.observer) - }.takeLast() + return scan(initial, combine).takeLast() } } diff --git a/Tests/OperationTests.swift b/Tests/OperationTests.swift index b79e095..37372f3 100644 --- a/Tests/OperationTests.swift +++ b/Tests/OperationTests.swift @@ -77,7 +77,7 @@ class OperatorsTests: XCTestCase { func testScan() { let operation = Operation.sequence([1, 2, 3]) let scanned = operation.scan(0, +) - scanned.expectNext([1, 3, 6]) + scanned.expectNext([0, 1, 3, 6]) } func testToOperation() { diff --git a/Tests/StreamTests.swift b/Tests/StreamTests.swift index 3f06d0a..6867fe2 100644 --- a/Tests/StreamTests.swift +++ b/Tests/StreamTests.swift @@ -68,7 +68,7 @@ class StreamTests: XCTestCase { func testScan() { let stream = Stream.sequence([1, 2, 3]) let scanned = stream.scan(0, +) - scanned.expectNext([1, 3, 6]) + scanned.expectNext([0, 1, 3, 6]) } func testToOperation() {