diff --git a/otoroshi/app/models/wasm.scala b/otoroshi/app/models/wasm.scala index 909f62f01a..5cbd4a624f 100644 --- a/otoroshi/app/models/wasm.scala +++ b/otoroshi/app/models/wasm.scala @@ -31,7 +31,6 @@ case class WasmPlugin( override def theDescription: String = description override def theTags: Seq[String] = tags override def theMetadata: Map[String, String] = metadata - //def pool()(implicit env: Env): WasmVmPool = WasmVmPool.forPlugin(this.id)(env.wasmIntegrationCtx) } object WasmPlugin { @@ -125,19 +124,6 @@ class WasmPluginsCacheManager extends Job { override def predicate(ctx: JobContext, env: Env): Option[Boolean] = None override def jobRun(ctx: JobContext)(implicit env: Env, ec: ExecutionContext): Future[Unit] = { - // env.proxyState.allWasmPlugins().foreach { plugin => - // val now = System.currentTimeMillis() - // WasmUtils.scriptCache(env).get(plugin.config.source.cacheKey) match { - // case None => plugin.config.source.getWasm() - // case Some(CacheableWasmScript.CachedWasmScript(_, createAt)) if (createAt + env.wasmCacheTtl) < now => - // plugin.config.source.getWasm() - // case Some(CacheableWasmScript.CachedWasmScript(_, createAt)) - // if (createAt + env.wasmCacheTtl) > now && (createAt + env.wasmCacheTtl + 1000) < now => - // plugin.config.source.getWasm() - // case _ => () - // } - // } - // funit env.wasmIntegration.runVmLoaderJob() } } diff --git a/otoroshi/app/wasm/host.scala b/otoroshi/app/wasm/host.scala index a06c90f8fd..de4668ff3f 100644 --- a/otoroshi/app/wasm/host.scala +++ b/otoroshi/app/wasm/host.scala @@ -42,22 +42,7 @@ object Utils { Json.parse(rawBytePtrToString(plugin, params(0).v.i64, params(1).v.i32)) } } -/* -case class EnvUserData( - env: Env, - executionContext: ExecutionContext, - mat: Materializer, - config: WasmConfig -) extends WasmOtoroshiHostUserData - -case class StateUserData( - env: Env, - executionContext: ExecutionContext, - mat: Materializer, - cache: UnboundedTrieMap[String, UnboundedTrieMap[String, ByteString]] -) extends WasmOtoroshiHostUserData -case class EmptyUserData() extends WasmOtoroshiHostUserData -*/ + object LogLevel extends Enumeration { type LogLevel = Value @@ -71,11 +56,6 @@ object Status extends Enumeration { StatusUnimplemented = Value } -// case class HostFunctionWithAuthorization( -// function: WasmOtoroshiHostFunction[_ <: WasmOtoroshiHostUserData], -// authorized: WasmAuthorizations => Boolean -// ) - trait AwaitCapable { def await[T](future: Future[T], atMost: FiniteDuration = 5.seconds)(implicit env: Env): T = { Await.result(future, atMost) // TODO: atMost from env diff --git a/otoroshi/app/wasm/opa.scala b/otoroshi/app/wasm/opa.scala deleted file mode 100644 index ab03051c36..0000000000 --- a/otoroshi/app/wasm/opa.scala +++ /dev/null @@ -1,313 +0,0 @@ -/*package otoroshi.wasm; - -import org.extism.sdk.wasmotoroshi._ -import otoroshi.utils.syntax.implicits.{BetterJsValue, BetterSyntax} -import play.api.libs.json.{JsString, JsValue, Json} - -import java.nio.ByteBuffer -import java.nio.charset.StandardCharsets -import java.util.Optional -import java.util.concurrent.atomic.AtomicReference; - -object OPA extends AwaitCapable { - - def opaAbortFunction: WasmOtoroshiExtismFunction[EmptyUserData] = - ( - plugin: WasmOtoroshiInternal, - params: Array[WasmBridge.ExtismVal], - returns: Array[WasmBridge.ExtismVal], - data: Optional[EmptyUserData] - ) => { - System.out.println("opaAbortFunction"); - } - - def opaPrintlnFunction: WasmOtoroshiExtismFunction[EmptyUserData] = - ( - plugin: WasmOtoroshiInternal, - params: Array[WasmBridge.ExtismVal], - returns: Array[WasmBridge.ExtismVal], - data: Optional[EmptyUserData] - ) => { - System.out.println("opaPrintlnFunction"); - } - - def opaBuiltin0Function: WasmOtoroshiExtismFunction[EmptyUserData] = - ( - plugin: WasmOtoroshiInternal, - params: Array[WasmBridge.ExtismVal], - returns: Array[WasmBridge.ExtismVal], - data: Optional[EmptyUserData] - ) => { - System.out.println("opaBuiltin0Function"); - } - - def opaBuiltin1Function: WasmOtoroshiExtismFunction[EmptyUserData] = - ( - plugin: WasmOtoroshiInternal, - params: Array[WasmBridge.ExtismVal], - returns: Array[WasmBridge.ExtismVal], - data: Optional[EmptyUserData] - ) => { - System.out.println("opaBuiltin1Function"); - } - - def opaBuiltin2Function: WasmOtoroshiExtismFunction[EmptyUserData] = - ( - plugin: WasmOtoroshiInternal, - params: Array[WasmBridge.ExtismVal], - returns: Array[WasmBridge.ExtismVal], - data: Optional[EmptyUserData] - ) => { - System.out.println("opaBuiltin2Function"); - } - - def opaBuiltin3Function: WasmOtoroshiExtismFunction[EmptyUserData] = - ( - plugin: WasmOtoroshiInternal, - params: Array[WasmBridge.ExtismVal], - returns: Array[WasmBridge.ExtismVal], - data: Optional[EmptyUserData] - ) => { - System.out.println("opaBuiltin3Function"); - }; - - def opaBuiltin4Function: WasmOtoroshiExtismFunction[EmptyUserData] = - ( - plugin: WasmOtoroshiInternal, - params: Array[WasmBridge.ExtismVal], - returns: Array[WasmBridge.ExtismVal], - data: Optional[EmptyUserData] - ) => { - System.out.println("opaBuiltin4Function"); - } - - def opaAbort() = new WasmOtoroshiHostFunction[EmptyUserData]( - "opa_abort", - Array(WasmBridge.ExtismValType.I32), - Array(), - opaAbortFunction, - Optional.empty() - ) - - def opaPrintln() = new WasmOtoroshiHostFunction[EmptyUserData]( - "opa_println", - Array(WasmBridge.ExtismValType.I64), - Array(WasmBridge.ExtismValType.I64), - opaPrintlnFunction, - Optional.empty() - ) - - def opaBuiltin0() = new WasmOtoroshiHostFunction[EmptyUserData]( - "opa_builtin0", - Array(WasmBridge.ExtismValType.I32, WasmBridge.ExtismValType.I32), - Array(WasmBridge.ExtismValType.I32), - opaBuiltin0Function, - Optional.empty() - ) - - def opaBuiltin1() = new WasmOtoroshiHostFunction[EmptyUserData]( - "opa_builtin1", - Array(WasmBridge.ExtismValType.I32, WasmBridge.ExtismValType.I32, WasmBridge.ExtismValType.I32), - Array(WasmBridge.ExtismValType.I32), - opaBuiltin1Function, - Optional.empty() - ) - - def opaBuiltin2() = new WasmOtoroshiHostFunction[EmptyUserData]( - "opa_builtin2", - Array( - WasmBridge.ExtismValType.I32, - WasmBridge.ExtismValType.I32, - WasmBridge.ExtismValType.I32, - WasmBridge.ExtismValType.I32 - ), - Array(WasmBridge.ExtismValType.I32), - opaBuiltin2Function, - Optional.empty() - ) - - def opaBuiltin3() = new WasmOtoroshiHostFunction[EmptyUserData]( - "opa_builtin3", - Array( - WasmBridge.ExtismValType.I32, - WasmBridge.ExtismValType.I32, - WasmBridge.ExtismValType.I32, - WasmBridge.ExtismValType.I32, - WasmBridge.ExtismValType.I32 - ), - Array(WasmBridge.ExtismValType.I32), - opaBuiltin3Function, - Optional.empty() - ) - - def opaBuiltin4() = new WasmOtoroshiHostFunction[EmptyUserData]( - "opa_builtin4", - Array( - WasmBridge.ExtismValType.I32, - WasmBridge.ExtismValType.I32, - WasmBridge.ExtismValType.I32, - WasmBridge.ExtismValType.I32, - WasmBridge.ExtismValType.I32, - WasmBridge.ExtismValType.I32 - ), - Array(WasmBridge.ExtismValType.I32), - opaBuiltin4Function, - Optional.empty() - ) - - def getFunctions(config: WasmConfig): Seq[HostFunctionWithAuthorization] = { - Seq( - HostFunctionWithAuthorization(opaAbort(), _ => config.opa), - HostFunctionWithAuthorization(opaPrintln(), _ => config.opa), - HostFunctionWithAuthorization(opaBuiltin0(), _ => config.opa), - HostFunctionWithAuthorization(opaBuiltin1(), _ => config.opa), - HostFunctionWithAuthorization(opaBuiltin2(), _ => config.opa), - HostFunctionWithAuthorization(opaBuiltin3(), _ => config.opa), - HostFunctionWithAuthorization(opaBuiltin4(), _ => config.opa) - ) - } - - def getLinearMemories(): Seq[WasmOtoroshiLinearMemory] = { - Seq( - new WasmOtoroshiLinearMemory("memory", "env", new WasmOtoroshiLinearMemoryOptions(5, Optional.empty())) - ) - } - - def loadJSON(plugin: WasmOtoroshiInstance, value: Array[Byte]): Either[JsValue, Int] = { - if (value.length == 0) { - 0.right - } else { - val value_buf_len = value.length - var parameters = new WasmOtoroshiParameters(1) - .pushInt(value_buf_len) - - val raw_addr = plugin.call("opa_malloc", parameters, 1) - - if ( - plugin.writeBytes( - value, - value_buf_len, - raw_addr.getValue(0).v.i32 - ) == -1 - ) { - JsString("Cant' write in memory").left - } else { - parameters = new WasmOtoroshiParameters(2) - .pushInts(raw_addr.getValue(0).v.i32, value_buf_len) - val parsed_addr = plugin.call( - "opa_json_parse", - parameters, - 1 - ) - - if (parsed_addr.getValue(0).v.i32 == 0) { - JsString("failed to parse json value").left - } else { - parsed_addr.getValue(0).v.i32.right - } - } - } - } - - def initialize(plugin: WasmOtoroshiInstance): Either[JsValue, (String, ResultsWrapper)] = { - loadJSON(plugin, "{}".getBytes(StandardCharsets.UTF_8)) - .flatMap(dataAddr => { - val base_heap_ptr = plugin.call( - "opa_heap_ptr_get", - new WasmOtoroshiParameters(0), - 1 - ) - - val data_heap_ptr = base_heap_ptr.getValue(0).v.i32 - ( - Json.obj("dataAddr" -> dataAddr, "baseHeapPtr" -> data_heap_ptr).stringify, - ResultsWrapper(new WasmOtoroshiResults(0)) - ).right - }) - } - - def evaluate( - plugin: WasmOtoroshiInstance, - dataAddr: Int, - baseHeapPtr: Int, - input: String - ): Either[JsValue, (String, ResultsWrapper)] = { - val entrypoint = 0 - - // TODO - read and load builtins functions by calling dumpJSON - val input_len = input.getBytes(StandardCharsets.UTF_8).length - plugin.writeBytes( - input.getBytes(StandardCharsets.UTF_8), - input_len, - baseHeapPtr - ) - - val heap_ptr = baseHeapPtr + input_len - val input_addr = baseHeapPtr - - val ptr = new WasmOtoroshiParameters(7) - .pushInts(0, entrypoint, dataAddr, input_addr, input_len, heap_ptr, 0) - - val ret = plugin.call("opa_eval", ptr, 1) - - val memory = plugin.getMemory("memory") - - val offset: Int = ret.getValue(0).v.i32 - val arraySize: Int = 65356 - - val mem: Array[Byte] = memory.getByteArray(offset, arraySize) - val size: Int = lastValidByte(mem) - - ( - new String(java.util.Arrays.copyOf(mem, size), StandardCharsets.UTF_8), - ResultsWrapper(new WasmOtoroshiResults(0)) - ).right - } - - def lastValidByte(arr: Array[Byte]): Int = { - for (i <- arr.indices) { - if (arr(i) == 0) { - return i - } - } - arr.length - } -} - -object LinearMemories { - - private val memories: AtomicReference[Seq[WasmOtoroshiLinearMemory]] = - new AtomicReference[Seq[WasmOtoroshiLinearMemory]](Seq.empty[WasmOtoroshiLinearMemory]) - - def getMemories(config: WasmConfig): Array[WasmOtoroshiLinearMemory] = { - if (config.opa) { - if (memories.get.isEmpty) { - memories.set( - OPA.getLinearMemories() - ) - } - memories.get().toArray - } else { - Array.empty - } - } -} - -/* - String dumpJSON() { - Results addr = plugin.call("builtins", new WasmOtoroshiParameters(0), 1); - - Parameters parameters = new WasmOtoroshiParameters(1); - IntegerParameter builder = new IntegerParameter(); - builder.add(parameters, addr.getValue(0).v.i32, 0); - - Results rawAddr = plugin.call("opa_json_dump", parameters, 1); - - Pointer memory = WasmBridge.INSTANCE.extism_get_memory(plugin.getPointer(), plugin.getIndex(), "memory"); - byte[] mem = memory.getByteArray(rawAddr.getValue(0).v.i32, 65356); - int size = lastValidByte(mem); - - return new String(Arrays.copyOf(mem, size), StandardCharsets.UTF_8); - } -}*/ -*/ \ No newline at end of file diff --git a/otoroshi/app/wasm/runtimev1.scala b/otoroshi/app/wasm/runtimev1.scala deleted file mode 100644 index f71b3092f2..0000000000 --- a/otoroshi/app/wasm/runtimev1.scala +++ /dev/null @@ -1,537 +0,0 @@ -/*package otoroshi.wasm - -import akka.stream.OverflowStrategy -import akka.stream.scaladsl.{Keep, Sink, Source, SourceQueueWithComplete} -import akka.util.ByteString -import org.extism.sdk.manifest.{Manifest, MemoryOptions} -import org.extism.sdk.wasmotoroshi._ -import org.extism.sdk.wasm.WasmSourceResolver -import org.joda.time.DateTime -import otoroshi.env.Env -import otoroshi.security.IdGenerator -import otoroshi.utils.TypedMap -import otoroshi.utils.cache.types.UnboundedTrieMap -import otoroshi.utils.syntax.implicits._ -import otoroshi.wasm.proxywasm.VmData -import play.api.Logger -import play.api.libs.json._ -import play.api.libs.ws.{DefaultWSCookie, WSCookie} -import play.api.mvc.Cookie - -import java.util.concurrent.Executors -import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} -import scala.concurrent.duration.DurationInt -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.jdk.CollectionConverters._ - -sealed trait WasmAction - -object WasmAction { - case class WasmOpaInvocation(call: () => Either[JsValue, String], promise: Promise[Either[JsValue, String]]) - extends WasmAction - case class WasmInvocation( - call: () => Either[JsValue, (String, ResultsWrapper)], - promise: Promise[Either[JsValue, (String, ResultsWrapper)]] - ) extends WasmAction - case class WasmUpdate(call: () => Unit) extends WasmAction -} - -class WasmContextSlot( - id: String, - instance: Int, - plugin: WasmOtoroshiInstance, - cfg: WasmConfig, - wsm: ByteString, - closed: AtomicBoolean, - updating: AtomicBoolean, - instanceId: String, - functions: Array[WasmOtoroshiHostFunction[_ <: WasmOtoroshiHostUserData]] -) { - - def callSync( - wasmFunctionParameters: WasmFunctionParameters, - context: Option[VmData] - )(implicit env: Env, ec: ExecutionContext): Either[JsValue, (String, ResultsWrapper)] = { - if (closed.get()) { - val plug = WasmUtils.pluginCache.apply(s"$id-$instance") - plug.callSync(wasmFunctionParameters, context) - } else { - try { - // context.foreach(ctx => WasmContextSlot.setCurrentContext(ctx)) - if (WasmUtils.logger.isDebugEnabled) WasmUtils.logger.debug(s"calling instance $id-$instance") - WasmUtils.debugLog.debug(s"calling '${wasmFunctionParameters.functionName}' on instance '$id-$instance'") - val res: Either[JsValue, (String, ResultsWrapper)] = env.metrics.withTimer("otoroshi.wasm.core.call") { - wasmFunctionParameters.call(plugin) - } - env.metrics.withTimer("otoroshi.wasm.core.reset") { - plugin.reset() - } - env.metrics.withTimer("otoroshi.wasm.core.count-thunks") { - WasmUtils.logger.debug(s"thunks: ${functions.size}") - } - res - } catch { - case e: Throwable if e.getMessage.contains("wasm backtrace") => - WasmUtils.logger.error(s"error while invoking wasm function '${wasmFunctionParameters.functionName}'", e) - Json - .obj( - "error" -> "wasm_error", - "error_description" -> JsArray(e.getMessage.split("\\n").filter(_.trim.nonEmpty).map(JsString.apply)) - ) - .left - case e: Throwable => - WasmUtils.logger.error(s"error while invoking wasm function '${wasmFunctionParameters.functionName}'", e) - Json.obj("error" -> "wasm_error", "error_description" -> JsString(e.getMessage)).left - } finally { - // context.foreach(ctx => WasmContextSlot.clearCurrentContext()) - } - } - } - - def callOpaSync(input: String)(implicit env: Env, ec: ExecutionContext): Either[JsValue, String] = { - if (closed.get()) { - val plug = WasmUtils.pluginCache.apply(s"$id-$instance") - plug.callOpaSync(input) - } else { - try { - val res = env.metrics.withTimer("otoroshi.wasm.core.call-opa") { - val result = OPA.initialize(plugin).right - val str = result.get._1 - val parts = str.split("@") - OPA - .evaluate(plugin, parts(0).toInt, parts(1).toInt, input) - .map(r => r._1) - } - res - } catch { - case e: Throwable if e.getMessage.contains("wasm backtrace") => - WasmUtils.logger.error(s"error while invoking wasm function 'opa'", e) - Json - .obj( - "error" -> "wasm_error", - "error_description" -> JsArray(e.getMessage.split("\\n").filter(_.trim.nonEmpty).map(JsString.apply)) - ) - .left - case e: Throwable => - WasmUtils.logger.error(s"error while invoking wasm function 'opa'", e) - Json.obj("error" -> "wasm_error", "error_description" -> JsString(e.getMessage)).left - } - } - } - - def call( - wasmFunctionParameters: WasmFunctionParameters, - context: Option[VmData] - )(implicit env: Env, ec: ExecutionContext): Future[Either[JsValue, (String, ResultsWrapper)]] = { - val promise = Promise.apply[Either[JsValue, (String, ResultsWrapper)]]() - WasmUtils - .getInvocationQueueFor(id, instance) - .offer(WasmAction.WasmInvocation(() => callSync(wasmFunctionParameters, context), promise)) - promise.future - } - - def callOpa(input: String)(implicit env: Env, ec: ExecutionContext): Future[Either[JsValue, String]] = { - val promise = Promise.apply[Either[JsValue, String]]() - WasmUtils.getInvocationQueueFor(id, instance).offer(WasmAction.WasmOpaInvocation(() => callOpaSync(input), promise)) - promise.future - } - - def close(lifetime: WasmVmLifetime): Unit = { - if (lifetime == WasmVmLifetime.Invocation) { - if (WasmUtils.logger.isDebugEnabled) WasmUtils.logger.debug(s"calling close on WasmContextSlot of ${id}") - forceClose() - } - } - - def forceClose(): Unit = { - if (WasmUtils.logger.isDebugEnabled) WasmUtils.logger.debug(s"calling forceClose on WasmContextSlot of ${id}") - if (closed.compareAndSet(false, true)) { - try { - plugin.close() - } catch { - case e: Throwable => e.printStackTrace() - } - } - } - - def needsUpdate(wasmConfig: WasmConfig, wasm: ByteString): Boolean = { - val configHasChanged = wasmConfig != cfg - val wasmHasChanged = wasm != wsm - if (WasmUtils.logger.isDebugEnabled && configHasChanged) - WasmUtils.logger.debug(s"plugin ${id} needs update because of config change") - if (WasmUtils.logger.isDebugEnabled && wasmHasChanged) - WasmUtils.logger.debug(s"plugin ${id} needs update because of wasm change") - configHasChanged || wasmHasChanged - } - - def updateIfNeeded( - pluginId: String, - config: WasmConfig, - wasm: ByteString, - attrsOpt: Option[TypedMap], - addHostFunctions: Seq[WasmOtoroshiHostFunction[_ <: WasmOtoroshiHostUserData]] - )(implicit env: Env, ec: ExecutionContext): WasmContextSlot = { - if (needsUpdate(config, wasm) && updating.compareAndSet(false, true)) { - - if (config.instances < cfg.instances) { - env.otoroshiActorSystem.scheduler.scheduleOnce(20.seconds) { // TODO: config ? - if (WasmUtils.logger.isDebugEnabled) WasmUtils.logger.debug(s"trying to kill unused instances of ${pluginId}") - (config.instances to cfg.instances).map { idx => - WasmUtils.pluginCache.get(s"${pluginId}-${instance}").foreach(p => p.forceClose()) - WasmUtils.queues.remove(s"${pluginId}-${instance}") - WasmUtils.pluginCache.remove(s"$pluginId-$instance") - } - } - } - if (WasmUtils.logger.isDebugEnabled) WasmUtils.logger.debug(s"scheduling update ${instanceId}") - WasmUtils - .getInvocationQueueFor(id, instance) - .offer(WasmAction.WasmUpdate(() => { - val plugin = WasmUtils.actuallyCreatePlugin( - instance, - wasm, - config, - pluginId, - attrsOpt, - addHostFunctions - ) - if (WasmUtils.logger.isDebugEnabled) WasmUtils.logger.debug(s"updating ${instanceId}") - WasmUtils.pluginCache.put(s"$pluginId-$instance", plugin) - env.otoroshiActorSystem.scheduler.scheduleOnce(20.seconds) { // TODO: config ? - if (WasmUtils.logger.isDebugEnabled) WasmUtils.logger.debug(s"delayed force close ${instanceId}") - if (!closed.get()) { - forceClose() - } - } - })) - } - this - } -} - -object WasmUtils { - - private[wasm] val logger = Logger("otoroshi-wasm") - - val debugLog = Logger("otoroshi-wasm-debug") - - implicit val executor = ExecutionContext.fromExecutorService( - Executors.newWorkStealingPool(Math.max(32, (Runtime.getRuntime.availableProcessors * 4) + 1)) - ) - - // TODO: handle env.wasmCacheSize based on creation date ? - private[wasm] val _script_cache: UnboundedTrieMap[String, CacheableWasmScript] = - new UnboundedTrieMap[String, CacheableWasmScript]() - private[wasm] val pluginCache = new UnboundedTrieMap[String, WasmContextSlot]() - private[wasm] val queues = new UnboundedTrieMap[String, (DateTime, SourceQueueWithComplete[WasmAction])]() - private[wasm] val instancesCounter = new AtomicInteger(0) - - def scriptCache(implicit env: Env): UnboundedTrieMap[String, CacheableWasmScript] = _script_cache - - def convertJsonCookies(wasmResponse: JsValue): Option[Seq[WSCookie]] = - wasmResponse - .select("cookies") - .asOpt[Seq[JsObject]] - .map { arr => - arr.map { c => - DefaultWSCookie( - name = c.select("name").asString, - value = c.select("value").asString, - maxAge = c.select("maxAge").asOpt[Long], - path = c.select("path").asOpt[String], - domain = c.select("domain").asOpt[String], - secure = c.select("secure").asOpt[Boolean].getOrElse(false), - httpOnly = c.select("httpOnly").asOpt[Boolean].getOrElse(false) - ) - } - } - - def convertJsonPlayCookies(wasmResponse: JsValue): Option[Seq[Cookie]] = - wasmResponse - .select("cookies") - .asOpt[Seq[JsObject]] - .map { arr => - arr.map { c => - Cookie( - name = c.select("name").asString, - value = c.select("value").asString, - maxAge = c.select("maxAge").asOpt[Int], - path = c.select("path").asOpt[String].getOrElse("/"), - domain = c.select("domain").asOpt[String], - secure = c.select("secure").asOpt[Boolean].getOrElse(false), - httpOnly = c.select("httpOnly").asOpt[Boolean].getOrElse(false), - sameSite = c.select("domain").asOpt[String].flatMap(Cookie.SameSite.parse) - ) - } - } - - private[wasm] def getInvocationQueueFor(id: String, instance: Int)(implicit - env: Env - ): SourceQueueWithComplete[WasmAction] = { - val key = s"$id-$instance" - queues.getOrUpdate(key) { - val stream = Source - .queue[WasmAction](env.wasmQueueBufferSize, OverflowStrategy.dropHead) - .mapAsync(1) { action => - Future.apply { - action match { - case WasmAction.WasmInvocation(invoke, promise) => - try { - val res = invoke() - promise.trySuccess(res) - } catch { - case e: Throwable => promise.tryFailure(e) - } - case WasmAction.WasmOpaInvocation(invoke, promise) => - try { - val res = invoke() - promise.trySuccess(res) - } catch { - case e: Throwable => promise.tryFailure(e) - } - case WasmAction.WasmUpdate(update) => - try { - update() - } catch { - case e: Throwable => e.printStackTrace() - } - } - }(executor) - } - (DateTime.now(), stream.toMat(Sink.ignore)(Keep.both).run()(env.otoroshiMaterializer)._1) - } - }._2 - - private[wasm] def internalCreateManifest(config: WasmConfig, wasm: ByteString, env: Env) = - env.metrics.withTimer("otoroshi.wasm.core.create-plugin.manifest") { - val resolver = new WasmSourceResolver() - val source = resolver.resolve("wasm", wasm.toByteBuffer.array()) - new Manifest( - Seq[org.extism.sdk.wasm.WasmSource](source).asJava, - new MemoryOptions(config.memoryPages), - config.config.asJava, - config.allowedHosts.asJava, - config.allowedPaths.asJava - ) - } - - private[wasm] def actuallyCreatePlugin( - instance: Int, - wasm: ByteString, - config: WasmConfig, - pluginId: String, - attrsOpt: Option[TypedMap], - addHostFunctions: Seq[WasmOtoroshiHostFunction[_ <: WasmOtoroshiHostUserData]] - )(implicit env: Env, ec: ExecutionContext): WasmContextSlot = - env.metrics.withTimer("otoroshi.wasm.core.act-create-plugin") { - if (WasmUtils.logger.isDebugEnabled) - WasmUtils.logger.debug(s"creating wasm plugin instance for ${pluginId}") - val engine = WasmVmPool.engine - val manifest = internalCreateManifest(config, wasm, env) - val hash = java.security.MessageDigest - .getInstance("SHA-256") - .digest(wasm.toArray) - .map("%02x".format(_)) - .mkString - val template = new WasmOtoroshiTemplate(engine, hash, manifest) - // val context = env.metrics.withTimer("otoroshi.wasm.core.create-plugin.context")(new Context()) - val functions: Array[WasmOtoroshiHostFunction[_ <: WasmOtoroshiHostUserData]] = - HostFunctions.getFunctions(config, pluginId, attrsOpt) ++ addHostFunctions - val plugin = env.metrics.withTimer("otoroshi.wasm.core.create-plugin.plugin") { - template.instantiate( - engine, - functions, - LinearMemories.getMemories(config), - config.wasi - ) - } - new WasmContextSlot( - pluginId, - instance, - plugin, - config, - wasm, - functions = functions, - closed = new AtomicBoolean(false), - updating = new AtomicBoolean(false), - instanceId = IdGenerator.uuid - ) - } - - private def callWasm( - wasm: ByteString, - config: WasmConfig, - wasmFunctionParameters: WasmFunctionParameters, - pluginId: String, - attrsOpt: Option[TypedMap], - ctx: Option[VmData], - addHostFunctions: Seq[WasmOtoroshiHostFunction[_ <: WasmOtoroshiHostUserData]] - )(implicit env: Env, ec: ExecutionContext): Future[Either[JsValue, (String, ResultsWrapper)]] = - env.metrics.withTimerAsync("otoroshi.wasm.core.call-wasm") { - - WasmUtils.debugLog.debug("callWasm") - - val functionName = config.functionName.filter(_.nonEmpty).getOrElse(wasmFunctionParameters.functionName) - val instance = instancesCounter.incrementAndGet() % config.instances - - def createPlugin(): WasmContextSlot = { - if (config.lifetime == WasmVmLifetime.Forever) { - pluginCache - .getOrUpdate(s"$pluginId-$instance") { - actuallyCreatePlugin(instance, wasm, config, pluginId, None, addHostFunctions) - } - .seffectOn(_.updateIfNeeded(pluginId, config, wasm, None, addHostFunctions)) - } else { - actuallyCreatePlugin(instance, wasm, config, pluginId, attrsOpt, addHostFunctions) - } - } - - attrsOpt match { - case None => { - val slot = createPlugin() - if (config.opa) { - slot.callOpa(wasmFunctionParameters.input.get).map { output => - slot.close(config.lifetime) - output.map(str => (str, ResultsWrapper(new WasmOtoroshiResults(0)))) - } - } else { - slot.call(wasmFunctionParameters, ctx).map { output => - slot.close(config.lifetime) - output - } - } - } - case Some(attrs) => { - val context = attrs.get(otoroshi.next.plugins.Keys.WasmContextKey) match { - case None => { - val context = new WasmContext() - attrs.put(otoroshi.next.plugins.Keys.WasmContextKey -> context) - context - } - case Some(context) => context - } - val slot = context.get(pluginId) match { - case None => { - val plugin = createPlugin() - if (config.lifetime == WasmVmLifetime.Invocation) context.put(pluginId, plugin) - plugin - } - case Some(plugin) => plugin - } - if (config.opa) { - slot.callOpa(wasmFunctionParameters.input.get).map { output => - slot.close(config.lifetime) - output.map(str => (str, ResultsWrapper(new WasmOtoroshiResults(0)))) - } - } else { - slot.call(wasmFunctionParameters, ctx).map { output => - slot.close(config.lifetime) - output - } - } - } - } - } - - @deprecated(message = "Use WasmVmPool and WasmVm apis instead", since = "v16.6.0") - def execute( - config: WasmConfig, - defaultFunctionName: String, - input: JsValue, - attrs: Option[TypedMap], - ctx: Option[VmData] - )(implicit env: Env): Future[Either[JsValue, String]] = { - rawExecute( - config, - WasmFunctionParameters.ExtismFuntionCall(config.functionName.getOrElse(defaultFunctionName), input.stringify), - attrs, - ctx, - Seq.empty - ).map(r => r.map(_._1)) - } - - @deprecated(message = "Use WasmVmPool and WasmVm apis instead", since = "v16.6.0") - def rawExecute( - _config: WasmConfig, - wasmFunctionParameters: WasmFunctionParameters, - attrs: Option[TypedMap], - ctx: Option[VmData], - addHostFunctions: Seq[WasmOtoroshiHostFunction[_ <: WasmOtoroshiHostUserData]] - )(implicit env: Env): Future[Either[JsValue, (String, ResultsWrapper)]] = - env.metrics.withTimerAsync("otoroshi.wasm.core.raw-execute") { - val config = _config // if (_config.opa) _config.copy(lifetime = WasmVmLifetime.Invocation) else _config - WasmUtils.debugLog.debug("execute") - val pluginId = config.source.kind match { - case WasmSourceKind.Local => { - env.proxyState.wasmPlugin(config.source.path) match { - case None => config.source.cacheKey - case Some(plugin) => plugin.config.source.cacheKey - } - } - case _ => config.source.cacheKey - } - scriptCache.get(pluginId) match { - case Some(CacheableWasmScript.FetchingWasmScript(fu)) => - fu.flatMap { _ => - rawExecute(config, wasmFunctionParameters, attrs, ctx, addHostFunctions) - } - case Some(CacheableWasmScript.CachedWasmScript(script, _)) => { - env.metrics.withTimerAsync("otoroshi.wasm.core.get-config")(config.source.getConfig()).flatMap { - case None => - WasmUtils.callWasm( - script, - config, - wasmFunctionParameters, - pluginId, - attrs, - ctx, - addHostFunctions - ) - case Some(finalConfig) => - val functionName = config.functionName.filter(_.nonEmpty).orElse(finalConfig.functionName) - WasmUtils.callWasm( - script, - finalConfig.copy(functionName = functionName), - wasmFunctionParameters.withFunctionName(functionName.getOrElse(wasmFunctionParameters.functionName)), - pluginId, - attrs, - ctx, - addHostFunctions - ) - } - } - case None if config.source.kind == WasmSourceKind.Unknown => Left(Json.obj("error" -> "missing source")).future - case _ => - env.metrics.withTimerAsync("otoroshi.wasm.core.get-wasm")(config.source.getWasm()).flatMap { - case Left(err) => err.left.vfuture - case Right(wasm) => { - env.metrics.withTimerAsync("otoroshi.wasm.core.get-config")(config.source.getConfig()).flatMap { - case None => - WasmUtils.callWasm( - wasm, - config, - wasmFunctionParameters, - pluginId, - attrs, - ctx, - addHostFunctions - ) - case Some(finalConfig) => - val functionName = config.functionName.filter(_.nonEmpty).orElse(finalConfig.functionName) - WasmUtils.callWasm( - wasm, - finalConfig.copy(functionName = functionName), - wasmFunctionParameters - .withFunctionName(functionName.getOrElse(wasmFunctionParameters.functionName)), - pluginId, - attrs, - ctx, - addHostFunctions - ) - } - } - } - } - } -} -*/ \ No newline at end of file diff --git a/otoroshi/app/wasm/runtimev2.scala b/otoroshi/app/wasm/runtimev2.scala deleted file mode 100644 index 34e192ad41..0000000000 --- a/otoroshi/app/wasm/runtimev2.scala +++ /dev/null @@ -1,653 +0,0 @@ -/*package otoroshi.wasm - -import akka.stream.OverflowStrategy -import akka.stream.scaladsl.{Keep, Sink, Source} -import com.codahale.metrics.UniformReservoir -import org.extism.sdk.manifest.{Manifest, MemoryOptions} -import org.extism.sdk.wasm.WasmSourceResolver -import org.extism.sdk.wasmotoroshi._ -import otoroshi.env.Env -import otoroshi.models.WasmPlugin -import otoroshi.next.plugins.api.{NgPluginVisibility, NgStep} -import otoroshi.script._ -import otoroshi.utils.cache.types.UnboundedTrieMap -import otoroshi.utils.syntax.implicits._ -import otoroshi.wasm.CacheableWasmScript.CachedWasmScript -import otoroshi.wasm.WasmVm.logger -import otoroshi.wasm.proxywasm.VmData -import play.api.Logger -import play.api.libs.json._ - -import java.util.concurrent.ConcurrentLinkedQueue -import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicLong, AtomicReference} -import scala.concurrent.duration.{DurationInt, DurationLong, FiniteDuration} -import scala.concurrent.{Await, ExecutionContext, Future, Promise} -import scala.jdk.CollectionConverters._ -import scala.util.{Failure, Success, Try} - -sealed trait WasmVmAction - -object WasmVmAction { - case object WasmVmKillAction extends WasmVmAction - case class WasmVmCallAction( - parameters: WasmFunctionParameters, - context: Option[VmData], - promise: Promise[Either[JsValue, (String, ResultsWrapper)]] - ) extends WasmVmAction -} - -object WasmVm { - val logger = Logger("otoroshi-wasm-vm") - def fromConfig(config: WasmConfig)(implicit env: Env, ec: ExecutionContext): Future[Option[(WasmVm, WasmConfig)]] = { - if (config.source.kind == WasmSourceKind.Local) { - env.proxyState.wasmPlugin(config.source.path) match { - case None => None.vfuture - case Some(localPlugin) => { - val localConfig = localPlugin.config - localPlugin.pool().getPooledVm().map(vm => Some((vm, localConfig))) - } - } - } else { - config.pool().getPooledVm().map(vm => Some((vm, config))) - } - } -} - -case class OPAWasmVm(opaDataAddr: Int, opaBaseHeapPtr: Int) - -case class WasmVm( - index: Int, - maxCalls: Int, - maxMemory: Long, - resetMemory: Boolean, - instance: WasmOtoroshiInstance, - vmDataRef: AtomicReference[VmData], - memories: Array[WasmOtoroshiLinearMemory], - functions: Array[WasmOtoroshiHostFunction[_ <: WasmOtoroshiHostUserData]], - pool: WasmVmPool, - var opaPointers: Option[OPAWasmVm] = None -) { - - private val callDurationReservoirNs = new UniformReservoir() - private val lastUsage: AtomicLong = new AtomicLong(System.currentTimeMillis()) - private val initializedRef: AtomicBoolean = new AtomicBoolean(false) - private val killAtRelease: AtomicBoolean = new AtomicBoolean(false) - private val inFlight = new AtomicInteger(0) - private val callCounter = new AtomicInteger(0) - private val queue = { - val env = pool.env - Source - .queue[WasmVmAction](env.wasmQueueBufferSize, OverflowStrategy.dropTail) - .mapAsync(1)(handle) - .toMat(Sink.ignore)(Keep.both) - .run()(env.otoroshiMaterializer) - ._1 - } - - def calls: Int = callCounter.get() - def current: Int = inFlight.get() - - private def handle(act: WasmVmAction): Future[Unit] = { - Future.apply { - lastUsage.set(System.currentTimeMillis()) - act match { - case WasmVmAction.WasmVmKillAction => destroy() - case action: WasmVmAction.WasmVmCallAction => { - try { - inFlight.decrementAndGet() - // action.context.foreach(ctx => WasmContextSlot.setCurrentContext(ctx)) - action.context.foreach(ctx => vmDataRef.set(ctx)) - if (WasmVm.logger.isDebugEnabled) - WasmVm.logger.debug(s"call vm ${index} with method ${action.parameters.functionName} on thread ${Thread - .currentThread() - .getName} on path ${action.context.get.properties.get("request.path").map(v => new String(v))}") - val start = System.nanoTime() - val res = action.parameters.call(instance) - callDurationReservoirNs.update(System.nanoTime() - start) - if (res.isRight && res.right.get._2.results.getValues() != null) { - val ret = res.right.get._2.results.getValues()(0).v.i32 - if (ret > 7 || ret < 0) { // weird multi thread issues - ignore() - killAtRelease.set(true) - } - } - action.promise.trySuccess(res) - } catch { - case t: Throwable => action.promise.tryFailure(t) - } finally { - if (resetMemory) { - instance.reset() - } - WasmVm.logger.debug(s"functions: ${functions.size}") - WasmVm.logger.debug(s"memories: ${memories.size}") - // WasmContextSlot.clearCurrentContext() - // vmDataRef.set(null) - val count = callCounter.incrementAndGet() - if (count >= maxCalls) { - callCounter.set(0) - if (WasmVm.logger.isDebugEnabled) - WasmVm.logger.debug(s"killing vm ${index} with remaining ${inFlight.get()} calls (${count})") - destroyAtRelease() - } - } - } - } - () - }(WasmUtils.executor) - } - - def reset(): Unit = instance.reset() - - def destroy(): Unit = { - if (WasmVm.logger.isDebugEnabled) WasmVm.logger.debug(s"destroy vm: ${index}") - WasmVm.logger.debug(s"destroy vm: ${index}") - pool.clear(this) - instance.close() - } - - def isBusy(): Boolean = { - inFlight.get() > 0 - } - - def destroyAtRelease(): Unit = { - ignore() - killAtRelease.set(true) - } - - def release(): Unit = { - if (killAtRelease.get()) { - queue.offer(WasmVmAction.WasmVmKillAction) - } else { - pool.release(this) - } - } - - def lastUsedAt(): Long = lastUsage.get() - - def hasNotBeenUsedInTheLast(duration: FiniteDuration): Boolean = - if (duration.toNanos == 0L) false else !hasBeenUsedInTheLast(duration) - - def consumesMoreThanMemoryPercent(percent: Double): Boolean = if (percent == 0.0) { - false - } else { - val consumed: Double = instance.getMemorySize.toDouble / maxMemory.toDouble - val res = consumed > percent - if (logger.isDebugEnabled) - logger.debug( - s"consumesMoreThanMemoryPercent($percent) = (${instance.getMemorySize} / $maxMemory) > $percent : $res : (${consumed * 100.0}%)" - ) - res - } - - def tooSlow(max: Long): Boolean = { - if (max == 0L) { - false - } else { - callDurationReservoirNs.getSnapshot.getMean.toLong > max - } - } - - def hasBeenUsedInTheLast(duration: FiniteDuration): Boolean = { - val now = System.currentTimeMillis() - val limit = lastUsage.get() + duration.toMillis - now < limit - } - - def ignore(): Unit = pool.ignore(this) - - def initialized(): Boolean = initializedRef.get() - - def initialize(f: => Any): Unit = { - if (initializedRef.compareAndSet(false, true)) { - f - } - } - - def finitialize[A](f: => Future[A]): Future[Unit] = { - if (initializedRef.compareAndSet(false, true)) { - f.map(_ => ())(pool.env.otoroshiExecutionContext) - } else { - ().vfuture - } - } - - def call( - parameters: WasmFunctionParameters, - context: Option[VmData] - )(implicit env: Env, ec: ExecutionContext): Future[Either[JsValue, (String, ResultsWrapper)]] = { - val promise = Promise[Either[JsValue, (String, ResultsWrapper)]]() - inFlight.incrementAndGet() - lastUsage.set(System.currentTimeMillis()) - queue.offer(WasmVmAction.WasmVmCallAction(parameters, context, promise)) - promise.future - } -} - -case class WasmVmPoolAction(promise: Promise[WasmVm], options: WasmVmInitOptions) { - private[wasm] def provideVm(vm: WasmVm): Unit = promise.trySuccess(vm) - private[wasm] def fail(e: Throwable): Unit = promise.tryFailure(e) -} - -object WasmVmPool { - - private[wasm] val logger = Logger("otoroshi-wasm-vm-pool") - private[wasm] val engine = new WasmOtoroshiEngine() - private val instances = new UnboundedTrieMap[String, WasmVmPool]() - - def allInstances(): Map[String, WasmVmPool] = instances.synchronized { - instances.toMap - } - - def forPlugin(plugin: WasmPlugin)(implicit env: Env): WasmVmPool = instances.synchronized { - val key = plugin.id // s"plugin://${plugin.id}?cfg=${plugin.config.json.stringify.sha512}" - instances.getOrUpdate(key) { - new WasmVmPool(key, None, env) - } - } - - def forConfig(config: => WasmConfig)(implicit env: Env): WasmVmPool = instances.synchronized { - val key = s"${config.source.cacheKey}?cfg=${config.json.stringify.sha512}" - instances.getOrUpdate(key) { - new WasmVmPool(key, config.some, env) - } - } - - private[wasm] def removePlugin(id: String): Unit = instances.synchronized { - instances.remove(id) - } -} - -class WasmVmPool(stableId: => String, optConfig: => Option[WasmConfig], val env: Env) { - - WasmVmPool.logger.debug("new WasmVmPool") - - private val engine = new WasmOtoroshiEngine() - private val counter = new AtomicInteger(-1) - private val templateRef = new AtomicReference[WasmOtoroshiTemplate](null) - private[wasm] val availableVms = new ConcurrentLinkedQueue[WasmVm]() - private[wasm] val inUseVms = new ConcurrentLinkedQueue[WasmVm]() - private val creatingRef = new AtomicBoolean(false) - private val lastPluginVersion = new AtomicReference[String](null) - private val requestsSource = Source.queue[WasmVmPoolAction](env.wasmQueueBufferSize, OverflowStrategy.dropTail) - private val prioritySource = Source.queue[WasmVmPoolAction](env.wasmQueueBufferSize, OverflowStrategy.dropTail) - private val (priorityQueue, requestsQueue) = { - prioritySource - .mergePrioritizedMat(requestsSource, 99, 1, false)(Keep.both) - .map(handleAction) - .toMat(Sink.ignore)(Keep.both) - .run()(env.otoroshiMaterializer) - ._1 - } - - // unqueue actions from the action queue - private def handleAction(action: WasmVmPoolAction): Unit = try { - wasmConfig() match { - case None => - // if we cannot find the current wasm config, something is wrong, we destroy the pool - destroyCurrentVms() - WasmVmPool.removePlugin(stableId) - action.fail(new RuntimeException(s"No more plugin ${stableId}")) - case Some(wcfg) => { - // first we ensure the wasm source has been fetched - if (!wcfg.source.isCached()(env)) { - wcfg.source - .getWasm()(env, env.otoroshiExecutionContext) - .andThen { case _ => - priorityQueue.offer(action) - }(env.otoroshiExecutionContext) - } else { - val changed = hasChanged(wcfg) - val available = hasAvailableVm(wcfg) - val creating = isVmCreating() - val atMax = atMaxPoolCapacity(wcfg) - // then we check if the underlying wasmcode + config has not changed since last time - if (changed) { - // if so, we destroy all current vms and recreate a new one - WasmVmPool.logger.warn("plugin has changed, destroying old instances") - destroyCurrentVms() - createVm(wcfg, action.options) - } - // check if a vm is available - if (!available) { - // if not, but a new one is creating, just wait a little bit more - if (creating) { - priorityQueue.offer(action) - } else { - // check if we hit the max possible instances - if (atMax) { - // if so, just wait - priorityQueue.offer(action) - } else { - // if not, create a new instance because we need one - createVm(wcfg, action.options) - priorityQueue.offer(action) - } - } - } else { - // if so, acquire one - val vm = acquireVm() - action.provideVm(vm) - } - } - } - } - } catch { - case t: Throwable => - t.printStackTrace() - action.fail(t) - } - - // create a new vm for the pool - // we try to create vm one by one and to not create more than needed - private def createVm(config: WasmConfig, options: WasmVmInitOptions): Unit = synchronized { - if (creatingRef.compareAndSet(false, true)) { - val index = counter.incrementAndGet() - WasmVmPool.logger.debug(s"creating vm: ${index}") - if (templateRef.get() == null) { - if (!config.source.isCached()(env)) { - // this part should never happen anymore, but just in case - WasmVmPool.logger.warn("fetching missing source") - Await.result(config.source.getWasm()(env, env.otoroshiExecutionContext), 30.seconds) - } - lastPluginVersion.set(computeHash(config, config.source.cacheKey, WasmUtils.scriptCache(env))) - val cache = WasmUtils.scriptCache(env) - val key = config.source.cacheKey - val wasm = cache(key).asInstanceOf[CachedWasmScript].script - val hash = wasm.sha256 - val resolver = new WasmSourceResolver() - val source = resolver.resolve("wasm", wasm.toByteBuffer.array()) - templateRef.set( - new WasmOtoroshiTemplate( - engine, - hash, - new Manifest( - Seq[org.extism.sdk.wasm.WasmSource](source).asJava, - new MemoryOptions(config.memoryPages), - config.config.asJava, - config.allowedHosts.asJava, - config.allowedPaths.asJava - ) - ) - ) - } - val template = templateRef.get() - val vmDataRef = new AtomicReference[VmData](null) - val addedFunctions = options.addHostFunctions(vmDataRef) - val functions: Array[WasmOtoroshiHostFunction[_ <: WasmOtoroshiHostUserData]] = - if (options.importDefaultHostFunctions) { - HostFunctions.getFunctions(config, stableId, None)(env, env.otoroshiExecutionContext) ++ addedFunctions - } else { - addedFunctions.toArray[WasmOtoroshiHostFunction[_ <: WasmOtoroshiHostUserData]] - } - val memories = LinearMemories.getMemories(config) - val instance = template.instantiate(engine, functions, memories, config.wasi) - val vm = WasmVm( - index, - config.killOptions.maxCalls, - config.memoryPages * (64L * 1024L), - options.resetMemory, - instance, - vmDataRef, - memories, - functions, - this - ) - availableVms.offer(vm) - creatingRef.compareAndSet(true, false) - } - } - - // acquire an available vm for work - private def acquireVm(): WasmVm = synchronized { - if (availableVms.size() > 0) { - availableVms.synchronized { - val vm = availableVms.poll() - availableVms.remove(vm) - inUseVms.offer(vm) - vm - } - } else { - throw new RuntimeException("no instances available") - } - } - - // release the vm to be available for other tasks - private[wasm] def release(vm: WasmVm): Unit = synchronized { - availableVms.synchronized { - availableVms.offer(vm) - inUseVms.remove(vm) - } - } - - // do not consider the vm anymore for more work (the vm is being dropped for some reason) - private[wasm] def ignore(vm: WasmVm): Unit = synchronized { - availableVms.synchronized { - inUseVms.remove(vm) - } - } - - // do not consider the vm anymore for more work (the vm is being dropped for some reason) - private[wasm] def clear(vm: WasmVm): Unit = synchronized { - availableVms.synchronized { - availableVms.remove(vm) - } - } - - private[wasm] def wasmConfig(): Option[WasmConfig] = { - optConfig.orElse(env.proxyState.wasmPlugin(stableId).map(_.config)) - } - - private def hasAvailableVm(plugin: WasmConfig): Boolean = - availableVms.size() > 0 && (inUseVms.size < plugin.instances) - - private def isVmCreating(): Boolean = creatingRef.get() - - private def atMaxPoolCapacity(plugin: WasmConfig): Boolean = (availableVms.size + inUseVms.size) >= plugin.instances - - // close the current pool - private[wasm] def close(): Unit = availableVms.synchronized { - engine.close() - } - - // destroy all vms and clear everything in order to destroy the current pool - private[wasm] def destroyCurrentVms(): Unit = availableVms.synchronized { - WasmVmPool.logger.info("destroying all vms") - availableVms.asScala.foreach(_.destroy()) - availableVms.clear() - inUseVms.clear() - //counter.set(0) - templateRef.set(null) - creatingRef.set(false) - lastPluginVersion.set(null) - } - - // compute the current hash for a tuple (wasmcode + config) - private def computeHash( - config: WasmConfig, - key: String, - cache: UnboundedTrieMap[String, CacheableWasmScript] - ): String = { - config.json.stringify.sha512 + "#" + cache - .get(key) - .map { - case CacheableWasmScript.CachedWasmScript(wasm, _) => wasm.sha512 - case _ => "fetching" - } - .getOrElse("null") - } - - // compute if the source (wasm code + config) is the same than current - private def hasChanged(config: WasmConfig): Boolean = availableVms.synchronized { - val key = config.source.cacheKey - val cache = WasmUtils.scriptCache(env) - var oldHash = lastPluginVersion.get() - if (oldHash == null) { - oldHash = computeHash(config, key, cache) - lastPluginVersion.set(oldHash) - } - cache.get(key) match { - case Some(CacheableWasmScript.CachedWasmScript(_, _)) => { - val currentHash = computeHash(config, key, cache) - oldHash != currentHash - } - case _ => false - } - } - - // get a pooled vm when one available. - // Do not forget to release it after usage - def getPooledVm(options: WasmVmInitOptions = WasmVmInitOptions.empty()): Future[WasmVm] = { - val p = Promise[WasmVm]() - requestsQueue.offer(WasmVmPoolAction(p, options)) - p.future - } - - // borrow a vm for sync operations - def withPooledVm[A](options: WasmVmInitOptions = WasmVmInitOptions.empty())(f: WasmVm => A): Future[A] = { - implicit val ev = env - implicit val ec = env.otoroshiExecutionContext - getPooledVm(options).flatMap { vm => - val p = Promise[A]() - try { - val ret = f(vm) - p.trySuccess(ret) - } catch { - case e: Throwable => - p.tryFailure(e) - } finally { - vm.release() - } - p.future - } - } - - // borrow a vm for async operations - def withPooledVmF[A](options: WasmVmInitOptions = WasmVmInitOptions.empty())(f: WasmVm => Future[A]): Future[A] = { - implicit val ev = env - implicit val ec = env.otoroshiExecutionContext - getPooledVm(options).flatMap { vm => - f(vm).andThen { case _ => - vm.release() - } - } - } -} - -case class WasmVmInitOptions( - importDefaultHostFunctions: Boolean = true, - resetMemory: Boolean = true, - addHostFunctions: (AtomicReference[VmData]) => Seq[WasmOtoroshiHostFunction[_ <: WasmOtoroshiHostUserData]] = _ => - Seq.empty -) - -object WasmVmInitOptions { - def empty(): WasmVmInitOptions = WasmVmInitOptions( - importDefaultHostFunctions = true, - resetMemory = true, - addHostFunctions = _ => Seq.empty - ) -} - -// this job tries to kill unused wasm vms and unused pools to save memory -class WasmVmPoolCleaner extends Job { - - private val logger = Logger("otoroshi-wasm-vm-pool-cleaner") - - override def uniqueId: JobId = JobId("otoroshi.wasm.WasmVmPoolCleaner") - - override def visibility: NgPluginVisibility = NgPluginVisibility.NgInternal - - override def steps: Seq[NgStep] = Seq(NgStep.Job) - - override def kind: JobKind = JobKind.ScheduledEvery - - override def starting: JobStarting = JobStarting.Automatically - - override def instantiation(ctx: JobContext, env: Env): JobInstantiation = - JobInstantiation.OneInstancePerOtoroshiInstance - - override def initialDelay(ctx: JobContext, env: Env): Option[FiniteDuration] = 10.seconds.some - - override def interval(ctx: JobContext, env: Env): Option[FiniteDuration] = 60.seconds.some - - override def jobRun(ctx: JobContext)(implicit env: Env, ec: ExecutionContext): Future[Unit] = { - val config = env.datastores.globalConfigDataStore - .latest() - .plugins - .config - .select("wasm-vm-pool-cleaner-config") - .asOpt[JsObject] - .getOrElse(Json.obj()) - val globalNotUsedDuration = config.select("not-used-duration").asOpt[Long].map(v => v.millis).getOrElse(5.minutes) - WasmVmPool.allInstances().foreach { case (key, pool) => - if (pool.inUseVms.isEmpty && pool.availableVms.isEmpty) { - logger.warn(s"will destroy 1 wasm vms pool") - pool.destroyCurrentVms() - pool.close() - WasmVmPool.removePlugin(key) - } else { - val options = pool.wasmConfig().map(_.killOptions) - if (!options.exists(_.immortal)) { - val maxDur = options.map(_.maxUnusedDuration).getOrElse(globalNotUsedDuration) - val unusedVms = pool.availableVms.asScala.filter(_.hasNotBeenUsedInTheLast(maxDur)) - val tooMuchMemoryVms = (pool.availableVms.asScala ++ pool.inUseVms.asScala) - .filter(_.consumesMoreThanMemoryPercent(options.map(_.maxMemoryUsage).getOrElse(0.9))) - val tooSlowVms = (pool.availableVms.asScala ++ pool.inUseVms.asScala) - .filter(_.tooSlow(options.map(_.maxAvgCallDuration.toNanos).getOrElse(1.day.toNanos))) - val allVms = unusedVms ++ tooMuchMemoryVms ++ tooSlowVms - if (allVms.nonEmpty) { - logger.warn(s"will destroy ${allVms.size} wasm vms") - if (unusedVms.nonEmpty) logger.warn(s" - ${unusedVms.size} because unused for more than ${maxDur.toHours}") - if (tooMuchMemoryVms.nonEmpty) logger.warn(s" - ${tooMuchMemoryVms.size} because of too much memory used") - if (tooSlowVms.nonEmpty) logger.warn(s" - ${tooSlowVms.size} because of avg call duration too long") - } - allVms.foreach { vm => - if (vm.isBusy()) { - vm.destroyAtRelease() - } else { - vm.ignore() - vm.destroy() - } - } - } - } - } - ().vfuture - } -} - -case class WasmVmKillOptions( - immortal: Boolean = false, - maxCalls: Int = Int.MaxValue, - maxMemoryUsage: Double = 0.0, - maxAvgCallDuration: FiniteDuration = 0.nano, - maxUnusedDuration: FiniteDuration = 5.minutes -) { - def json: JsValue = WasmVmKillOptions.format.writes(this) -} - -object WasmVmKillOptions { - val default = WasmVmKillOptions() - val format = new Format[WasmVmKillOptions] { - override def writes(o: WasmVmKillOptions): JsValue = Json.obj( - "immortal" -> o.immortal, - "max_calls" -> o.maxCalls, - "max_memory_usage" -> o.maxMemoryUsage, - "max_avg_call_duration" -> o.maxAvgCallDuration.toMillis, - "max_unused_duration" -> o.maxUnusedDuration.toMillis - ) - override def reads(json: JsValue): JsResult[WasmVmKillOptions] = Try { - WasmVmKillOptions( - immortal = json.select("immortal").asOpt[Boolean].getOrElse(false), - maxCalls = json.select("max_calls").asOpt[Int].getOrElse(Int.MaxValue), - maxMemoryUsage = json.select("max_memory_usage").asOpt[Double].getOrElse(0.0), - maxAvgCallDuration = json.select("max_avg_call_duration").asOpt[Long].map(_.millis).getOrElse(0.nano), - maxUnusedDuration = json.select("max_unused_duration").asOpt[Long].map(_.millis).getOrElse(5.minutes) - ) - } match { - case Failure(e) => JsError(e.getMessage) - case Success(e) => JsSuccess(e) - } - } -} -*/ \ No newline at end of file diff --git a/otoroshi/app/wasm/types.scala b/otoroshi/app/wasm/types.scala deleted file mode 100644 index afdcb91b9a..0000000000 --- a/otoroshi/app/wasm/types.scala +++ /dev/null @@ -1,138 +0,0 @@ -/*package otoroshi.wasm - -import org.extism.sdk.Results -import org.extism.sdk.wasmotoroshi._ -import play.api.libs.json._ -import otoroshi.utils.syntax.implicits._ - -import java.nio.charset.StandardCharsets - -sealed abstract class WasmFunctionParameters { - def functionName: String - def input: Option[String] - def parameters: Option[WasmOtoroshiParameters] - def resultSize: Option[Int] - def call(plugin: WasmOtoroshiInstance): Either[JsValue, (String, ResultsWrapper)] - def withInput(input: Option[String]): WasmFunctionParameters - def withFunctionName(functionName: String): WasmFunctionParameters -} - -object WasmFunctionParameters { - def from( - functionName: String, - input: Option[String], - parameters: Option[WasmOtoroshiParameters], - resultSize: Option[Int] - ) = { - (input, parameters, resultSize) match { - case (_, Some(p), Some(s)) => BothParamsResults(functionName, p, s) - case (_, Some(p), None) => NoResult(functionName, p) - case (_, None, Some(s)) => NoParams(functionName, s) - case (Some(in), None, None) => ExtismFuntionCall(functionName, in) - case _ => UnknownCombination() - } - } - - case class UnknownCombination( - functionName: String = "unknown", - input: Option[String] = None, - parameters: Option[WasmOtoroshiParameters] = None, - resultSize: Option[Int] = None - ) extends WasmFunctionParameters { - override def call(plugin: WasmOtoroshiInstance): Either[JsValue, (String, ResultsWrapper)] = { - Left(Json.obj("error" -> "bad call combination")) - } - def withInput(input: Option[String]): WasmFunctionParameters = this.copy(input = input) - def withFunctionName(functionName: String): WasmFunctionParameters = this.copy(functionName = functionName) - } - - case class NoResult( - functionName: String, - params: WasmOtoroshiParameters, - input: Option[String] = None, - resultSize: Option[Int] = None - ) extends WasmFunctionParameters { - override def parameters: Option[WasmOtoroshiParameters] = Some(params) - override def call(plugin: WasmOtoroshiInstance): Either[JsValue, (String, ResultsWrapper)] = { - plugin.callWithoutResults(functionName, parameters.get) - Right[JsValue, (String, ResultsWrapper)](("", ResultsWrapper(new WasmOtoroshiResults(0), plugin))) - } - override def withInput(input: Option[String]): WasmFunctionParameters = this.copy(input = input) - override def withFunctionName(functionName: String): WasmFunctionParameters = this.copy(functionName = functionName) - } - - case class NoParams( - functionName: String, - result: Int, - input: Option[String] = None, - parameters: Option[WasmOtoroshiParameters] = None - ) extends WasmFunctionParameters { - override def resultSize: Option[Int] = Some(result) - override def call(plugin: WasmOtoroshiInstance): Either[JsValue, (String, ResultsWrapper)] = { - plugin - .callWithoutParams(functionName, resultSize.get) - .right - .map(_ => ("", ResultsWrapper(new WasmOtoroshiResults(0), plugin))) - } - override def withInput(input: Option[String]): WasmFunctionParameters = this.copy(input = input) - override def withFunctionName(functionName: String): WasmFunctionParameters = this.copy(functionName = functionName) - } - - case class BothParamsResults( - functionName: String, - params: WasmOtoroshiParameters, - result: Int, - input: Option[String] = None - ) extends WasmFunctionParameters { - override def parameters: Option[WasmOtoroshiParameters] = Some(params) - override def resultSize: Option[Int] = Some(result) - override def call(plugin: WasmOtoroshiInstance): Either[JsValue, (String, ResultsWrapper)] = { - plugin - .call(functionName, parameters.get, resultSize.get) - .right - .map(res => ("", ResultsWrapper(res, plugin))) - } - override def withInput(input: Option[String]): WasmFunctionParameters = this.copy(input = input) - override def withFunctionName(functionName: String): WasmFunctionParameters = this.copy(functionName = functionName) - } - - case class ExtismFuntionCall( - functionName: String, - in: String, - parameters: Option[WasmOtoroshiParameters] = None, - resultSize: Option[Int] = None - ) extends WasmFunctionParameters { - override def input: Option[String] = Some(in) - override def call(plugin: WasmOtoroshiInstance): Either[JsValue, (String, ResultsWrapper)] = { - plugin - .extismCall(functionName, input.get.getBytes(StandardCharsets.UTF_8)) - .right - .map { str => - (str, ResultsWrapper(new WasmOtoroshiResults(0), plugin)) - } - } - - override def withInput(input: Option[String]): WasmFunctionParameters = this.copy(in = input.get) - override def withFunctionName(functionName: String): WasmFunctionParameters = this.copy(functionName = functionName) - } - - case class OPACall(functionName: String, pointers: Option[OPAWasmVm] = None, in: String) - extends WasmFunctionParameters { - override def input: Option[String] = Some(in) - - override def call(plugin: WasmOtoroshiInstance): Either[JsValue, (String, ResultsWrapper)] = { - if (functionName == "initialize") - OPA.initialize(plugin) - else - OPA.evaluate(plugin, pointers.get.opaDataAddr, pointers.get.opaBaseHeapPtr, in) - } - - override def withInput(input: Option[String]): WasmFunctionParameters = this.copy(in = input.get) - - override def withFunctionName(functionName: String): WasmFunctionParameters = this - override def parameters: Option[WasmOtoroshiParameters] = None - override def resultSize: Option[Int] = None - } -} - -*/ diff --git a/otoroshi/app/wasm/wasm.scala b/otoroshi/app/wasm/wasm.scala index 70482541f5..750f2abeb3 100644 --- a/otoroshi/app/wasm/wasm.scala +++ b/otoroshi/app/wasm/wasm.scala @@ -6,9 +6,7 @@ import org.extism.sdk.wasmotoroshi.{WasmOtoroshiHostFunction, WasmOtoroshiHostUs import otoroshi.env.Env import otoroshi.next.models.NgTlsConfig import otoroshi.next.plugins.api.{NgPluginConfig, NgPluginVisibility, NgStep} -import otoroshi.script.{Job, JobContext, JobId, JobInstantiation, JobKind, JobStarting} -import otoroshi.utils.TypedMap -import otoroshi.utils.cache.types.UnboundedTrieMap +import otoroshi.script._ import otoroshi.utils.syntax.implicits._ import play.api.Logger import play.api.libs.json._ @@ -21,334 +19,6 @@ import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} -/* -import akka.util.ByteString -import org.extism.sdk.wasmotoroshi._ -import otoroshi.env.Env -import otoroshi.models.{WSProxyServerJson, WasmManagerSettings} -import otoroshi.next.models.NgTlsConfig -import otoroshi.next.plugins.api._ -import otoroshi.utils.cache.types.UnboundedTrieMap -import otoroshi.utils.http.MtlsConfig -import otoroshi.utils.syntax.implicits._ -import play.api.libs.json._ - -import java.nio.file.{Files, Paths} -import scala.concurrent.duration.{DurationLong, FiniteDuration, MILLISECONDS} -import scala.concurrent.{ExecutionContext, Future, Promise} -import scala.util.{Failure, Success, Try} - -sealed trait WasmSourceKind { - def name: String - def json: JsValue = JsString(name) - def getWasm(path: String, opts: JsValue)(implicit env: Env, ec: ExecutionContext): Future[Either[JsValue, ByteString]] - def getConfig(path: String, opts: JsValue)(implicit env: Env, ec: ExecutionContext): Future[Option[WasmConfig]] = - None.vfuture -} - -object WasmSourceKind { - case object Unknown extends WasmSourceKind { - def name: String = "Unknown" - def getWasm(path: String, opts: JsValue)(implicit - env: Env, - ec: ExecutionContext - ): Future[Either[JsValue, ByteString]] = { - Left(Json.obj("error" -> "unknown source")).vfuture - } - } - case object Base64 extends WasmSourceKind { - def name: String = "Base64" - def getWasm(path: String, opts: JsValue)(implicit - env: Env, - ec: ExecutionContext - ): Future[Either[JsValue, ByteString]] = { - ByteString(path.replace("base64://", "")).decodeBase64.right.future - } - } - case object Http extends WasmSourceKind { - def name: String = "Http" - def getWasm(path: String, opts: JsValue)(implicit - env: Env, - ec: ExecutionContext - ): Future[Either[JsValue, ByteString]] = { - val method = opts.select("method").asOpt[String].getOrElse("GET") - val headers = opts.select("headers").asOpt[Map[String, String]].getOrElse(Map.empty) - val timeout = opts.select("timeout").asOpt[Long].getOrElse(10000L).millis - val followRedirect = opts.select("followRedirect").asOpt[Boolean].getOrElse(true) - val proxy = opts.select("proxy").asOpt[JsObject].flatMap(v => WSProxyServerJson.proxyFromJson(v)) - val tlsConfig = - opts.select("tls").asOpt(NgTlsConfig.format).map(_.legacy).orElse(opts.select("tls").asOpt(MtlsConfig.format)) - (tlsConfig match { - case None => env.Ws.url(path) - case Some(cfg) => env.MtlsWs.url(path, cfg) - }) - .withMethod(method) - .withFollowRedirects(followRedirect) - .withHttpHeaders(headers.toSeq: _*) - .withRequestTimeout(timeout) - .applyOnWithOpt(proxy) { case (req, proxy) => - req.withProxyServer(proxy) - } - .execute() - .map { resp => - if (resp.status == 200) { - val body = resp.bodyAsBytes - Right(body) - } else { - val body: String = resp.body - Left( - Json.obj( - "error" -> "bad response", - "status" -> resp.status, - "headers" -> resp.headers.mapValues(_.last), - "body" -> body - ) - ) - } - } - } - } - case object WasmManager extends WasmSourceKind { - def name: String = "WasmManager" - def getWasm(path: String, opts: JsValue)(implicit - env: Env, - ec: ExecutionContext - ): Future[Either[JsValue, ByteString]] = { - env.datastores.globalConfigDataStore.singleton().flatMap { globalConfig => - globalConfig.wasmManagerSettings match { - case Some(WasmManagerSettings(url, clientId, clientSecret, kind)) => { - env.Ws - .url(s"$url/wasm/$path") - .withFollowRedirects(false) - .withRequestTimeout(FiniteDuration(5 * 1000, MILLISECONDS)) - .withHttpHeaders( - "Accept" -> "application/json", - "Otoroshi-Client-Id" -> clientId, - "Otoroshi-Client-Secret" -> clientSecret, - "kind" -> kind.getOrElse("*") - ) - .get() - .flatMap { resp => - if (resp.status == 400) { - Left(Json.obj("error" -> "missing signed plugin url")).vfuture - } else { - Right(resp.bodyAsBytes).vfuture - } - } - } - case _ => - Left(Json.obj("error" -> "missing wasm manager url")).vfuture - } - } - - } - } - case object Local extends WasmSourceKind { - def name: String = "Local" - override def getWasm(path: String, opts: JsValue)(implicit - env: Env, - ec: ExecutionContext - ): Future[Either[JsValue, ByteString]] = { - env.proxyState.wasmPlugin(path) match { - case None => Left(Json.obj("error" -> "resource not found")).vfuture - case Some(plugin) => plugin.config.source.getWasm() - } - } - override def getConfig(path: String, opts: JsValue)(implicit - env: Env, - ec: ExecutionContext - ): Future[Option[WasmConfig]] = { - env.proxyState.wasmPlugin(path).map(_.config).vfuture - } - } - case object File extends WasmSourceKind { - def name: String = "File" - def getWasm(path: String, opts: JsValue)(implicit - env: Env, - ec: ExecutionContext - ): Future[Either[JsValue, ByteString]] = { - Right(ByteString(Files.readAllBytes(Paths.get(path.replace("file://", ""))))).vfuture - } - } - - def apply(value: String): WasmSourceKind = value.toLowerCase match { - case "base64" => Base64 - case "http" => Http - case "wasmmanager" => WasmManager - case "local" => Local - case "file" => File - case _ => Unknown - } -} - -case class WasmSource(kind: WasmSourceKind, path: String, opts: JsValue = Json.obj()) { - def json: JsValue = WasmSource.format.writes(this) - def cacheKey = s"${kind.name.toLowerCase}://${path}" - def getConfig()(implicit env: Env, ec: ExecutionContext): Future[Option[WasmConfig]] = kind.getConfig(path, opts) - def isCached()(implicit env: Env): Boolean = { - val cache = WasmUtils.scriptCache(env) - cache.get(cacheKey) match { - case Some(CacheableWasmScript.CachedWasmScript(_, _)) => true - case _ => false - } - } - def getWasm()(implicit env: Env, ec: ExecutionContext): Future[Either[JsValue, ByteString]] = { - val cache = WasmUtils.scriptCache(env) - def fetchAndAddToCache(): Future[Either[JsValue, ByteString]] = { - val promise = Promise[Either[JsValue, ByteString]]() - cache.put(cacheKey, CacheableWasmScript.FetchingWasmScript(promise.future)) - kind.getWasm(path, opts).map { - case Left(err) => - promise.trySuccess(err.left) - err.left - case Right(bs) => { - cache.put(cacheKey, CacheableWasmScript.CachedWasmScript(bs, System.currentTimeMillis())) - promise.trySuccess(bs.right) - bs.right - } - } - } - cache.get(cacheKey) match { - case None => fetchAndAddToCache() - case Some(CacheableWasmScript.FetchingWasmScript(fu)) => fu - case Some(CacheableWasmScript.CachedWasmScript(script, createAt)) - if createAt + env.wasmCacheTtl < System.currentTimeMillis => - fetchAndAddToCache() - script.right.vfuture - case Some(CacheableWasmScript.CachedWasmScript(script, _)) => script.right.vfuture - } - } -} - -object WasmSource { - val format = new Format[WasmSource] { - override def writes(o: WasmSource): JsValue = Json.obj( - "kind" -> o.kind.json, - "path" -> o.path, - "opts" -> o.opts - ) - override def reads(json: JsValue): JsResult[WasmSource] = Try { - WasmSource( - kind = json.select("kind").asOpt[String].map(WasmSourceKind.apply).getOrElse(WasmSourceKind.Unknown), - path = json.select("path").asString, - opts = json.select("opts").asOpt[JsValue].getOrElse(Json.obj()) - ) - } match { - case Success(s) => JsSuccess(s) - case Failure(e) => JsError(e.getMessage) - } - } -} - -case class WasmAuthorizations( - httpAccess: Boolean = false, - globalDataStoreAccess: WasmDataRights = WasmDataRights(), - pluginDataStoreAccess: WasmDataRights = WasmDataRights(), - globalMapAccess: WasmDataRights = WasmDataRights(), - pluginMapAccess: WasmDataRights = WasmDataRights(), - proxyStateAccess: Boolean = false, - configurationAccess: Boolean = false, - proxyHttpCallTimeout: Int = 5000 -) { - def json: JsValue = WasmAuthorizations.format.writes(this) -} - -object WasmAuthorizations { - val format = new Format[WasmAuthorizations] { - override def writes(o: WasmAuthorizations): JsValue = Json.obj( - "httpAccess" -> o.httpAccess, - "proxyHttpCallTimeout" -> o.proxyHttpCallTimeout, - "globalDataStoreAccess" -> WasmDataRights.fmt.writes(o.globalDataStoreAccess), - "pluginDataStoreAccess" -> WasmDataRights.fmt.writes(o.pluginDataStoreAccess), - "globalMapAccess" -> WasmDataRights.fmt.writes(o.globalMapAccess), - "pluginMapAccess" -> WasmDataRights.fmt.writes(o.pluginMapAccess), - "proxyStateAccess" -> o.proxyStateAccess, - "configurationAccess" -> o.configurationAccess - ) - override def reads(json: JsValue): JsResult[WasmAuthorizations] = Try { - WasmAuthorizations( - httpAccess = (json \ "httpAccess").asOpt[Boolean].getOrElse(false), - proxyHttpCallTimeout = (json \ "proxyHttpCallTimeout").asOpt[Int].getOrElse(5000), - globalDataStoreAccess = (json \ "globalDataStoreAccess") - .asOpt[WasmDataRights](WasmDataRights.fmt.reads) - .getOrElse(WasmDataRights()), - pluginDataStoreAccess = (json \ "pluginDataStoreAccess") - .asOpt[WasmDataRights](WasmDataRights.fmt.reads) - .getOrElse(WasmDataRights()), - globalMapAccess = (json \ "globalMapAccess") - .asOpt[WasmDataRights](WasmDataRights.fmt.reads) - .getOrElse(WasmDataRights()), - pluginMapAccess = (json \ "pluginMapAccess") - .asOpt[WasmDataRights](WasmDataRights.fmt.reads) - .getOrElse(WasmDataRights()), - proxyStateAccess = (json \ "proxyStateAccess").asOpt[Boolean].getOrElse(false), - configurationAccess = (json \ "configurationAccess").asOpt[Boolean].getOrElse(false) - ) - } match { - case Failure(ex) => JsError(ex.getMessage) - case Success(value) => JsSuccess(value) - } - } -} - -sealed trait WasmVmLifetime { - def name: String - def json: JsValue = JsString(name) -} - -object WasmVmLifetime { - - case object Invocation extends WasmVmLifetime { def name: String = "Invocation" } - case object Request extends WasmVmLifetime { def name: String = "Request" } - case object Forever extends WasmVmLifetime { def name: String = "Forever" } - - def parse(str: String): Option[WasmVmLifetime] = str.toLowerCase() match { - case "invocation" => Invocation.some - case "request" => Request.some - case "forever" => Forever.some - case _ => None - } -} - -object ResultsWrapper { - def apply(results: WasmOtoroshiResults): ResultsWrapper = new ResultsWrapper(results, None) - def apply(results: WasmOtoroshiResults, plugin: WasmOtoroshiInstance): ResultsWrapper = - new ResultsWrapper(results, Some(plugin)) -} - -case class ResultsWrapper(results: WasmOtoroshiResults, pluginOpt: Option[WasmOtoroshiInstance]) { - def free(): Unit = try { - if (results.getLength > 0) { - results.close() - } - } catch { - case t: Throwable => - t.printStackTrace() - () - } -} - -class WasmContext( - plugins: UnboundedTrieMap[String, WasmContextSlot] = new UnboundedTrieMap[String, WasmContextSlot]() -) { - def put(id: String, slot: WasmContextSlot): Unit = plugins.put(id, slot) - def get(id: String): Option[WasmContextSlot] = plugins.get(id) - def close(): Unit = { - if (WasmUtils.logger.isDebugEnabled) - WasmUtils.logger.debug(s"[WasmContext] will close ${plugins.size} wasm plugin instances") - plugins.foreach(_._2.forceClose()) - plugins.clear() - } -} - -sealed trait CacheableWasmScript - -object CacheableWasmScript { - case class CachedWasmScript(script: ByteString, createAt: Long) extends CacheableWasmScript - case class FetchingWasmScript(f: Future[Either[JsValue, ByteString]]) extends CacheableWasmScript -} -*/ - case class WasmDataRights(read: Boolean = false, write: Boolean = false) object WasmDataRights {