Skip to content

Commit

Permalink
Merge pull request #2157 from vasilmkd/kleislice2
Browse files Browse the repository at this point in the history
Kleisli `parTraverse` CE2
  • Loading branch information
djspiewak authored Jul 27, 2021
2 parents da85bdf + 5b4a8db commit cfb40e6
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 1 deletion.
2 changes: 2 additions & 0 deletions core/shared/src/main/scala/cats/effect/IO.scala
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,8 @@ abstract private[effect] class IOParallelNewtype extends internals.IOTimerRef wi
par(unwrap(fa).map(f))
final override def unit: IO.Par[Unit] =
par(IO.unit)
override def map2Eval[A, B, Z](fa: IO.Par[A], fb: Eval[IO.Par[B]])(f: (A, B) => Z): Eval[IO.Par[Z]] =
Eval.now(map2(fa, par(IO.suspend(unwrap(fb.value))))(f))
}
}

Expand Down
10 changes: 10 additions & 0 deletions core/shared/src/main/scala/cats/effect/Resource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,14 @@ abstract private[effect] class ResourceMonad[F[_]] extends Monad[Resource[F, *]]

def tailRecM[A, B](a: A)(f: A => Resource[F, Either[A, B]]): Resource[F, B] =
Resource.tailRecM(a)(f)

override def map2Eval[A, B, Z](fa: Resource[F, A], fb: Eval[Resource[F, B]])(f: (A, B) => Z): Eval[Resource[F, Z]] =
Eval.now {
for {
a <- fa
b <- fb.value
} yield f(a, b)
}
}

abstract private[effect] class ResourceMonoid[F[_], A] extends ResourceSemigroup[F, A] with Monoid[Resource[F, A]] {
Expand Down Expand Up @@ -693,6 +701,8 @@ abstract private[effect] class ResourceParCommutativeApplicative[F[_]]
map(product(fa, fb)) { case (a, b) => f(a, b) }
final override def ap[A, B](ff: Par[F, A => B])(fa: Par[F, A]): Par[F, B] =
map(product(ff, fa)) { case (ff, a) => ff(a) }
override def map2Eval[A, B, Z](fa: Par[F, A], fb: Eval[Par[F, B]])(f: (A, B) => Z): Eval[Par[F, Z]] =
Eval.now(map2(fa, par(Resource.suspend(F0.delay(unwrap(fb.value)))))(f))
}

abstract private[effect] class ResourceParallel[F0[_]] extends Parallel[Resource[F0, *]] {
Expand Down
35 changes: 34 additions & 1 deletion laws/shared/src/test/scala/cats/effect/IOTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package effect

import java.util.concurrent.atomic.AtomicInteger

import cats.effect.concurrent.Deferred
import cats.data.Kleisli
import cats.effect.concurrent.{Deferred, Ref}
import cats.effect.internals.{Callback, IOPlatform}
import cats.effect.laws.discipline.{ConcurrentEffectTests, EffectTests}
import cats.effect.laws.discipline.arbitrary._
Expand Down Expand Up @@ -1212,6 +1213,38 @@ class IOTests extends BaseTestsSuite {
ec.tick()
assertEquals(f.value, Some(Success(())))
}

testAsync("Should be stack safe in long traverse chains") { implicit ec =>
val N = 10000

val test = for {
ref <- Ref[IO].of(0)
_ <- List.fill(N)(0).traverse_(_ => Kleisli.liftF(ref.update(_ + 1))).run("Go...")
v <- ref.get
} yield v

val f = test.unsafeToFuture()

ec.tick()
assertEquals(f.value, Some(Success(N)))
}

testAsync("Should be stack safe in long parTraverse chains") { implicit ec =>
implicit val contextShift: ContextShift[IO] = ec.ioContextShift

val N = 10000

val test = for {
ref <- Ref[IO].of(0)
_ <- List.fill(N)(0).parTraverse_(_ => Kleisli.liftF(ref.update(_ + 1))).run("Go...")
v <- ref.get
} yield v

val f = test.unsafeToFuture()

ec.tick()
assertEquals(f.value, Some(Success(N)))
}
}

object IOTests {
Expand Down
32 changes: 32 additions & 0 deletions laws/shared/src/test/scala/cats/effect/ResourceTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -621,4 +621,36 @@ class ResourceTests extends BaseTestsSuite {
released <-> y :: acquired
}
}

testAsync("Should be stack safe in long traverse chains") { implicit ec =>
val N = 10000
var i = 0

val test = for {
_ <- List.fill(N)(0).traverse_(_ => Kleisli.liftF(Resource.eval(IO(i += 1)))).run("Go...")
v <- Resource.eval(IO(i))
} yield v

val f = test.use(IO.pure).unsafeToFuture()

ec.tick()
assertEquals(f.value, Some(Success(N)))
}

testAsync("Should be stack safe in long parTraverse chains") { implicit ec =>
implicit val contextShift: ContextShift[IO] = ec.ioContextShift

val N = 10000
var i = 0

val test = for {
_ <- List.fill(N)(0).parTraverse_(_ => Kleisli.liftF(Resource.eval(IO(i += 1)))).run("Go...")
v <- Resource.eval(IO(i))
} yield v

val f = test.use(IO.pure).unsafeToFuture()

ec.tick()
assertEquals(f.value, Some(Success(N)))
}
}

0 comments on commit cfb40e6

Please sign in to comment.