Skip to content

Commit

Permalink
chore: Tweak withAttribuets in Flow
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 7, 2025
1 parent 41b4cab commit aff004e
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,8 @@ trait FlowOps[+Out, +Mat] {
* @param onComplete a function that transforms the ongoing state into an optional output element
*/
def statefulMap[S, T](create: () => S)(f: (S, Out) => (S, T), onComplete: S => Option[T]): Repr[T] =
via(new StatefulMap[S, Out, T](create, f, onComplete).withAttributes(DefaultAttributes.statefulMap))
via(new StatefulMap[S, Out, T](create, f, onComplete)
.withAttributes(DefaultAttributes.statefulMap and SourceLocation.forLambda(f)))

/**
* Transform each stream element with the help of a resource.
Expand Down Expand Up @@ -1358,12 +1359,12 @@ trait FlowOps[+Out, +Mat] {
def mapAsyncPartitioned[T, P](parallelism: Int)(
partitioner: Out => P)(
f: (Out, P) => Future[T]): Repr[T] = {
(if (parallelism == 1) {
via(MapAsyncUnordered(1, elem => f(elem, partitioner(elem))))
} else {
via(new MapAsyncPartitioned(parallelism, orderedOutput = true, partitioner, f))
})
.withAttributes(DefaultAttributes.mapAsyncPartition and SourceLocation.forLambda(f))
val graph: Graph[FlowShape[Out, T], _] = if (parallelism == 1) {
MapAsyncUnordered(1, elem => f(elem, partitioner(elem)))
} else {
new MapAsyncPartitioned(parallelism, orderedOutput = true, partitioner, f)
}
via(graph.withAttributes(DefaultAttributes.mapAsyncPartition and SourceLocation.forLambda(f)))
}

/**
Expand Down Expand Up @@ -1396,11 +1397,12 @@ trait FlowOps[+Out, +Mat] {
def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
partitioner: Out => P)(
f: (Out, P) => Future[T]): Repr[T] = {
(if (parallelism == 1) {
via(MapAsyncUnordered(1, elem => f(elem, partitioner(elem))))
} else {
via(new MapAsyncPartitioned(parallelism, orderedOutput = false, partitioner, f))
}).withAttributes(DefaultAttributes.mapAsyncPartitionUnordered and SourceLocation.forLambda(f))
val graph: Graph[FlowShape[Out, T], _] = if (parallelism == 1) {
MapAsyncUnordered(1, elem => f(elem, partitioner(elem)))
} else {
new MapAsyncPartitioned(parallelism, orderedOutput = false, partitioner, f)
}
via(graph.withAttributes(DefaultAttributes.mapAsyncPartition and SourceLocation.forLambda(f)))
}

/**
Expand Down

0 comments on commit aff004e

Please sign in to comment.