diff --git a/docs/src/main/paradox/release-notes/releases-1.1.md b/docs/src/main/paradox/release-notes/releases-1.1.md index 9571bdda364..5fdee4669f9 100644 --- a/docs/src/main/paradox/release-notes/releases-1.1.md +++ b/docs/src/main/paradox/release-notes/releases-1.1.md @@ -42,6 +42,7 @@ The Stream API has been updated to add some extra functions. * add Source.iterate operator ([PR1244](https://github.com/apache/pekko/pull/1244)) * added extra retry operators that allow users to provide a predicate to decide whether to retry based on the exception ([PR1269](https://github.com/apache/pekko/pull/1269)) * add optionalVia/unsafeOptionalDataVia operators ([PR1422](https://github.com/apache/pekko/pull/1422)) +* add alsoTo/alsoToContext operators to `SourceWithContext`/`FlowWithContext` ([PR-1443](https://github.com/apache/pekko/pull/1443)) The Stream Testkit Java DSL has some extra functions. diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala index 35f2eb70142..0d3de6261f6 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/DslFactoriesConsistencySpec.scala @@ -33,6 +33,7 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with Matchers { "getClass", "shape", "identityTraversalBuilder", + "contramapImpl", // futures in scaladsl vs completion stage in javadsl "lazyFutureSource", // lazyCompletionStageSource "futureSource", // completionStageSource diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala index 9ac07e2a39b..ed3d61ab6fd 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala @@ -19,6 +19,9 @@ import org.apache.pekko import pekko.stream.testkit.StreamSpec import pekko.stream.testkit.scaladsl.TestSink +import scala.collection.mutable.ListBuffer +import scala.concurrent.duration._ + class FlowWithContextSpec extends StreamSpec { "A FlowWithContext" must { @@ -77,6 +80,52 @@ class FlowWithContextSpec extends StreamSpec { .expectError(boom) } + "pass through all data when using alsoTo" in { + val listBuffer = new ListBuffer[String]() + Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L))) + .asSourceWithContext(_.offset) + .via( + FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), (String, Long)] { case (data, offset) => + (data.data.toLowerCase, offset) + }) + .alsoTo(Sink.foreach(string => listBuffer.+=(string))) + ) + .toMat(TestSink.probe[(String, Long)])(Keep.right) + .run() + .request(4) + .expectNext(("a", 1L)) + .expectNext(("b", 2L)) + .expectNext(("d", 3L)) + .expectNext(("c", 4L)) + .expectComplete() + .within(10.seconds) { + listBuffer should contain theSameElementsInOrderAs List("a", "b", "d", "c") + } + } + + "pass through all data when using alsoToContext" in { + val listBuffer = new ListBuffer[Long]() + Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L))) + .asSourceWithContext(_.offset) + .via( + FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), (String, Long)] { case (data, offset) => + (data.data.toLowerCase, offset) + }) + .alsoToContext(Sink.foreach(offset => listBuffer.+=(offset))) + ) + .toMat(TestSink.probe[(String, Long)])(Keep.right) + .run() + .request(4) + .expectNext(("a", 1L)) + .expectNext(("b", 2L)) + .expectNext(("d", 3L)) + .expectNext(("c", 4L)) + .expectComplete() + .within(10.seconds) { + listBuffer should contain theSameElementsInOrderAs List(1L, 2L, 3L, 4L) + } + } + "keep the same order for data and context when using unsafeDataVia" in { val data = List(("1", 1), ("2", 2), ("3", 3), ("4", 4)) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala index 62dda8a96fa..0c832f2beb5 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala @@ -19,6 +19,9 @@ import org.apache.pekko import pekko.stream.testkit.StreamSpec import pekko.stream.testkit.scaladsl.TestSink +import scala.collection.mutable.ListBuffer +import scala.concurrent.duration._ + case class Message(data: String, offset: Long) class SourceWithContextSpec extends StreamSpec { @@ -74,6 +77,44 @@ class SourceWithContextSpec extends StreamSpec { .expectComplete() } + "pass through all data when using alsoTo" in { + val listBuffer = new ListBuffer[Message]() + val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L)) + Source(messages) + .asSourceWithContext(_.offset) + .alsoTo(Sink.foreach(message => listBuffer.+=(message))) + .toMat(TestSink.probe[(Message, Long)])(Keep.right) + .run() + .request(4) + .expectNext((Message("A", 1L), 1L)) + .expectNext((Message("B", 2L), 2L)) + .expectNext((Message("D", 3L), 3L)) + .expectNext((Message("C", 4L), 4L)) + .expectComplete() + .within(10.seconds) { + listBuffer.toVector shouldBe messages + } + } + + "pass through all data when using alsoToContext" in { + val listBuffer = new ListBuffer[Long]() + val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L)) + Source(messages) + .asSourceWithContext(_.offset) + .alsoToContext(Sink.foreach(offset => listBuffer.+=(offset))) + .toMat(TestSink.probe[(Message, Long)])(Keep.right) + .run() + .request(4) + .expectNext((Message("A", 1L), 1L)) + .expectNext((Message("B", 2L), 2L)) + .expectNext((Message("D", 3L), 3L)) + .expectNext((Message("C", 4L), 4L)) + .expectComplete() + .within(10.seconds) { + listBuffer.toVector shouldBe messages.map(_.offset) + } + } + "pass through contexts via a FlowWithContext" in { def flowWithContext[T] = FlowWithContext[T, Long] diff --git a/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1443-also-alsoTo-source-flow-with-context.backwards.excludes b/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1443-also-alsoTo-source-flow-with-context.backwards.excludes new file mode 100644 index 00000000000..4006d402a19 --- /dev/null +++ b/stream/src/main/mima-filters/1.0.x.backwards.excludes/pr-1443-also-alsoTo-source-flow-with-context.backwards.excludes @@ -0,0 +1,2 @@ +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.alsoTo") +ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.alsoToContext") diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala index 94141c95b94..13df8743952 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala @@ -138,6 +138,12 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In combine: (Mat, Mat2) => Mat3): FlowWithContext[In, CtxIn, Out2, Ctx2, Mat3] = new FlowWithContext(delegate.viaMat(flow)(combine)) + override def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out, CtxOut] = + FlowWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._1))) + + override def alsoToContext(that: Graph[SinkShape[CtxOut], _]): Repr[Out, CtxOut] = + FlowWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._2))) + /** * Context-preserving variant of [[pekko.stream.scaladsl.Flow.withAttributes]]. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala index 8edcc22c203..fe06cf0142f 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala @@ -88,6 +88,22 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { def viaMat[Out2, Ctx2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])( combine: (Mat, Mat2) => Mat3): ReprMat[Out2, Ctx2, Mat3] + /** + * Data variant of [[pekko.stream.scaladsl.FlowOps.alsoTo]] + * + * @see [[pekko.stream.scaladsl.FlowOps.alsoTo]] + * @since 1.1.0 + */ + def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out, Ctx] + + /** + * Context variant of [[pekko.stream.scaladsl.FlowOps.alsoTo]] + * + * @see [[pekko.stream.scaladsl.FlowOps.alsoTo]] + * @since 1.1.0 + */ + def alsoToContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx] + /** * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.map]]. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index 492115eba01..cf8b7151d12 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -51,7 +51,7 @@ final class Sink[-In, +Mat](override val traversalBuilder: LinearTraversalBuilde * '''Cancels when''' original [[Sink]] cancels * @since 1.1.0 */ - def contramap[In2](f: In2 => In): Sink[In2, Mat] = Flow.fromFunction(f).toMat(this)(Keep.right) + def contramap[In2](f: In2 => In): Sink[In2, Mat] = Sink.contramapImpl(this, f) /** * Connect this `Sink` to a `Source` and run it. The returned value is the materialized value @@ -138,6 +138,10 @@ object Sink { /** INTERNAL API */ def shape[T](name: String): SinkShape[T] = SinkShape(Inlet(name + ".in")) + @InternalApi private[pekko] final def contramapImpl[In, In2, Mat]( + sink: Graph[SinkShape[In], Mat], f: In2 => In): Sink[In2, Mat] = + Flow.fromFunction(f).toMat(sink)(Keep.right) + /** * A graph with the shape of a sink logically is a sink, this method makes * it so also in type. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala index 59e7af530dc..be9c09f7e16 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala @@ -155,6 +155,12 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc def toMat[Mat2, Mat3](sink: Graph[SinkShape[(Out, Ctx)], Mat2])(combine: (Mat, Mat2) => Mat3): RunnableGraph[Mat3] = delegate.toMat(sink)(combine) + override def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out, Ctx] = + SourceWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._1))) + + override def alsoToContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx] = + SourceWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._2))) + /** * Connect this [[pekko.stream.scaladsl.SourceWithContext]] to a [[pekko.stream.scaladsl.Sink]] and run it. * The returned value is the materialized value of the `Sink`.