Skip to content

Commit

Permalink
queue backlog metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
j-white committed May 9, 2024
1 parent 2a569e9 commit c4047c0
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 15 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ This worker was written to push Cloudflare Analytics data to an OpenTelemetry co
It is inspired by the [cloudflare-exporter](https://github.com/lablabs/cloudflare-exporter), which is unfortunately no longer maintained.
By running it as a worker and pushing metrics, we avoid the need to deploy a dedicated container and allow the worker to be run on [green compute](https://blog.cloudflare.com/announcing-green-compute).

## Metrics currently support
## Metrics currently supported

- [x] Workers
- [x] D1
- [x] Durable Objects
- [ ] Queues
- [x] Queues
- [ ] Zones

## Usage
Expand Down
1 change: 1 addition & 0 deletions features/data/queue_backlog_query_response.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"data":{"viewer":{"accounts":[{"queueBacklogAdaptiveGroups":[]}]}},"errors":null}
3 changes: 3 additions & 0 deletions features/step_definitions/cf_mock_server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export class CloudflareMockServer {
const workerQuery = fs.readFileSync('./features/data/worker_query_response.json').toString();
const d1Query = fs.readFileSync('./features/data/d1_query_response.json').toString();
const durableObjectsQuery = fs.readFileSync('./features/data/durableobjects_query_response.json').toString();
const queueBacklogQuery = fs.readFileSync('./features/data/queue_backlog_query_response.json').toString();
this.server = http.createServer((req, res) => {
var body = "";
req.on('readable', function() {
Expand All @@ -26,6 +27,8 @@ export class CloudflareMockServer {
res.end(d1Query);
} else if (body.indexOf('durableObjectsInvocationsAdaptiveGroups') > -1) {
res.end(durableObjectsQuery);
} else if (body.indexOf('queueBacklogAdaptiveGroups') > -1) {
res.end(queueBacklogQuery);
} else {
res.end(workerQuery);
}
Expand Down
4 changes: 0 additions & 4 deletions gql/queries.graphql
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@




query GetQueueAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) {
viewer {
accounts(filter: {accountTag: $accountTag}) {
Expand Down
21 changes: 21 additions & 0 deletions gql/queue_backlog_query.graphql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
query GetQueueBacklogAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) {
viewer {
accounts(filter: {accountTag: $accountTag}) {
queueBacklogAdaptiveGroups(limit: $limit, filter: {
datetimeMinute_geq: $datetimeStart,
datetimeMinute_lt: $datetimeEnd
}) {
dimensions {
queueId
datetimeMinute
}

avg {
bytes
messages
sampleInterval
}
}
}
}
}
74 changes: 66 additions & 8 deletions src/gql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ pub struct GetD1AnalyticsQuery;
)]
pub struct GetDurableObjectsAnalyticsQuery;

// #[derive(GraphQLQuery)]
// #[graphql(
// schema_path = "gql/schema.graphql",
// query_path = "gql/queries.graphql",
// variables_derives = "Debug",
// response_derives = "Debug,Clone"
// )]
// pub struct GetQueueAnalyticsQuery;
#[derive(GraphQLQuery)]
#[graphql(
schema_path = "gql/schema.graphql",
query_path = "gql/queue_backlog_query.graphql",
variables_derives = "Debug",
response_derives = "Debug,Clone"
)]
pub struct GetQueueBacklogAnalyticsQuery;

#[allow(non_camel_case_types)]
type float32 = f32;
Expand Down Expand Up @@ -283,6 +283,64 @@ pub async fn do_get_durableobjects_analytics_query(cloudflare_api_url: &String,
Ok(prometheus_registry_to_opentelemetry_metrics(registry, timestamp))
}

pub async fn do_get_queue_backlog_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_queue_backlog_analytics_query::Variables) -> Result<Vec<Metric>, Box<dyn Error>> {
let request_body = GetQueueBacklogAnalyticsQuery::build_query(variables);
//console_log!("request_body: {:?}", request_body);
let client = reqwest::Client::new();
let res = client.post(cloudflare_api_url)
.bearer_auth(cloudflare_api_key)
.json(&request_body).send().await?;

if !res.status().is_success() {
console_log!("GraphQL query failed: {:?}", res.status());
return Err(Box::new(res.error_for_status().unwrap_err()));
}

let response_body: Response<get_queue_backlog_analytics_query::ResponseData> = res.json().await?;
if response_body.errors.is_some() {
console_log!("GraphQL query failed: {:?}", response_body.errors);
return Err(Box::new(worker::Error::JsError("graphql".parse().unwrap())));
}
let response_data: get_queue_backlog_analytics_query::ResponseData = response_body.data.expect("missing response data");

let registry = Registry::new();
let queue_backlog_bytes_opts = Opts::new("cloudflare_queue_backlog_bytes", "The average size of the backlog in bytes for sample interval");
let queue_backlog_bytes = GaugeVec::new(queue_backlog_bytes_opts, &["queue_id"]).unwrap();
registry.register(Box::new(queue_backlog_bytes.clone())).unwrap();

let queue_backlog_messages_opts = Opts::new("cloudflare_queue_backlog_messages", "The average number of messages in the backlog for sample interval");
let queue_backlog_messages = GaugeVec::new(queue_backlog_messages_opts, &["queue_id"]).unwrap();
registry.register(Box::new(queue_backlog_messages.clone())).unwrap();

let queue_backlog_sample_interval_opts = Opts::new("cloudflare_queue_backlog_sample_interval", "The average value used for sample interval");
let queue_backlog_sample_interval = GaugeVec::new(queue_backlog_sample_interval_opts, &["queue_id"]).unwrap();
registry.register(Box::new(queue_backlog_sample_interval.clone())).unwrap();

let mut last_datetime: Option<Time> = None;
for account in response_data.clone().viewer.unwrap().accounts.iter() {
for group in account.queue_backlog_adaptive_groups.iter() {
let dimensions = group.dimensions.as_ref().unwrap();
last_datetime = Some(dimensions.datetime_minute.clone());
let queue_id = dimensions.queue_id.clone();
let avg = group.avg.as_ref().unwrap();

queue_backlog_bytes.with_label_values(&[queue_id.as_str()]).set(avg.bytes as f64);
queue_backlog_messages.with_label_values(&[queue_id.as_str()]).set(avg.messages as f64);
queue_backlog_sample_interval.with_label_values(&[queue_id.as_str()]).set(avg.sample_interval as f64);
}
}

let timestamp: std::time::SystemTime = last_datetime.map(|datetime| {
let datetime: NaiveDateTime = NaiveDateTime::parse_from_str(&*datetime, "%+").unwrap();
datetime.and_utc().into()
}).unwrap_or_else(|| {
to_std_systemtime(SystemTime::now())
});

Ok(prometheus_registry_to_opentelemetry_metrics(registry, timestamp))
}


fn to_std_systemtime(time: web_time::SystemTime) -> std::time::SystemTime {
let duration = time.duration_since(web_time::SystemTime::UNIX_EPOCH).unwrap();
std::time::SystemTime::UNIX_EPOCH + duration
Expand Down
20 changes: 19 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use prost::Message;
use worker::*;
use worker::js_sys::Uint8Array;
use worker::wasm_bindgen::JsValue;
use crate::gql::{get_workers_analytics_query, do_get_workers_analytics_query, do_get_d1_analytics_query, get_d1_analytics_query, do_get_durableobjects_analytics_query, get_durable_objects_analytics_query};
use crate::gql::{get_workers_analytics_query, do_get_workers_analytics_query, do_get_d1_analytics_query, get_d1_analytics_query, do_get_durableobjects_analytics_query, get_durable_objects_analytics_query, do_get_queue_backlog_analytics_query, get_queue_backlog_analytics_query};

mod gql;
mod metrics;
Expand Down Expand Up @@ -121,6 +121,24 @@ async fn do_trigger(env: Env) -> Result<()> {
return Err(Error::JsError(e.to_string()));
}
};

let result = do_get_queue_backlog_analytics_query(&cloudflare_api_url, &cloudflare_api_key, get_queue_backlog_analytics_query::Variables {
account_tag: cloudflare_account_id.clone(),
datetime_start: Some(start.to_rfc3339()),
datetime_end: Some(end.to_rfc3339()),
limit: 9999,
}).await;
match result {
Ok(metrics) => {
for metric in metrics {
all_metrics.push(metric);
}
},
Err(e) => {
console_log!("Querying Cloudflare API failed: {:?}", e);
return Err(Error::JsError(e.to_string()));
}
};
console_log!("Done fetching!");

do_push_metrics(env, all_metrics).await
Expand Down

0 comments on commit c4047c0

Please sign in to comment.