diff --git a/plugins/wasm-rust/Cargo.lock b/plugins/wasm-rust/Cargo.lock index 06dc3d05ea..899d559b0d 100644 --- a/plugins/wasm-rust/Cargo.lock +++ b/plugins/wasm-rust/Cargo.lock @@ -20,12 +20,24 @@ version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" +[[package]] +name = "bytes" +version = "1.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" + [[package]] name = "cfg-if" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "getrandom" version = "0.2.15" @@ -51,6 +63,8 @@ dependencies = [ name = "higress-wasm-rust" version = "0.1.0" dependencies = [ + "http", + "lazy_static", "multimap", "proxy-wasm", "serde", @@ -58,12 +72,29 @@ dependencies = [ "uuid", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "itoa" version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.155" diff --git a/plugins/wasm-rust/Cargo.toml b/plugins/wasm-rust/Cargo.toml index ff7ab504ce..a1e5472c63 100644 --- a/plugins/wasm-rust/Cargo.toml +++ b/plugins/wasm-rust/Cargo.toml @@ -11,3 +11,5 @@ serde = "1.0" serde_json = "1.0" uuid = { version = "1.3.3", features = ["v4"] } multimap = "0" +http = "1" +lazy_static = "1" diff --git a/plugins/wasm-rust/extensions/demo-wasm/Cargo.lock b/plugins/wasm-rust/extensions/demo-wasm/Cargo.lock new file mode 100644 index 0000000000..85e2edaea3 --- /dev/null +++ b/plugins/wasm-rust/extensions/demo-wasm/Cargo.lock @@ -0,0 +1,263 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "ahash" +version = "0.8.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + +[[package]] +name = "allocator-api2" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" + +[[package]] +name = "bytes" +version = "1.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "demo-wasm" +version = "0.1.0" +dependencies = [ + "higress-wasm-rust", + "http", + "multimap", + "proxy-wasm", + "serde", +] + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "getrandom" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +dependencies = [ + "ahash", + "allocator-api2", +] + +[[package]] +name = "higress-wasm-rust" +version = "0.1.0" +dependencies = [ + "http", + "lazy_static", + "multimap", + "proxy-wasm", + "serde", + "serde_json", + "uuid", +] + +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "itoa" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" + +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + +[[package]] +name = "libc" +version = "0.2.157" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "374af5f94e54fa97cf75e945cce8a6b201e88a1a07e688b47dfd2a59c66dbd86" + +[[package]] +name = "log" +version = "0.4.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "multimap" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +dependencies = [ + "serde", +] + +[[package]] +name = "once_cell" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" + +[[package]] +name = "proc-macro2" +version = "1.0.86" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e719e8df665df0d1c8fbfd238015744736151d4445ec0836b8e628aae103b77" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "proxy-wasm" +version = "0.2.2" +source = "git+https://github.com/higress-group/proxy-wasm-rust-sdk?branch=main#73833051f57d483570cf5aaa9d62bd7402fae63b" +dependencies = [ + "hashbrown", + "log", +] + +[[package]] +name = "quote" +version = "1.0.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "ryu" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" + +[[package]] +name = "serde" +version = "1.0.208" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cff085d2cb684faa248efb494c39b68e522822ac0de72ccf08109abde717cfb2" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.208" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24008e81ff7613ed8e5ba0cfaf24e2c2f1e5b8a0495711e44fcd4882fca62bcf" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.125" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83c8e735a073ccf5be70aa8066aa984eaf2fa000db6c8d0100ae605b366d31ed" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + +[[package]] +name = "syn" +version = "2.0.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6af063034fc1935ede7be0122941bafa9bacb949334d090b77ca98b5817c7d9" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "unicode-ident" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" + +[[package]] +name = "uuid" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +dependencies = [ + "getrandom", +] + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/plugins/wasm-rust/extensions/demo-wasm/Cargo.toml b/plugins/wasm-rust/extensions/demo-wasm/Cargo.toml new file mode 100644 index 0000000000..a517c2b531 --- /dev/null +++ b/plugins/wasm-rust/extensions/demo-wasm/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "demo-wasm" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +crate-type = ["cdylib"] + +[dependencies] +higress-wasm-rust = { path = "../../", version = "0.1.0" } +proxy-wasm = { git="https://github.com/higress-group/proxy-wasm-rust-sdk", branch="main", version="0.2.2" } +serde = { version = "1.0", features = ["derive"] } +multimap = "*" +http = "*" \ No newline at end of file diff --git a/plugins/wasm-rust/extensions/demo-wasm/src/lib.rs b/plugins/wasm-rust/extensions/demo-wasm/src/lib.rs new file mode 100644 index 0000000000..55647a83cc --- /dev/null +++ b/plugins/wasm-rust/extensions/demo-wasm/src/lib.rs @@ -0,0 +1,203 @@ +use higress_wasm_rust::cluster_wrapper::DnsCluster; +use higress_wasm_rust::log::Log; +use higress_wasm_rust::plugin_wrapper::{ + HttpCallArgStorage, HttpCallbackFn, HttpContextWrapper, RootContextWrapper, +}; +use higress_wasm_rust::rule_matcher::{on_configure, RuleMatcher, SharedRuleMatcher}; +use http::Method; +use multimap::MultiMap; +use proxy_wasm::traits::{Context, HttpContext, RootContext}; +use proxy_wasm::types::{Bytes, ContextType, DataAction, HeaderAction, LogLevel}; + +use serde::Deserialize; +use std::cell::RefCell; +use std::ops::DerefMut; +use std::rc::Rc; +use std::time::Duration; + +proxy_wasm::main! {{ + proxy_wasm::set_log_level(LogLevel::Trace); + proxy_wasm::set_root_context(|_|Box::new(DemoWasmRoot::new())); +}} + +const PLUGIN_NAME: &str = "demo-wasm"; + +#[derive(Default, Debug, Deserialize, Clone)] +struct DemoWasmConfig { + // 配置文件结构体 + test: String, +} + +fn format_body(body: Option>) -> String { + if let Some(bd) = &body { + if let Ok(b) = std::str::from_utf8(bd) { + return b.to_string(); + } + } + format!("{:?}", body) +} + +fn test_callback( + this: &mut DemoWasm, + status_code: u16, + headers: &MultiMap, + body: Option>, +) { + this.log.info(&format!( + "test_callback status_code:{}, headers: {:?}, body: {}", + status_code, + headers, + format_body(body) + )); + this.reset_http_request(); +} +struct DemoWasm { + // 每个请求对应的插件实例 + log: Log, + config: Option, + + arg_storage: HttpCallArgStorage>>, +} + +impl Context for DemoWasm {} +impl HttpContext for DemoWasm {} +impl HttpContextWrapper>> for DemoWasm { + fn log(&self) -> &Log { + &self.log + } + fn get_http_call_storage( + &mut self, + ) -> Option<&mut HttpCallArgStorage>>> { + Some(&mut self.arg_storage) + } + fn on_config(&mut self, config: &DemoWasmConfig) { + // 获取config + self.log.info(&format!("on_config {}", config.test)); + self.config = Some(config.clone()) + } + fn on_http_request_complete_headers( + &mut self, + headers: &MultiMap, + ) -> HeaderAction { + // 请求header获取完成回调 + self.log + .info(&format!("on_http_request_complete_headers {:?}", headers)); + HeaderAction::Continue + } + fn on_http_response_complete_headers( + &mut self, + headers: &MultiMap, + ) -> HeaderAction { + // 返回header获取完成回调 + self.log + .info(&format!("on_http_response_complete_headers {:?}", headers)); + HeaderAction::Continue + } + fn cache_request_body(&self) -> bool { + // 是否缓存请求body + true + } + fn cache_response_body(&self) -> bool { + // 是否缓存返回body + true + } + fn on_http_call_response_detail( + &mut self, + _token_id: u32, + arg: Box>, + status_code: u16, + headers: &MultiMap, + body: Option>, + ) { + arg(self, status_code, headers, body) + } + fn on_http_request_complete_body(&mut self, req_body: &Bytes) -> DataAction { + // 请求body获取完成回调 + self.log.info(&format!( + "on_http_request_complete_body {}", + String::from_utf8(req_body.clone()).unwrap_or("".to_string()) + )); + let cluster = DnsCluster::new("httpbin", "httpbin.org", 80); + if self + .http_call( + &cluster, + &Method::POST, + "http://httpbin.org/post", + MultiMap::new(), + Some("test_body".as_bytes()), + // Box::new(move |this, _status_code, _headers, _body| this.resume_http_request()), + Box::new(test_callback), + Duration::from_secs(5), + ) + .is_ok() + { + DataAction::StopIterationAndBuffer + } else { + self.log.info("http_call fail"); + DataAction::Continue + } + } + fn on_http_response_complete_body(&mut self, res_body: &Bytes) -> DataAction { + // 返回body获取完成回调 + self.log.info(&format!( + "on_http_response_complete_body {}", + String::from_utf8(res_body.clone()).unwrap_or("".to_string()) + )); + DataAction::Continue + } +} +struct DemoWasmRoot { + log: Log, + rule_matcher: SharedRuleMatcher, +} +impl DemoWasmRoot { + fn new() -> Self { + let log = Log::new(PLUGIN_NAME.to_string()); + log.info("DemoWasmRoot::new"); + DemoWasmRoot { + log, + rule_matcher: Rc::new(RefCell::new(RuleMatcher::default())), + } + } +} + +impl Context for DemoWasmRoot {} + +impl RootContext for DemoWasmRoot { + fn on_configure(&mut self, _plugin_configuration_size: usize) -> bool { + self.log.info("DemoWasmRoot::on_configure"); + on_configure( + self, + _plugin_configuration_size, + self.rule_matcher.borrow_mut().deref_mut(), + &self.log, + ) + } + fn create_http_context(&self, context_id: u32) -> Option> { + self.log.info(&format!( + "DemoWasmRoot::create_http_context({})", + context_id + )); + self.create_http_context_use_wrapper(context_id) + } + fn get_type(&self) -> Option { + Some(ContextType::HttpContext) + } +} + +impl RootContextWrapper>> for DemoWasmRoot { + fn rule_matcher(&self) -> &SharedRuleMatcher { + &self.rule_matcher + } + + fn create_http_context_wrapper( + &self, + _context_id: u32, + ) -> Option>>>> { + Some(Box::new(DemoWasm { + config: None, + log: Log::new(PLUGIN_NAME.to_string()), + arg_storage: HttpCallArgStorage::new(), + })) + } +} diff --git a/plugins/wasm-rust/src/cluster_wrapper.rs b/plugins/wasm-rust/src/cluster_wrapper.rs new file mode 100644 index 0000000000..2891293fb3 --- /dev/null +++ b/plugins/wasm-rust/src/cluster_wrapper.rs @@ -0,0 +1,259 @@ +use crate::{internal::get_property, request_wrapper::get_request_host}; + +pub trait Cluster { + fn cluster_name(&self) -> String; + fn host_name(&self) -> String; +} +#[derive(Debug, Clone)] +pub struct RouteCluster { + host: String, +} +impl RouteCluster { + pub fn new(host: &str) -> Self { + RouteCluster { + host: host.to_string(), + } + } +} +impl Cluster for RouteCluster { + fn cluster_name(&self) -> String { + if let Some(res) = get_property(vec!["cluster_name"]) { + if let Ok(r) = String::from_utf8(res) { + return r; + } + } + String::new() + } + + fn host_name(&self) -> String { + if !self.host.is_empty() { + return self.host.clone(); + } + + get_request_host() + } +} + +#[derive(Debug, Clone)] +pub struct K8sCluster { + service_name: String, + namespace: String, + port: String, + version: String, + host: String, +} + +impl K8sCluster { + pub fn new(service_name: &str, namespace: &str, port: &str, version: &str, host: &str) -> Self { + K8sCluster { + service_name: service_name.to_string(), + namespace: namespace.to_string(), + port: port.to_string(), + version: version.to_string(), + host: host.to_string(), + } + } +} + +impl Cluster for K8sCluster { + fn cluster_name(&self) -> String { + format!( + "outbound|{}|{}|{}.{}.svc.cluster.local", + self.port, + self.version, + self.service_name, + if self.namespace.is_empty() { + "default" + } else { + &self.namespace + } + ) + } + + fn host_name(&self) -> String { + if self.host.is_empty() { + format!("{}.{}.svc.cluster.local", self.service_name, self.namespace) + } else { + self.host.clone() + } + } +} + +#[derive(Debug, Clone)] +pub struct NacosCluster { + service_name: String, + group: String, + namespace_id: String, + port: u16, + is_ext_registry: bool, + version: String, + host: String, +} + +impl NacosCluster { + pub fn new( + service_name: &str, + group: &str, + namespace_id: &str, + port: u16, + is_ext_registry: bool, + version: &str, + host: &str, + ) -> Self { + NacosCluster { + service_name: service_name.to_string(), + group: group.to_string(), + namespace_id: namespace_id.to_string(), + port, + is_ext_registry, + version: version.to_string(), + host: host.to_string(), + } + } +} +impl Cluster for NacosCluster { + fn cluster_name(&self) -> String { + let group = if self.group.is_empty() { + "DEFAULT-GROUP".to_string() + } else { + self.group.replace('_', "-") + }; + let tail = if self.is_ext_registry { + "nacos-ext" + } else { + "nacos" + }; + format!( + "outbound|{}|{}|{}.{}.{}.{}", + self.port, self.version, self.service_name, group, self.namespace_id, tail + ) + } + + fn host_name(&self) -> String { + if self.host.is_empty() { + self.service_name.clone() + } else { + self.host.clone() + } + } +} + +#[derive(Debug, Clone)] +pub struct StaticIpCluster { + service_name: String, + port: u16, + host: String, +} + +impl StaticIpCluster { + pub fn new(service_name: &str, port: u16, host: &str) -> Self { + StaticIpCluster { + service_name: service_name.to_string(), + port, + host: host.to_string(), + } + } +} +impl Cluster for StaticIpCluster { + fn cluster_name(&self) -> String { + format!("outbound|{}||{}.static", self.port, self.service_name) + } + + fn host_name(&self) -> String { + if self.host.is_empty() { + self.service_name.clone() + } else { + self.host.clone() + } + } +} + +#[derive(Debug, Clone)] +pub struct DnsCluster { + service_name: String, + domain: String, + port: u16, +} + +impl DnsCluster { + pub fn new(service_name: &str, domain: &str, port: u16) -> Self { + DnsCluster { + service_name: service_name.to_string(), + domain: domain.to_string(), + port, + } + } +} +impl Cluster for DnsCluster { + fn cluster_name(&self) -> String { + format!("outbound|{}||{}.dns", self.port, self.service_name) + } + + fn host_name(&self) -> String { + self.domain.clone() + } +} + +#[derive(Debug, Clone)] +pub struct ConsulCluster { + service_name: String, + datacenter: String, + port: u16, + host: String, +} + +impl ConsulCluster { + pub fn new(service_name: &str, datacenter: &str, port: u16, host: &str) -> Self { + ConsulCluster { + service_name: service_name.to_string(), + datacenter: datacenter.to_string(), + port, + host: host.to_string(), + } + } +} +impl Cluster for ConsulCluster { + fn cluster_name(&self) -> String { + format!( + "outbound|{}||{}.{}.consul", + self.port, self.service_name, self.datacenter + ) + } + + fn host_name(&self) -> String { + if self.host.is_empty() { + self.service_name.clone() + } else { + self.host.clone() + } + } +} + +#[derive(Debug, Clone)] +pub struct FQDNCluster { + fqdn: String, + host: String, + port: u16, +} + +impl FQDNCluster { + pub fn new(fqdn: &str, host: &str, port: u16) -> Self { + FQDNCluster { + fqdn: fqdn.to_string(), + host: host.to_string(), + port, + } + } +} +impl Cluster for FQDNCluster { + fn cluster_name(&self) -> String { + format!("outbound|{}||{}", self.port, self.fqdn) + } + fn host_name(&self) -> String { + if self.host.is_empty() { + self.fqdn.clone() + } else { + self.host.clone() + } + } +} diff --git a/plugins/wasm-rust/src/lib.rs b/plugins/wasm-rust/src/lib.rs index 1e38d0cb0b..3296ff648a 100644 --- a/plugins/wasm-rust/src/lib.rs +++ b/plugins/wasm-rust/src/lib.rs @@ -12,8 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod cluster_wrapper; pub mod error; mod internal; pub mod log; pub mod plugin_wrapper; +pub mod request_wrapper; pub mod rule_matcher; diff --git a/plugins/wasm-rust/src/plugin_wrapper.rs b/plugins/wasm-rust/src/plugin_wrapper.rs index ba4e0a2580..25d445f22f 100644 --- a/plugins/wasm-rust/src/plugin_wrapper.rs +++ b/plugins/wasm-rust/src/plugin_wrapper.rs @@ -12,15 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; +use std::time::Duration; + +use crate::cluster_wrapper::Cluster; +use crate::log::Log; use crate::rule_matcher::SharedRuleMatcher; +use http::{method::Method, Uri}; +use lazy_static::lazy_static; use multimap::MultiMap; -use proxy_wasm::hostcalls::log; use proxy_wasm::traits::{Context, HttpContext, RootContext}; -use proxy_wasm::types::LogLevel; -use proxy_wasm::types::{Action, Bytes, DataAction, HeaderAction}; +use proxy_wasm::types::{Action, Bytes, DataAction, HeaderAction, Status}; use serde::de::DeserializeOwned; -pub trait RootContextWrapper: RootContext +lazy_static! { + static ref LOG: Log = Log::new("plugin_wrapper".to_string()); +} + +pub trait RootContextWrapper: RootContext where PluginConfig: Default + DeserializeOwned + 'static + Clone, { @@ -39,11 +48,37 @@ where fn create_http_context_wrapper( &self, _context_id: u32, - ) -> Option>> { + ) -> Option>> { None } } -pub trait HttpContextWrapper: HttpContext { +pub type HttpCallbackFn = dyn FnOnce(&mut T, u16, &MultiMap, Option>); + +pub struct HttpCallArgStorage { + args: HashMap, +} +impl Default for HttpCallArgStorage { + fn default() -> Self { + Self::new() + } +} +impl HttpCallArgStorage { + pub fn new() -> Self { + HttpCallArgStorage { + args: HashMap::new(), + } + } + pub fn set(&mut self, token_id: u32, arg: HttpCallArg) { + self.args.insert(token_id, arg); + } + pub fn pop(&mut self, token_id: u32) -> Option { + self.args.remove(&token_id) + } +} +pub trait HttpContextWrapper: HttpContext { + fn log(&self) -> &Log { + &LOG + } fn on_config(&mut self, _config: &PluginConfig) {} fn on_http_request_complete_headers( &mut self, @@ -69,26 +104,96 @@ pub trait HttpContextWrapper: HttpContext { fn on_http_response_complete_body(&mut self, _res_body: &Bytes) -> DataAction { DataAction::Continue } + + #[allow(clippy::too_many_arguments)] + fn on_http_call_response_detail( + &mut self, + _token_id: u32, + _arg: HttpCallArg, + _status_code: u16, + _headers: &MultiMap, + _body: Option>, + ) { + } fn replace_http_request_body(&mut self, body: &[u8]) { self.set_http_request_body(0, i32::MAX as usize, body) } fn replace_http_response_body(&mut self, body: &[u8]) { self.set_http_response_body(0, i32::MAX as usize, body) } + + fn get_http_call_storage(&mut self) -> Option<&mut HttpCallArgStorage> { + None + } + + #[allow(clippy::too_many_arguments)] + fn http_call( + &mut self, + cluster: &dyn Cluster, + method: &Method, + raw_url: &str, + headers: MultiMap, + body: Option<&[u8]>, + arg: HttpCallArg, + timeout: Duration, + ) -> Result { + if let Ok(uri) = raw_url.parse::() { + let mut authority = cluster.host_name(); + if let Some(host) = uri.host() { + authority = host.to_string(); + } + let mut path = uri.path().to_string(); + if let Some(query) = uri.query() { + path = format!("{}?{}", path, query); + } + let mut headers_vec = Vec::new(); + for (k, v) in headers.iter() { + headers_vec.push((k.as_str(), v.as_str())); + } + headers_vec.push((":method", method.as_str())); + headers_vec.push((":path", &path)); + headers_vec.push((":authority", &authority)); + let ret = self.dispatch_http_call( + &cluster.cluster_name(), + headers_vec, + body, + Vec::new(), + timeout, + ); + + if let Ok(token_id) = ret { + if let Some(storage) = self.get_http_call_storage() { + storage.set(token_id, arg); + self.log().debug( + &format!( + "http call start, id: {}, cluster: {}, method: {}, url: {}, body: {:?}, timeout: {:?}", + token_id, cluster.cluster_name(), method.as_str(), raw_url, body, timeout + ) + ); + } else { + return Err(Status::InternalFailure); + } + } + ret + } else { + self.log().critical(&format!("invalid raw_url:{}", raw_url)); + Err(Status::ParseFailure) + } + } } -pub struct PluginHttpWrapper { +pub struct PluginHttpWrapper { req_headers: MultiMap, res_headers: MultiMap, req_body_len: usize, res_body_len: usize, config: Option, rule_matcher: SharedRuleMatcher, - http_content: Box>, + http_content: Box>, } -impl PluginHttpWrapper { +impl PluginHttpWrapper { pub fn new( rule_matcher: &SharedRuleMatcher, - http_content: Box>, + http_content: Box>, ) -> Self { PluginHttpWrapper { req_headers: MultiMap::new(), @@ -100,8 +205,15 @@ impl PluginHttpWrapper { http_content, } } + fn get_http_call_arg(&mut self, token_id: u32) -> Option { + if let Some(storage) = self.http_content.get_http_call_storage() { + storage.pop(token_id) + } else { + None + } + } } -impl Context for PluginHttpWrapper { +impl Context for PluginHttpWrapper { fn on_http_call_response( &mut self, token_id: u32, @@ -109,8 +221,50 @@ impl Context for PluginHttpWrapper { body_size: usize, num_trailers: usize, ) { - self.http_content - .on_http_call_response(token_id, num_headers, body_size, num_trailers) + if let Some(arg) = self.get_http_call_arg(token_id) { + let body = self.get_http_call_response_body(0, body_size); + let mut headers = MultiMap::new(); + let mut status_code = 502; + let mut normal_response = false; + for (k, v) in self.get_http_call_response_headers_bytes() { + match String::from_utf8(v) { + Ok(header_value) => { + if k == ":status" { + if let Ok(code) = header_value.parse::() { + status_code = code; + normal_response = true; + } else { + self.http_content + .log() + .error(&format!("failed to parse status: {}", header_value)); + status_code = 500; + } + } + headers.insert(k, header_value); + } + Err(_) => { + self.http_content.log().warn(&format!( + "http call response header contains non-ASCII characters header: {}", + k + )); + } + } + } + self.http_content.log().warn(&format!( + "http call end, id: {}, code: {}, normal: {}, body: {:?}", + token_id, status_code, normal_response, body + )); + self.http_content.on_http_call_response_detail( + token_id, + arg, + status_code, + &headers, + body, + ) + } else { + self.http_content + .on_http_call_response(token_id, num_headers, body_size, num_trailers) + } } fn on_grpc_call_response(&mut self, token_id: u32, status_code: u32, response_size: usize) { @@ -138,7 +292,7 @@ impl Context for PluginHttpWrapper { self.http_content.on_done() } } -impl HttpContext for PluginHttpWrapper +impl HttpContext for PluginHttpWrapper where PluginConfig: Default + DeserializeOwned + Clone, { @@ -152,15 +306,10 @@ where self.req_headers.insert(k, header_value); } Err(_) => { - log( - LogLevel::Warn, - format!( - "request http header contains non-ASCII characters header: {}", - k - ) - .as_str(), - ) - .unwrap(); + self.http_content.log().warn(&format!( + "request http header contains non-ASCII characters header: {}", + k + )); } } } @@ -212,15 +361,10 @@ where self.res_headers.insert(k, header_value); } Err(_) => { - log( - LogLevel::Warn, - format!( - "response http header contains non-ASCII characters header: {}", - k - ) - .as_str(), - ) - .unwrap(); + self.http_content.log().warn(&format!( + "response http header contains non-ASCII characters header: {}", + k + )); } } } diff --git a/plugins/wasm-rust/src/request_wrapper.rs b/plugins/wasm-rust/src/request_wrapper.rs new file mode 100644 index 0000000000..bc9624f6a9 --- /dev/null +++ b/plugins/wasm-rust/src/request_wrapper.rs @@ -0,0 +1,82 @@ +use proxy_wasm::hostcalls; + +use crate::internal; + +fn get_request_head(head: &str, log_flag: &str) -> String { + if let Some(value) = internal::get_http_request_header(head) { + value + } else { + hostcalls::log( + proxy_wasm::types::LogLevel::Error, + &format!("get request {} failed", log_flag), + ) + .unwrap(); + String::new() + } +} +pub fn get_request_scheme() -> String { + get_request_head(":scheme", "head") +} + +pub fn get_request_host() -> String { + get_request_head(":authority", "host") +} + +pub fn get_request_path() -> String { + get_request_head(":path", "path") +} + +pub fn get_request_method() -> String { + get_request_head(":method", "method") +} + +pub fn is_binary_request_body() -> bool { + if let Some(content_type) = internal::get_http_request_header("content-type") { + if content_type.contains("octet-stream") || content_type.contains("grpc") { + return true; + } + } + if let Some(encoding) = internal::get_http_request_header("content-encoding") { + if !encoding.is_empty() { + return true; + } + } + false +} + +pub fn is_binary_response_body() -> bool { + if let Some(content_type) = internal::get_http_response_header("content-type") { + if content_type.contains("octet-stream") || content_type.contains("grpc") { + return true; + } + } + if let Some(encoding) = internal::get_http_response_header("content-encoding") { + if !encoding.is_empty() { + return true; + } + } + false +} +pub fn has_request_body() -> bool { + let content_type = internal::get_http_request_header("content-type"); + let content_length_str = internal::get_http_request_header("content-length"); + let transfer_encoding = internal::get_http_request_header("transfer-encoding"); + hostcalls::log( + proxy_wasm::types::LogLevel::Debug, + &format!( + "check has request body: content_type:{:?}, content_length_str:{:?}, transfer_encoding:{:?}", + content_type, content_length_str, transfer_encoding + ) + ).unwrap(); + if !content_type.is_some_and(|x| !x.is_empty()) { + return true; + } + if let Some(cl) = content_length_str { + if let Ok(content_length) = cl.parse::() { + if content_length > 0 { + return true; + } + } + } + transfer_encoding.is_some_and(|x| x == "chunked") +}