Skip to content

Commit

Permalink
fix #1863
Browse files Browse the repository at this point in the history
  • Loading branch information
mathieuancelin committed Mar 21, 2024
1 parent 3aa85bf commit 0e17845
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 15 deletions.
24 changes: 19 additions & 5 deletions otoroshi/app/next/models/plugins.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ case class PluginIndex(
matchRoute: Option[Double] = None,
handlesTunnel: Option[Double] = None,
handlesWebsocket: Option[Double] = None,
handlesWebsocketBackend: Option[Double] = None,
callBackend: Option[Double] = None
) {
def json: JsValue = PluginIndex.format.writes(this)
Expand Down Expand Up @@ -541,11 +542,24 @@ case class NgContextualPlugins(
) ++ plsWithoutIndex
}

lazy val hasWebsocketPlugins = websocketPlugins.nonEmpty
lazy val hasNoWebsocketPlugins = websocketPlugins.isEmpty
lazy val hasTunnelHandlerPlugin = tunnelHandlerPlugins.nonEmpty
lazy val tunnelHandlerPlugin = tunnelHandlerPlugins.head
lazy val tunnelHandlerPluginOption = tunnelHandlerPlugins.headOption
lazy val websocketBackendPlugins = {
val pls = allPlugins
.map(inst => (inst, inst.getPlugin[NgWebsocketBackendPlugin]))
.collect { case (inst, Some(plugin)) =>
NgPluginWrapper.NgSimplePluginWrapper(inst, plugin)
}
val (plsWithIndex, plsWithoutIndex) = pls.partition(_.instance.pluginIndex.exists(_.handlesWebsocketBackend.isDefined))
plsWithIndex.sortWith((a, b) =>
a.instance.pluginIndex.get.handlesWebsocketBackend.get.compareTo(b.instance.pluginIndex.get.handlesWebsocketBackend.get) < 0
) ++ plsWithoutIndex
}

lazy val hasWebsocketPlugins = websocketPlugins.nonEmpty
lazy val hasNoWebsocketPlugins = websocketPlugins.isEmpty
lazy val hasWebsocketBackendPlugins = websocketBackendPlugins.nonEmpty
lazy val hasTunnelHandlerPlugin = tunnelHandlerPlugins.nonEmpty
lazy val tunnelHandlerPlugin = tunnelHandlerPlugins.head
lazy val tunnelHandlerPluginOption = tunnelHandlerPlugins.headOption

lazy val backendCallPlugins = {
val pls = allPlugins
Expand Down
22 changes: 12 additions & 10 deletions otoroshi/app/next/plugins/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package otoroshi.next.plugins.api
import akka.Done
import akka.http.scaladsl.model.{StatusCodes, Uri}
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Source}
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.util.ByteString
import com.github.blemale.scaffeine.{Cache, Scaffeine}
import otoroshi.env.Env
Expand All @@ -18,14 +18,7 @@ import otoroshi.utils.TypedMap
import otoroshi.utils.http.WSCookieWithSameSite
import otoroshi.utils.syntax.implicits._
import play.api.http.HttpEntity
import play.api.http.websocket.{
CloseMessage,
Message,
PingMessage,
PongMessage,
BinaryMessage => PlayWSBinaryMessage,
TextMessage => PlayWSTextMessage
}
import play.api.http.websocket.{CloseMessage, Message, PingMessage, PongMessage, BinaryMessage => PlayWSBinaryMessage, TextMessage => PlayWSTextMessage}
import play.api.libs.json._
import play.api.libs.ws.{DefaultWSCookie, WSCookie, WSResponse}
import play.api.mvc.{Cookie, RequestHeader, Result, Results}
Expand Down Expand Up @@ -1456,6 +1449,15 @@ trait NgWebsocketPlugin extends NgPlugin {
): Future[Either[NgWebsocketError, WebsocketMessage]] = message.rightf
}

trait NgWebsocketBackendPlugin extends NgPlugin {

import play.api.http.websocket.{ Message => PlayWSMessage }

def callBackend(ctx: NgWebsocketPluginContext)(implicit env: Env, ec: ExecutionContext): Flow[PlayWSMessage, PlayWSMessage, _] = {
Flow.fromSinkAndSource(Sink.ignore, Source.empty)
}
}

trait NgWebsocketValidatorPlugin extends NgWebsocketPlugin {}

case class NgWebsocketError(
Expand All @@ -1465,4 +1467,4 @@ case class NgWebsocketError(
)
object NgWebsocketError {
def apply(statusCode: Int, reason: String): NgWebsocketError = NgWebsocketError(statusCode.some, reason.some, None)
}
}
41 changes: 41 additions & 0 deletions otoroshi/app/next/proxy/engine.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2677,6 +2677,47 @@ class ProxyEngine() extends RequestHandler {
.map(r => Left(NgResultProxyEngineError(r)))
)
}
} else if (ctxPlugins.hasWebsocketBackendPlugins) {
val handler = ctxPlugins.websocketBackendPlugins.head
val wsEngine = if (ctxPlugins.hasWebsocketPlugins) {
new WebsocketEngine(route, ctxPlugins, rawRequest, finalTarget, attrs)
} else {
new WebsocketEngine(NgRoute.empty, NgContextualPlugins.empty(rawRequest), rawRequest, finalTarget, attrs)
}
val ctx = NgWebsocketPluginContext(
idx = 0,
snowflake = snowflake,
request = rawRequest,
route = route,
config = handler.instance.config.raw,
attrs = attrs,
target = finalTarget,
)
val flow: Flow[PlayWSMessage, PlayWSMessage, _] = handler.plugin.callBackend(ctx)
val outFlow: Flow[PlayWSMessage, PlayWSMessage, _] = flow.mapAsync(1) { mess =>
WebsocketMessage.PlayMessage(mess).asAkka.flatMap { m =>
wsEngine.handleResponse(m)(_ => ())
}
}
.takeWhile(_.isRight)
.collect {
case Right(message) => message
}
.mapAsync(1) { message =>
message.asPlay
}
val inFlow = Flow.fromFunction[PlayWSMessage, PlayWSMessage](identity).mapAsync(1) { mess =>
wsEngine.handleRequest(mess)(_ => ())
}
.takeWhile(_.isRight)
.collect {
case Right(message) => message
}
.mapAsync(1) { message =>
message.asPlay
}
val finalFlow = inFlow.via(outFlow)
FEither(finalFlow.right.vfuture)
} else {
if (route.useAkkaHttpWsClient && ctxPlugins.hasNoWebsocketPlugins) {
FEither(
Expand Down

0 comments on commit 0e17845

Please sign in to comment.