diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index 29f3397b3e..7ea1e7d9ff 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -24,7 +24,6 @@ import org.apache.pekko.stream.*; import org.apache.pekko.stream.scaladsl.FlowSpec; import org.apache.pekko.stream.testkit.javadsl.TestSink; -import org.apache.pekko.util.ConstantFun; import org.apache.pekko.stream.javadsl.GraphDSL.Builder; import org.apache.pekko.stream.stage.*; import org.apache.pekko.testkit.PekkoSpec; @@ -1689,4 +1688,41 @@ public void useFlatMapPrefixSubSource() { .join(); Assert.assertEquals(Sets.newHashSet(1, 2), resultSet); } + + @Test + public void zipWithIndex() { + final List input = Arrays.asList(1, 2, 3); + final List> expected = + Arrays.asList(new Pair<>(1, 0L), new Pair<>(2, 1L), new Pair<>(3, 2L)); + + final List> result = + Source.from(input) + .via(Flow.of(Integer.class).zipWithIndex()) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .join(); + + assertEquals(expected, result); + } + + @Test + public void zipWithIndexInSubFlow() { + + final Set> resultSet = + Source.range(1, 5) + .via(Flow.of(Integer.class).groupBy(2, i -> i % 2).zipWithIndex().mergeSubstreams()) + .runWith(Sink.collect(Collectors.toSet()), system) + .toCompletableFuture() + .join(); + + Assert.assertEquals( + new HashSet<>( + Arrays.asList( + Pair.create(1, 0L), + Pair.create(3, 1L), + Pair.create(5, 2L), + Pair.create(2, 0L), + Pair.create(4, 1L))), + resultSet); + } } diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 96180c51b7..9be7dab422 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -36,7 +36,6 @@ import org.apache.pekko.testkit.PekkoJUnitActorSystemResource; import org.apache.pekko.testkit.PekkoSpec; import org.apache.pekko.testkit.javadsl.TestKit; -import org.apache.pekko.util.ConstantFun; import com.google.common.collect.Iterables; import org.junit.Assert; import org.junit.ClassRule; @@ -1529,4 +1528,33 @@ public void useFlatMapPrefixSubSource() { .join(); Assert.assertEquals(Sets.newHashSet(1, 2), resultSet); } + + @Test + public void zipWithIndex() { + final List> resultList = + Source.range(1, 3).zipWithIndex().runWith(Sink.seq(), system).toCompletableFuture().join(); + assertEquals( + Arrays.asList(Pair.create(1, 0L), Pair.create(2, 1L), Pair.create(3, 2L)), resultList); + } + + @Test + public void zipWithIndexOnSubSource() { + final Set> resultSet = + Source.range(1, 5) + .groupBy(2, i -> i % 2) + .zipWithIndex() + .mergeSubstreams() + .runWith(Sink.collect(Collectors.toSet()), system) + .toCompletableFuture() + .join(); + Assert.assertEquals( + new HashSet<>( + Arrays.asList( + Pair.create(1, 0L), + Pair.create(3, 1L), + Pair.create(5, 2L), + Pair.create(2, 0L), + Pair.create(4, 1L))), + resultSet); + } } diff --git a/stream/src/main/mima-filters/1.1.x.backwards.excludes/pr-1669-avoid-boxing-in-zipWithIndex.backwards.excludes b/stream/src/main/mima-filters/1.1.x.backwards.excludes/pr-1669-avoid-boxing-in-zipWithIndex.backwards.excludes new file mode 100644 index 0000000000..aaaac3bbcc --- /dev/null +++ b/stream/src/main/mima-filters/1.1.x.backwards.excludes/pr-1669-avoid-boxing-in-zipWithIndex.backwards.excludes @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Should be java.lang.Long instead of java.lang.Object +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.SubSource.zipWithIndex") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ZipWithIndex.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ZipWithIndex.scala new file mode 100644 index 0000000000..25c671d764 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ZipWithIndex.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.japi.Pair +import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet } +import pekko.stream.impl.Stages.DefaultAttributes +import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } + +/** + * INTERNAL API + */ +@InternalApi private[pekko] object ZipWithIndex extends GraphStage[FlowShape[Any, (Any, Long)]] { + val in = Inlet[Any]("ZipWithIndex.in") + val out = Outlet[(Any, Long)]("ZipWithIndex.out") + override val shape = FlowShape(in, out) + override def initialAttributes: Attributes = DefaultAttributes.zipWithIndex + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + private var index = 0L + override def onPush(): Unit = { + push(out, (grab(in), index)) + index += 1 + } + + override def onPull(): Unit = pull(in) + setHandlers(in, out, this) + } +} + +/** + * INTERNAL API + */ +@InternalApi private[pekko] object ZipWithIndexJava extends GraphStage[FlowShape[Any, Pair[Any, Long]]] { + val in = Inlet[Any]("ZipWithIndex.in") + val out = Outlet[Pair[Any, Long]]("ZipWithIndex.out") + override val shape = FlowShape(in, out) + override def initialAttributes: Attributes = DefaultAttributes.zipWithIndex + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + private var index = 0L + override def onPush(): Unit = { + push(out, new Pair(grab(in), index)) + index += 1 + } + + override def onPull(): Unit = pull(in) + setHandlers(in, out, this) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index df395c1ad5..1eb248c0c0 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -37,6 +37,7 @@ import pekko.japi.Pair import pekko.japi.function import pekko.japi.function.Creator import pekko.stream.{ javadsl, _ } +import pekko.stream.impl.fusing.ZipWithIndexJava import pekko.util.ConstantFun import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ @@ -3691,7 +3692,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * '''Cancels when''' downstream cancels */ def zipWithIndex: Flow[In, Pair[Out, java.lang.Long], Mat] = - new Flow(delegate.zipWithIndex.map { case (elem, index) => Pair[Out, java.lang.Long](elem, index) }) + via(ZipWithIndexJava.asInstanceOf[Graph[FlowShape[Out, Pair[Out, java.lang.Long]], NotUsed]]) /** * If the first element has not passed through this operator before the provided timeout, the stream is failed diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 1f50fea255..7ad5bd0df7 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -35,7 +35,7 @@ import pekko.japi.{ function, JavaPartialFunction, Pair } import pekko.japi.function.Creator import pekko.stream._ import pekko.stream.impl.{ LinearTraversalBuilder, UnfoldAsyncJava, UnfoldJava } -import pekko.stream.impl.fusing.ArraySource +import pekko.stream.impl.fusing.{ ArraySource, ZipWithIndexJava } import pekko.util.{ unused, _ } import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ @@ -2173,7 +2173,8 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * '''Cancels when''' downstream cancels */ def zipWithIndex: javadsl.Source[Pair[Out @uncheckedVariance, java.lang.Long], Mat] = - new Source(delegate.zipWithIndex.map { case (elem, index) => Pair[Out, java.lang.Long](elem, index) }) + new Source(delegate.via( + ZipWithIndexJava.asInstanceOf[Graph[FlowShape[Out, Pair[Out, java.lang.Long]], NotUsed]])) /** * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 37216e02d6..79d7ea4e43 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -29,6 +29,7 @@ import pekko.annotation.ApiMayChange import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.{ function, Pair } import pekko.stream._ +import pekko.stream.impl.fusing.ZipWithIndexJava import pekko.util.ConstantFun import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ @@ -2272,7 +2273,8 @@ class SubFlow[In, Out, Mat]( * '''Cancels when''' downstream cancels */ def zipWithIndex: SubFlow[In, pekko.japi.Pair[Out @uncheckedVariance, java.lang.Long], Mat] = - new SubFlow(delegate.zipWithIndex.map { case (elem, index) => pekko.japi.Pair[Out, java.lang.Long](elem, index) }) + new SubFlow(delegate.via( + ZipWithIndexJava.asInstanceOf[Graph[FlowShape[Out, Pair[Out, java.lang.Long]], NotUsed]])) /** * If the first element has not passed through this operator before the provided timeout, the stream is failed diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 89e676d958..3092262b7d 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -29,6 +29,7 @@ import pekko.annotation.ApiMayChange import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.{ function, Pair } import pekko.stream._ +import pekko.stream.impl.fusing.ZipWithIndexJava import pekko.util.ConstantFun import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ @@ -2246,8 +2247,9 @@ class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels */ - def zipWithIndex: javadsl.SubSource[pekko.japi.Pair[Out @uncheckedVariance, Long], Mat] = - new SubSource(delegate.zipWithIndex.map { case (elem, index) => pekko.japi.Pair(elem, index) }) + def zipWithIndex: javadsl.SubSource[pekko.japi.Pair[Out @uncheckedVariance, java.lang.Long], Mat] = + new SubSource(delegate.via( + ZipWithIndexJava.asInstanceOf[Graph[FlowShape[Out, Pair[Out, java.lang.Long]], NotUsed]])) /** * If the first element has not passed through this operator before the provided timeout, the stream is failed diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 9c66727d51..e2db2e8daf 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -3304,16 +3304,7 @@ trait FlowOps[+Out, +Mat] { * * '''Cancels when''' downstream cancels */ - def zipWithIndex: Repr[(Out, Long)] = { - statefulMapConcat[(Out, Long)] { () => - var index: Long = 0L - elem => { - val zipped = (elem, index) - index += 1 - immutable.Iterable[(Out, Long)](zipped) - } - } - } + def zipWithIndex: Repr[(Out, Long)] = via(ZipWithIndex.asInstanceOf[Graph[FlowShape[Out, (Out, Long)], NotUsed]]) /** * Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].