Skip to content

Commit

Permalink
chore: Use more lambda instread in stream module. (#1686)
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin authored Jan 6, 2025
1 parent 477fd39 commit 7782cf5
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,7 @@ import pekko.stream.stage._
}
})

subSink.setHandler(new InHandler {
override def onPush(): Unit = {
push(out, subSink.grab())
}
})
subSink.setHandler(() => push(out, subSink.grab()))

try {
val matVal = subFusingMaterializer.materialize(source.toMat(subSink.sink)(Keep.left), inheritedAttributes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,10 +666,7 @@ import org.reactivestreams.Subscription
else {
waitingForShutdown = true
val subscriptionTimeout = attributes.mandatoryAttribute[ActorAttributes.StreamSubscriptionTimeout].timeout
mat.scheduleOnce(subscriptionTimeout,
new Runnable {
override def run(): Unit = self ! Abort(GraphInterpreterShell.this)
})
mat.scheduleOnce(subscriptionTimeout, () => self ! Abort(GraphInterpreterShell.this))
}
} else if (interpreter.isSuspended && !resumeScheduled) sendResume(!usingShellLimit)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,7 @@ import pekko.util.ccompat.JavaConverters._
setKeepGoing(false)
cancelTimer(SubscriptionTimer)
pull(in)
tailSource.setHandler(new OutHandler {
override def onPull(): Unit = pull(in)
})
tailSource.setHandler(() => pull(in))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ object DelayStrategy {
* Fixed delay strategy, always returns constant delay for any element.
* @param delay value of the delay
*/
def fixedDelay[T](delay: java.time.Duration): DelayStrategy[T] = new DelayStrategy[T] {
override def nextDelay(elem: T): java.time.Duration = delay
}
def fixedDelay[T](delay: java.time.Duration): DelayStrategy[T] = (_: T) => delay

/**
* Strategy with linear increasing delay.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,7 @@ object Zip {
def create[A, B]: Graph[FanInShape2[A, B, A Pair B], NotUsed] =
ZipWith.create(_toPair.asInstanceOf[Function2[A, B, A Pair B]])

private[this] final val _toPair: Function2[Any, Any, Any Pair Any] =
new Function2[Any, Any, Any Pair Any] { override def apply(a: Any, b: Any): Any Pair Any = new Pair(a, b) }
private[this] final val _toPair: Function2[Any, Any, Any Pair Any] = (a: Any, b: Any) => new Pair(a, b)
}

/**
Expand Down Expand Up @@ -506,8 +505,7 @@ object ZipLatest {
def create[A, B]: Graph[FanInShape2[A, B, A Pair B], NotUsed] =
ZipLatestWith.create(_toPair.asInstanceOf[Function2[A, B, A Pair B]])

private[this] final val _toPair: Function2[Any, Any, Any Pair Any] =
new Function2[Any, Any, Any Pair Any] { override def apply(a: Any, b: Any): Any Pair Any = new Pair(a, b) }
private[this] final val _toPair: Function2[Any, Any, Any Pair Any] = (a: Any, b: Any) => new Pair(a, b)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ object DelayStrategy {
* Fixed delay strategy, always returns constant delay for any element.
* @param delay value of the delay
*/
def fixedDelay(delay: FiniteDuration): DelayStrategy[Any] = new DelayStrategy[Any] {
override def nextDelay(elem: Any): FiniteDuration = delay
}
def fixedDelay(delay: FiniteDuration): DelayStrategy[Any] = (_: Any) => delay

/**
* Strategy with linear increasing delay.
Expand Down

0 comments on commit 7782cf5

Please sign in to comment.