diff --git a/src/Observable.php b/src/Observable.php index 79db73ba..f06109bc 100644 --- a/src/Observable.php +++ b/src/Observable.php @@ -293,6 +293,33 @@ public function mergeAll(): Observable }); } + /** + * Combine an Observable together with another Observable by merging their emissions into a single Observable. + * This variant of merge will hold errors emitted from component observables until all observables have + * either completed or errored. + * + * @return Observable + * + * @operator + * @reactivex mergeDelayError + */ + public function mergeDelayError(Observable $o) : Observable { + return Observable::fromArray([$this, $o]) + ->reduce(function ($a, Observable $o) { + $s = new Subject(); + return [ + $a[0]->merge($o->catch(function (\Throwable $e) use ($s) { + $s->onError($e); + return Observable::empty(); + })), + $a[1]->merge($s) + ]; + }, [Observable::empty(), Observable::empty()]) + ->flatMap(function ($a) { + return $a[0]->concat($a[1]); + }); + } + /** * Converts an array to an observable sequence * diff --git a/test/Rx/Functional/Operator/MergeDelayErrorTest.php b/test/Rx/Functional/Operator/MergeDelayErrorTest.php new file mode 100644 index 00000000..48ebdfe7 --- /dev/null +++ b/test/Rx/Functional/Operator/MergeDelayErrorTest.php @@ -0,0 +1,129 @@ +createColdObservable(array( + onNext(100, 4), + onNext(200, 2), + onNext(300, 3), + onNext(400, 1), + onCompleted(500) + )); + + $ys = $this->createColdObservable(array( + onNext(50, 'foo'), + onNext(100, 'bar'), + onNext(150, 'baz'), + onError(160, new \Exception()), + onNext(200, 'qux'), + onCompleted(250) + )); + + $results = $this->scheduler->startWithCreate(function() use ($xs, $ys) { + return $xs->mergeDelayError($ys); + }); + + $this->assertMessages(array( + onNext(250, 'foo'), + onNext(300, 4), + onNext(300, 'bar'), + onNext(350, 'baz'), + onNext(400, 2), + onNext(500, 3), + onNext(600, 1), + onError(700, new \Exception()) + ), $results->getMessages()); + + $this->assertSubscriptions(array(subscribe(200, 700)), $xs->getSubscriptions()); + $this->assertSubscriptions(array(subscribe(200, 360)), $ys->getSubscriptions()); + } + + /** + * @test + */ + public function it_waits_for_complete_before_emitting_error_2() + { + $xs = $this->createColdObservable(array( + onNext(100, 4), + onError(160, new \Exception()), + onNext(200, 2), + onNext(300, 3), + onNext(400, 1), + onCompleted(500) + )); + + $ys = $this->createColdObservable(array( + onNext(50, 'foo'), + onNext(100, 'bar'), + onNext(150, 'baz'), + onNext(200, 'qux'), + onCompleted(250) + )); + + $results = $this->scheduler->startWithCreate(function() use ($xs, $ys) { + return $xs->mergeDelayError($ys); + }); + + $this->assertMessages(array( + onNext(250, 'foo'), + onNext(300, 4), + onNext(300, 'bar'), + onNext(350, 'baz'), + onNext(400, 'qux'), + onError(450, new \Exception()) + ), $results->getMessages()); + + $this->assertSubscriptions(array(subscribe(200, 360)), $xs->getSubscriptions()); + $this->assertSubscriptions(array(subscribe(200, 450)), $ys->getSubscriptions()); + } + + /** + * @test + */ + public function it_works_when_both_sources_error() + { + $xs = $this->createColdObservable(array( + onNext(100, 4), + onError(160, new \Exception()), + onNext(200, 2), + onNext(300, 3), + onNext(400, 1), + onCompleted(500) + )); + + $ys = $this->createColdObservable(array( + onNext(50, 'foo'), + onNext(100, 'bar'), + onNext(150, 'baz'), + onError(161, new \Exception()), + onNext(200, 'qux'), + onCompleted(250) + )); + + $results = $this->scheduler->startWithCreate(function() use ($xs, $ys) { + return $xs->mergeDelayError($ys); + }); + + $this->assertMessages(array( + onNext(250, 'foo'), + onNext(300, 4), + onNext(300, 'bar'), + onNext(350, 'baz'), + onError(361, new \Exception()) + ), $results->getMessages()); + + $this->assertSubscriptions(array(subscribe(200, 360)), $xs->getSubscriptions()); + $this->assertSubscriptions(array(subscribe(200, 361)), $ys->getSubscriptions()); + } +} \ No newline at end of file