From d2ad8cdf7f662ac4d218afda648f74a751247a5e Mon Sep 17 00:00:00 2001 From: Jesse White Date: Wed, 8 May 2024 09:56:02 -0400 Subject: [PATCH] working push to grafana cloud --- Cargo.lock | 41 ++++----- Cargo.toml | 15 ++-- features/metrics.json | 107 ----------------------- features/o11y.feature | 6 +- features/step_definitions/mf.ts | 1 + features/step_definitions/otel_server.ts | 10 ++- src/gql.rs | 8 ++ src/lib.rs | 85 ++++++++++++++---- src/metrics.rs | 98 ++++++++++++--------- 9 files changed, 169 insertions(+), 202 deletions(-) delete mode 100644 features/metrics.json diff --git a/Cargo.lock b/Cargo.lock index 6e93c7f..12b41a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -153,9 +153,10 @@ dependencies = [ "graphql_client", "opentelemetry", "opentelemetry-otlp", - "opentelemetry-stdout", + "opentelemetry-proto", "opentelemetry_sdk", "prometheus", + "prost", "reqwest", "serde", "serde_json", @@ -460,6 +461,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "http" version = "0.2.12" @@ -860,9 +867,11 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a8fddc9b68f5b80dae9d6f510b88e02396f006ad48cac349411fbecc80caae4" dependencies = [ + "hex", "opentelemetry", "opentelemetry_sdk", "prost", + "serde", "tonic", ] @@ -872,22 +881,6 @@ version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9ab5bd6c42fb9349dcf28af2ba9a0667f697f9bdcca045d39f2cec5543e2910" -[[package]] -name = "opentelemetry-stdout" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bdf28b381f23afcd150afc0b38a4183dd321fc96320c1554752b6b761648f78" -dependencies = [ - "async-trait", - "chrono", - "futures-util", - "opentelemetry", - "opentelemetry_sdk", - "ordered-float", - "serde", - "serde_json", -] - [[package]] name = "opentelemetry_sdk" version = "0.22.1" @@ -906,8 +899,6 @@ dependencies = [ "percent-encoding", "rand", "thiserror", - "tokio", - "tokio-stream", ] [[package]] @@ -1216,9 +1207,9 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.200" +version = "1.0.201" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddc6f9cc94d67c0e21aaf7eda3a010fd3af78ebf6e096aa6e2e13c79749cce4f" +checksum = "780f1cebed1629e4753a1a38a3c72d30b97ec044f0aef68cb26650a3c5cf363c" dependencies = [ "serde_derive", ] @@ -1247,9 +1238,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.200" +version = "1.0.201" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "856f046b9400cee3c8c94ed572ecdb752444c24528c035cd35882aad6f492bcb" +checksum = "c5e405930b9796f1c00bee880d03fc7e0bb4b9a11afc776885ffe84320da2865" dependencies = [ "proc-macro2", "quote", @@ -1258,9 +1249,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.116" +version = "1.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e17db7126d17feb94eb3fad46bf1a96b034e8aacbc2e775fe81505f8b0b2813" +checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" dependencies = [ "itoa", "ryu", diff --git a/Cargo.toml b/Cargo.toml index 9556151..c1f2e51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,18 +9,19 @@ crate-type = ["cdylib"] [dependencies] graphql_client = "0.14.0" -serde = "1.0.200" +serde = "1.0.201" worker = "0.2.0" reqwest = { version = "0.12.4", features = ["json"] } -opentelemetry = { version = "=0.22.0", features = ["otel_unstable"] } -opentelemetry_sdk = { version="=0.22.1", default-features = false, features = ["trace", "metrics", "rt-tokio-current-thread"] } -opentelemetry-stdout = { version="0.3.0", default-features = false, features=["trace", "metrics"] } -getrandom = { version = "0.2.14", features = ["js"] } -serde_json = "1.0.116" -opentelemetry-otlp = { version="0.15.0", features = ["metrics", "http-proto"], default-features = false } +opentelemetry = { version = "=0.22.0", default-features = false, features = ["metrics"] } +opentelemetry_sdk = { version="=0.22.1", default-features = false, features = ["metrics"] } +getrandom = { version = "0.2.15", features = ["js"] } +serde_json = "1.0.117" +opentelemetry-otlp = { version="0.15.0", default-features = false, features = ["metrics", "http-proto"] } +opentelemetry-proto = { version = "0.5.0", default-features = false, features = ["metrics", "with-serde"] } prometheus = "0.13.4" web-time = "1.1.0" chrono = "0.4.38" +prost = "0.12.4" [profile.release] opt-level = "s" # optimize for size in release builds diff --git a/features/metrics.json b/features/metrics.json deleted file mode 100644 index b3143a5..0000000 --- a/features/metrics.json +++ /dev/null @@ -1,107 +0,0 @@ -{ - "resourceMetrics": [ - { - "resource": { - "attributes": [ - { - "key": "service.name", - "value": { - "stringValue": "my.service" - } - } - ] - }, - "scopeMetrics": [ - { - "scope": { - "name": "my.library", - "version": "1.0.0", - "attributes": [ - { - "key": "my.scope.attribute", - "value": { - "stringValue": "some scope attribute" - } - } - ] - }, - "metrics": [ - { - "name": "my.counter", - "unit": "1", - "description": "I'm a Counter", - "sum": { - "aggregationTemporality": 1, - "isMonotonic": true, - "dataPoints": [ - { - "asDouble": 5, - "startTimeUnixNano": 1544712660300000000, - "timeUnixNano": 1544712660300000000, - "attributes": [ - { - "key": "my.counter.attr", - "value": { - "stringValue": "some value" - } - } - ] - } - ] - } - }, - { - "name": "my.gauge", - "unit": "1", - "description": "I'm a Gauge", - "gauge": { - "dataPoints": [ - { - "asDouble": 10, - "timeUnixNano": 1544712660300000000, - "attributes": [ - { - "key": "my.gauge.attr", - "value": { - "stringValue": "some value" - } - } - ] - } - ] - } - }, - { - "name": "my.histogram", - "unit": "1", - "description": "I'm a Histogram", - "histogram": { - "aggregationTemporality": 1, - "dataPoints": [ - { - "startTimeUnixNano": 1544712660300000000, - "timeUnixNano": 1544712660300000000, - "count": 3, - "sum": 3, - "bucketCounts": [1,1,1], - "explicitBounds": [1], - "min": 1, - "max": 1, - "attributes": [ - { - "key": "my.histogram.attr", - "value": { - "stringValue": "some value" - } - } - ] - } - ] - } - } - ] - } - ] - } - ] -} \ No newline at end of file diff --git a/features/o11y.feature b/features/o11y.feature index c410e56..40ece2d 100644 --- a/features/o11y.feature +++ b/features/o11y.feature @@ -4,7 +4,5 @@ Feature: OpenTelemetry metrics Given Worker is configured to send metrics to a mock OpenTelemetry collector When Worker is triggered Then Worker metrics are published - And Metric name should include "cloudflare_worker_requests" - And Metric name should include "cloudflare_worker_errors" - And Metric name should include "cloudflare_worker_cpu_time" - And Metric name should include "cloudflare_worker_duration" + And Metric name should include "cloudflare_worker" + And Metric name should include "cloudflare_worker_cpu" diff --git a/features/step_definitions/mf.ts b/features/step_definitions/mf.ts index 9b36bae..88abf8a 100644 --- a/features/step_definitions/mf.ts +++ b/features/step_definitions/mf.ts @@ -62,6 +62,7 @@ export class MiniflareDriver { CLOUDFLARE_API_URL: self.config.cloudflareApiUrl, CLOUDFLARE_API_KEY: "fake-key", CLOUDFLARE_ACCOUNT_ID: "1234", + OTLP_ENCODING: "json", }, modulesRules: [ { type: "CompiledWasm", include: ["**/*.wasm"], fallthrough: true }, diff --git a/features/step_definitions/otel_server.ts b/features/step_definitions/otel_server.ts index 52f22fa..0932a3c 100644 --- a/features/step_definitions/otel_server.ts +++ b/features/step_definitions/otel_server.ts @@ -40,11 +40,13 @@ export class OpenTelemetryServer { indexMetrics() { let self = this; this.metricNames.clear(); + console.log(JSON.stringify(this.metrics, null, 2)); for (let metrics of this.metrics) { - let resourceMetrics = metrics.resourceMetrics as unknown as IResourceMetrics; - for (let scopeMetrics of resourceMetrics.scopeMetrics) { - for (let metric of scopeMetrics.metrics) { - self.metricNames.set(metric.name, 1); + for (let resourceMetrics of metrics.resourceMetrics) { + for (let scopeMetrics of resourceMetrics.scopeMetrics) { + for (let metric of scopeMetrics.metrics) { + self.metricNames.set(metric.name, 1); + } } } } diff --git a/src/gql.rs b/src/gql.rs index 69ae752..a5a7d6f 100644 --- a/src/gql.rs +++ b/src/gql.rs @@ -5,6 +5,7 @@ use prometheus::{CounterVec, GaugeVec, Opts, Registry}; use crate::metrics::prometheus_registry_to_opentelemetry_metrics; use web_time::SystemTime; use chrono::NaiveDateTime; +use worker::console_log; // The paths are relative to the directory where your `Cargo.toml` is located. // Both json and the GraphQL schema language are supported as sources for the schema @@ -37,10 +38,17 @@ pub async fn perform_my_query(cloudflare_api_url: String, cloudflare_api_key: St let res = client.post(cloudflare_api_url) .bearer_auth(cloudflare_api_key) .json(&request_body).send().await?; + if !res.status().is_success() { + console_log!("GraphQL query failed: {:?}", res.status()); return Err(Box::new(res.error_for_status().unwrap_err())); } + let response_body: Response = res.json().await?; + if response_body.errors.is_some() { + console_log!("GraphQL query failed: {:?}", response_body.errors); + return Err(Box::new(worker::Error::JsError("graphql".parse().unwrap()))); + } let response_data: get_workers_analytics_query::ResponseData = response_body.data.expect("missing response data"); let registry = Registry::new(); diff --git a/src/lib.rs b/src/lib.rs index 4cca1b3..9daa6c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,12 @@ +use std::env; +use chrono::SubsecRound; +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_sdk::metrics::data::{ResourceMetrics, ScopeMetrics}; use opentelemetry_sdk::Resource; -use opentelemetry_stdout::MetricsData; +use prost::Message; + use worker::*; +use worker::js_sys::Uint8Array; use worker::wasm_bindgen::JsValue; use crate::gql::{get_workers_analytics_query, perform_my_query}; @@ -13,18 +18,20 @@ pub async fn do_fetch( url: String, headers: String, data: Option, + content_type: String ) -> Result { let mut http_headers = Headers::new(); // split headers by command, and then by = for header in headers.split(",") { - let parts: Vec<&str> = header.split("=").collect(); + console_log!("header: {}", header); + let parts: Vec<&str> = header.splitn(2, "=").collect(); if parts.len() == 2 { let key = parts[0].trim(); let value = parts[1].trim(); http_headers.set(key, value).expect("failed to construct header"); } } - + http_headers.set("Content-Type", &*content_type).expect("failed to construct content-type header"); let mut init = RequestInit::new(); init.method = Method::Post; init.with_body(data).with_headers(http_headers); @@ -34,7 +41,24 @@ pub async fn do_fetch( } #[event(fetch)] -async fn main(_req: Request, env: Env, _ctx: Context) -> Result { +async fn fetch(_req: Request, env: Env, _ctx: Context) -> Result { + let res = do_trigger(env).await; + match res { + Ok(_) => Response::ok("OK"), + Err(_) => Response::error("Error", 500) + } +} + +#[event(scheduled)] +async fn main(_req: ScheduledEvent, env: Env, _ctx: ScheduleContext) -> () { + let res = do_trigger(env).await; + match res { + Ok(_) => console_log!("OK"), + Err(e) => console_log!("Error: {:?}", e), + } +} + +async fn do_trigger(env: Env) -> Result<()> { let metrics_url = env.var("METRICS_URL")?.to_string(); let cloudflare_api_url = env.var("CLOUDFLARE_API_URL")?.to_string(); let cloudflare_api_key = env.var("CLOUDFLARE_API_KEY")?.to_string(); @@ -43,24 +67,34 @@ async fn main(_req: Request, env: Env, _ctx: Context) -> Result { Ok(val) => val.to_string(), Err(_) => String::from(""), }; + let otlp_encoding_json: bool = match env.var("OTLP_ENCODING") { + Ok(val) => match val.to_string().to_lowercase().as_str() { + "json" => true, + _ => false, + } + Err(_) => false, + }; - let end = chrono::Utc::now(); - let start = end - chrono::Duration::minutes(1); + let end = chrono::Utc::now().round_subsecs(0); + let start = (end - chrono::Duration::minutes(1)).round_subsecs(0); + console_log!("Fetching!"); let result = perform_my_query(cloudflare_api_url, cloudflare_api_key, get_workers_analytics_query::Variables { account_tag: Some(cloudflare_account_id), - datetime_start: Some(start.to_string()), - datetime_end: Some(end.to_string()), + datetime_start: Some(start.to_rfc3339()), + datetime_end: Some(end.to_rfc3339()), limit: 9999, }).await; let cf_metrics = match result { Ok(metrics) => metrics, Err(e) => { console_log!("Querying Cloudflare API failed: {:?}", e); - return Response::error(format!("Error: {:?}", e), 500); + return Err(Error::JsError(e.to_string())); } }; + console_log!("Done fetching!"); + console_log!("Converting metrics to OTLP."); let library = opentelemetry::InstrumentationLibrary::new( "cloudflare-otlp-exporter", Some(env!("CARGO_PKG_VERSION")), @@ -71,12 +105,33 @@ async fn main(_req: Request, env: Env, _ctx: Context) -> Result { scope: library, metrics: cf_metrics, }; - let mut resource_metrics = ResourceMetrics { - resource: Resource::default(), + let resource_metrics = ResourceMetrics { + resource: Resource::empty(), scope_metrics: vec![scope_metrics], }; - let metrics = MetricsData::from(&mut resource_metrics); - let metrics_json = serde_json::to_string(&metrics).unwrap(); - let response = do_fetch(metrics_url, otlp_headers, Some(JsValue::from_str(&metrics_json)).into()).await?; - return Ok(response); + + let metrics = ExportMetricsServiceRequest::from(&resource_metrics); + let js_value: JsValue; + let content_type: String; + if otlp_encoding_json { + let metrics_json = serde_json::to_string(&metrics).unwrap(); + js_value = JsValue::from_str(&metrics_json); + content_type = "application/json".to_string(); + } else { + let bytes = metrics.encode_to_vec(); + let array = Uint8Array::from(bytes.as_slice()); + js_value = JsValue::from(array); + content_type = "application/x-protobuf".to_string(); + } + console_log!("Done converting metrics to OTLP."); + + console_log!("Posting metrics to OTLP endpoint."); + let mut res = do_fetch(metrics_url, otlp_headers, Some(js_value).into(), content_type).await?; + let body = res.text().await?; + console_log!("Done posting metrics status={} body={:?}", res.status_code(), body); + + if res.status_code() != 200 { + return Err(Error::JsError(body)); + } + return Ok(()); } diff --git a/src/metrics.rs b/src/metrics.rs index ba5f3e8..c47add9 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -2,69 +2,87 @@ use std::borrow::Cow; use std::time::SystemTime; use opentelemetry::KeyValue; use opentelemetry::metrics::Unit; +use opentelemetry_sdk::AttributeSet; use opentelemetry_sdk::metrics::data::{DataPoint, Metric, Temporality}; -use prometheus::proto::MetricFamily; +use prometheus::proto::{LabelPair, MetricFamily}; use prometheus::Registry; pub fn prometheus_registry_to_opentelemetry_metrics(registry: Registry, timestamp: SystemTime) -> Vec { let mut vec = Vec::new(); for metric_family in registry.gather() { - for metric in metric_family.get_metric() { - vec.push(create_metric_prom(&metric_family, metric, timestamp)); - } + vec.push(create_metric_prom(&metric_family, timestamp)); } return vec; } -fn create_metric_prom(metric_family: &MetricFamily, metric: &prometheus::proto::Metric, timestamp: SystemTime) -> Metric { - // convert metric labels to key value pairs - let mut labels = Vec::new(); - for label_pair in metric.get_label() { - let key_value = KeyValue::new(label_pair.get_name().to_owned(), label_pair.get_value().to_owned()); - labels.push(key_value); +fn to_attributes(labels: &[LabelPair]) -> AttributeSet { + let mut attributes = Vec::new(); + for label in labels { + attributes.push(KeyValue::new(label.get_name().to_string(), label.get_value().to_string())); } + attributes.as_slice().into() +} + +fn get_otlp_name_and_unit_from_prom_name(name: &str) -> (String, String) { + let mut parts = name.rsplitn(2, '_'); + let unit = parts.next().unwrap(); + let otlp_name = parts.next().unwrap(); + (otlp_name.to_string(), unit.to_string()) +} +fn create_metric_prom(metric_family: &MetricFamily, timestamp: SystemTime) -> Metric { + let is_counter = metric_family.get_metric().get(0).map(|metric| metric.has_counter()).unwrap_or(false); let otlp_metric: Metric; - if metric.has_gauge() { - let gauge = metric.get_gauge(); - let data_point = DataPoint { - attributes: labels.as_slice().into(), - start_time: None, - time: Some(timestamp), - value: gauge.get_value(), - exemplars: vec![], - }; - let sample: opentelemetry_sdk::metrics::data::Gauge = opentelemetry_sdk::metrics::data::Gauge { - data_points: vec![data_point], + if is_counter { + let mut data_points = Vec::new(); + for metric in metric_family.get_metric() { + let counter = metric.get_counter(); + let data_point = DataPoint { + attributes: to_attributes(metric.get_label()), + start_time: Some(timestamp), + time: Some(timestamp), + value: counter.get_value(), + exemplars: vec![], + }; + data_points.push(data_point); + } + let sample: opentelemetry_sdk::metrics::data::Sum = opentelemetry_sdk::metrics::data::Sum { + data_points, + temporality: Temporality::Cumulative, + // See https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#otlp-metric-points-to-prometheus + // if the metric is monotonic, then "_total" gets appended to name + is_monotonic: false }; + let (name, unit) = get_otlp_name_and_unit_from_prom_name(metric_family.get_name()); otlp_metric = Metric { - name: Cow::from(metric_family.get_name().to_owned()), + name: Cow::from(name.to_owned()), description: Cow::from(metric_family.get_help().to_owned()), - unit: Unit::new(""), + unit: Unit::new(unit), data: Box::new(sample), } - } else if metric.has_counter() { - let counter = metric.get_counter(); - let data_point = DataPoint { - attributes: labels.as_slice().into(), - start_time: None, - time: Some(timestamp), - value: counter.get_value(), - exemplars: vec![], - }; - let sample: opentelemetry_sdk::metrics::data::Sum = opentelemetry_sdk::metrics::data::Sum { - data_points: vec![data_point], - temporality: Temporality::Cumulative, - is_monotonic: true + } else { + let mut data_points = Vec::new(); + for metric in metric_family.get_metric() { + let gauge = metric.get_gauge(); + let data_point = DataPoint { + attributes: to_attributes(metric.get_label()), + start_time: Some(timestamp), + time: Some(timestamp), + value: gauge.get_value(), + exemplars: vec![], + }; + data_points.push(data_point); + } + let sample: opentelemetry_sdk::metrics::data::Gauge = opentelemetry_sdk::metrics::data::Gauge { + data_points }; + let (name, unit) = get_otlp_name_and_unit_from_prom_name(metric_family.get_name()); otlp_metric = Metric { - name: Cow::from(metric_family.get_name().to_owned()), + name: Cow::from(name.to_owned()), description: Cow::from(metric_family.get_help().to_owned()), - unit: Unit::new(""), + unit: Unit::new(unit), data: Box::new(sample), } - } else { - panic!("Unsupported metric type") } otlp_metric }