Skip to content

Commit

Permalink
Valve: getMode fails after the stage is stopped (#191)
Browse files Browse the repository at this point in the history
Once the valve stage has been stopped, the future returned by `getMode`
used to never complete (an async callback is invoked with a `Promise`,
but the callback action is never process once the stage is completed).
By using `invokeWithFeedback` instead of `invoke`, we can be sure that
that future returned by `getMode` will either fail if the valve is
already stopped or else will be processed by the valve.

Fixes #119
  • Loading branch information
harpocrates authored Mar 4, 2022
1 parent 72c6f1c commit 0c48f32
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
7 changes: 4 additions & 3 deletions src/main/scala/akka/stream/contrib/Valve.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ final class Valve[A](mode: SwitchMode) extends GraphStageWithMaterializedValue[F
promise.success(succeed)
}

// FIXME will never complete promise if stage is stopped, use invokeWithFeedback when Akka 2.5.7 is released
val getModeCallback = getAsyncCallback[Promise[SwitchMode]](_.success(mode))

override def flip(flipToMode: SwitchMode): Future[Boolean] = {
Expand All @@ -124,8 +123,10 @@ final class Valve[A](mode: SwitchMode) extends GraphStageWithMaterializedValue[F

override def getMode(): Future[SwitchMode] = {
val promise = Promise[SwitchMode]()
getModeCallback.invoke(promise)
promise.future
implicit val ec = materializer.executionContext
getModeCallback
.invokeWithFeedback(promise)
.flatMap(_ => promise.future)
}
}

Expand Down
21 changes: 20 additions & 1 deletion src/test/scala/akka/stream/contrib/ValveSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package akka.stream.contrib

import akka.actor.ActorSystem
import akka.pattern.after
import akka.stream.ActorMaterializer
import akka.stream.{ActorMaterializer, StreamDetachedException}
import akka.stream.contrib.SwitchMode.{Close, Open}
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.testkit.scaladsl._
Expand Down Expand Up @@ -307,4 +307,23 @@ class ValveSpec extends WordSpec with ScalaFutures {
}

}

"A completed valve" should {

"fail to report its mode" in {

val (switchFut, terminatedFut) = Source.empty
.viaMat(new Valve(SwitchMode.Close))(Keep.right)
.toMat(Sink.ignore)(Keep.both)
.run()

whenReady(switchFut.zip(terminatedFut)) {
case (switch, _) =>
after(100 millis, system.scheduler) {
switch.getMode
}.failed.futureValue shouldBe a[StreamDetachedException]
}
}
}

}

0 comments on commit 0c48f32

Please sign in to comment.