diff --git a/docs/src/main/paradox/stream/operators/Source/fromArray.md b/docs/src/main/paradox/stream/operators/Source/fromArray.md new file mode 100644 index 00000000000..0e0eb34e7ad --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source/fromArray.md @@ -0,0 +1,28 @@ +# Source.fromArray + +Stream the values of an `array`. + +@ref[Source operators](../index.md#source-operators) + +## Signature + +@apidoc[Source.from](Source$) { java="#fromArray(java.lang.Object[])" } + +## Description + +Stream the values of a Java `array`. + +## Examples + +Java +: @@snip [from.java](/docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #imports #source-from-array } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** the next value of the array + +**completes** when the last element of the seq has been emitted + +@@@ diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index eff0593c8b2..cc6d698a5b6 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -16,6 +16,7 @@ These built-in sources are available from @scala[`org.apache.pekko.stream.scalad |Source|@ref[empty](Source/empty.md)|Complete right away without ever emitting any elements.| |Source|@ref[failed](Source/failed.md)|Fail directly with a user specified exception.| |Source|@ref[@scala[apply]@java[from]](Source/from.md)|Stream the values of an @scala[`immutable.Seq`]@java[`Iterable`].| +|Source|@ref[fromArray](Source/fromArray.md)|Stream the values of an `array`.| |Source|@ref[fromCompletionStage](Source/fromCompletionStage.md)|Deprecated by @ref[`Source.completionStage`](Source/completionStage.md).| |Source|@ref[fromFuture](Source/fromFuture.md)|Deprecated by @ref[`Source.future`](Source/future.md).| |Source|@ref[fromFutureSource](Source/fromFutureSource.md)|Deprecated by @ref[`Source.futureSource`](Source/futureSource.md).| @@ -468,6 +469,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [foreachAsync](Sink/foreachAsync.md) * [foreachParallel](Sink/foreachParallel.md) * [from](Source/from.md) +* [fromArray](Source/fromArray.md) * [fromCompletionStage](Source/fromCompletionStage.md) * [fromFile](FileIO/fromFile.md) * [fromFuture](Source/fromFuture.md) diff --git a/docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java b/docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java index 531d4f38778..c2afdff2193 100644 --- a/docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java +++ b/docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java @@ -60,6 +60,14 @@ public static void fromExample() { // #source-from-example } + private static void fromArrayExample() { + final ActorSystem system = null; + // #source-from-array + Source words = Source.fromArray("Hello world".split("\\s")); + words.runForeach(System.out::println, system); + // #source-from-array + } + static void rangeExample() { final ActorSystem system = ActorSystem.create("Source"); diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index ced206a4a6c..8464688043c 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -105,6 +105,26 @@ public void mustBeAbleToUseSimpleOperators() { probe.expectMsgEquals("de"); } + @Test + public void mustBeAbleToCompleteWhenArrayIsEmpty() { + Source.fromArray(new String[] {}) + .runWith(TestSink.probe(system), system) + .ensureSubscription() + .expectComplete(); + } + + @Test + public void mustBeAbleToEmitEveryArrayElementSequentially() { + Source.fromArray(new String[] {"a", "b", "c"}) + .runWith(TestSink.probe(system), system) + .ensureSubscription() + .request(3) + .expectNext("a") + .expectNext("b") + .expectNext("c") + .expectComplete(); + } + @Test public void mustBeAbleToUseVoidTypeInForeach() { final TestKit probe = new TestKit(system); diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index 0a18b880bba..1559a550dcc 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -116,6 +116,7 @@ import pekko.stream.Attributes._ val publisherSource = name("publisherSource") val iterableSource = name("iterableSource") + val arraySource = name("arraySource") val iterateSource = name("iterateSource") val cycledSource = name("cycledSource") val futureSource = name("futureSource") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ArraySource.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ArraySource.scala new file mode 100644 index 00000000000..afbbbf2b3db --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ArraySource.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.impl.fusing + +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.stream.{ Attributes, Outlet, SourceShape } +import pekko.stream.impl.Stages.DefaultAttributes +import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } + +@InternalApi +private[pekko] final class ArraySource[T](elements: Array[T]) extends GraphStage[SourceShape[T]] { + require(elements ne null, "array must not be null") + override protected def initialAttributes: Attributes = DefaultAttributes.arraySource + private val out = Outlet[T]("ArraySource.out") + override val shape: SourceShape[T] = SourceShape(out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with OutHandler { + private var index: Int = 0 + + override def preStart(): Unit = if (elements.isEmpty) completeStage() + + override def onPull(): Unit = { + if (index < elements.length) { + push(out, elements(index)) + index += 1 + if (index == elements.length) { + complete(out) + } + } else { + complete(out) + } + } + + setHandler(out, this) + } + + override def toString: String = "ArraySource" +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala index 0bf99be6e3a..801803e7421 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/IterableSource.scala @@ -21,12 +21,14 @@ import scala.collection.immutable import scala.util.control.NonFatal import org.apache.pekko +import pekko.annotation.InternalApi import pekko.stream.{ Attributes, Outlet, SourceShape, Supervision } import pekko.stream.ActorAttributes.SupervisionStrategy import pekko.stream.impl.ReactiveStreamsCompliance import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler } +@InternalApi private[pekko] final class IterableSource[T](val elements: immutable.Iterable[T]) extends GraphStage[SourceShape[T]] { ReactiveStreamsCompliance.requireNonNullElement(elements) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index dd3747a2441..c53881266c1 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -26,6 +26,7 @@ import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag import org.apache.pekko +import org.apache.pekko.stream.impl.fusing.ArraySource import pekko.{ Done, NotUsed } import pekko.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider } import pekko.annotation.ApiMayChange @@ -166,6 +167,15 @@ object Source { new Source(scaladsl.Source(scalaIterable)) } + /** + * Creates a `Source` from an array, if the array is empty, the stream is completed immediately, + * otherwise, every element of the array will be emitted sequentially. + * + * @since 1.1.0 + */ + def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new Source(scaladsl.Source.fromGraph( + new ArraySource[T](array))) + /** * Creates [[Source]] that represents integer values in range ''[start;end]'', step equals to 1. * It allows to create `Source` out of range as simply as on Scala `Source(1 to N)`