From 477fd393c20cd507917e589a2b919f37449a08f5 Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Mon, 6 Jan 2025 00:10:18 +0800 Subject: [PATCH] fix: Fix occasional ordering issue in FlowWithContext#unsafeOptionalDataVia (#1681) * chore: Test more rounds for unsafeDataVia keeping order. * fix: Fix flask ordering in FlowWithContext#unsafeOptionalDataVia operator. --- .../stream/scaladsl/FlowWithContext.scala | 56 +++++++++++-------- .../stream/scaladsl/SourceWithContext.scala | 2 +- 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala index 88e9e392c6..71ba23fe10 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala @@ -69,36 +69,44 @@ object FlowWithContext { FlowWithContext.fromTuples(Flow.fromGraph(GraphDSL.createGraph(flow, viaFlow)(combine) { implicit b => (f, viaF) => import GraphDSL.Implicits._ - val broadcast = b.add(Broadcast[(Option[FOut], Ctx)](2)) - val merge = b.add(Merge[(Option[FViaOut], Ctx)](2)) - val unzip = b.add(Unzip[FOut, Ctx]()) - val zipper = b.add(Zip[FViaOut, Ctx]()) - - val filterAvailable = Flow[(Option[FOut], Ctx)].collect { - case (Some(f), ctx) => (f, ctx) - } - - val filterUnavailable = Flow[(Option[FOut], Ctx)].collect { - case (None, ctx) => (Option.empty[FViaOut], ctx) + case class IndexedCtx(idx: Long, ctx: Ctx) + val partition = b.add(Partition[(Option[FOut], IndexedCtx)](2, + { + case (None, _) => 0 + case (Some(_), _) => 1 + })) + + val sequence = Flow[(Option[FOut], Ctx)].zipWithIndex + .map { + case ((opt, ctx), idx) => (opt, IndexedCtx(idx, ctx)) + } + + val unzip = b.add(Unzip[Option[FOut], IndexedCtx]()) + val zipper = b.add(Zip[FViaOut, IndexedCtx]()) + val mergeSequence = b.add(MergeSequence[(Option[FViaOut], IndexedCtx)](2)(_._2.idx)) + val unwrapSome = b.add(Flow[Option[FOut]].map { + case Some(elem) => elem + case _ => throw new IllegalStateException("Only expects Some") + }) + val unwrap = Flow[(Option[FViaOut], IndexedCtx)].map { + case (opt, indexedCtx) => (opt, indexedCtx.ctx) } - val mapIntoOption = Flow[(FViaOut, Ctx)].map { - case (f, ctx) => (Some(f), ctx) + val mapIntoOption = Flow[(FViaOut, IndexedCtx)].map { + case (elem, indexedCtx) => (Some(elem), indexedCtx) } - f ~> broadcast.in - - broadcast.out(0) ~> filterAvailable ~> unzip.in - - unzip.out0 ~> viaF ~> zipper.in0 - unzip.out1 ~> zipper.in1 - - zipper.out ~> mapIntoOption ~> merge.in(0) - - broadcast.out(1) ~> filterUnavailable ~> merge.in(1) + //format: off + f ~> sequence ~> partition.in + partition.out(0).asInstanceOf[Outlet[(Option[FViaOut], IndexedCtx)]] ~> mergeSequence.in(0) + partition.out(1) ~> unzip.in + unzip.out0 ~> unwrapSome ~> viaF ~> zipper.in0 + unzip.out1 ~> zipper.in1 + zipper.out ~> mapIntoOption ~> mergeSequence.in(1) - FlowShape(f.in, merge.out) + //format: on + FlowShape(f.in, (mergeSequence.out ~> unwrap).outlet) })) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala index 70ff3e38b2..b7323ca4db 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala @@ -87,7 +87,7 @@ object SourceWithContext { //format: off s ~> sequence ~> partition.in - partition.out(0).asInstanceOf[Outlet[(Option[FOut], IndexedCtx)]] ~> mergeSequence.in(0) + partition.out(0).asInstanceOf[Outlet[(Option[FOut], IndexedCtx)]] ~> mergeSequence.in(0) partition.out(1) ~> unzip.in unzip.out0 ~> unwrapSome ~> viaF ~> zipper.in0 unzip.out1 ~> zipper.in1