Skip to content

Commit

Permalink
experiments on #1287 - add plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
mathieuancelin committed Feb 4, 2025
1 parent 43d1ade commit 9742a5d
Show file tree
Hide file tree
Showing 4 changed files with 245 additions and 8 deletions.
2 changes: 1 addition & 1 deletion otoroshi/app/next/workflow/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import otoroshi.events.AnalyticEvent
import otoroshi.utils.syntax.implicits._
import play.api.libs.json._

import java.util.concurrent.{ConcurrentLinkedQueue, Executors}
import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.concurrent.TrieMap
import scala.concurrent._
import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
Expand Down
4 changes: 2 additions & 2 deletions otoroshi/app/next/workflow/extension.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ class WorkflowAdminExtension(val env: Env) extends AdminExtension {
private[workflow] lazy val datastores = new WorkflowConfigAdminExtensionDatastores(env, id)
private[workflow] lazy val states = new WorkflowConfigAdminExtensionState(env)

val engine = new WorkflowEngine(env)

override def id: AdminExtensionId = AdminExtensionId("otoroshi.extensions.Workflows")

override def name: String = "Otoroshi Workflows extension"
Expand Down Expand Up @@ -176,9 +178,7 @@ class WorkflowAdminExtension(val env: Env) extends AdminExtension {
val payload = bodyRaw.utf8String.parseJson
val input = payload.select("input").asString.parseJson.asObject
val workflow = payload.select("workflow").asObject
val engine = new WorkflowEngine(env)
val node = Node.from(workflow)
// Files.writeString(new File("./workflow_test.json").toPath, workflow.prettify)
engine.run(node, input).map { res =>
Results.Ok(res.json)
}
Expand Down
6 changes: 3 additions & 3 deletions otoroshi/app/next/workflow/nodes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ case class ParallelFlowsNode(json: JsObject) extends Node {
override def run(wfr: WorkflowRun)(implicit env: Env, ec: ExecutionContext): Future[Either[WorkflowError, JsValue]] = {
Future.sequence(paths.filter { o =>
if (o.select("predicate").isDefined) {
o.select("predicate").asOptBoolean.getOrElse(false)
WorkflowOperator.processOperators(o.select("predicate").asValue, wfr, env).asOptBoolean.getOrElse(false)
} else {
true
}
Expand All @@ -107,10 +107,10 @@ case class ParallelFlowsNode(json: JsObject) extends Node {
case class SwitchNode(json: JsObject) extends Node {
override def run(wfr: WorkflowRun)(implicit env: Env, ec: ExecutionContext): Future[Either[WorkflowError, JsValue]] = {
val paths: Seq[JsObject] = json.select("paths").asOpt[Seq[JsObject]].getOrElse(Seq.empty)
paths.find(o => o.select("predicate").asOptBoolean.getOrElse(false)) match {
paths.find(o => WorkflowOperator.processOperators(o.select("predicate").asValue, wfr, env).asOptBoolean.getOrElse(false)) match {
case None => JsNull.rightf
case Some(path) => {
val node =Node.from(path.select("node").asObject)
val node = Node.from(path.select("node").asObject)
node.internalRun(wfr).recover {
case t: Throwable => WorkflowError(s"caught exception on task '${id}' at path: '${node.id}'", None, Some(t)).left
}
Expand Down
241 changes: 239 additions & 2 deletions otoroshi/app/next/workflow/plugins.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,242 @@
package next.workflow
package otoroshi.next.workflow

class plugins {
import akka.stream.Materializer
import otoroshi.env.Env
import otoroshi.gateway.Errors
import otoroshi.next.plugins.BodyHelper
import otoroshi.next.plugins.api._
import otoroshi.next.proxy.NgProxyEngineError
import otoroshi.utils.syntax.implicits._
import otoroshi.wasm.WasmUtils
import play.api.libs.json._
import play.api.mvc.{Result, Results}

import scala.concurrent.{ExecutionContext, Future}

case class WorkflowBackendConfig(json: JsValue = Json.obj()) extends NgPluginConfig {
lazy val ref: String = json.select("ref").asString
lazy val async: Boolean = json.select("async").asOptBoolean.getOrElse(false)
}

object WorkflowBackendConfig {
val configFlow: Seq[String] = Seq("ref", "async")
val configSchema: Option[JsObject] = Some(Json.obj(
"async" -> Json.obj("type" -> "bool", "label" -> "Async"),
"ref" -> Json.obj(
"type" -> "select",
"label" -> s"Workflow",
"props" -> Json.obj(
"optionsFrom" -> s"/bo/api/proxy/apis/plugins.otoroshi.io/v1/workflows",
"optionsTransformer" -> Json.obj(
"label" -> "name",
"value" -> "id",
),
),
)
))
val format = new Format[WorkflowBackendConfig] {
override def reads(json: JsValue): JsResult[WorkflowBackendConfig] = JsSuccess(WorkflowBackendConfig(json))
override def writes(o: WorkflowBackendConfig): JsValue = o.json
}
}

class WorkflowBackend extends NgBackendCall {

override def useDelegates: Boolean = false
override def multiInstance: Boolean = true
override def core: Boolean = true
override def name: String = "Workflow Backend"
override def description: Option[String] = "This plugin uses a workflow as a backend".some
override def defaultConfigObject: Option[NgPluginConfig] = WorkflowBackendConfig().some

override def visibility: NgPluginVisibility = NgPluginVisibility.NgUserLand
override def categories: Seq[NgPluginCategory] = Seq(NgPluginCategory.Custom("Workflow"))
override def steps: Seq[NgStep] = Seq(NgStep.CallBackend)
override def noJsForm: Boolean = true
override def configFlow: Seq[String] = WorkflowBackendConfig.configFlow
override def configSchema: Option[JsObject] = WorkflowBackendConfig.configSchema

override def callBackend(
ctx: NgbBackendCallContext,
delegates: () => Future[Either[NgProxyEngineError, BackendCallResponse]]
)(implicit
env: Env,
ec: ExecutionContext,
mat: Materializer
): Future[Either[NgProxyEngineError, BackendCallResponse]] = {
val config = ctx
.cachedConfig(internalName)(WorkflowBackendConfig.format)
.getOrElse(WorkflowBackendConfig())
env.adminExtensions.extension[WorkflowAdminExtension].flatMap(ext => ext.states.workflow(config.ref).map(w => (ext, w))) match {
case None =>
Errors
.craftResponseResult(
"workflow not found !",
Results.Status(500),
ctx.rawRequest,
None,
None,
attrs = ctx.attrs,
maybeRoute = ctx.route.some
)
.map(r => NgProxyEngineError.NgResultProxyEngineError(r).left)
case Some((extension, workflow)) => {
ctx.wasmJson
.flatMap { input =>
val f = extension.engine.run(Node.from(workflow.config), input.asObject)
if (config.async) {
Right(BackendCallResponse(NgPluginHttpResponse.fromResult(Results.Ok(Json.obj("ack" -> true))), None)).vfuture
} else {
f.map { res =>
if (res.hasError) {
Right(BackendCallResponse(NgPluginHttpResponse.fromResult(Results.InternalServerError(Json.obj("error" -> res.error.get.json))), None))
} else {
val respBody = res.json
val status = respBody.select("status").asOpt[Int]
val headers = respBody.select("headers").asOpt[Map[String, String]]
val body = BodyHelper.extractBodyFromOpt(respBody)
if (status.isDefined && headers.isDefined && body.isDefined) {
val heads = headers.get.getIgnoreCase("Content-Length") match {
case None => headers.get - "Content-Type" - "content-type" ++ Map("Content-Length" -> s"${body.get.length}")
case Some(_) => headers.get - "Content-Type" - "content-type"
}
val ctype = headers.get.getIgnoreCase("Content-Type").getOrElse("application/json")
Right(BackendCallResponse(NgPluginHttpResponse.fromResult(Results.Status(status.get)(body.get).withHeaders(heads.toSeq: _*).as(ctype)), None))
} else {
Right(BackendCallResponse(NgPluginHttpResponse.fromResult(Results.Ok(body.get).as("application/json")), None))
}
}
}
}
}
}
}
}
}

class WorkflowRequestTransformer extends NgRequestTransformer {

override def steps: Seq[NgStep] = Seq(NgStep.TransformRequest)
override def categories: Seq[NgPluginCategory] = Seq(NgPluginCategory.Custom("Workflow"), NgPluginCategory.Transformations)
override def visibility: NgPluginVisibility = NgPluginVisibility.NgUserLand

override def multiInstance: Boolean = true
override def core: Boolean = true
override def usesCallbacks: Boolean = false
override def transformsRequest: Boolean = true
override def transformsResponse: Boolean = false
override def transformsError: Boolean = false
override def name: String = "Workflow Request Transformer"
override def description: Option[String] =
"Transform the content of the request with a workflow".some
override def defaultConfigObject: Option[NgPluginConfig] = WorkflowBackendConfig().some

override def transformRequest(
ctx: NgTransformerRequestContext
)(implicit env: Env, ec: ExecutionContext, mat: Materializer): Future[Either[Result, NgPluginHttpRequest]] = {
val config = ctx
.cachedConfig(internalName)(WorkflowBackendConfig.format)
.getOrElse(WorkflowBackendConfig())
env.adminExtensions.extension[WorkflowAdminExtension].flatMap(ext => ext.states.workflow(config.ref).map(w => (ext, w))) match {
case None =>
Errors
.craftResponseResult(
"workflow not found !",
Results.Status(500),
ctx.request,
None,
None,
attrs = ctx.attrs,
maybeRoute = ctx.route.some
)
.map(r => r.left)
case Some((extension, workflow)) => {
ctx.wasmJson
.flatMap { input =>
extension.engine.run(Node.from(workflow.config), input.asObject).map { res =>
if (res.hasError) {
Results.InternalServerError(Json.obj("error" -> res.error.get.json)).left
} else {
val response = res.json
val body = BodyHelper.extractBodyFromOpt(response)
Right(
ctx.otoroshiRequest.copy(
method = (response \ "method").asOpt[String].getOrElse(ctx.otoroshiRequest.method),
url = (response \ "url").asOpt[String].getOrElse(ctx.otoroshiRequest.url),
headers =
(response \ "headers").asOpt[Map[String, String]].getOrElse(ctx.otoroshiRequest.headers),
cookies = WasmUtils.convertJsonCookies(response).getOrElse(ctx.otoroshiRequest.cookies),
body = body.map(_.chunks(16 * 1024)).getOrElse(ctx.otoroshiRequest.body)
)
)
}
}
}
}
}
}
}

class WorkflowResponseTransformer extends NgRequestTransformer {

override def steps: Seq[NgStep] = Seq(NgStep.TransformResponse)
override def categories: Seq[NgPluginCategory] = Seq(NgPluginCategory.Custom("Workflow"), NgPluginCategory.Transformations)
override def visibility: NgPluginVisibility = NgPluginVisibility.NgUserLand

override def multiInstance: Boolean = true
override def core: Boolean = true
override def usesCallbacks: Boolean = false
override def transformsRequest: Boolean = false
override def transformsResponse: Boolean = true
override def transformsError: Boolean = false
override def isTransformRequestAsync: Boolean = false
override def isTransformResponseAsync: Boolean = true
override def name: String = "Workflow Response Transformer"
override def description: Option[String] =
"Transform the content of a response with a workflow".some
override def defaultConfigObject: Option[NgPluginConfig] = WorkflowBackendConfig().some

override def transformResponse(
ctx: NgTransformerResponseContext
)(implicit env: Env, ec: ExecutionContext, mat: Materializer): Future[Either[Result, NgPluginHttpResponse]] = {
val config = ctx
.cachedConfig(internalName)(WorkflowBackendConfig.format)
.getOrElse(WorkflowBackendConfig())
env.adminExtensions.extension[WorkflowAdminExtension].flatMap(ext => ext.states.workflow(config.ref).map(w => (ext, w))) match {
case None =>
Errors
.craftResponseResult(
"workflow not found !",
Results.Status(500),
ctx.request,
None,
None,
attrs = ctx.attrs,
maybeRoute = ctx.route.some
)
.map(r => r.left)
case Some((extension, workflow)) => {
ctx.wasmJson
.flatMap { input =>
extension.engine.run(Node.from(workflow.config), input.asObject).map { res =>
if (res.hasError) {
Results.InternalServerError(Json.obj("error" -> res.error.get.json)).left
} else {
val response = res.json
val body = BodyHelper.extractBodyFromOpt(response)
Right(
ctx.otoroshiResponse.copy(
status = (response \ "status").asOpt[Int].getOrElse(200),
headers =
(response \ "headers").asOpt[Map[String, String]].getOrElse(ctx.otoroshiResponse.headers),
cookies = WasmUtils.convertJsonCookies(response).getOrElse(ctx.otoroshiResponse.cookies),
body = body.map(_.chunks(16 * 1024)).getOrElse(ctx.otoroshiResponse.body)
)
)
}
}
}
}
}
}
}

0 comments on commit 9742a5d

Please sign in to comment.