Skip to content

Commit

Permalink
Merge pull request #2239 from djspiewak/bug/short-circuit-par
Browse files Browse the repository at this point in the history
  • Loading branch information
djspiewak authored Aug 16, 2021
2 parents 6e45e80 + 29a9f46 commit d6817ff
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,57 @@ trait GenSpawnInstances {
fiberA <- F.start(ParallelF.value(fa))
fiberB <- F.start(ParallelF.value(fb))

// start a pair of supervisors to ensure that the opposite is canceled on error
_ <- F start {
fiberB.join flatMap {
case Outcome.Succeeded(_) => F.unit
case _ => fiberA.cancel
}
}

_ <- F start {
fiberA.join flatMap {
case Outcome.Succeeded(_) => F.unit
case _ => fiberB.cancel
}
}

a <- F
.onCancel(poll(fiberA.join), F.both(fiberA.cancel, fiberB.cancel).void)
.flatMap[A] {
case Outcome.Succeeded(fa) => fa
case Outcome.Errored(e) => fiberB.cancel *> F.raiseError(e)
case Outcome.Canceled() => fiberB.cancel *> poll(F.canceled *> F.never)
case Outcome.Succeeded(fa) =>
fa

case Outcome.Errored(e) =>
fiberB.cancel *> F.raiseError(e)

case Outcome.Canceled() =>
fiberB.cancel *> poll {
fiberB.join flatMap {
case Outcome.Succeeded(_) | Outcome.Canceled() =>
F.canceled *> F.never
case Outcome.Errored(e) =>
F.raiseError(e)
}
}
}

z <- F.onCancel(poll(fiberB.join), fiberB.cancel).flatMap[Z] {
case Outcome.Succeeded(fb) => fb.map(b => f(a, b))
case Outcome.Errored(e) => F.raiseError(e)
case Outcome.Canceled() => poll(F.canceled *> F.never)
case Outcome.Succeeded(fb) =>
fb.map(b => f(a, b))

case Outcome.Errored(e) =>
F.raiseError(e)

case Outcome.Canceled() =>
poll {
fiberA.join flatMap {
case Outcome.Succeeded(_) | Outcome.Canceled() =>
F.canceled *> F.never
case Outcome.Errored(e) =>
F.raiseError(e)
}
}
}
} yield z
}
Expand All @@ -84,18 +123,57 @@ trait GenSpawnInstances {
fiberA <- F.start(ParallelF.value(fa))
fiberB <- F.start(ParallelF.value(fb.value))

// start a pair of supervisors to ensure that the opposite is canceled on error
_ <- F start {
fiberB.join flatMap {
case Outcome.Succeeded(_) => F.unit
case _ => fiberA.cancel
}
}

_ <- F start {
fiberA.join flatMap {
case Outcome.Succeeded(_) => F.unit
case _ => fiberB.cancel
}
}

a <- F
.onCancel(poll(fiberA.join), F.both(fiberA.cancel, fiberB.cancel).void)
.flatMap[A] {
case Outcome.Succeeded(fa) => fa
case Outcome.Errored(e) => fiberB.cancel *> F.raiseError(e)
case Outcome.Canceled() => fiberB.cancel *> poll(F.canceled *> F.never)
case Outcome.Succeeded(fa) =>
fa

case Outcome.Errored(e) =>
fiberB.cancel *> F.raiseError(e)

case Outcome.Canceled() =>
fiberB.cancel *> poll {
fiberB.join flatMap {
case Outcome.Succeeded(_) | Outcome.Canceled() =>
F.canceled *> F.never
case Outcome.Errored(e) =>
F.raiseError(e)
}
}
}

z <- F.onCancel(poll(fiberB.join), fiberB.cancel).flatMap[Z] {
case Outcome.Succeeded(fb) => fb.map(b => f(a, b))
case Outcome.Errored(e) => F.raiseError(e)
case Outcome.Canceled() => poll(F.canceled *> F.never)
case Outcome.Succeeded(fb) =>
fb.map(b => f(a, b))

case Outcome.Errored(e) =>
F.raiseError(e)

case Outcome.Canceled() =>
poll {
fiberA.join flatMap {
case Outcome.Succeeded(_) | Outcome.Canceled() =>
F.canceled *> F.never
case Outcome.Errored(e) =>
F.raiseError(e)
}
}
}
} yield z
}
Expand Down
14 changes: 14 additions & 0 deletions laws/shared/src/test/scala/cats/effect/laws/PureConcSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ class PureConcSpec extends Specification with Discipline with BaseSpec {
implicit def exec(fb: TimeT[PureConc[Int, *], Boolean]): Prop =
Prop(pure.run(TimeT.run(fb)).fold(false, _ => false, _.getOrElse(false)))

"parallel utilities" should {
import cats.effect.kernel.{GenConcurrent, Outcome}
import cats.effect.kernel.implicits._
import cats.syntax.all._

type F[A] = PureConc[Int, A]
val F = GenConcurrent[F]

"short-circuit on error" in {
pure.run((F.never[Unit], F.raiseError[Unit](42)).parTupled) mustEqual Outcome.Errored(42)
pure.run((F.raiseError[Unit](42), F.never[Unit]).parTupled) mustEqual Outcome.Errored(42)
}
}

checkAll(
"TimeT[PureConc]",
GenTemporalTests[TimeT[PureConc[Int, *], *], Int].temporal[Int, Int, Int](10.millis)
Expand Down
31 changes: 20 additions & 11 deletions tests/shared/src/test/scala/cats/effect/IOSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,26 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {

}

"parallel" should {
"run parallel actually in parallel" in real {
val x = IO.sleep(2.seconds) >> IO.pure(1)
val y = IO.sleep(2.seconds) >> IO.pure(2)

List(x, y).parSequence.timeout(3.seconds).flatMap { res =>
IO {
res mustEqual List(1, 2)
}
}
}

"short-circuit on error" in ticked { implicit ticker =>
case object TestException extends RuntimeException

(IO.never[Unit], IO.raiseError[Unit](TestException)).parTupled.void must failAs(TestException)
(IO.raiseError[Unit](TestException), IO.never[Unit]).parTupled.void must failAs(TestException)
}
}

"miscellaneous" should {

"round trip non-canceled through s.c.Future" in ticked { implicit ticker =>
Expand All @@ -1081,17 +1101,6 @@ class IOSpec extends BaseSpec with Discipline with IOPlatformSpecification {
test must completeAs(42)
}

"run parallel actually in parallel" in real {
val x = IO.sleep(2.seconds) >> IO.pure(1)
val y = IO.sleep(2.seconds) >> IO.pure(2)

List(x, y).parSequence.timeout(3.seconds).flatMap { res =>
IO {
res mustEqual List(1, 2)
}
}
}

"run a synchronous IO" in ticked { implicit ticker =>
val ioa = IO(1).map(_ + 2)
val test = IO.fromFuture(IO(ioa.unsafeToFuture()))
Expand Down

0 comments on commit d6817ff

Please sign in to comment.