diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index e2db2e8daf..a12a0cd9c1 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -1149,7 +1149,8 @@ trait FlowOps[+Out, +Mat] { * @param onComplete a function that transforms the ongoing state into an optional output element */ def statefulMap[S, T](create: () => S)(f: (S, Out) => (S, T), onComplete: S => Option[T]): Repr[T] = - via(new StatefulMap[S, Out, T](create, f, onComplete).withAttributes(DefaultAttributes.statefulMap)) + via(new StatefulMap[S, Out, T](create, f, onComplete) + .withAttributes(DefaultAttributes.statefulMap and SourceLocation.forLambda(f))) /** * Transform each stream element with the help of a resource. @@ -1358,12 +1359,12 @@ trait FlowOps[+Out, +Mat] { def mapAsyncPartitioned[T, P](parallelism: Int)( partitioner: Out => P)( f: (Out, P) => Future[T]): Repr[T] = { - (if (parallelism == 1) { - via(MapAsyncUnordered(1, elem => f(elem, partitioner(elem)))) - } else { - via(new MapAsyncPartitioned(parallelism, orderedOutput = true, partitioner, f)) - }) - .withAttributes(DefaultAttributes.mapAsyncPartition and SourceLocation.forLambda(f)) + val graph: Graph[FlowShape[Out, T], _] = if (parallelism == 1) { + MapAsyncUnordered(1, elem => f(elem, partitioner(elem))) + } else { + new MapAsyncPartitioned(parallelism, orderedOutput = true, partitioner, f) + } + via(graph.withAttributes(DefaultAttributes.mapAsyncPartition and SourceLocation.forLambda(f))) } /** @@ -1396,11 +1397,12 @@ trait FlowOps[+Out, +Mat] { def mapAsyncPartitionedUnordered[T, P](parallelism: Int)( partitioner: Out => P)( f: (Out, P) => Future[T]): Repr[T] = { - (if (parallelism == 1) { - via(MapAsyncUnordered(1, elem => f(elem, partitioner(elem)))) - } else { - via(new MapAsyncPartitioned(parallelism, orderedOutput = false, partitioner, f)) - }).withAttributes(DefaultAttributes.mapAsyncPartitionUnordered and SourceLocation.forLambda(f)) + val graph: Graph[FlowShape[Out, T], _] = if (parallelism == 1) { + MapAsyncUnordered(1, elem => f(elem, partitioner(elem))) + } else { + new MapAsyncPartitioned(parallelism, orderedOutput = false, partitioner, f) + } + via(graph.withAttributes(DefaultAttributes.mapAsyncPartition and SourceLocation.forLambda(f))) } /**