diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala index ac4bc14c89f..1999c624ad5 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreterSpec.scala @@ -412,21 +412,23 @@ class ActorGraphInterpreterSpec extends StreamSpec { val upstream = TestPublisher.probe[Int]() val downstream = TestSubscriber.probe[Int]() - Source - .fromPublisher(upstream) - .alsoTo(Sink.fromSubscriber(downstream)) - .runWith(Sink.fromSubscriber(filthySubscriber)) - - upstream.sendNext(0) - - downstream.requestNext(0) - val ise = downstream.expectError() - ise shouldBe an[IllegalStateException] - ise.getCause shouldBe a[SpecViolation] - ise.getCause.getCause shouldBe a[TE] - ise.getCause.getCause should (have.message("violating your spec")) - - upstream.expectCancellation() + EventFilter[NullPointerException](occurrences = 0).intercept { + Source + .fromPublisher(upstream) + .alsoTo(Sink.fromSubscriber(downstream)) + .runWith(Sink.fromSubscriber(filthySubscriber)) + + upstream.sendNext(0) + + downstream.requestNext(0) + val ise = downstream.expectError() + ise shouldBe an[IllegalStateException] + ise.getCause shouldBe a[SpecViolation] + ise.getCause.getCause shouldBe a[TE] + ise.getCause.getCause should (have.message("violating your spec")) + + upstream.expectCancellation() + } } "trigger postStop in all stages when abruptly terminated (and no upstream boundaries)" in { diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala index 9d4aab4a662..1962f1fcc72 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala @@ -239,6 +239,9 @@ import pekko.stream.stage._ logics(i).handlers.length } + // Marks whether a stage has been finalized (finalizeStage been called) or not + private[this] val finalizedMark = Array.fill(logics.length)(false) + private[this] var _subFusingMaterializer: Materializer = _ def subFusingMaterializer: Materializer = _subFusingMaterializer @@ -331,7 +334,10 @@ import pekko.stream.stage._ var i = 0 while (i < logics.length) { val logic = logics(i) - if (!isStageCompleted(logic)) finalizeStage(logic) + if (!isStageCompleted(logic) && !isStageFinalized(logic)) { + markStageFinalized(logic) + finalizeStage(logic) + } i += 1 } } @@ -589,7 +595,8 @@ import pekko.stream.stage._ } def afterStageHasRun(logic: GraphStageLogic): Unit = - if (isStageCompleted(logic)) { + if (isStageCompleted(logic) && !isStageFinalized(logic)) { + markStageFinalized(logic) runningStages -= 1 finalizeStage(logic) } @@ -597,6 +604,12 @@ import pekko.stream.stage._ // Returns true if the given stage is already completed def isStageCompleted(stage: GraphStageLogic): Boolean = stage != null && shutdownCounter(stage.stageId) == 0 + // Returns true if the given stage is already finalized + private def isStageFinalized(stage: GraphStageLogic): Boolean = finalizedMark(stage.stageId) + + // Mark the given stage as finalized + private def markStageFinalized(stage: GraphStageLogic): Unit = finalizedMark(stage.stageId) = true + // Register that a connection in which the given stage participated has been completed and therefore the stage // itself might stop, too. private def completeConnection(stageId: Int): Unit = {