Skip to content

Commit

Permalink
Add alsoTo/alsoToContext to FlowWithContext/SourceWithContext
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Aug 23, 2024
1 parent 60c480a commit 302f38f
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 1 deletion.
1 change: 1 addition & 0 deletions docs/src/main/paradox/release-notes/releases-1.1.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The Stream API has been updated to add some extra functions.
* add Source.iterate operator ([PR1244](https://github.com/apache/pekko/pull/1244))
* added extra retry operators that allow users to provide a predicate to decide whether to retry based on the exception ([PR1269](https://github.com/apache/pekko/pull/1269))
* add optionalVia/unsafeOptionalDataVia operators ([PR1422](https://github.com/apache/pekko/pull/1422))
* add alsoTo/alsoToContext operators to `SourceWithContext`/`FlowWithContext` ([PR-1443](https://github.com/apache/pekko/pull/1443))

The Stream Testkit Java DSL has some extra functions.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with Matchers {
"getClass",
"shape",
"identityTraversalBuilder",
"contramapImpl",
// futures in scaladsl vs completion stage in javadsl
"lazyFutureSource", // lazyCompletionStageSource
"futureSource", // completionStageSource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import org.apache.pekko
import pekko.stream.testkit.StreamSpec
import pekko.stream.testkit.scaladsl.TestSink

import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._

class FlowWithContextSpec extends StreamSpec {

"A FlowWithContext" must {
Expand Down Expand Up @@ -77,6 +80,52 @@ class FlowWithContextSpec extends StreamSpec {
.expectError(boom)
}

"pass through all data when using alsoTo" in {
val listBuffer = new ListBuffer[String]()
Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L)))
.asSourceWithContext(_.offset)
.via(
FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), (String, Long)] { case (data, offset) =>
(data.data.toLowerCase, offset)
})
.alsoTo(Sink.foreach(string => listBuffer.+=(string)))
)
.toMat(TestSink.probe[(String, Long)])(Keep.right)
.run()
.request(4)
.expectNext(("a", 1L))
.expectNext(("b", 2L))
.expectNext(("d", 3L))
.expectNext(("c", 4L))
.expectComplete()
.within(10.seconds) {
listBuffer should contain theSameElementsInOrderAs List("a", "b", "d", "c")
}
}

"pass through all data when using alsoToContext" in {
val listBuffer = new ListBuffer[Long]()
Source(Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L)))
.asSourceWithContext(_.offset)
.via(
FlowWithContext.fromTuples(Flow.fromFunction[(Message, Long), (String, Long)] { case (data, offset) =>
(data.data.toLowerCase, offset)
})
.alsoToContext(Sink.foreach(offset => listBuffer.+=(offset)))
)
.toMat(TestSink.probe[(String, Long)])(Keep.right)
.run()
.request(4)
.expectNext(("a", 1L))
.expectNext(("b", 2L))
.expectNext(("d", 3L))
.expectNext(("c", 4L))
.expectComplete()
.within(10.seconds) {
listBuffer should contain theSameElementsInOrderAs List(1L, 2L, 3L, 4L)
}
}

"keep the same order for data and context when using unsafeDataVia" in {
val data = List(("1", 1), ("2", 2), ("3", 3), ("4", 4))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import org.apache.pekko
import pekko.stream.testkit.StreamSpec
import pekko.stream.testkit.scaladsl.TestSink

import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._

case class Message(data: String, offset: Long)

class SourceWithContextSpec extends StreamSpec {
Expand Down Expand Up @@ -74,6 +77,44 @@ class SourceWithContextSpec extends StreamSpec {
.expectComplete()
}

"pass through all data when using alsoTo" in {
val listBuffer = new ListBuffer[Message]()
val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L))
Source(messages)
.asSourceWithContext(_.offset)
.alsoTo(Sink.foreach(message => listBuffer.+=(message)))
.toMat(TestSink.probe[(Message, Long)])(Keep.right)
.run()
.request(4)
.expectNext((Message("A", 1L), 1L))
.expectNext((Message("B", 2L), 2L))
.expectNext((Message("D", 3L), 3L))
.expectNext((Message("C", 4L), 4L))
.expectComplete()
.within(10.seconds) {
listBuffer.toVector shouldBe messages
}
}

"pass through all data when using alsoToContext" in {
val listBuffer = new ListBuffer[Long]()
val messages = Vector(Message("A", 1L), Message("B", 2L), Message("D", 3L), Message("C", 4L))
Source(messages)
.asSourceWithContext(_.offset)
.alsoToContext(Sink.foreach(offset => listBuffer.+=(offset)))
.toMat(TestSink.probe[(Message, Long)])(Keep.right)
.run()
.request(4)
.expectNext((Message("A", 1L), 1L))
.expectNext((Message("B", 2L), 2L))
.expectNext((Message("D", 3L), 3L))
.expectNext((Message("C", 4L), 4L))
.expectComplete()
.within(10.seconds) {
listBuffer.toVector shouldBe messages.map(_.offset)
}
}

"pass through contexts via a FlowWithContext" in {

def flowWithContext[T] = FlowWithContext[T, Long]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.alsoTo")
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.pekko.stream.scaladsl.FlowWithContextOps.alsoToContext")
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In
combine: (Mat, Mat2) => Mat3): FlowWithContext[In, CtxIn, Out2, Ctx2, Mat3] =
new FlowWithContext(delegate.viaMat(flow)(combine))

override def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out, CtxOut] =
FlowWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._1)))

