diff --git a/Cargo.lock b/Cargo.lock index ab158a2b..28da0b4d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -277,6 +277,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "concat-string" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7439becb5fafc780b6f4de382b1a7a3e70234afe783854a4702ee8adbb838609" + [[package]] name = "const-random" version = "0.1.13" @@ -2004,6 +2010,7 @@ dependencies = [ "aho-corasick", "base64", "chrono", + "concat-string", "const-random", "dashmap", "dbpnoise", diff --git a/Cargo.toml b/Cargo.toml index 121c2528..8fc97490 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ rayon = { version = "1.5", optional = true } dbpnoise = { version = "0.1.2", optional = true } pathfinding = { version = "3.0.13", optional = true } num = { version = "0.4.0", optional = true } +concat-string = { version = "1.0.1", optional = true } [features] default = [ @@ -101,6 +102,7 @@ hash = [ "serde", "serde_json", ] +influxdb2 = ["concat-string", "serde", "serde_json", "http"] pathfinder = ["num", "pathfinding", "serde", "serde_json"] redis_pubsub = ["flume", "redis", "serde", "serde_json"] unzip = ["zip", "jobs"] diff --git a/dmsrc/influxdb2.dm b/dmsrc/influxdb2.dm new file mode 100644 index 00000000..e9b52092 --- /dev/null +++ b/dmsrc/influxdb2.dm @@ -0,0 +1,2 @@ +#define rustg_influxdb2_publish(data, endpoint, token) RUSTG_CALL(RUST_G, "influxdb2_publish")(data, endpoint, token) +#define rustg_influxdb2_publish_profile(data, endpoint, token, round_id) RUSTG_CALL(RUST_G, "influxdb2_publish_profile")(data, endpoint, token, round_id) diff --git a/dmsrc/time.dm b/dmsrc/time.dm index f4ac5e5c..d1f58e65 100644 --- a/dmsrc/time.dm +++ b/dmsrc/time.dm @@ -4,3 +4,6 @@ /proc/rustg_unix_timestamp() return text2num(RUSTG_CALL(RUST_G, "unix_timestamp")()) + +/proc/rustg_unix_timestamp_int() + return RUSTG_CALL(RUST_G, "unix_timestamp_int")() diff --git a/src/error.rs b/src/error.rs index 104e80b7..37b95366 100644 --- a/src/error.rs +++ b/src/error.rs @@ -61,6 +61,9 @@ pub enum Error { #[cfg(feature = "hash")] #[error("Unable to decode hex value.")] HexDecode, + #[cfg(feature = "influxdb2")] + #[error("Invalid metrics format")] + InvalidMetrics, } impl From for Error { diff --git a/src/http.rs b/src/http.rs index 5da2ff1a..13ffe78b 100644 --- a/src/http.rs +++ b/src/http.rs @@ -83,12 +83,12 @@ pub static HTTP_CLIENT: Lazy = Lazy::new(setup_http_c // ---------------------------------------------------------------------------- // Request construction and execution -struct RequestPrep { +pub struct RequestPrep { req: reqwest::blocking::RequestBuilder, output_filename: Option, } -fn construct_request( +pub fn construct_request( method: &str, url: &str, body: &str, @@ -130,7 +130,7 @@ fn construct_request( }) } -fn submit_request(prep: RequestPrep) -> Result { +pub fn submit_request(prep: RequestPrep) -> Result { let mut response = prep.req.send()?; let body; diff --git a/src/influxdb2.rs b/src/influxdb2.rs new file mode 100644 index 00000000..8a8f795a --- /dev/null +++ b/src/influxdb2.rs @@ -0,0 +1,115 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +use crate::error::Error; +use crate::http::{construct_request, submit_request, RequestPrep}; +use crate::jobs; + +byond_fn!( + fn influxdb2_publish(data, endpoint, token) { + let data = data.to_owned(); + let endpoint = endpoint.to_owned(); + let token = token.to_owned(); + Some(jobs::start(move || { + fn handle(data: &str, endpoint: &str, token: &str) -> Result { + let mut lines = vec!(); + + let data: Value = serde_json::from_str(data)?; + for entry in data.as_array().unwrap() { + let entry = entry.as_object().ok_or(Error::InvalidMetrics)?; + + let measurement = entry.get("@measurement").ok_or(Error::InvalidMetrics)?.as_str().ok_or(Error::InvalidMetrics)?.to_owned(); + let mut measurement_tags = vec!{measurement}; + + let tags = entry.get("@tags").ok_or(Error::InvalidMetrics)?.as_object().ok_or(Error::InvalidMetrics)?; + for (key, val) in tags { + measurement_tags.push(concat_string!(key, "=", val.as_str().ok_or(Error::InvalidMetrics)?)) + }; + + let mut fields = vec!{}; + for (key, val) in entry { + if key.starts_with('@') { + continue; + } + fields.push(concat_string!(key, "=", val.to_string())) + }; + + let timestamp = entry.get("@timestamp").ok_or(Error::InvalidMetrics)?.as_str().ok_or(Error::InvalidMetrics)?; + lines.push(concat_string!(measurement_tags.join(","), " ", fields.join(",") , " ", timestamp)); + } + + construct_request( + "post", + endpoint, + lines.join("\n").as_str(), + concat_string!("{\"Authorization\":\"Token ", token ,"\"}").as_str(), + "" + ) + } + + let req = match handle(data.as_str(), endpoint.as_str(), token.as_str()) { + Ok(r) => r, + Err(e) => return e.to_string() + }; + match submit_request(req) { + Ok(r) => r, + Err(e) => e.to_string() + } + })) + } +); + +#[derive(Serialize, Deserialize)] +struct ProfileProcEntry { + name: String, + #[serde(rename = "self")] + self_: f32, + total: f32, + real: f32, + over: f32, + calls: f32, +} +byond_fn!( + fn influxdb2_publish_profile(data, endpoint, token, round_id) { + let data = data.to_owned(); + let endpoint = endpoint.to_owned(); + let token = token.to_owned(); + let round_id = round_id.to_owned(); + Some(jobs::start(move || { + fn handle(data: &str, endpoint: &str, token: &str, round_id: &str) -> Result { + let mut lines = vec!(); + + let data: Vec = serde_json::from_str(data)?; + let timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() + .to_string(); + for entry in data { + let mut name = entry.name; + if name.is_empty() { + name = String::from("(no_name)"); + } + lines.push(concat_string!("profile,proc=", name, " self=", entry.self_.to_string(), ",total=", entry.total.to_string(), ",real=", entry.real.to_string(), ",over=", entry.over.to_string(), ",calls=", entry.calls.to_string(), ",round_id=", round_id.to_string(), " ", timestamp)); + } + + construct_request( + "post", + endpoint, + lines.join("\n").as_str(), + concat_string!("{\"Authorization\":\"Token ", token ,"\"}").as_str(), + "" + ) + } + + let req = match handle(data.as_str(), endpoint.as_str(), token.as_str(), round_id.as_str()) { + Ok(r) => r, + Err(e) => return e.to_string() + }; + match submit_request(req) { + Ok(r) => r, + Err(e) => e.to_string() + } + })) + } +); diff --git a/src/lib.rs b/src/lib.rs index bbe40556..c6c07e9e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,8 @@ #![forbid(unsafe_op_in_unsafe_fn)] +#[cfg(feature = "concat-string")] +#[macro_use(concat_string)] +extern crate concat_string; #[macro_use] mod byond; #[allow(dead_code)] @@ -24,6 +27,8 @@ pub mod git; pub mod hash; #[cfg(feature = "http")] pub mod http; +#[cfg(feature = "influxdb2")] +pub mod influxdb2; #[cfg(feature = "json")] pub mod json; #[cfg(feature = "log")] diff --git a/src/time.rs b/src/time.rs index f27b9ddb..6ad21eea 100644 --- a/src/time.rs +++ b/src/time.rs @@ -47,3 +47,15 @@ byond_fn!( ) } ); + +byond_fn!( + fn unix_timestamp_int() { + Some( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() + .to_string(), + ) + } +);