From ce3620fc1a6a17b00ad29fe103c8c77cff796776 Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Sat, 4 Jan 2025 00:56:50 +0800 Subject: [PATCH] chore: Fix leak in FlatMapPrefix operator. (#1622) --- .../stream/impl/fusing/FlatMapPrefix.scala | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala index c3672ae1eaa..1191e44eafd 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala @@ -43,8 +43,10 @@ import pekko.util.OptionVal .mandatoryAttribute[Attributes.NestedMaterializationCancellationPolicy] .propagateToNestedMaterialization val matPromise = Promise[M]() - val logic = new GraphStageLogic(shape) with InHandler with OutHandler { - val accumulated = collection.mutable.Buffer.empty[In] + object FlatMapPrefixLogic extends GraphStageLogic(shape) with InHandler with OutHandler { + private var left = n + private var builder = Vector.newBuilder[In] + builder.sizeHint(left) private var subSource = OptionVal.none[SubSourceOutlet[In]] private var subSink = OptionVal.none[SubSinkInlet[Out]] @@ -65,11 +67,12 @@ import pekko.util.OptionVal subSource match { case OptionVal.Some(s) => s.push(grab(in)) case _ => - accumulated.append(grab(in)) - if (accumulated.size == n) { + builder += grab(in) + left -= 1 + if (left == 0) { materializeFlow() } else { - // gi'me some more! + // give me some more! pull(in) } } @@ -98,12 +101,12 @@ import pekko.util.OptionVal // delegate to subSink s.pull() case _ => - if (accumulated.size < n) pull(in) - else if (accumulated.size == n) { + if (left > 0) pull(in) + else if (left == 0) { // corner case for n = 0, can be handled in FlowOps materializeFlow() } else { - throw new IllegalStateException(s"Unexpected accumulated size: ${accumulated.size} (n: $n)") + throw new IllegalStateException(s"Unexpected accumulated size, left : $left (n: $n)") } } } @@ -114,7 +117,7 @@ import pekko.util.OptionVal case _ => if (propagateToNestedMaterialization) { downstreamCause = OptionVal.Some(cause) - if (accumulated.size == n) { + if (left == 0) { // corner case for n = 0, can be handled in FlowOps materializeFlow() } else if (!hasBeenPulled(in)) { // if in was already closed, nested flow would have already been materialized @@ -128,8 +131,8 @@ import pekko.util.OptionVal def materializeFlow(): Unit = try { - val prefix = accumulated.toVector - accumulated.clear() + val prefix = builder.result() + builder = null // free for GC subSource = OptionVal.Some(new SubSourceOutlet[In]("FlatMapPrefix.subSource")) val theSubSource = subSource.get subSink = OptionVal.Some(new SubSinkInlet[Out]("FlatMapPrefix.subSink")) @@ -196,6 +199,6 @@ import pekko.util.OptionVal case NonFatal(ex) => failStage(ex) } } - (logic, matPromise.future) + (FlatMapPrefixLogic, matPromise.future) } }