diff --git a/README.md b/README.md index 8d98840..4c07ba1 100644 --- a/README.md +++ b/README.md @@ -7,12 +7,12 @@ This worker was written to push Cloudflare Analytics data to an OpenTelemetry co It is inspired by the [cloudflare-exporter](https://github.com/lablabs/cloudflare-exporter), which is unfortunately no longer maintained. By running it as a worker and pushing metrics, we avoid the need to deploy a dedicated container and allow the worker to be run on [green compute](https://blog.cloudflare.com/announcing-green-compute). -## Metrics currently support +## Metrics currently supported - [x] Workers - [x] D1 - [x] Durable Objects -- [ ] Queues +- [x] Queues - [ ] Zones ## Usage diff --git a/features/data/queue_backlog_query_response.json b/features/data/queue_backlog_query_response.json new file mode 100644 index 0000000..a261c19 --- /dev/null +++ b/features/data/queue_backlog_query_response.json @@ -0,0 +1 @@ +{"data":{"viewer":{"accounts":[{"queueBacklogAdaptiveGroups":[]}]}},"errors":null} \ No newline at end of file diff --git a/features/step_definitions/cf_mock_server.ts b/features/step_definitions/cf_mock_server.ts index 4c8c24a..57cd653 100644 --- a/features/step_definitions/cf_mock_server.ts +++ b/features/step_definitions/cf_mock_server.ts @@ -10,6 +10,7 @@ export class CloudflareMockServer { const workerQuery = fs.readFileSync('./features/data/worker_query_response.json').toString(); const d1Query = fs.readFileSync('./features/data/d1_query_response.json').toString(); const durableObjectsQuery = fs.readFileSync('./features/data/durableobjects_query_response.json').toString(); + const queueBacklogQuery = fs.readFileSync('./features/data/queue_backlog_query_response.json').toString(); this.server = http.createServer((req, res) => { var body = ""; req.on('readable', function() { @@ -26,6 +27,8 @@ export class CloudflareMockServer { res.end(d1Query); } else if (body.indexOf('durableObjectsInvocationsAdaptiveGroups') > -1) { res.end(durableObjectsQuery); + } else if (body.indexOf('queueBacklogAdaptiveGroups') > -1) { + res.end(queueBacklogQuery); } else { res.end(workerQuery); } diff --git a/gql/queries.graphql b/gql/queries.graphql index 7fc1abe..60fa633 100644 --- a/gql/queries.graphql +++ b/gql/queries.graphql @@ -1,7 +1,3 @@ - - - - query GetQueueAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) { viewer { accounts(filter: {accountTag: $accountTag}) { diff --git a/gql/queue_backlog_query.graphql b/gql/queue_backlog_query.graphql new file mode 100644 index 0000000..abe27c9 --- /dev/null +++ b/gql/queue_backlog_query.graphql @@ -0,0 +1,21 @@ +query GetQueueBacklogAnalyticsQuery($accountTag: string!, $datetimeStart: Time, $datetimeEnd: Time, $limit: Int!) { + viewer { + accounts(filter: {accountTag: $accountTag}) { + queueBacklogAdaptiveGroups(limit: $limit, filter: { + datetimeMinute_geq: $datetimeStart, + datetimeMinute_lt: $datetimeEnd + }) { + dimensions { + queueId + datetimeMinute + } + + avg { + bytes + messages + sampleInterval + } + } + } + } +} \ No newline at end of file diff --git a/src/gql.rs b/src/gql.rs index 1d5ba6c..30029b9 100644 --- a/src/gql.rs +++ b/src/gql.rs @@ -36,14 +36,14 @@ pub struct GetD1AnalyticsQuery; )] pub struct GetDurableObjectsAnalyticsQuery; -// #[derive(GraphQLQuery)] -// #[graphql( -// schema_path = "gql/schema.graphql", -// query_path = "gql/queries.graphql", -// variables_derives = "Debug", -// response_derives = "Debug,Clone" -// )] -// pub struct GetQueueAnalyticsQuery; +#[derive(GraphQLQuery)] +#[graphql( + schema_path = "gql/schema.graphql", + query_path = "gql/queue_backlog_query.graphql", + variables_derives = "Debug", + response_derives = "Debug,Clone" +)] +pub struct GetQueueBacklogAnalyticsQuery; #[allow(non_camel_case_types)] type float32 = f32; @@ -283,6 +283,64 @@ pub async fn do_get_durableobjects_analytics_query(cloudflare_api_url: &String, Ok(prometheus_registry_to_opentelemetry_metrics(registry, timestamp)) } +pub async fn do_get_queue_backlog_analytics_query(cloudflare_api_url: &String, cloudflare_api_key: &String, variables: get_queue_backlog_analytics_query::Variables) -> Result, Box> { + let request_body = GetQueueBacklogAnalyticsQuery::build_query(variables); + //console_log!("request_body: {:?}", request_body); + let client = reqwest::Client::new(); + let res = client.post(cloudflare_api_url) + .bearer_auth(cloudflare_api_key) + .json(&request_body).send().await?; + + if !res.status().is_success() { + console_log!("GraphQL query failed: {:?}", res.status()); + return Err(Box::new(res.error_for_status().unwrap_err())); + } + + let response_body: Response = res.json().await?; + if response_body.errors.is_some() { + console_log!("GraphQL query failed: {:?}", response_body.errors); + return Err(Box::new(worker::Error::JsError("graphql".parse().unwrap()))); + } + let response_data: get_queue_backlog_analytics_query::ResponseData = response_body.data.expect("missing response data"); + + let registry = Registry::new(); + let queue_backlog_bytes_opts = Opts::new("cloudflare_queue_backlog_bytes", "The average size of the backlog in bytes for sample interval"); + let queue_backlog_bytes = GaugeVec::new(queue_backlog_bytes_opts, &["queue_id"]).unwrap(); + registry.register(Box::new(queue_backlog_bytes.clone())).unwrap(); + + let queue_backlog_messages_opts = Opts::new("cloudflare_queue_backlog_messages", "The average number of messages in the backlog for sample interval"); + let queue_backlog_messages = GaugeVec::new(queue_backlog_messages_opts, &["queue_id"]).unwrap(); + registry.register(Box::new(queue_backlog_messages.clone())).unwrap(); + + let queue_backlog_sample_interval_opts = Opts::new("cloudflare_queue_backlog_sample_interval", "The average value used for sample interval"); + let queue_backlog_sample_interval = GaugeVec::new(queue_backlog_sample_interval_opts, &["queue_id"]).unwrap(); + registry.register(Box::new(queue_backlog_sample_interval.clone())).unwrap(); + + let mut last_datetime: Option