From de960892a98daef4c458d453044f2d2515b234e3 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Sat, 4 Jan 2025 02:36:11 +0800 Subject: [PATCH] perf: avoid boxing in zipWithIndex --- .../apache/pekko/stream/javadsl/FlowTest.java | 39 +++++++++++ .../pekko/stream/javadsl/SourceTest.java | 30 +++++++++ ...-boxing-in-zipWithIndex.backwards.excludes | 19 ++++++ .../pekko/stream/impl/JavaStreamSource.scala | 6 +- .../stream/impl/fusing/ZipWithIndex.scala | 67 +++++++++++++++++++ .../apache/pekko/stream/javadsl/Flow.scala | 3 +- .../apache/pekko/stream/javadsl/Source.scala | 5 +- .../apache/pekko/stream/javadsl/SubFlow.scala | 4 +- .../pekko/stream/javadsl/SubSource.scala | 6 +- .../apache/pekko/stream/scaladsl/Flow.scala | 11 +-- .../stream/scaladsl/FlowWithContext.scala | 2 +- 11 files changed, 172 insertions(+), 20 deletions(-) create mode 100644 stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1669-avoid-boxing-in-zipWithIndex.backwards.excludes create mode 100644 stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ZipWithIndex.scala diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index f2e92c88638..09314a6c5ea 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -13,6 +13,7 @@ package org.apache.pekko.stream.javadsl; +import com.google.common.collect.Sets; import org.apache.pekko.Done; import org.apache.pekko.NotUsed; import org.apache.pekko.actor.ActorRef; @@ -1663,4 +1664,42 @@ public void mustBeAbleToConvertToJavaInJava() { org.apache.pekko.stream.scaladsl.Flow.apply(); Flow javaFlow = scalaFlow.asJava(); } + + @Test + public void zipWithIndex() { + final List input = Arrays.asList(1, 2, 3); + final List> expected = + Arrays.asList(new Pair<>(1, 0L), new Pair<>(2, 1L), new Pair<>(3, 2L)); + + final List> result = + Source.from(input) + .via(Flow.of(Integer.class).zipWithIndex()) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .join(); + + assertEquals(expected, result); + } + + @Test + public void zipWithIndexInSubFlow() { + + final Set> resultSet = + new HashSet<>( + Source.range(1, 5) + .via(Flow.of(Integer.class).groupBy(2, i -> i % 2).zipWithIndex().mergeSubstreams()) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .join()); + + Assert.assertEquals( + new HashSet<>( + Arrays.asList( + Pair.create(1, 0L), + Pair.create(3, 1L), + Pair.create(5, 2L), + Pair.create(2, 0L), + Pair.create(4, 1L))), + resultSet); + } } diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 8464688043c..2f2a68a04cb 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -1504,4 +1504,34 @@ public void flattenOptionalOptional() throws Exception { .get(3, TimeUnit.SECONDS); Assert.assertEquals(Arrays.asList(2, 4, 6, 8, 10), resultList); } + + @Test + public void zipWithIndex() { + final List> resultList = + Source.range(1, 3).zipWithIndex().runWith(Sink.seq(), system).toCompletableFuture().join(); + assertEquals( + Arrays.asList(Pair.create(1, 0L), Pair.create(2, 1L), Pair.create(3, 2L)), resultList); + } + + @Test + public void zipWithIndexOnSubSource() { + final Set> resultSet = + new HashSet<>( + Source.range(1, 5) + .groupBy(2, i -> i % 2) + .zipWithIndex() + .mergeSubstreams() + .runWith(Sink.seq(), system) + .toCompletableFuture() + .join()); + Assert.assertEquals( + new HashSet<>( + Arrays.asList( + Pair.create(1, 0L), + Pair.create(3, 1L), + Pair.create(5, 2L), + Pair.create(2, 0L), + Pair.create(4, 1L))), + resultSet); + } } diff --git a/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1669-avoid-boxing-in-zipWithIndex.backwards.excludes b/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1669-avoid-boxing-in-zipWithIndex.backwards.excludes new file mode 100644 index 00000000000..7726fda48eb --- /dev/null +++ b/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1669-avoid-boxing-in-zipWithIndex.backwards.excludes @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Should be java.lang.Long instead of java.lang.Object +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.SubSource.zipWithIndex") \ No newline at end of file diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala index 74bba55d0a0..9ce904425af 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/JavaStreamSource.scala @@ -13,14 +13,14 @@ package org.apache.pekko.stream.impl +import java.util.Spliterator +import java.util.function.Consumer + import org.apache.pekko import pekko.annotation.InternalApi import pekko.stream._ import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } -import java.util.Spliterator -import java.util.function.Consumer - /** INTERNAL API */ @InternalApi private[stream] final class JavaStreamSource[T, S <: java.util.stream.BaseStream[T, S]]( open: () => java.util.stream.BaseStream[T, S]) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ZipWithIndex.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ZipWithIndex.scala new file mode 100644 index 00000000000..25c671d7643 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ZipWithIndex.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.japi.Pair +import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet } +import pekko.stream.impl.Stages.DefaultAttributes +import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } + +/** + * INTERNAL API + */ +@InternalApi private[pekko] object ZipWithIndex extends GraphStage[FlowShape[Any, (Any, Long)]] { + val in = Inlet[Any]("ZipWithIndex.in") + val out = Outlet[(Any, Long)]("ZipWithIndex.out") + override val shape = FlowShape(in, out) + override def initialAttributes: Attributes = DefaultAttributes.zipWithIndex + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + private var index = 0L + override def onPush(): Unit = { + push(out, (grab(in), index)) + index += 1 + } + + override def onPull(): Unit = pull(in) + setHandlers(in, out, this) + } +} + +/** + * INTERNAL API + */ +@InternalApi private[pekko] object ZipWithIndexJava extends GraphStage[FlowShape[Any, Pair[Any, Long]]] { + val in = Inlet[Any]("ZipWithIndex.in") + val out = Outlet[Pair[Any, Long]]("ZipWithIndex.out") + override val shape = FlowShape(in, out) + override def initialAttributes: Attributes = DefaultAttributes.zipWithIndex + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + private var index = 0L + override def onPush(): Unit = { + push(out, new Pair(grab(in), index)) + index += 1 + } + + override def onPull(): Unit = pull(in) + setHandlers(in, out, this) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 91b0385e490..c2f8cf581ed 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -26,6 +26,7 @@ import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag import org.apache.pekko +import org.apache.pekko.stream.impl.fusing.ZipWithIndexJava import pekko.Done import pekko.NotUsed import pekko.actor.ActorRef @@ -3691,7 +3692,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * '''Cancels when''' downstream cancels */ def zipWithIndex: Flow[In, Pair[Out, java.lang.Long], Mat] = - new Flow(delegate.zipWithIndex.map { case (elem, index) => Pair[Out, java.lang.Long](elem, index) }) + via(ZipWithIndexJava.asInstanceOf[Graph[FlowShape[Out, Pair[Out, java.lang.Long]], NotUsed]]) /** * If the first element has not passed through this operator before the provided timeout, the stream is failed diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index bcd289b7857..563ac472742 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -35,7 +35,7 @@ import pekko.japi.{ function, JavaPartialFunction, Pair } import pekko.japi.function.Creator import pekko.stream._ import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava } -import pekko.stream.impl.fusing.ArraySource +import pekko.stream.impl.fusing.{ ArraySource, ZipWithIndexJava } import pekko.util.{ unused, _ } import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ @@ -2173,7 +2173,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * '''Cancels when''' downstream cancels */ def zipWithIndex: javadsl.Source[Pair[Out @uncheckedVariance, java.lang.Long], Mat] = - new Source(delegate.zipWithIndex.map { case (elem, index) => Pair[Out, java.lang.Long](elem, index) }) + new Source(delegate.via( + ZipWithIndexJava.asInstanceOf[Graph[FlowShape[Out, Pair[Out, java.lang.Long]], NotUsed]])) /** * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 20f73c4f010..8e96be733ff 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -29,6 +29,7 @@ import pekko.annotation.ApiMayChange import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.{ function, Pair } import pekko.stream._ +import pekko.stream.impl.fusing.ZipWithIndexJava import pekko.util.ConstantFun import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ @@ -2272,7 +2273,8 @@ class SubFlow[In, Out, Mat]( * '''Cancels when''' downstream cancels */ def zipWithIndex: SubFlow[In, pekko.japi.Pair[Out @uncheckedVariance, java.lang.Long], Mat] = - new SubFlow(delegate.zipWithIndex.map { case (elem, index) => pekko.japi.Pair[Out, java.lang.Long](elem, index) }) + new SubFlow(delegate.via( + ZipWithIndexJava.asInstanceOf[Graph[FlowShape[Out, Pair[Out, java.lang.Long]], NotUsed]])) /** * If the first element has not passed through this operator before the provided timeout, the stream is failed diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 9ac6f9c714b..b9ad87a782c 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -29,6 +29,7 @@ import pekko.annotation.ApiMayChange import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.{ function, Pair } import pekko.stream._ +import pekko.stream.impl.fusing.ZipWithIndexJava import pekko.util.ConstantFun import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ @@ -2246,8 +2247,9 @@ class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels */ - def zipWithIndex: javadsl.SubSource[pekko.japi.Pair[Out @uncheckedVariance, Long], Mat] = - new SubSource(delegate.zipWithIndex.map { case (elem, index) => pekko.japi.Pair(elem, index) }) + def zipWithIndex: javadsl.SubSource[pekko.japi.Pair[Out @uncheckedVariance, java.lang.Long], Mat] = + new SubSource(delegate.via( + ZipWithIndexJava.asInstanceOf[Graph[FlowShape[Out, Pair[Out, java.lang.Long]], NotUsed]])) /** * If the first element has not passed through this operator before the provided timeout, the stream is failed diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 9c66727d519..e2db2e8daf9 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -3304,16 +3304,7 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels */ - def zipWithIndex: Repr[(Out, Long)] = { - statefulMapConcat[(Out, Long)] { () => - var index: Long = 0L - elem => { - val zipped = (elem, index) - index += 1 - immutable.Iterable[(Out, Long)](zipped) - } - } - } + def zipWithIndex: Repr[(Out, Long)] = via(ZipWithIndex.asInstanceOf[Graph[FlowShape[Out, (Out, Long)], NotUsed]]) /** * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]]. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala index 88e9e392c62..91a72fc90a8 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala @@ -16,8 +16,8 @@ package org.apache.pekko.stream.scaladsl import scala.annotation.unchecked.uncheckedVariance import org.apache.pekko -import pekko.annotation.ApiMayChange import pekko.NotUsed +import pekko.annotation.ApiMayChange import pekko.japi.Pair import pekko.stream._