Skip to content

Commit

Permalink
feat:Add onErrorComplete stream operator.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 7, 2024
1 parent 688eac7 commit 7e15e73
Show file tree
Hide file tree
Showing 10 changed files with 463 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# onErrorComplete

Allows completing the stream when an upstream error occur.

@ref[Error handling](../index.md#error-handling)

## Signature

@apidoc[Source.onErrorComplete](Source) { scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(java.util.function.Predicate)" }
@apidoc[Source.onErrorComplete](Source) { scala="#onErrorComplete%5BT%20%3C%3A%20Throwable%5D()(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(java.lang.Class)" }
@apidoc[Flow.onErrorComplete](Flow) { scala="#onErrorComplete(pf%3A%20PartialFunction%5BThrowable%2C%20Boolean%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(java.util.function.Predicate)" }
@apidoc[Flow.onErrorComplete](Flow) { scala="#onErrorComplete%5BT%20%3C%3A%20Throwable%5D()(implicit%20tag%3A%20ClassTag%5BT%5D)%3AFlowOps.this.Repr%5BT%5D" java="#onErrorComplete(java.lang.Class)" }

## Description

Allows to complete the stream when an upstream error occur.

## Reactive Streams semantics

@@@div { .callout }

**emits** element is available from the upstream

**backpressures** downstream backpressures

**completes** upstream completes or upstream failed with exception this operator can handle

**Cancels when** downstream cancels
@@@
2 changes: 2 additions & 0 deletions docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
| |Operator|Description|
|--|--|--|
|Source/Flow|<a name="maperror"></a>@ref[mapError](Source-or-Flow/mapError.md)|While similar to `recover` this operators can be used to transform an error signal to a different one *without* logging it as an error in the process.|
|Source/Flow|<a name="onerrorcomplete"></a>@ref[onErrorComplete](Source-or-Flow/onErrorComplete.md)|Allows completing the stream when an upstream error occur.|
|RestartSource|<a name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)|Wrap the given @apidoc[Source] with a @apidoc[Source] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Source] will not restart on completion of the wrapped flow.|
|RestartFlow|<a name="onfailureswithbackoff"></a>@ref[onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)|Wrap the given @apidoc[Flow] with a @apidoc[Flow] that will restart it when it fails using an exponential backoff. Notice that this @apidoc[Flow] will not restart on completion of the wrapped flow.|
|Source/Flow|<a name="recover"></a>@ref[recover](Source-or-Flow/recover.md)|Allow sending of one last element downstream when a failure has happened upstream.|
Expand Down Expand Up @@ -532,6 +533,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [never](Source/never.md)
* [never](Sink/never.md)
* [onComplete](Sink/onComplete.md)
* [onErrorComplete](Source-or-Flow/onErrorComplete.md)
* [onFailuresWithBackoff](RestartSource/onFailuresWithBackoff.md)
* [onFailuresWithBackoff](RestartFlow/onFailuresWithBackoff.md)
* [orElse](Source-or-Flow/orElse.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,79 @@ public void mustBeAbleToRecoverWithRetriesClass() throws Exception {
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}

@Test
public void mustBeAbleToOnErrorComplete() {
Source.from(Arrays.asList(1, 2))
.map(
elem -> {
if (elem == 2) {
throw new RuntimeException("ex");
} else {
return elem;
}
})
.onErrorComplete()
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectComplete();
}

@Test
public void mustBeAbleToOnErrorCompleteWithDedicatedException() {
Source.from(Arrays.asList(1, 2))
.map(
elem -> {
if (elem == 2) {
throw new IllegalArgumentException("ex");
} else {
return elem;
}
})
.onErrorComplete(IllegalArgumentException.class)
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectComplete();
}

@Test
public void mustBeAbleToFailWhenExceptionTypeNotMatch() {
final IllegalArgumentException ex = new IllegalArgumentException("ex");
Source.from(Arrays.asList(1, 2))
.map(
elem -> {
if (elem == 2) {
throw ex;
} else {
return elem;
}
})
.onErrorComplete(TimeoutException.class)
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectError(ex);
}

@Test
public void mustBeAbleToOnErrorCompleteWithPredicate() {
Source.from(Arrays.asList(1, 2))
.map(
elem -> {
if (elem == 2) {
throw new IllegalArgumentException("Boom");
} else {
return elem;
}
})
.onErrorComplete(ex -> ex.getMessage().contains("Boom"))
.runWith(TestSink.probe(system), system)
.request(2)
.expectNext(1)
.expectComplete();
}

@Test
public void mustBeAbleToMapErrorClass() {
final String head = "foo";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.scaladsl

import org.apache.pekko.stream.testkit.StreamSpec
import org.apache.pekko.stream.testkit.scaladsl.TestSink

import scala.concurrent.TimeoutException
import scala.util.control.NoStackTrace

class FlowOnErrorCompleteSpec extends StreamSpec {
val ex = new RuntimeException("ex") with NoStackTrace

"A CompleteOn" must {
"can complete with all exceptions" in {
Source(List(1, 2))
.map { a =>
if (a == 2) throw ex else a
}
.onErrorComplete[Throwable]()
.runWith(TestSink[Int]())
.request(2)
.expectNext(1)
.expectComplete()
}

"can complete with dedicated exception type" in {
Source(List(1, 2))
.map { a =>
if (a == 2) throw new IllegalArgumentException() else a
}
.onErrorComplete[IllegalArgumentException]()
.runWith(TestSink[Int]())
.request(2)
.expectNext(1)
.expectComplete()
}

"can fail if an unexpected exception occur" in {
Source(List(1, 2))
.map { a =>
if (a == 2) throw new IllegalArgumentException() else a
}
.onErrorComplete[TimeoutException]()
.runWith(TestSink[Int]())
.request(1)
.expectNext(1)
.request(1)
.expectError()
}

"can complete if the pf is applied" in {
Source(List(1, 2))
.map { a =>
if (a == 2) throw new TimeoutException() else a
}
.onErrorComplete {
case _: IllegalArgumentException => false
case _: TimeoutException => true
}
.runWith(TestSink[Int]())
.request(2)
.expectNext(1)
.expectComplete()
}

"can fail if the pf is not applied" in {
Source(List(1, 2))
.map { a =>
if (a == 2) throw ex else a
}
.onErrorComplete {
case _: IllegalArgumentException => false
case _: TimeoutException => true
}
.runWith(TestSink[Int]())
.request(2)
.expectNext(1)
.expectError()
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ import pekko.stream.Attributes._
val mergePrioritized = name("mergePrioritized")
val flattenMerge = name("flattenMerge")
val recoverWith = name("recoverWith")
val onErrorComplete = name("onErrorComplete")
val broadcast = name("broadcast")
val wireTap = name("wireTap")
val balance = name("balance")
Expand Down
55 changes: 55 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1907,6 +1907,61 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
case elem if clazz.isInstance(elem) => supplier.get()
})

/**
* onErrorComplete allows to complete the stream when an upstream error occurs.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or failed with exception is an instance of the provided type
*
* '''Cancels when''' downstream cancels
* @since 1.1.0
*/
def onErrorComplete(): javadsl.Flow[In, Out, Mat] = onErrorComplete(classOf[Throwable])

/**
* onErrorComplete allows to complete the stream when an upstream error occurs.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or failed with exception is an instance of the provided type
*
* '''Cancels when''' downstream cancels
* @since 1.1.0
*/
def onErrorComplete(clazz: Class[_ <: Throwable]): javadsl.Flow[In, Out, Mat] =
onErrorComplete(ex => clazz.isInstance(ex))

/**
* onErrorComplete allows to complete the stream when an upstream error occurs.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or failed with predicate return ture
*
* '''Cancels when''' downstream cancels
* @since 1.1.0
*/
def onErrorComplete(predicate: java.util.function.Predicate[_ >: Throwable]): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.onErrorComplete {
case ex: Throwable if predicate.test(ex) => true
})

/**
* Terminate processing (and cancel the upstream publisher) after the given
* number of elements. Due to input buffering some elements may have been
Expand Down
55 changes: 55 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2382,6 +2382,61 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
case elem if clazz.isInstance(elem) => supplier.get()
}: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]])

/**
* onErrorComplete allows to complete the stream when an upstream error occurs.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or failed with exception is an instance of the provided type
*
* '''Cancels when''' downstream cancels
* @since 1.1.0
*/
def onErrorComplete(): javadsl.Source[Out, Mat] = onErrorComplete(classOf[Throwable])

/**
* onErrorComplete allows to complete the stream when an upstream error occurs.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or failed with exception is an instance of the provided type
*
* '''Cancels when''' downstream cancels
* @since 1.1.0
*/
def onErrorComplete(clazz: Class[_ <: Throwable]): javadsl.Source[Out, Mat] =
onErrorComplete(ex => clazz.isInstance(ex))

/**
* onErrorComplete allows to complete the stream when an upstream error occurs.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This operator can recover the failure signal, but not the skipped elements, which will be dropped.
*
* '''Emits when''' element is available from the upstream
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or failed with predicate return ture
*
* '''Cancels when''' downstream cancels
* @since 1.1.0
*/
def onErrorComplete(predicate: java.util.function.Predicate[_ >: Throwable]): javadsl.Source[Out, Mat] =
new Source(delegate.onErrorComplete {
case ex: Throwable if predicate.test(ex) => true
})

/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream.
Expand Down
Loading

0 comments on commit 7e15e73

Please sign in to comment.