override def alsoToContext(that: Graph[SinkShape[CtxOut], _]): Repr[Out, CtxOut] =
FlowWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._2)))

/**
* Context-preserving variant of [[pekko.stream.scaladsl.Flow.withAttributes]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,22 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
def viaMat[Out2, Ctx2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])(
combine: (Mat, Mat2) => Mat3): ReprMat[Out2, Ctx2, Mat3]

/**
* Data variant of [[pekko.stream.scaladsl.FlowOps.alsoTo]]
*
* @see [[pekko.stream.scaladsl.FlowOps.alsoTo]]
* @since 1.1.0
*/
def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out, Ctx]

/**
* Context variant of [[pekko.stream.scaladsl.FlowOps.alsoTo]]
*
* @see [[pekko.stream.scaladsl.FlowOps.alsoTo]]
* @since 1.1.0
*/
def alsoToContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx]

/**
* Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.map]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ final class Sink[-In, +Mat](override val traversalBuilder: LinearTraversalBuilde
* '''Cancels when''' original [[Sink]] cancels
* @since 1.1.0
*/
def contramap[In2](f: In2 => In): Sink[In2, Mat] = Flow.fromFunction(f).toMat(this)(Keep.right)
def contramap[In2](f: In2 => In): Sink[In2, Mat] = Sink.contramapImpl(this, f)

/**
* Connect this `Sink` to a `Source` and run it. The returned value is the materialized value
Expand Down Expand Up @@ -138,6 +138,10 @@ object Sink {
/** INTERNAL API */
def shape[T](name: String): SinkShape[T] = SinkShape(Inlet(name + ".in"))

@InternalApi private[pekko] final def contramapImpl[In, In2, Mat](
sink: Graph[SinkShape[In], Mat], f: In2 => In): Sink[In2, Mat] =
Flow.fromFunction(f).toMat(sink)(Keep.right)

/**
* A graph with the shape of a sink logically is a sink, this method makes
* it so also in type.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc
def toMat[Mat2, Mat3](sink: Graph[SinkShape[(Out, Ctx)], Mat2])(combine: (Mat, Mat2) => Mat3): RunnableGraph[Mat3] =
delegate.toMat(sink)(combine)

override def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out, Ctx] =
SourceWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._1)))

override def alsoToContext(that: Graph[SinkShape[Ctx], _]): Repr[Out, Ctx] =
SourceWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._2)))

/**
* Connect this [[pekko.stream.scaladsl.SourceWithContext]] to a [[pekko.stream.scaladsl.Sink]] and run it.
* The returned value is the materialized value of the `Sink`.
Expand Down

0 comments on commit 302f38f

Please sign in to comment.