From 7782cf55e83dc3c2c2b2675ec91e1fa6a90586ca Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Mon, 6 Jan 2025 10:10:51 +0800 Subject: [PATCH] chore: Use more lambda instread in stream module. (#1686) --- .../scala/org/apache/pekko/stream/impl/LazySource.scala | 6 +----- .../pekko/stream/impl/fusing/ActorGraphInterpreter.scala | 5 +---- .../apache/pekko/stream/impl/fusing/StreamOfStreams.scala | 4 +--- .../org/apache/pekko/stream/javadsl/DelayStrategy.scala | 4 +--- .../main/scala/org/apache/pekko/stream/javadsl/Graph.scala | 6 ++---- .../org/apache/pekko/stream/scaladsl/DelayStrategy.scala | 4 +--- 6 files changed, 7 insertions(+), 22 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/LazySource.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/LazySource.scala index dc68fc139ff..0f7ed2661b6 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/LazySource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/LazySource.scala @@ -74,11 +74,7 @@ import pekko.stream.stage._ } }) - subSink.setHandler(new InHandler { - override def onPush(): Unit = { - push(out, subSink.grab()) - } - }) + subSink.setHandler(() => push(out, subSink.grab())) try { val matVal = subFusingMaterializer.materialize(source.toMat(subSink.sink)(Keep.left), inheritedAttributes) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala index ad5f3b325ab..817bf3cd899 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala @@ -666,10 +666,7 @@ import org.reactivestreams.Subscription else { waitingForShutdown = true val subscriptionTimeout = attributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout - mat.scheduleOnce(subscriptionTimeout, - new Runnable { - override def run(): Unit = self ! Abort(GraphInterpreterShell.this) - }) + mat.scheduleOnce(subscriptionTimeout, () => self ! Abort(GraphInterpreterShell.this)) } } else if (interpreter.isSuspended && !resumeScheduled) sendResume(!usingShellLimit) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala index 4c9b9cea7b2..8b536f7357e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala @@ -199,9 +199,7 @@ import pekko.util.ccompat.JavaConverters._ setKeepGoing(false) cancelTimer(SubscriptionTimer) pull(in) - tailSource.setHandler(new OutHandler { - override def onPull(): Unit = pull(in) - }) + tailSource.setHandler(() => pull(in)) } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/DelayStrategy.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/DelayStrategy.scala index c36a98b796c..2e1f5374d67 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/DelayStrategy.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/DelayStrategy.scala @@ -45,9 +45,7 @@ object DelayStrategy { * Fixed delay strategy, always returns constant delay for any element. * @param delay value of the delay */ - def fixedDelay[T](delay: java.time.Duration): DelayStrategy[T] = new DelayStrategy[T] { - override def nextDelay(elem: T): java.time.Duration = delay - } + def fixedDelay[T](delay: java.time.Duration): DelayStrategy[T] = (_: T) => delay /** * Strategy with linear increasing delay. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala index b486112c416..20ad8438db1 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala @@ -477,8 +477,7 @@ object Zip { def create[A, B]: Graph[FanInShape2[A, B, A Pair B], NotUsed] = ZipWith.create(_toPair.asInstanceOf[Function2[A, B, A Pair B]]) - private[this] final val _toPair: Function2[Any, Any, Any Pair Any] = - new Function2[Any, Any, Any Pair Any] { override def apply(a: Any, b: Any): Any Pair Any = new Pair(a, b) } + private[this] final val _toPair: Function2[Any, Any, Any Pair Any] = (a: Any, b: Any) => new Pair(a, b) } /** @@ -506,8 +505,7 @@ object ZipLatest { def create[A, B]: Graph[FanInShape2[A, B, A Pair B], NotUsed] = ZipLatestWith.create(_toPair.asInstanceOf[Function2[A, B, A Pair B]]) - private[this] final val _toPair: Function2[Any, Any, Any Pair Any] = - new Function2[Any, Any, Any Pair Any] { override def apply(a: Any, b: Any): Any Pair Any = new Pair(a, b) } + private[this] final val _toPair: Function2[Any, Any, Any Pair Any] = (a: Any, b: Any) => new Pair(a, b) } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/DelayStrategy.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/DelayStrategy.scala index 15be91fbc20..549a1583948 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/DelayStrategy.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/DelayStrategy.scala @@ -36,9 +36,7 @@ object DelayStrategy { * Fixed delay strategy, always returns constant delay for any element. * @param delay value of the delay */ - def fixedDelay(delay: FiniteDuration): DelayStrategy[Any] = new DelayStrategy[Any] { - override def nextDelay(elem: Any): FiniteDuration = delay - } + def fixedDelay(delay: FiniteDuration): DelayStrategy[Any] = (_: Any) => delay /** * Strategy with linear increasing delay.