Skip to content

Commit

Permalink
fix: avoid call finalizeStage more times than once.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Dec 31, 2024
1 parent ec70e11 commit bf57618
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -412,21 +412,23 @@ class ActorGraphInterpreterSpec extends StreamSpec {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()

Source
.fromPublisher(upstream)
.alsoTo(Sink.fromSubscriber(downstream))
.runWith(Sink.fromSubscriber(filthySubscriber))

upstream.sendNext(0)

downstream.requestNext(0)
val ise = downstream.expectError()
ise shouldBe an[IllegalStateException]
ise.getCause shouldBe a[SpecViolation]
ise.getCause.getCause shouldBe a[TE]
ise.getCause.getCause should (have.message("violating your spec"))

upstream.expectCancellation()
EventFilter[NullPointerException](occurrences = 0).intercept {
Source
.fromPublisher(upstream)
.alsoTo(Sink.fromSubscriber(downstream))
.runWith(Sink.fromSubscriber(filthySubscriber))

upstream.sendNext(0)

downstream.requestNext(0)
val ise = downstream.expectError()
ise shouldBe an[IllegalStateException]
ise.getCause shouldBe a[SpecViolation]
ise.getCause.getCause shouldBe a[TE]
ise.getCause.getCause should (have.message("violating your spec"))

upstream.expectCancellation()
}
}

"trigger postStop in all stages when abruptly terminated (and no upstream boundaries)" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ import pekko.stream.stage._
logics(i).handlers.length
}

// Marks whether a stage has been finalized (finalizeStage been called) or not
private[this] val finalizedMark = Array.fill(logics.length)(false)

private[this] var _subFusingMaterializer: Materializer = _
def subFusingMaterializer: Materializer = _subFusingMaterializer

Expand Down Expand Up @@ -331,7 +334,10 @@ import pekko.stream.stage._
var i = 0
while (i < logics.length) {
val logic = logics(i)
if (!isStageCompleted(logic)) finalizeStage(logic)
if (!isStageCompleted(logic) && !isStageFinalized(logic)) {
markStageFinalized(logic)
finalizeStage(logic)
}
i += 1
}
}
Expand Down Expand Up @@ -589,14 +595,21 @@ import pekko.stream.stage._
}

def afterStageHasRun(logic: GraphStageLogic): Unit =
if (isStageCompleted(logic)) {
if (isStageCompleted(logic) && !isStageFinalized(logic)) {
markStageFinalized(logic)
runningStages -= 1
finalizeStage(logic)
}

// Returns true if the given stage is already completed
def isStageCompleted(stage: GraphStageLogic): Boolean = stage != null && shutdownCounter(stage.stageId) == 0

// Returns true if the given stage is already finalized
private def isStageFinalized(stage: GraphStageLogic): Boolean = finalizedMark(stage.stageId)

// Mark the given stage as finalized
private def markStageFinalized(stage: GraphStageLogic): Unit = finalizedMark(stage.stageId) = true

// Register that a connection in which the given stage participated has been completed and therefore the stage
// itself might stop, too.
private def completeConnection(stageId: Int): Unit = {
Expand Down

0 comments on commit bf57618

Please sign in to comment.