diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/LazySource.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/LazySource.scala index dc68fc139f..0f7ed2661b 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/LazySource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/LazySource.scala @@ -74,11 +74,7 @@ import pekko.stream.stage._ } }) - subSink.setHandler(new InHandler { - override def onPush(): Unit = { - push(out, subSink.grab()) - } - }) + subSink.setHandler(() => push(out, subSink.grab())) try { val matVal = subFusingMaterializer.materialize(source.toMat(subSink.sink)(Keep.left), inheritedAttributes) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala index ad5f3b325a..817bf3cd89 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/ActorGraphInterpreter.scala @@ -666,10 +666,7 @@ import org.reactivestreams.Subscription else { waitingForShutdown = true val subscriptionTimeout = attributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout - mat.scheduleOnce(subscriptionTimeout, - new Runnable { - override def run(): Unit = self ! Abort(GraphInterpreterShell.this) - }) + mat.scheduleOnce(subscriptionTimeout, () => self ! Abort(GraphInterpreterShell.this)) } } else if (interpreter.isSuspended && !resumeScheduled) sendResume(!usingShellLimit) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala index 4c9b9cea7b..8b536f7357 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/StreamOfStreams.scala @@ -199,9 +199,7 @@ import pekko.util.ccompat.JavaConverters._ setKeepGoing(false) cancelTimer(SubscriptionTimer) pull(in) - tailSource.setHandler(new OutHandler { - override def onPull(): Unit = pull(in) - }) + tailSource.setHandler(() => pull(in)) } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/DelayStrategy.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/DelayStrategy.scala index c36a98b796..2e1f5374d6 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/DelayStrategy.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/DelayStrategy.scala @@ -45,9 +45,7 @@ object DelayStrategy { * Fixed delay strategy, always returns constant delay for any element. * @param delay value of the delay */ - def fixedDelay[T](delay: java.time.Duration): DelayStrategy[T] = new DelayStrategy[T] { - override def nextDelay(elem: T): java.time.Duration = delay - } + def fixedDelay[T](delay: java.time.Duration): DelayStrategy[T] = (_: T) => delay /** * Strategy with linear increasing delay. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala index b486112c41..20ad8438db 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala @@ -477,8 +477,7 @@ object Zip { def create[A, B]: Graph[FanInShape2[A, B, A Pair B], NotUsed] = ZipWith.create(_toPair.asInstanceOf[Function2[A, B, A Pair B]]) - private[this] final val _toPair: Function2[Any, Any, Any Pair Any] = - new Function2[Any, Any, Any Pair Any] { override def apply(a: Any, b: Any): Any Pair Any = new Pair(a, b) } + private[this] final val _toPair: Function2[Any, Any, Any Pair Any] = (a: Any, b: Any) => new Pair(a, b) } /** @@ -506,8 +505,7 @@ object ZipLatest { def create[A, B]: Graph[FanInShape2[A, B, A Pair B], NotUsed] = ZipLatestWith.create(_toPair.asInstanceOf[Function2[A, B, A Pair B]]) - private[this] final val _toPair: Function2[Any, Any, Any Pair Any] = - new Function2[Any, Any, Any Pair Any] { override def apply(a: Any, b: Any): Any Pair Any = new Pair(a, b) } + private[this] final val _toPair: Function2[Any, Any, Any Pair Any] = (a: Any, b: Any) => new Pair(a, b) } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/DelayStrategy.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/DelayStrategy.scala index 15be91fbc2..549a158394 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/DelayStrategy.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/DelayStrategy.scala @@ -36,9 +36,7 @@ object DelayStrategy { * Fixed delay strategy, always returns constant delay for any element. * @param delay value of the delay */ - def fixedDelay(delay: FiniteDuration): DelayStrategy[Any] = new DelayStrategy[Any] { - override def nextDelay(elem: Any): FiniteDuration = delay - } + def fixedDelay(delay: FiniteDuration): DelayStrategy[Any] = (_: Any) => delay /** * Strategy with linear increasing delay.