From 1f0ed62c9c52e4562aa8ba3c3b640884d89429b9 Mon Sep 17 00:00:00 2001 From: janosbabik <143906591+janosbabik@users.noreply.github.com> Date: Wed, 23 Oct 2024 15:40:42 +0200 Subject: [PATCH] feat: add Prometheus client with default metrics and custom metrics for RabbitMQ message processing (#460) --- package-lock.json | 61 ++++++++++++++++++++++++ package.json | 7 +-- src/index.spec.ts | 1 + src/index.ts | 3 +- src/middlewares/healthCheck.ts | 4 +- src/middlewares/metrics/customMetrics.ts | 18 +++++++ src/middlewares/metrics/metrics.ts | 13 +++++ src/middlewares/readinessCheck.ts | 4 +- src/queue/consumers/QueueConsumer.ts | 20 ++++++++ 9 files changed, 121 insertions(+), 10 deletions(-) create mode 100644 src/middlewares/metrics/customMetrics.ts create mode 100644 src/middlewares/metrics/metrics.ts diff --git a/package-lock.json b/package-lock.json index ef318b9..87d3125 100644 --- a/package-lock.json +++ b/package-lock.json @@ -19,6 +19,7 @@ "knex": "^3.1.0", "matrix-js-sdk": "24.1.0", "pg": "^8.13.0", + "prom-client": "^15.1.3", "reflect-metadata": "^0.2.2", "ts-node-dev": "^2.0.0", "tsyringe": "^4.8.0" @@ -1355,6 +1356,14 @@ "node": ">= 8" } }, + "node_modules/@opentelemetry/api": { + "version": "1.9.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/api/-/api-1.9.0.tgz", + "integrity": "sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==", + "engines": { + "node": ">=8.0.0" + } + }, "node_modules/@pkgr/core": { "version": "0.1.1", "resolved": "https://registry.npmjs.org/@pkgr/core/-/core-0.1.1.tgz", @@ -2418,6 +2427,11 @@ "node": ">=8" } }, + "node_modules/bintrees": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/bintrees/-/bintrees-1.0.2.tgz", + "integrity": "sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw==" + }, "node_modules/body-parser": { "version": "1.20.3", "resolved": "https://registry.npmjs.org/body-parser/-/body-parser-1.20.3.tgz", @@ -7182,6 +7196,18 @@ "url": "https://github.com/chalk/ansi-styles?sponsor=1" } }, + "node_modules/prom-client": { + "version": "15.1.3", + "resolved": "https://registry.npmjs.org/prom-client/-/prom-client-15.1.3.tgz", + "integrity": "sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g==", + "dependencies": { + "@opentelemetry/api": "^1.4.0", + "tdigest": "^0.1.1" + }, + "engines": { + "node": "^16 || ^18 || >=20" + } + }, "node_modules/prompts": { "version": "2.4.2", "resolved": "https://registry.npmjs.org/prompts/-/prompts-2.4.2.tgz", @@ -8047,6 +8073,14 @@ "node": ">=8.0.0" } }, + "node_modules/tdigest": { + "version": "0.1.2", + "resolved": "https://registry.npmjs.org/tdigest/-/tdigest-0.1.2.tgz", + "integrity": "sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA==", + "dependencies": { + "bintrees": "1.0.2" + } + }, "node_modules/test-exclude": { "version": "6.0.0", "resolved": "https://registry.npmjs.org/test-exclude/-/test-exclude-6.0.0.tgz", @@ -9781,6 +9815,11 @@ "fastq": "^1.6.0" } }, + "@opentelemetry/api": { + "version": "1.9.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/api/-/api-1.9.0.tgz", + "integrity": "sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==" + }, "@pkgr/core": { "version": "0.1.1", "resolved": "https://registry.npmjs.org/@pkgr/core/-/core-0.1.1.tgz", @@ -10587,6 +10626,11 @@ "resolved": "https://registry.npmjs.org/binary-extensions/-/binary-extensions-2.2.0.tgz", "integrity": "sha512-jDctJ/IVQbZoJykoeHbhXpOlNBqGNcwXJKJog42E5HDPUwQTSdjCHdihjj0DlnheQ7blbT6dHOafNAiS8ooQKA==" }, + "bintrees": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/bintrees/-/bintrees-1.0.2.tgz", + "integrity": "sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw==" + }, "body-parser": { "version": "1.20.3", "resolved": "https://registry.npmjs.org/body-parser/-/body-parser-1.20.3.tgz", @@ -13946,6 +13990,15 @@ } } }, + "prom-client": { + "version": "15.1.3", + "resolved": "https://registry.npmjs.org/prom-client/-/prom-client-15.1.3.tgz", + "integrity": "sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g==", + "requires": { + "@opentelemetry/api": "^1.4.0", + "tdigest": "^0.1.1" + } + }, "prompts": { "version": "2.4.2", "resolved": "https://registry.npmjs.org/prompts/-/prompts-2.4.2.tgz", @@ -14561,6 +14614,14 @@ "resolved": "https://registry.npmjs.org/tarn/-/tarn-3.0.2.tgz", "integrity": "sha512-51LAVKUSZSVfI05vjPESNc5vwqqZpbXCsU+/+wxlOrUjk2SnFTt97v9ZgQrD4YmxYW1Px6w2KjaDitCfkvgxMQ==" }, + "tdigest": { + "version": "0.1.2", + "resolved": "https://registry.npmjs.org/tdigest/-/tdigest-0.1.2.tgz", + "integrity": "sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA==", + "requires": { + "bintrees": "1.0.2" + } + }, "test-exclude": { "version": "6.0.0", "resolved": "https://registry.npmjs.org/test-exclude/-/test-exclude-6.0.0.tgz", diff --git a/package.json b/package.json index bf80970..99a924c 100644 --- a/package.json +++ b/package.json @@ -25,10 +25,10 @@ "license": "MIT", "devDependencies": { "@types/express": "^4.17.21", - "@types/knex": "^0.16.1", - "@types/pg": "^8.11.10", "@types/jest": "^29.5.13", + "@types/knex": "^0.16.1", "@types/node": "^22.7.7", + "@types/pg": "^8.11.10", "@typescript-eslint/eslint-plugin": "^7.16.1", "@typescript-eslint/parser": "^7.18.0", "eslint": "^8.57.1", @@ -53,8 +53,9 @@ "express": "^4.21.1", "kafkajs": "^2.2.3", "knex": "^3.1.0", - "pg": "^8.13.0", "matrix-js-sdk": "24.1.0", + "pg": "^8.13.0", + "prom-client": "^15.1.3", "reflect-metadata": "^0.2.2", "ts-node-dev": "^2.0.0", "tsyringe": "^4.8.0" diff --git a/src/index.spec.ts b/src/index.spec.ts index 6d984ed..99a5538 100644 --- a/src/index.spec.ts +++ b/src/index.spec.ts @@ -8,6 +8,7 @@ jest.mock('express', () => { }); jest.mock('@user-office-software/duo-logger'); +jest.mock('./middlewares/metrics/metrics', () => jest.fn()); jest.mock('./middlewares/healthCheck', () => jest.fn()); jest.mock('./middlewares/readinessCheck', () => jest.fn()); jest.mock('tsyringe'); diff --git a/src/index.ts b/src/index.ts index f86bd99..f3f5635 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,6 +9,7 @@ import { Tokens } from './config/Tokens'; import { str2Bool } from './config/utils'; import validateEnv from './config/validateEnv'; import healthCheck from './middlewares/healthCheck'; +import metrics from './middlewares/metrics/metrics'; import readinessCheck from './middlewares/readinessCheck'; import startKafkaTopicHandling from './queue/kafkaTopicHandling'; import startRabbitMQHandling from './queue/queueHandling'; @@ -63,7 +64,7 @@ async function bootstrap() { Sync_Visa_Proposals: enableSyncVisaProposals, }); - app.use(healthCheck()).use(readinessCheck()); + app.use(metrics).use(healthCheck).use(readinessCheck); app.listen(PORT); diff --git a/src/middlewares/healthCheck.ts b/src/middlewares/healthCheck.ts index 05b98d5..401df52 100644 --- a/src/middlewares/healthCheck.ts +++ b/src/middlewares/healthCheck.ts @@ -9,6 +9,4 @@ router.get('/health', (req: Request, res: Response) => { res.status(200).send(JSON.stringify(body)); }); -export default function () { - return router; -} +export default router; diff --git a/src/middlewares/metrics/customMetrics.ts b/src/middlewares/metrics/customMetrics.ts new file mode 100644 index 0000000..e14002b --- /dev/null +++ b/src/middlewares/metrics/customMetrics.ts @@ -0,0 +1,18 @@ +import { Counter, Histogram } from 'prom-client'; + +// Metrics for rabbitmq message processing + +// This counter tracks the total number of processed messages, with labels for the queue and status (success or failure) +export const processedMessagesCounter = new Counter({ + name: 'rabbitmq_processed_messages_total', + help: 'Total number of processed messages from RabbitMQ', + labelNames: ['queue', 'status'], +}); + +// This histogram tracks the duration of message processing. It helps monitor how long it takes to process each message. +export const processingDurationHistogram = new Histogram({ + name: 'rabbitmq_message_processing_duration_seconds', + help: 'Duration of message processing in seconds', + labelNames: ['queue'], + buckets: [0.1, 0.5, 1, 5, 10, 30, 60], +}); diff --git a/src/middlewares/metrics/metrics.ts b/src/middlewares/metrics/metrics.ts new file mode 100644 index 0000000..47802e1 --- /dev/null +++ b/src/middlewares/metrics/metrics.ts @@ -0,0 +1,13 @@ +import express, { Request, Response } from 'express'; +import { collectDefaultMetrics, register } from 'prom-client'; + +collectDefaultMetrics(); + +const router = express.Router(); + +router.get('/metrics', async (req: Request, res: Response) => { + res.set('Content-Type', register.contentType); + res.end(await register.metrics()); +}); + +export default router; diff --git a/src/middlewares/readinessCheck.ts b/src/middlewares/readinessCheck.ts index eec3279..5b3a21c 100644 --- a/src/middlewares/readinessCheck.ts +++ b/src/middlewares/readinessCheck.ts @@ -9,6 +9,4 @@ router.get('/readiness', (req: Request, res: Response) => { res.status(200).send(JSON.stringify(body)); }); -export default function () { - return router; -} +export default router; diff --git a/src/queue/consumers/QueueConsumer.ts b/src/queue/consumers/QueueConsumer.ts index e12c5b4..c98c079 100644 --- a/src/queue/consumers/QueueConsumer.ts +++ b/src/queue/consumers/QueueConsumer.ts @@ -5,6 +5,11 @@ import { Queue, } from '@user-office-software/duo-message-broker'; +import { + processedMessagesCounter, + processingDurationHistogram, +} from '../../middlewares/metrics/customMetrics'; + export abstract class QueueConsumer { private messageBroker: MessageBroker; @@ -45,8 +50,17 @@ export abstract class QueueConsumer { this.messageBroker.listenOn(queueName as Queue, async (...args) => { logger.logInfo('Received message on queue', { queueName }); + + // Start tracking processing time + const endTimer = processingDurationHistogram.startTimer({ + queue: queueName, + }); + try { await this.onMessage(...args); + + // Increment the success counter + processedMessagesCounter.inc({ queue: queueName, status: 'success' }); } catch (error) { logger.logException('Error while handling QueueConsumer callback: ', { error: (error as Error).message, @@ -55,8 +69,14 @@ export abstract class QueueConsumer { args, }); + // Increment the failure counter + processedMessagesCounter.inc({ queue: queueName, status: 'failure' }); + // Re-throw the error to make sure the message is not acknowledged throw error; + } finally { + // Stop the timer + endTimer(); } }); logger.logInfo('Listening on queue', { queueName });