Skip to content

Commit

Permalink
Add Mode
Browse files Browse the repository at this point in the history
  • Loading branch information
kyri-petrou committed Aug 13, 2024
1 parent 66e040e commit 9e560c4
Showing 1 changed file with 20 additions and 13 deletions.
33 changes: 20 additions & 13 deletions zio-query/shared/src/main/scala/zio/query/ZQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1164,7 +1164,7 @@ object ZQuery {
)(implicit bf: BuildFrom[Collection[A], B, Collection[B]], trace: Trace): ZQuery[R, E, Collection[B]] = {
implicit val ct1: ClassTag[A] = anyRefClassTag
implicit val c2: ClassTag[B] = anyRefClassTag
foreachSequentialOuter[R, E, A, B, Collection](as.toArray, mode = 0, bf.fromSpecific(as)(_))(f)
foreachSequentialOuter[R, E, A, B, Collection](as.toArray, mode = Mode.Sequential, bf.fromSpecific(as)(_))(f)
}

/**
Expand All @@ -1189,7 +1189,7 @@ object ZQuery {
def foreach[R, E, A, B: ClassTag](in: Array[A])(f: A => ZQuery[R, E, B])(implicit
trace: Trace
): ZQuery[R, E, Array[B]] =
foreachSequentialOuter(in, mode = 0, ZIO.identityFn[Array[B]])(f)
foreachSequentialOuter(in, mode = Mode.Sequential, ZIO.identityFn[Array[B]])(f)

/**
* Applies the function `f` to each element of the `Map[Key, Value]` and
Expand Down Expand Up @@ -1236,7 +1236,7 @@ object ZQuery {
)(implicit bf: BuildFrom[Collection[A], B, Collection[B]], trace: Trace): ZQuery[R, E, Collection[B]] = {
implicit val ct1: ClassTag[A] = anyRefClassTag
implicit val ct2: ClassTag[B] = anyRefClassTag
foreachSequentialOuter[R, E, A, B, Collection](as.toArray, mode = 2, bf.fromSpecific(as)(_))(f)
foreachSequentialOuter[R, E, A, B, Collection](as.toArray, mode = Mode.Batched, bf.fromSpecific(as)(_))(f)
}

def foreachBatched[R, E, A, B](as: Set[A])(fn: A => ZQuery[R, E, B])(implicit
Expand All @@ -1254,7 +1254,7 @@ object ZQuery {
def foreachBatched[R, E, A, B: ClassTag](as: Array[A])(f: A => ZQuery[R, E, B])(implicit
trace: Trace
): ZQuery[R, E, Array[B]] =
foreachSequentialOuter(as, mode = 2, ZIO.identityFn[Array[B]])(f)
foreachSequentialOuter(as, mode = Mode.Batched, ZIO.identityFn[Array[B]])(f)

/**
* Performs a query for each element in a Map, batching requests to data
Expand Down Expand Up @@ -1298,7 +1298,7 @@ object ZQuery {
ZQuery {
ZIO
.foreachPar(as.toArray)(f(_).step)
.map(collectResults[R, E, A, B, Collection](_, mode = 1, bf.fromSpecific(as)(_)))
.map(collectResults[R, E, A, B, Collection](_, mode = Mode.Parallel, bf.fromSpecific(as)(_)))
}
}

Expand Down Expand Up @@ -1434,7 +1434,7 @@ object ZQuery {
ZQuery.currentCache.getWith {
case Some(cache) => CachedResult.foreachAsArr(as)(r => cachedResult(cache, dataSource, f(r)))
case _ => ZIO.foreach(as)(r => uncachedResult(dataSource, f(r))).map(_.toArray)
}.map(v => collectResults(v, mode = 2, Chunk.fromArray))
}.map(v => collectResults(v, mode = Mode.Batched, Chunk.fromArray))
}

/**
Expand All @@ -1450,7 +1450,7 @@ object ZQuery {
ZQuery.currentCache.getWith {
case Some(cache) => CachedResult.foreachAsArr(as)(r => cachedResult(cache, dataSource, f(r)))
case _ => ZIO.foreach(as)(r => uncachedResult(dataSource, f(r))).map(_.toArray)
}.map(v => collectResults(v, mode = 2, _.toList))
}.map(v => collectResults(v, mode = Mode.Batched, _.toList))
}

private def cachedResult[R, E, A, B](
Expand Down Expand Up @@ -1838,12 +1838,12 @@ object ZQuery {
*/
private def foreachSequentialOuter[R, E, A, B, F[_]](
as: Array[A],
mode: Int, // 0 or 2
mode: Mode, // Sequential or Batched
mapOut: Array[B] => F[B]
)(
f: A => ZQuery[R, E, B]
)(implicit trace: Trace, ct: ClassTag[B]): ZQuery[R, E, F[B]] = {
assert(BuildUtils.optimizationsEnabled || mode != 1)
assert(BuildUtils.optimizationsEnabled || mode != Mode.Parallel)

as.length match {
case 0 => ZQuery.succeedNow(mapOut(Array.empty[B]))
Expand Down Expand Up @@ -1874,7 +1874,7 @@ object ZQuery {

private def collectResults[R, E, A, B, F[_]](
results: Array[Result[R, E, B]],
mode: Int, // 0 = sequential, 1 = parallel, 2 = batched
mode: Mode,
mapOut: Array[B] => F[B]
)(implicit trace: Trace): Result[R, E, F[B]] = {
implicit val classTag: ClassTag[B] = anyRefClassTag
Expand Down Expand Up @@ -1965,9 +1965,9 @@ object ZQuery {
Continue.get(io)
} else {
val collect = (mode: @switch) match {
case 0 => ZQuery.collectAll[R, E, B](effects)
case 1 => ZQuery.collectAllPar[R, E, B](effects)
case 2 => ZQuery.collectAllBatched[R, E, B](effects)
case Mode.Sequential => ZQuery.collectAll[R, E, B](effects)
case Mode.Parallel => ZQuery.collectAllPar[R, E, B](effects)
case Mode.Batched => ZQuery.collectAllBatched[R, E, B](effects)
}
val query = collect.mapZIO { effects =>
collectArrayZIO(gets).map { gets =>
Expand Down Expand Up @@ -2078,4 +2078,11 @@ object ZQuery {
private implicit class IterableOps[A](private val it: Iterable[A]) extends AnyVal {
def knownSize: Int = it.size
}

private type Mode = Int
private object Mode {
final val Sequential = 0
final val Parallel = 1
final val Batched = 2
}
}

0 comments on commit 9e560c4

Please sign in to comment.