From 587ce2c55ec161994dc8d15375c27a2b1bfa2187 Mon Sep 17 00:00:00 2001 From: Onur Temizkan Date: Thu, 3 Oct 2024 12:52:49 +0300 Subject: [PATCH] fix(test): Fix flaky amqplib test --- .../tracing/amqplib/scenario-message.ts | 9 ++++++- .../suites/tracing/amqplib/utils.ts | 25 ++++++++++++------- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/dev-packages/node-integration-tests/suites/tracing/amqplib/scenario-message.ts b/dev-packages/node-integration-tests/suites/tracing/amqplib/scenario-message.ts index 2fa0d0feaa89..f8d2727a6e6b 100644 --- a/dev-packages/node-integration-tests/suites/tracing/amqplib/scenario-message.ts +++ b/dev-packages/node-integration-tests/suites/tracing/amqplib/scenario-message.ts @@ -4,16 +4,23 @@ import { connectToRabbitMQ, consumeMessageFromQueue, createQueue, sendMessageToQ const queueName = 'queue1'; +// Stop the process from exiting before the transaction is sent +// eslint-disable-next-line @typescript-eslint/no-empty-function +setInterval(() => {}, 1000); + // eslint-disable-next-line @typescript-eslint/no-floating-promises (async () => { const { connection, channel } = await connectToRabbitMQ(); await createQueue(queueName, channel); + const consumeMessagePromise = consumeMessageFromQueue(queueName, channel); + await Sentry.startSpan({ name: 'root span' }, async () => { sendMessageToQueue(queueName, channel, JSON.stringify({ foo: 'bar01' })); }); - await consumeMessageFromQueue(queueName, channel); + await consumeMessagePromise; + await channel.close(); await connection.close(); })(); diff --git a/dev-packages/node-integration-tests/suites/tracing/amqplib/utils.ts b/dev-packages/node-integration-tests/suites/tracing/amqplib/utils.ts index cf6f452365f2..db8af3e932fb 100644 --- a/dev-packages/node-integration-tests/suites/tracing/amqplib/utils.ts +++ b/dev-packages/node-integration-tests/suites/tracing/amqplib/utils.ts @@ -22,15 +22,22 @@ export function sendMessageToQueue(queueName: string, channel: Channel, message: } async function consumer(queueName: string, channel: Channel): Promise { - await channel.consume( - queueName, - message => { - if (message) { - channel.ack(message); - } - }, - ACKNOWLEDGEMENT, - ); + return new Promise((resolve, reject) => { + channel + .consume( + queueName, + message => { + if (message) { + channel.ack(message); + resolve(); + } else { + reject(new Error('No message received')); + } + }, + ACKNOWLEDGEMENT, + ) + .catch(reject); + }); } export async function consumeMessageFromQueue(queueName: string, channel: Channel): Promise {