Skip to content

Commit

Permalink
feat: add Prometheus client with default metrics and custom metrics f…
Browse files Browse the repository at this point in the history
…or RabbitMQ message processing (#460)
  • Loading branch information
janosbabik authored Oct 23, 2024
1 parent 489eaa3 commit 1f0ed62
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 10 deletions.
61 changes: 61 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
"license": "MIT",
"devDependencies": {
"@types/express": "^4.17.21",
"@types/knex": "^0.16.1",
"@types/pg": "^8.11.10",
"@types/jest": "^29.5.13",
"@types/knex": "^0.16.1",
"@types/node": "^22.7.7",
"@types/pg": "^8.11.10",
"@typescript-eslint/eslint-plugin": "^7.16.1",
"@typescript-eslint/parser": "^7.18.0",
"eslint": "^8.57.1",
Expand All @@ -53,8 +53,9 @@
"express": "^4.21.1",
"kafkajs": "^2.2.3",
"knex": "^3.1.0",
"pg": "^8.13.0",
"matrix-js-sdk": "24.1.0",
"pg": "^8.13.0",
"prom-client": "^15.1.3",
"reflect-metadata": "^0.2.2",
"ts-node-dev": "^2.0.0",
"tsyringe": "^4.8.0"
Expand Down
1 change: 1 addition & 0 deletions src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ jest.mock('express', () => {
});

jest.mock('@user-office-software/duo-logger');
jest.mock('./middlewares/metrics/metrics', () => jest.fn());
jest.mock('./middlewares/healthCheck', () => jest.fn());
jest.mock('./middlewares/readinessCheck', () => jest.fn());
jest.mock('tsyringe');
Expand Down
3 changes: 2 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { Tokens } from './config/Tokens';
import { str2Bool } from './config/utils';
import validateEnv from './config/validateEnv';
import healthCheck from './middlewares/healthCheck';
import metrics from './middlewares/metrics/metrics';
import readinessCheck from './middlewares/readinessCheck';
import startKafkaTopicHandling from './queue/kafkaTopicHandling';
import startRabbitMQHandling from './queue/queueHandling';
Expand Down Expand Up @@ -63,7 +64,7 @@ async function bootstrap() {
Sync_Visa_Proposals: enableSyncVisaProposals,
});

app.use(healthCheck()).use(readinessCheck());
app.use(metrics).use(healthCheck).use(readinessCheck);

app.listen(PORT);

Expand Down
4 changes: 1 addition & 3 deletions src/middlewares/healthCheck.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,4 @@ router.get('/health', (req: Request, res: Response) => {
res.status(200).send(JSON.stringify(body));
});

export default function () {
return router;
}
export default router;
18 changes: 18 additions & 0 deletions src/middlewares/metrics/customMetrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import { Counter, Histogram } from 'prom-client';

// Metrics for rabbitmq message processing

// This counter tracks the total number of processed messages, with labels for the queue and status (success or failure)
export const processedMessagesCounter = new Counter({
name: 'rabbitmq_processed_messages_total',
help: 'Total number of processed messages from RabbitMQ',
labelNames: ['queue', 'status'],
});

// This histogram tracks the duration of message processing. It helps monitor how long it takes to process each message.
export const processingDurationHistogram = new Histogram({
name: 'rabbitmq_message_processing_duration_seconds',
help: 'Duration of message processing in seconds',
labelNames: ['queue'],
buckets: [0.1, 0.5, 1, 5, 10, 30, 60],
});
13 changes: 13 additions & 0 deletions src/middlewares/metrics/metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import express, { Request, Response } from 'express';
import { collectDefaultMetrics, register } from 'prom-client';

collectDefaultMetrics();

const router = express.Router();

router.get('/metrics', async (req: Request, res: Response) => {
res.set('Content-Type', register.contentType);
res.end(await register.metrics());
});

export default router;
4 changes: 1 addition & 3 deletions src/middlewares/readinessCheck.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,4 @@ router.get('/readiness', (req: Request, res: Response) => {
res.status(200).send(JSON.stringify(body));
});

export default function () {
return router;
}
export default router;
20 changes: 20 additions & 0 deletions src/queue/consumers/QueueConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import {
Queue,
} from '@user-office-software/duo-message-broker';

import {
processedMessagesCounter,
processingDurationHistogram,
} from '../../middlewares/metrics/customMetrics';

export abstract class QueueConsumer {
private messageBroker: MessageBroker;

Expand Down Expand Up @@ -45,8 +50,17 @@ export abstract class QueueConsumer {

this.messageBroker.listenOn(queueName as Queue, async (...args) => {
logger.logInfo('Received message on queue', { queueName });

// Start tracking processing time
const endTimer = processingDurationHistogram.startTimer({
queue: queueName,
});

try {
await this.onMessage(...args);

// Increment the success counter
processedMessagesCounter.inc({ queue: queueName, status: 'success' });
} catch (error) {
logger.logException('Error while handling QueueConsumer callback: ', {
error: (error as Error).message,
Expand All @@ -55,8 +69,14 @@ export abstract class QueueConsumer {
args,
});

// Increment the failure counter
processedMessagesCounter.inc({ queue: queueName, status: 'failure' });

// Re-throw the error to make sure the message is not acknowledged
throw error;
} finally {
// Stop the timer
endTimer();
}
});
logger.logInfo('Listening on queue', { queueName });
Expand Down

0 comments on commit 1f0ed62

Please sign in to comment.