diff --git a/otoroshi/app/next/workflow/api.scala b/otoroshi/app/next/workflow/api.scala index ed70bb3c1..4ad361bcd 100644 --- a/otoroshi/app/next/workflow/api.scala +++ b/otoroshi/app/next/workflow/api.scala @@ -7,7 +7,7 @@ import otoroshi.events.AnalyticEvent import otoroshi.utils.syntax.implicits._ import play.api.libs.json._ -import java.util.concurrent.{ConcurrentLinkedQueue, Executors} +import java.util.concurrent.ConcurrentLinkedQueue import scala.collection.concurrent.TrieMap import scala.concurrent._ import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter diff --git a/otoroshi/app/next/workflow/extension.scala b/otoroshi/app/next/workflow/extension.scala index 2066bd41e..10986f6df 100644 --- a/otoroshi/app/next/workflow/extension.scala +++ b/otoroshi/app/next/workflow/extension.scala @@ -104,6 +104,8 @@ class WorkflowAdminExtension(val env: Env) extends AdminExtension { private[workflow] lazy val datastores = new WorkflowConfigAdminExtensionDatastores(env, id) private[workflow] lazy val states = new WorkflowConfigAdminExtensionState(env) + val engine = new WorkflowEngine(env) + override def id: AdminExtensionId = AdminExtensionId("otoroshi.extensions.Workflows") override def name: String = "Otoroshi Workflows extension" @@ -176,9 +178,7 @@ class WorkflowAdminExtension(val env: Env) extends AdminExtension { val payload = bodyRaw.utf8String.parseJson val input = payload.select("input").asString.parseJson.asObject val workflow = payload.select("workflow").asObject - val engine = new WorkflowEngine(env) val node = Node.from(workflow) - // Files.writeString(new File("./workflow_test.json").toPath, workflow.prettify) engine.run(node, input).map { res => Results.Ok(res.json) } diff --git a/otoroshi/app/next/workflow/nodes.scala b/otoroshi/app/next/workflow/nodes.scala index 35ff689a1..3577a4f1e 100644 --- a/otoroshi/app/next/workflow/nodes.scala +++ b/otoroshi/app/next/workflow/nodes.scala @@ -81,7 +81,7 @@ case class ParallelFlowsNode(json: JsObject) extends Node { override def run(wfr: WorkflowRun)(implicit env: Env, ec: ExecutionContext): Future[Either[WorkflowError, JsValue]] = { Future.sequence(paths.filter { o => if (o.select("predicate").isDefined) { - o.select("predicate").asOptBoolean.getOrElse(false) + WorkflowOperator.processOperators(o.select("predicate").asValue, wfr, env).asOptBoolean.getOrElse(false) } else { true } @@ -107,10 +107,10 @@ case class ParallelFlowsNode(json: JsObject) extends Node { case class SwitchNode(json: JsObject) extends Node { override def run(wfr: WorkflowRun)(implicit env: Env, ec: ExecutionContext): Future[Either[WorkflowError, JsValue]] = { val paths: Seq[JsObject] = json.select("paths").asOpt[Seq[JsObject]].getOrElse(Seq.empty) - paths.find(o => o.select("predicate").asOptBoolean.getOrElse(false)) match { + paths.find(o => WorkflowOperator.processOperators(o.select("predicate").asValue, wfr, env).asOptBoolean.getOrElse(false)) match { case None => JsNull.rightf case Some(path) => { - val node =Node.from(path.select("node").asObject) + val node = Node.from(path.select("node").asObject) node.internalRun(wfr).recover { case t: Throwable => WorkflowError(s"caught exception on task '${id}' at path: '${node.id}'", None, Some(t)).left } diff --git a/otoroshi/app/next/workflow/plugins.scala b/otoroshi/app/next/workflow/plugins.scala index e263d47a9..cf51b8935 100644 --- a/otoroshi/app/next/workflow/plugins.scala +++ b/otoroshi/app/next/workflow/plugins.scala @@ -1,5 +1,242 @@ -package next.workflow +package otoroshi.next.workflow -class plugins { +import akka.stream.Materializer +import otoroshi.env.Env +import otoroshi.gateway.Errors +import otoroshi.next.plugins.BodyHelper +import otoroshi.next.plugins.api._ +import otoroshi.next.proxy.NgProxyEngineError +import otoroshi.utils.syntax.implicits._ +import otoroshi.wasm.WasmUtils +import play.api.libs.json._ +import play.api.mvc.{Result, Results} +import scala.concurrent.{ExecutionContext, Future} + +case class WorkflowBackendConfig(json: JsValue = Json.obj()) extends NgPluginConfig { + lazy val ref: String = json.select("ref").asString + lazy val async: Boolean = json.select("async").asOptBoolean.getOrElse(false) +} + +object WorkflowBackendConfig { + val configFlow: Seq[String] = Seq("ref", "async") + val configSchema: Option[JsObject] = Some(Json.obj( + "async" -> Json.obj("type" -> "bool", "label" -> "Async"), + "ref" -> Json.obj( + "type" -> "select", + "label" -> s"Workflow", + "props" -> Json.obj( + "optionsFrom" -> s"/bo/api/proxy/apis/plugins.otoroshi.io/v1/workflows", + "optionsTransformer" -> Json.obj( + "label" -> "name", + "value" -> "id", + ), + ), + ) + )) + val format = new Format[WorkflowBackendConfig] { + override def reads(json: JsValue): JsResult[WorkflowBackendConfig] = JsSuccess(WorkflowBackendConfig(json)) + override def writes(o: WorkflowBackendConfig): JsValue = o.json + } } + +class WorkflowBackend extends NgBackendCall { + + override def useDelegates: Boolean = false + override def multiInstance: Boolean = true + override def core: Boolean = true + override def name: String = "Workflow Backend" + override def description: Option[String] = "This plugin uses a workflow as a backend".some + override def defaultConfigObject: Option[NgPluginConfig] = WorkflowBackendConfig().some + + override def visibility: NgPluginVisibility = NgPluginVisibility.NgUserLand + override def categories: Seq[NgPluginCategory] = Seq(NgPluginCategory.Custom("Workflow")) + override def steps: Seq[NgStep] = Seq(NgStep.CallBackend) + override def noJsForm: Boolean = true + override def configFlow: Seq[String] = WorkflowBackendConfig.configFlow + override def configSchema: Option[JsObject] = WorkflowBackendConfig.configSchema + + override def callBackend( + ctx: NgbBackendCallContext, + delegates: () => Future[Either[NgProxyEngineError, BackendCallResponse]] + )(implicit + env: Env, + ec: ExecutionContext, + mat: Materializer + ): Future[Either[NgProxyEngineError, BackendCallResponse]] = { + val config = ctx + .cachedConfig(internalName)(WorkflowBackendConfig.format) + .getOrElse(WorkflowBackendConfig()) + env.adminExtensions.extension[WorkflowAdminExtension].flatMap(ext => ext.states.workflow(config.ref).map(w => (ext, w))) match { + case None => + Errors + .craftResponseResult( + "workflow not found !", + Results.Status(500), + ctx.rawRequest, + None, + None, + attrs = ctx.attrs, + maybeRoute = ctx.route.some + ) + .map(r => NgProxyEngineError.NgResultProxyEngineError(r).left) + case Some((extension, workflow)) => { + ctx.wasmJson + .flatMap { input => + val f = extension.engine.run(Node.from(workflow.config), input.asObject) + if (config.async) { + Right(BackendCallResponse(NgPluginHttpResponse.fromResult(Results.Ok(Json.obj("ack" -> true))), None)).vfuture + } else { + f.map { res => + if (res.hasError) { + Right(BackendCallResponse(NgPluginHttpResponse.fromResult(Results.InternalServerError(Json.obj("error" -> res.error.get.json))), None)) + } else { + val respBody = res.json + val status = respBody.select("status").asOpt[Int] + val headers = respBody.select("headers").asOpt[Map[String, String]] + val body = BodyHelper.extractBodyFromOpt(respBody) + if (status.isDefined && headers.isDefined && body.isDefined) { + val heads = headers.get.getIgnoreCase("Content-Length") match { + case None => headers.get - "Content-Type" - "content-type" ++ Map("Content-Length" -> s"${body.get.length}") + case Some(_) => headers.get - "Content-Type" - "content-type" + } + val ctype = headers.get.getIgnoreCase("Content-Type").getOrElse("application/json") + Right(BackendCallResponse(NgPluginHttpResponse.fromResult(Results.Status(status.get)(body.get).withHeaders(heads.toSeq: _*).as(ctype)), None)) + } else { + Right(BackendCallResponse(NgPluginHttpResponse.fromResult(Results.Ok(body.get).as("application/json")), None)) + } + } + } + } + } + } + } + } +} + +class WorkflowRequestTransformer extends NgRequestTransformer { + + override def steps: Seq[NgStep] = Seq(NgStep.TransformRequest) + override def categories: Seq[NgPluginCategory] = Seq(NgPluginCategory.Custom("Workflow"), NgPluginCategory.Transformations) + override def visibility: NgPluginVisibility = NgPluginVisibility.NgUserLand + + override def multiInstance: Boolean = true + override def core: Boolean = true + override def usesCallbacks: Boolean = false + override def transformsRequest: Boolean = true + override def transformsResponse: Boolean = false + override def transformsError: Boolean = false + override def name: String = "Workflow Request Transformer" + override def description: Option[String] = + "Transform the content of the request with a workflow".some + override def defaultConfigObject: Option[NgPluginConfig] = WorkflowBackendConfig().some + + override def transformRequest( + ctx: NgTransformerRequestContext + )(implicit env: Env, ec: ExecutionContext, mat: Materializer): Future[Either[Result, NgPluginHttpRequest]] = { + val config = ctx + .cachedConfig(internalName)(WorkflowBackendConfig.format) + .getOrElse(WorkflowBackendConfig()) + env.adminExtensions.extension[WorkflowAdminExtension].flatMap(ext => ext.states.workflow(config.ref).map(w => (ext, w))) match { + case None => + Errors + .craftResponseResult( + "workflow not found !", + Results.Status(500), + ctx.request, + None, + None, + attrs = ctx.attrs, + maybeRoute = ctx.route.some + ) + .map(r => r.left) + case Some((extension, workflow)) => { + ctx.wasmJson + .flatMap { input => + extension.engine.run(Node.from(workflow.config), input.asObject).map { res => + if (res.hasError) { + Results.InternalServerError(Json.obj("error" -> res.error.get.json)).left + } else { + val response = res.json + val body = BodyHelper.extractBodyFromOpt(response) + Right( + ctx.otoroshiRequest.copy( + method = (response \ "method").asOpt[String].getOrElse(ctx.otoroshiRequest.method), + url = (response \ "url").asOpt[String].getOrElse(ctx.otoroshiRequest.url), + headers = + (response \ "headers").asOpt[Map[String, String]].getOrElse(ctx.otoroshiRequest.headers), + cookies = WasmUtils.convertJsonCookies(response).getOrElse(ctx.otoroshiRequest.cookies), + body = body.map(_.chunks(16 * 1024)).getOrElse(ctx.otoroshiRequest.body) + ) + ) + } + } + } + } + } + } +} + +class WorkflowResponseTransformer extends NgRequestTransformer { + + override def steps: Seq[NgStep] = Seq(NgStep.TransformResponse) + override def categories: Seq[NgPluginCategory] = Seq(NgPluginCategory.Custom("Workflow"), NgPluginCategory.Transformations) + override def visibility: NgPluginVisibility = NgPluginVisibility.NgUserLand + + override def multiInstance: Boolean = true + override def core: Boolean = true + override def usesCallbacks: Boolean = false + override def transformsRequest: Boolean = false + override def transformsResponse: Boolean = true + override def transformsError: Boolean = false + override def isTransformRequestAsync: Boolean = false + override def isTransformResponseAsync: Boolean = true + override def name: String = "Workflow Response Transformer" + override def description: Option[String] = + "Transform the content of a response with a workflow".some + override def defaultConfigObject: Option[NgPluginConfig] = WorkflowBackendConfig().some + + override def transformResponse( + ctx: NgTransformerResponseContext + )(implicit env: Env, ec: ExecutionContext, mat: Materializer): Future[Either[Result, NgPluginHttpResponse]] = { + val config = ctx + .cachedConfig(internalName)(WorkflowBackendConfig.format) + .getOrElse(WorkflowBackendConfig()) + env.adminExtensions.extension[WorkflowAdminExtension].flatMap(ext => ext.states.workflow(config.ref).map(w => (ext, w))) match { + case None => + Errors + .craftResponseResult( + "workflow not found !", + Results.Status(500), + ctx.request, + None, + None, + attrs = ctx.attrs, + maybeRoute = ctx.route.some + ) + .map(r => r.left) + case Some((extension, workflow)) => { + ctx.wasmJson + .flatMap { input => + extension.engine.run(Node.from(workflow.config), input.asObject).map { res => + if (res.hasError) { + Results.InternalServerError(Json.obj("error" -> res.error.get.json)).left + } else { + val response = res.json + val body = BodyHelper.extractBodyFromOpt(response) + Right( + ctx.otoroshiResponse.copy( + status = (response \ "status").asOpt[Int].getOrElse(200), + headers = + (response \ "headers").asOpt[Map[String, String]].getOrElse(ctx.otoroshiResponse.headers), + cookies = WasmUtils.convertJsonCookies(response).getOrElse(ctx.otoroshiResponse.cookies), + body = body.map(_.chunks(16 * 1024)).getOrElse(ctx.otoroshiResponse.body) + ) + ) + } + } + } + } + } + } +} \ No newline at end of file