Skip to content

Commit

Permalink
wrap in withExecutionContext
Browse files Browse the repository at this point in the history
  • Loading branch information
phearnot committed Feb 28, 2024
1 parent 33a7913 commit bc208ef
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
5 changes: 3 additions & 2 deletions node/src/main/scala/com/wavesplatform/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import kamon.Kamon
import kamon.instrumentation.executor.ExecutorInstrumentation
import monix.eval.{Coeval, Task}
import monix.execution.schedulers.{ExecutorScheduler, SchedulerService}
import monix.execution.{Scheduler, UncaughtExceptionReporter}
import monix.execution.{ExecutionModel, Scheduler, UncaughtExceptionReporter}
import monix.reactive.Observable
import monix.reactive.subjects.ConcurrentSubject
import org.influxdb.dto.Point
Expand Down Expand Up @@ -371,7 +371,8 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con
val heavyRequestScheduler = Scheduler(
if (settings.config.getBoolean("kamon.enable"))
ExecutorInstrumentation.instrument(heavyRequestExecutor, "heavy-request-executor")
else heavyRequestExecutor
else heavyRequestExecutor,
ExecutionModel.AlwaysAsyncExecution
)

val serverRequestTimeout = FiniteDuration(settings.config.getDuration("akka.http.server.request-timeout").getSeconds, TimeUnit.SECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package com.wavesplatform.api.http

import akka.NotUsed
import akka.http.scaladsl.marshalling.{ToResponseMarshallable, ToResponseMarshaller}
import akka.http.scaladsl.server.Directives.{complete, handleExceptions}
import akka.http.scaladsl.server.Directives.{complete, handleExceptions, withExecutionContext}
import akka.http.scaladsl.server.{ExceptionHandler, Route}
import akka.stream.scaladsl.Source
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable

import scala.concurrent.ExecutionContext.fromExecutor
import scala.concurrent.TimeoutException
import scala.concurrent.duration.FiniteDuration

Expand All @@ -27,9 +28,11 @@ class RouteTimeout(timeout: FiniteDuration)(implicit sc: Scheduler) extends ApiM
.map(Source(_).map(f))(sc)
}

def executeFromObservable[T](observable: Observable[T])(implicit m: ToResponseMarshaller[Source[T, NotUsed]]): Route = {
handleExceptions(handler) & complete(Source.fromPublisher(observable.toReactivePublisher(sc)).initialTimeout(timeout))
}
def executeFromObservable[T](observable: Observable[T])(implicit m: ToResponseMarshaller[Source[T, NotUsed]]): Route =
withExecutionContext(fromExecutor(sc)) {
handleExceptions(handler) &
complete(Source.fromPublisher(observable.toReactivePublisher(sc)).initialTimeout(timeout))
}

def execute[T](task: Task[T])(f: (Task[T], Scheduler) => ToResponseMarshallable): Route =
handleExceptions(handler) & complete(f(task.timeout(timeout), sc))
Expand Down

0 comments on commit bc208ef

Please sign in to comment.