diff --git a/cli/src/__tests__/cli/sub.test.ts b/cli/src/__tests__/cli/sub.test.ts index 41e7a4db0..a08538d88 100644 --- a/cli/src/__tests__/cli/sub.test.ts +++ b/cli/src/__tests__/cli/sub.test.ts @@ -1,5 +1,5 @@ import { exec, ChildProcess } from 'child_process' -import { expect, jest, afterAll } from '@jest/globals' +import { expect, jest, afterAll, describe, it } from '@jest/globals' import util from 'util' const execAsync = util.promisify(exec) @@ -20,99 +20,46 @@ describe('sub', () => { expect(stdout.trim()).toContain('Usage: mqttx sub [options]') }) - it('can subscribe to a topic', (done) => { - const topic = `test/mqttx/cli/${Date.now()}` + it('can subscribe to a topic and receive messages', (done) => { + const topic = `testtopic/#` const childProcess = exec(`node ./bin/index.js sub -h broker.emqx.io -p 1883 -u mqttx_test_sub -t ${topic}`) childProcesses.push(childProcess) let isSubscribing = false let isSubscribed = false + let messageReceived = false - const checkSubscription = (data: string) => { + const checkOutput = (data: string) => { if (data.includes('Subscribing')) { isSubscribing = true - isSubscribing = true } if (data.includes(`Subscribed to ${topic}`)) { isSubscribed = true isSubscribing = false - expect(isSubscribing).toBe(false) - expect(isSubscribed).toBe(true) - clearTimeout(timeoutId) - done() - } - } - - childProcess.stdout?.on('data', checkSubscription) - childProcess.stderr?.on('data', (data) => { - checkSubscription(data) - }) - - // Set a timeout in case the subscription takes too long - const timeoutId = setTimeout(() => { - if (!isSubscribed) { - done(new Error('Subscription timed out')) } - }, 25000) - }) - - const topic = `test/mqttx/cli/${Date.now()}` - const message = 'Hello MQTT' - - it('can receive messages after subscribing', (done) => { - console.log(`Starting subscription to topic: ${topic}`) - const subProcess = exec(`node ./bin/index.js sub -h broker.emqx.io -p 1883 -u mqttx_test_sub -t ${topic}`) - childProcesses.push(subProcess) - let isSubscribed = false - let messageReceived = false - - const checkMessageReceived = () => { - if (!isSubscribed) { - subProcess.stdout?.emit('data', 'Checking subscription status') - } else if (!messageReceived) { - subProcess.stdout?.emit('data', 'Checking for received message') - } - } - - const dataHandler = (data: string) => { - console.log(`Received data: ${data}`) - if (data.includes(`Subscribed to ${topic}`)) { - isSubscribed = true - console.log('Subscription confirmed, publishing message') - const pubProcess = exec( - `node ./bin/index.js pub -h broker.emqx.io -p 1883 -u mqttx_test_pub -t ${topic} -m "${message}"`, - ) - console.log('Published message with message:', message, 'to topic:', topic) - childProcesses.push(pubProcess) + if (data.includes('topic:') && data.includes('qos:')) { + messageReceived = true } - if (data.includes(message)) { - messageReceived = true - console.log('Message received') + if (isSubscribed && messageReceived) { + expect(isSubscribing).toBe(false) expect(isSubscribed).toBe(true) expect(messageReceived).toBe(true) - clearInterval(pollInterval) clearTimeout(timeoutId) done() } } - subProcess.stdout?.on('data', dataHandler) - subProcess.stderr?.on('data', dataHandler) - - // Set up polling interval - const pollInterval = setInterval(checkMessageReceived, 1000) + childProcess.stdout?.on('data', checkOutput) + childProcess.stderr?.on('data', checkOutput) - // Set a timeout in case the message is not received + // Set a timeout in case the subscription or message reception takes too long const timeoutId = setTimeout(() => { - console.log('Timeout reached, test failed') - clearInterval(pollInterval) - done(new Error('Message not received within the timeout period')) - }, 60000) // 1 minutes - - // Ensure the timer is cleaned up - timeoutId.unref() + if (!isSubscribed || !messageReceived) { + done(new Error('Subscription or message reception timed out')) + } + }, 25000) }) })