Skip to content

Commit

Permalink
chore: Add Source.fromArray operator for Java dsl. (#1248)
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin authored Apr 30, 2024
1 parent b5eb6ff commit 2c3e9b4
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 0 deletions.
28 changes: 28 additions & 0 deletions docs/src/main/paradox/stream/operators/Source/fromArray.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Source.fromArray

Stream the values of an `array`.

@ref[Source operators](../index.md#source-operators)

## Signature

@apidoc[Source.from](Source$) { java="#fromArray(java.lang.Object[])" }

## Description

Stream the values of a Java `array`.

## Examples

Java
: @@snip [from.java](/docs/src/test/java/jdocs/stream/operators/SourceDocExamples.java) { #imports #source-from-array }

## Reactive Streams semantics

@@@div { .callout }

**emits** the next value of the array

**completes** when the last element of the seq has been emitted

@@@
2 changes: 2 additions & 0 deletions docs/src/main/paradox/stream/operators/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ These built-in sources are available from @scala[`org.apache.pekko.stream.scalad
|Source|<a name="empty"></a>@ref[empty](Source/empty.md)|Complete right away without ever emitting any elements.|
|Source|<a name="failed"></a>@ref[failed](Source/failed.md)|Fail directly with a user specified exception.|
|Source|<a name="from"></a>@ref[@scala[apply]@java[from]](Source/from.md)|Stream the values of an @scala[`immutable.Seq`]@java[`Iterable`].|
|Source|<a name="fromarray"></a>@ref[fromArray](Source/fromArray.md)|Stream the values of an `array`.|
|Source|<a name="fromcompletionstage"></a>@ref[fromCompletionStage](Source/fromCompletionStage.md)|Deprecated by @ref[`Source.completionStage`](Source/completionStage.md).|
|Source|<a name="fromfuture"></a>@ref[fromFuture](Source/fromFuture.md)|Deprecated by @ref[`Source.future`](Source/future.md).|
|Source|<a name="fromfuturesource"></a>@ref[fromFutureSource](Source/fromFutureSource.md)|Deprecated by @ref[`Source.futureSource`](Source/futureSource.md).|
Expand Down Expand Up @@ -468,6 +469,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [foreachAsync](Sink/foreachAsync.md)
* [foreachParallel](Sink/foreachParallel.md)
* [from](Source/from.md)
* [fromArray](Source/fromArray.md)
* [fromCompletionStage](Source/fromCompletionStage.md)
* [fromFile](FileIO/fromFile.md)
* [fromFuture](Source/fromFuture.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ public static void fromExample() {
// #source-from-example
}

private static void fromArrayExample() {
final ActorSystem system = null;
// #source-from-array
Source<String, NotUsed> words = Source.fromArray("Hello world".split("\\s"));
words.runForeach(System.out::println, system);
// #source-from-array
}

static void rangeExample() {

final ActorSystem system = ActorSystem.create("Source");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,26 @@ public void mustBeAbleToUseSimpleOperators() {
probe.expectMsgEquals("de");
}

@Test
public void mustBeAbleToCompleteWhenArrayIsEmpty() {
Source.fromArray(new String[] {})
.runWith(TestSink.probe(system), system)
.ensureSubscription()
.expectComplete();
}

@Test
public void mustBeAbleToEmitEveryArrayElementSequentially() {
Source.fromArray(new String[] {"a", "b", "c"})
.runWith(TestSink.probe(system), system)
.ensureSubscription()
.request(3)
.expectNext("a")
.expectNext("b")
.expectNext("c")
.expectComplete();
}

@Test
public void mustBeAbleToUseVoidTypeInForeach() {
final TestKit probe = new TestKit(system);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ import pekko.stream.Attributes._

val publisherSource = name("publisherSource")
val iterableSource = name("iterableSource")
val arraySource = name("arraySource")
val iterateSource = name("iterateSource")
val cycledSource = name("cycledSource")
val futureSource = name("futureSource")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.impl.fusing

import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.stream.{ Attributes, Outlet, SourceShape }
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }

@InternalApi
private[pekko] final class ArraySource[T](elements: Array[T]) extends GraphStage[SourceShape[T]] {
require(elements ne null, "array must not be null")
override protected def initialAttributes: Attributes = DefaultAttributes.arraySource
private val out = Outlet[T]("ArraySource.out")
override val shape: SourceShape[T] = SourceShape(out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
private var index: Int = 0

override def preStart(): Unit = if (elements.isEmpty) completeStage()

override def onPull(): Unit = {
if (index < elements.length) {
push(out, elements(index))
index += 1
if (index == elements.length) {
complete(out)
}
} else {
complete(out)
}
}

setHandler(out, this)
}

override def toString: String = "ArraySource"
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import scala.collection.immutable
import scala.util.control.NonFatal

import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.stream.{ Attributes, Outlet, SourceShape, Supervision }
import pekko.stream.ActorAttributes.SupervisionStrategy
import pekko.stream.impl.ReactiveStreamsCompliance
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.stage.{ GraphStage, GraphStageLogic, OutHandler }

@InternalApi
private[pekko] final class IterableSource[T](val elements: immutable.Iterable[T]) extends GraphStage[SourceShape[T]] {
ReactiveStreamsCompliance.requireNonNullElement(elements)

Expand Down
10 changes: 10 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.concurrent.duration.FiniteDuration
import scala.reflect.ClassTag

import org.apache.pekko
import org.apache.pekko.stream.impl.fusing.ArraySource
import pekko.{ Done, NotUsed }
import pekko.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider }
import pekko.annotation.ApiMayChange
Expand Down Expand Up @@ -166,6 +167,15 @@ object Source {
new Source(scaladsl.Source(scalaIterable))
}

/**
* Creates a `Source` from an array, if the array is empty, the stream is completed immediately,
* otherwise, every element of the array will be emitted sequentially.
*
* @since 1.1.0
*/
def fromArray[T](array: Array[T]): javadsl.Source[T, NotUsed] = new Source(scaladsl.Source.fromGraph(
new ArraySource[T](array)))

/**
* Creates [[Source]] that represents integer values in range ''[start;end]'', step equals to 1.
* It allows to create `Source` out of range as simply as on Scala `Source(1 to N)`
Expand Down

0 comments on commit 2c3e9b4

Please sign in to comment.