Skip to content

Commit

Permalink
fix: Fix occasional ordering issue in FlowWithContext#unsafeOptionalD…
Browse files Browse the repository at this point in the history
…ataVia (#1681) (#1684)

* chore: Test more rounds for unsafeDataVia keeping order.

* fix: Fix flask ordering in FlowWithContext#unsafeOptionalDataVia operator.

(cherry picked from commit 477fd39)
  • Loading branch information
He-Pin authored Jan 8, 2025
1 parent 761003d commit 9d8b12f
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,36 +69,44 @@ object FlowWithContext {
FlowWithContext.fromTuples(Flow.fromGraph(GraphDSL.createGraph(flow, viaFlow)(combine) {
implicit b => (f, viaF) =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[(Option[FOut], Ctx)](2))
val merge = b.add(Merge[(Option[FViaOut], Ctx)](2))

val unzip = b.add(Unzip[FOut, Ctx]())
val zipper = b.add(Zip[FViaOut, Ctx]())

val filterAvailable = Flow[(Option[FOut], Ctx)].collect {
case (Some(f), ctx) => (f, ctx)
}

val filterUnavailable = Flow[(Option[FOut], Ctx)].collect {
case (None, ctx) => (Option.empty[FViaOut], ctx)
case class IndexedCtx(idx: Long, ctx: Ctx)
val partition = b.add(Partition[(Option[FOut], IndexedCtx)](2,
{
case (None, _) => 0
case (Some(_), _) => 1
}))

val sequence = Flow[(Option[FOut], Ctx)].zipWithIndex
.map {
case ((opt, ctx), idx) => (opt, IndexedCtx(idx, ctx))
}

val unzip = b.add(Unzip[Option[FOut], IndexedCtx]())
val zipper = b.add(Zip[FViaOut, IndexedCtx]())
val mergeSequence = b.add(MergeSequence[(Option[FViaOut], IndexedCtx)](2)(_._2.idx))
val unwrapSome = b.add(Flow[Option[FOut]].map {
case Some(elem) => elem
case _ => throw new IllegalStateException("Only expects Some")
})
val unwrap = Flow[(Option[FViaOut], IndexedCtx)].map {
case (opt, indexedCtx) => (opt, indexedCtx.ctx)
}

val mapIntoOption = Flow[(FViaOut, Ctx)].map {
case (f, ctx) => (Some(f), ctx)
val mapIntoOption = Flow[(FViaOut, IndexedCtx)].map {
case (elem, indexedCtx) => (Some(elem), indexedCtx)
}

f ~> broadcast.in

broadcast.out(0) ~> filterAvailable ~> unzip.in

unzip.out0 ~> viaF ~> zipper.in0
unzip.out1 ~> zipper.in1

zipper.out ~> mapIntoOption ~> merge.in(0)

broadcast.out(1) ~> filterUnavailable ~> merge.in(1)
//format: off
f ~> sequence ~> partition.in
partition.out(0).asInstanceOf[Outlet[(Option[FViaOut], IndexedCtx)]] ~> mergeSequence.in(0)
partition.out(1) ~> unzip.in
unzip.out0 ~> unwrapSome ~> viaF ~> zipper.in0
unzip.out1 ~> zipper.in1
zipper.out ~> mapIntoOption ~> mergeSequence.in(1)

FlowShape(f.in, merge.out)
//format: on
FlowShape(f.in, (mergeSequence.out ~> unwrap).outlet)
}))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ object SourceWithContext {

//format: off
s ~> sequence ~> partition.in
partition.out(0).asInstanceOf[Outlet[(Option[FOut], IndexedCtx)]] ~> mergeSequence.in(0)
partition.out(0).asInstanceOf[Outlet[(Option[FOut], IndexedCtx)]] ~> mergeSequence.in(0)
partition.out(1) ~> unzip.in
unzip.out0 ~> unwrapSome ~> viaF ~> zipper.in0
unzip.out1 ~> zipper.in1
Expand Down

0 comments on commit 9d8b12f

Please sign in to comment.