diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9333103..54378da 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,17 +11,17 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - node-version: [14.x, 16.x, 18.x] + node-version: ['20', '22'] steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Start Redis working-directory: ./docker - run: docker-compose up -d + run: docker compose up -d - name: Use Node.js - uses: actions/setup-node@v3 + uses: actions/setup-node@v4 with: node-version: ${{ matrix.node-version }} - name: Install @@ -32,4 +32,4 @@ jobs: npm run test - name: Run tests - clusters run: | - node test-clusters.js + npm run test:clusters diff --git a/README.md b/README.md index afee9d8..2d1e233 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,6 @@ # aedes-persistence-redis ![.github/workflows/ci.yml](https://github.com/moscajs/aedes-persistence-redis/workflows/.github/workflows/ci.yml/badge.svg) -[![Dependencies Status](https://david-dm.org/moscajs/aedes-persistence-redis/status.svg)](https://david-dm.org/moscajs/aedes-persistence-redis) -[![devDependencies Status](https://david-dm.org/moscajs/aedes-persistence-redis/dev-status.svg)](https://david-dm.org/moscajs/aedes-persistence-redis?type=dev) \ [![Known Vulnerabilities](https://snyk.io/test/github/moscajs/aedes-persistence-redis/badge.svg)](https://snyk.io/test/github/moscajs/aedes-persistence-redis) [![Coverage Status](https://coveralls.io/repos/moscajs/aedes-persistence-redis/badge.svg?branch=master&service=github)](https://coveralls.io/github/moscajs/aedes-persistence-redis?branch=master) diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 8c1a80b..e21aead 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -1,5 +1,3 @@ -version: '3' - services: redis-default: image: redis:6.2.5-alpine diff --git a/eslint.config.js b/eslint.config.js new file mode 100644 index 0000000..1071df9 --- /dev/null +++ b/eslint.config.js @@ -0,0 +1 @@ +module.exports = require('neostandard')({}) diff --git a/example.js b/example.js index 9dd8f06..3699dd9 100644 --- a/example.js +++ b/example.js @@ -4,6 +4,6 @@ const aedes = require('aedes')({ mq, persistence }) -const server = require('net').createServer(aedes.handle) +const server = require('node:net').createServer(aedes.handle) server.listen(1883) diff --git a/migrations.js b/migrations.js index 70c4533..ed123a1 100644 --- a/migrations.js +++ b/migrations.js @@ -6,7 +6,7 @@ async function from9to10 (db, cb) { } // get all topics - db.hkeys(RETAINEDKEY, function (err, topics) { + db.hkeys(RETAINEDKEY, (err, topics) => { if (err) { return cb(err) } @@ -14,17 +14,17 @@ async function from9to10 (db, cb) { Promise.all(topics.map(t => { return new Promise((resolve, reject) => { // get packet payload - db.hgetBuffer(RETAINEDKEY, t, function (err, payload) { + db.hgetBuffer(RETAINEDKEY, t, (err, payload) => { if (err) { return reject(err) } // set packet with new format - db.set(retainedKey(t), payload, function (err) { + db.set(retainedKey(t), payload, (err) => { if (err) { return reject(err) } // remove old packet - db.hdel(RETAINEDKEY, t, function (err) { + db.hdel(RETAINEDKEY, t, (err) => { if (err) { return reject(err) } diff --git a/package.json b/package.json index 816de45..bbcbc71 100644 --- a/package.json +++ b/package.json @@ -3,12 +3,19 @@ "version": "10.0.0", "description": "Aedes persistence, backed by redis", "main": "persistence.js", + "engines": { + "node": ">=20" + }, "scripts": { - "lint": "standard --verbose | snazzy", - "test": "tape test.js | faucet", - "coverage": "nyc --reporter=lcov tape test.js", + "lint": "eslint", + "lint:fix": "eslint --fix", + "unit": "node --test test.js", + "test": "npm run lint && npm run unit", + "test:clusters": "node --test test-clusters.js", + "coverage": "nyc --reporter=lcov node --test test.js", "license-checker": "license-checker --production --onlyAllow='MIT;ISC;BSD-3-Clause;BSD-2-Clause;Apache-2.0;Apache*'", - "release": "read -p 'GITHUB_TOKEN: ' GITHUB_TOKEN && export GITHUB_TOKEN=$GITHUB_TOKEN && release-it --disable-metrics" + "release": "read -p 'GITHUB_TOKEN: ' GITHUB_TOKEN && export GITHUB_TOKEN=$GITHUB_TOKEN && release-it --disable-metrics", + "redis": "cd docker;docker compose up" }, "release-it": { "github": { @@ -27,7 +34,6 @@ } }, "pre-commit": [ - "lint", "test" ], "repository": { @@ -47,27 +53,21 @@ }, "homepage": "https://github.com/moscajs/aedes-persistence-redis#readme", "devDependencies": { - "fastq": "^1.13.0", - "faucet": "0.0.1", + "@fastify/pre-commit": "^2.2.0", + "eslint": "^9.21.0", + "fastq": "^1.19.1", "license-checker": "^25.0.1", - "mqemitter": "^4.5.0", - "mqemitter-redis": "^5.0.0", - "mqtt": "^4.3.7", - "nyc": "^15.1.0", - "pre-commit": "^1.2.2", - "release-it": "^15.0.0", - "snazzy": "^9.0.0", - "standard": "^17.0.0", - "tape": "^5.5.3" + "mqemitter-redis": "^6.1.0", + "mqtt": "^5.10.4", + "neostandard": "^0.12.1", + "nyc": "^17.1.0", + "release-it": "^18.1.2" }, "dependencies": { - "aedes-cached-persistence": "^9.0.0", + "aedes-cached-persistence": "^10.0.0", "hashlru": "^2.3.0", - "ioredis": "^5.0.5", + "ioredis": "^5.5.0", "msgpack-lite": "^0.1.26", - "pump": "^3.0.0", - "qlobber": "^7.0.0", - "through2": "^4.0.2", - "throughv": "^1.0.4" + "qlobber": "^8.0.1" } } diff --git a/persistence.js b/persistence.js index b527c88..2b2ac46 100644 --- a/persistence.js +++ b/persistence.js @@ -1,9 +1,6 @@ const Redis = require('ioredis') -const { Readable } = require('stream') -const through = require('through2') -const throughv = require('throughv') +const { Readable } = require('node:stream') const msgpack = require('msgpack-lite') -const pump = require('pump') const CachedPersistence = require('aedes-cached-persistence') const Packet = CachedPersistence.Packet const HLRU = require('hashlru') @@ -61,10 +58,49 @@ function packetCountKey (brokerId, brokerCounter) { return `${PACKETKEY}${brokerId}:${brokerCounter}:offlineCount` } +async function getDecodedValue (db, listKey, key) { + const value = await db.getBuffer(key) + const decoded = value ? msgpack.decode(value) : undefined + if (!decoded) { + db.lrem(listKey, 0, key) + } + return decoded +} + +async function getRetainedKeys (db, hasClusters) { + if (hasClusters === true) { + // Get keys of all the masters + const masters = db.nodes('master') + const keys = await Promise.all( + masters.flatMap((node) => node.keys(ALL_RETAINEDKEYS)) + ) + return keys + } + return await db.hkeys(RETAINEDKEY) +} + +async function getRetainedValue (db, topic, hasClusters) { + if (hasClusters === true) { + return msgpack.decode(await db.getBuffer(retainedKey(topic))) + } + return msgpack.decode(await db.hgetBuffer(RETAINEDKEY, topic)) +} + +async function * createWillStream (db, brokers, maxWills) { + const results = await db.lrange(WILLSKEY, 0, maxWills) + for (const key of results) { + const result = await getDecodedValue(db, WILLSKEY, key) + if (!brokers || !brokers[result.split(':')[1]]) { + yield result + } + } +} + class RedisPersistence extends CachedPersistence { constructor (opts = {}) { super(opts) this.maxSessionDelivery = opts.maxSessionDelivery || 1000 + this.maxWills = 10000 this.packetTTL = opts.packetTTL || (() => { return 0 }) this.messageIdCache = HLRU(100000) @@ -76,8 +112,6 @@ class RedisPersistence extends CachedPersistence { } this.hasClusters = !!opts.cluster - this._getRetainedChunkBound = (this.hasClusters ? this._getRetainedChunkCluster : this._getRetainedChunk).bind(this) - this._getRetainedKeysBound = (this.hasClusters ? this._getRetainedKeysCluster : this._getRetainedKeys).bind(this) } /** @@ -108,52 +142,13 @@ class RedisPersistence extends CachedPersistence { } } - _getRetainedChunkCluster (topic, enc, cb) { - this._db.getBuffer(retainedKey(topic), cb) - } - - _getRetainedChunk (topic, enc, cb) { - this._db.hgetBuffer(RETAINEDKEY, topic, cb) - } - - _getRetainedKeysCluster (cb) { - // Get keys of all the masters: - const masters = this._db.nodes('master') - Promise.all( - masters - .map((node) => node.keys(ALL_RETAINEDKEYS)) - ).then((keys) => { - // keys: [['key1', 'key2'], ['key3', 'key4']] - // flatten the array - cb(null, keys.reduce((acc, val) => acc.concat(val), [])) - }).catch((err) => { - cb(err) - }) - } - - _getRetainedKeys (cb) { - this._db.hkeys(RETAINEDKEY, cb) - } - createRetainedStreamCombi (patterns) { - const that = this const qlobber = new QlobberTrue(qlobberOpts) for (const pattern of patterns) { qlobber.add(pattern) } - - const stream = through.obj(that._getRetainedChunkBound) - - this._getRetainedKeysBound(function getKeys (err, keys) { - if (err) { - stream.emit('error', err) - } else { - matchRetained(stream, keys, qlobber, that.hasClusters) - } - }) - - return pump(stream, throughv.obj(decodeRetainedPacket)) + return Readable.from(matchRetained(this.db, qlobber, this.hasClusters)) } createRetainedStream (pattern) { @@ -256,7 +251,7 @@ class RedisPersistence extends CachedPersistence { return cb(err) } - cb(null, that._trie.subscriptionsCount, parseInt(count) || 0) + cb(null, that._trie.subscriptionsCount, Number.parseInt(count) || 0) }) } @@ -271,35 +266,22 @@ class RedisPersistence extends CachedPersistence { cb(null, result) } - _setup () { + async _setup () { if (this.ready) { return } - const that = this - - const hgetallStream = throughv.obj(function getStream (clientId, enc, cb) { - that._db.hgetallBuffer(clientSubKey(clientId), function clientHash (err, hash) { - cb(err, { clientHash: hash, clientId }) - }) - }, function emitReady (cb) { - that.ready = true - that.emit('ready') - cb() - }).on('data', function processKeys (data) { - processKeysForClient(data.clientId, data.clientHash, that) - }) - - this._db.smembers(CLIENTSKEY, function smembers (err, clientIds) { - if (err) { - hgetallStream.emit('error', err) - } else { - for (const clientId of clientIds) { - hgetallStream.write(clientId) - } - hgetallStream.end() + try { + const clientIds = await this._db.smembers(CLIENTSKEY) + for await (const clientId of clientIds) { + const clientHash = await this._db.hgetallBuffer(clientSubKey(clientId)) + processKeysForClient(clientId, clientHash, this._trie) } - }) + this.ready = true + this.emit('ready') + } catch (err) { + this.emit('error', err) + } } outgoingEnqueue (sub, packet, cb) { @@ -441,22 +423,17 @@ class RedisPersistence extends CachedPersistence { outgoingStream (client) { const clientListKey = outgoingKey(client.id) - const stream = throughv.obj(this._buildAugment(clientListKey)) - - this._db.lrange(clientListKey, 0, this.maxSessionDelivery, lrangeResult) + const db = this._db + const maxSessionDelivery = this.maxSessionDelivery - function lrangeResult (err, results) { - if (err) { - stream.emit('error', err) - } else { - for (const result of results) { - stream.write(result) - } - stream.end() + async function * lrangeResult () { + const results = await db.lrange(clientListKey, 0, maxSessionDelivery) + for (const key of results) { + yield getDecodedValue(db, clientListKey, key) } } - return stream + return Readable.from(lrangeResult()) } incomingStorePacket (client, packet, cb) { @@ -533,23 +510,7 @@ class RedisPersistence extends CachedPersistence { } streamWill (brokers) { - const stream = throughv.obj(this._buildAugment(WILLSKEY)) - - this._db.lrange(WILLSKEY, 0, 10000, streamWill) - - function streamWill (err, results) { - if (err) { - stream.emit('error', err) - } else { - for (const result of results) { - if (!brokers || !brokers[result.split(':')[1]]) { - stream.write(result) - } - } - stream.end() - } - } - return stream + return Readable.from(createWillStream(this._db, brokers, this.maxWills)) } * #getClientIdFromEntries (entries) { @@ -591,18 +552,13 @@ class RedisPersistence extends CachedPersistence { } } -function matchRetained (stream, topics, qlobber, hasClusters) { - for (let t of topics) { - t = hasClusters ? decodeURIComponent(t.split(':')[1]) : t - if (qlobber.test(t)) { - stream.write(t) +function * matchRetained (db, qlobber, hasClusters) { + for (const key of getRetainedKeys(db, hasClusters)) { + const topic = hasClusters ? decodeURIComponent(key.split(':')[1]) : key + if (qlobber.test(topic)) { + yield getRetainedValue(db, topic, hasClusters) } } - stream.end() -} - -function decodeRetainedPacket (chunk, enc, cb) { - cb(null, msgpack.decode(chunk)) } function toSub (topic) { @@ -624,7 +580,7 @@ function returnSubsForClient (subs) { if (subs[subKey].length === 1) { // version 8x fallback, QoS saved not encoded object toReturn.push({ topic: subKey, - qos: parseInt(subs[subKey]) + qos: Number.parseInt(subs[subKey]) }) } else { toReturn.push(msgpack.decode(subs[subKey])) @@ -634,12 +590,12 @@ function returnSubsForClient (subs) { return toReturn } -function processKeysForClient (clientId, clientHash, that) { +function processKeysForClient (clientId, clientHash, trie) { const topics = Object.keys(clientHash) for (const topic of topics) { const sub = msgpack.decode(clientHash[topic]) sub.clientId = clientId - that._trie.add(topic, sub) + trie.add(topic, sub) } } @@ -653,9 +609,8 @@ function updateWithClientData (that, client, packet, cb) { that.messageIdCache.set(messageIdKey, pktKey) if (ttl > 0) { return that._db.set(pktKey, msgpack.encode(packet), 'EX', ttl, updatePacket) - } else { - return that._db.set(pktKey, msgpack.encode(packet), updatePacket) } + return that._db.set(pktKey, msgpack.encode(packet), updatePacket) } // qos=2 diff --git a/test-clusters.js b/test-clusters.js index 5cac38e..c37e02b 100644 --- a/test-clusters.js +++ b/test-clusters.js @@ -1,4 +1,4 @@ -const test = require('tape').test +const test = require('node:test') const persistence = require('./persistence') const Redis = require('ioredis') const mqemitterRedis = require('mqemitter-redis') @@ -8,6 +8,10 @@ function unref () { this.connector.stream.unref() } +function sleep (sec) { + return new Promise(resolve => setTimeout(resolve, sec * 1000)) +} + const nodes = [ { host: 'localhost', port: 6378 }, { host: 'localhost', port: 6380 }, @@ -23,41 +27,43 @@ db.on('error', e => { console.trace(e) }) -db.on('ready', function () { +function buildEmitter () { + const emitter = mqemitterRedis() + emitter.subConn.on('connect', unref) + emitter.pubConn.on('connect', unref) + + return emitter +} + +function clusterPersistence (cb) { + const slaves = db.nodes('master') + Promise.all(slaves.map((node) => { + return node.flushdb().catch(err => { + console.error('flushRedisKeys-error:', err) + }) + })).then(() => { + const conn = new Redis.Cluster(nodes) + + conn.on('error', e => { + console.trace(e) + }) + + conn.on('ready', () => { + cb(null, persistence({ + conn, + cluster: true + })) + }) + }) +} + +db.on('ready', () => { abs({ test, - buildEmitter () { - const emitter = mqemitterRedis() - emitter.subConn.on('connect', unref) - emitter.pubConn.on('connect', unref) - - return emitter - }, - persistence (cb) { - const slaves = db.nodes('master') - Promise.all(slaves.map(function (node) { - return node.flushdb().catch(err => { - console.error('flushRedisKeys-error:', err) - }) - })).then(() => { - const conn = new Redis.Cluster(nodes) - - conn.on('error', e => { - console.trace(e) - }) - - conn.on('ready', function () { - cb(null, persistence({ - conn, - cluster: true - })) - }) - }) - }, + buildEmitter, + persistence: clusterPersistence, waitForReady: true }) - - test.onFinish(() => { - process.exit(0) - }) }) + +sleep(10).then(() => process.exit(0)) diff --git a/test.js b/test.js index e912ba9..985a26b 100644 --- a/test.js +++ b/test.js @@ -1,45 +1,69 @@ -const test = require('tape').test +const test = require('node:test') const persistence = require('./') const Redis = require('ioredis') const mqemitterRedis = require('mqemitter-redis') const abs = require('aedes-cached-persistence/abstract') -const db = new Redis() -db.on('error', e => { - console.trace(e) -}) +function sleep (sec) { + return new Promise(resolve => setTimeout(resolve, sec * 1000)) +} -db.on('connect', unref) +function waitForEvent (obj, resolveEvt) { + return new Promise((resolve, reject) => { + obj.once(resolveEvt, () => { + resolve() + }) + obj.once('error', reject) + }) +} +function setUpPersistence (t, id, persistenceOpts) { + const emitter = mqemitterRedis() + const instance = persistence(persistenceOpts) + instance.broker = toBroker(id, emitter) + t.diagnostic(`instance ${id} created`) + return { instance, emitter, id } +} + +function cleanUpPersistence (t, { instance, emitter, id }) { + instance.destroy() + emitter.close() + t.diagnostic(`instance ${id} destroyed`) +} + +function toBroker (id, emitter) { + return { + id, + publish: emitter.emit.bind(emitter), + subscribe: emitter.on.bind(emitter), + unsubscribe: emitter.removeListener.bind(emitter) + } +} function unref () { this.connector.stream.unref() } -test('external Redis conn', t => { - t.plan(2) +// testing starts here +const db = new Redis() +db.on('error', e => { + console.trace(e) +}) +db.on('connect', unref) +test('external Redis conn', async t => { + t.plan(2) const externalRedis = new Redis() - const emitter = mqemitterRedis() - - db.on('error', e => { - t.notOk(e) - }) - - db.on('connect', () => { - t.pass('redis connected') - }) - const instance = persistence({ + await waitForEvent(externalRedis, 'connect') + t.diagnostic('redis connected') + t.assert.ok(true, 'redis connected') + const p = setUpPersistence(t, '1', { conn: externalRedis }) - - instance.broker = toBroker('1', emitter) - - instance.on('ready', () => { - t.pass('instance ready') - externalRedis.disconnect() - instance.destroy() - emitter.close() - }) + await waitForEvent(p.instance, 'ready') + t.assert.ok(true, 'instance ready') + t.diagnostic('instance ready') + externalRedis.disconnect() + cleanUpPersistence(t, p) }) abs({ @@ -51,73 +75,64 @@ abs({ return emitter }, - persistence () { + persistence: () => { db.flushall() return persistence() }, waitForReady: true }) -function toBroker (id, emitter) { - return { - id, - publish: emitter.emit.bind(emitter), - subscribe: emitter.on.bind(emitter), - unsubscribe: emitter.removeListener.bind(emitter) - } -} +test('packet ttl', async t => { + t.plan(3) + // the promise is required for the test to wait for the end event + const executeTest = new Promise((resolve, reject) => { + db.flushall() -test('packet ttl', t => { - t.plan(4) - db.flushall() - const emitter = mqemitterRedis() - const instance = persistence({ - packetTTL () { - return 1 + const p = setUpPersistence(t, '1', { + packetTTL () { + return 1 + } + }) + const instance = p.instance + + const subs = [{ + clientId: 'ttlTest', + topic: 'hello', + qos: 1 + }] + const packet = { + cmd: 'publish', + topic: 'hello', + payload: 'ttl test', + qos: 1, + retain: false, + brokerId: instance.broker.id, + brokerCounter: 42 } - }) - instance.broker = toBroker('1', emitter) - - const subs = [{ - clientId: 'ttlTest', - topic: 'hello', - qos: 1 - }] - const packet = { - cmd: 'publish', - topic: 'hello', - payload: 'ttl test', - qos: 1, - retain: false, - brokerId: instance.broker.id, - brokerCounter: 42 - } - instance.outgoingEnqueueCombi(subs, packet, function enqueued (err, saved) { - t.notOk(err) - t.deepEqual(saved, packet) - setTimeout(() => { + instance.outgoingEnqueueCombi(subs, packet, async function enqueued (err, saved) { + t.assert.ifError(err) + t.assert.deepEqual(saved, packet) + await sleep(1) const offlineStream = instance.outgoingStream({ id: 'ttlTest' }) - offlineStream.on('data', offlinePacket => { - t.notOk(offlinePacket) - }) - offlineStream.on('end', () => { - instance.destroy(t.pass.bind(t, 'stop instance')) - emitter.close(t.pass.bind(t, 'stop emitter')) - }) - }, 1100) + for await (const offlinePacket of offlineStream) { + t.assert.ok(!offlinePacket) + } + cleanUpPersistence(t, p) + resolve() + }) }) + await executeTest }) -test('outgoingUpdate doesn\'t clear packet ttl', t => { - t.plan(5) +test('outgoingUpdate doesn\'t clear packet ttl', async t => { + t.plan(3) db.flushall() - const emitter = mqemitterRedis() - const instance = persistence({ + const p = setUpPersistence(t, '1', { packetTTL () { return 1 } }) - instance.broker = toBroker('1', emitter) + const instance = p.instance const client = { id: 'ttlTest' @@ -137,173 +152,172 @@ test('outgoingUpdate doesn\'t clear packet ttl', t => { brokerCounter: 42, messageId: 123 } - instance.outgoingEnqueueCombi(subs, packet, function enqueued (err, saved) { - t.notOk(err) - t.deepEqual(saved, packet) - instance.outgoingUpdate(client, packet, function updated () { - setTimeout(() => { + + await new Promise((resolve, reject) => { + instance.outgoingEnqueueCombi(subs, packet, function enqueued (err, saved) { + t.assert.ifError(err) + t.assert.deepEqual(saved, packet) + instance.outgoingUpdate(client, packet, async function updated () { + await sleep(2) db.exists('packet:1:42', (_, exists) => { - t.notOk(exists, 'packet key should have expired') + t.assert.ok(!exists, 'packet key should have expired') + cleanUpPersistence(t, p) + resolve() }) - instance.destroy(t.pass.bind(t, 'instance dies')) - emitter.close(t.pass.bind(t, 'emitter dies')) - }, 1100) + }) }) }) }) -test('multiple persistences', t => { - t.plan(7) - t.timeoutAfter(60 * 1000) - db.flushall() - const emitter = mqemitterRedis() - const emitter2 = mqemitterRedis() - const instance = persistence() - const instance2 = persistence() - instance.broker = toBroker('1', emitter) - instance2.broker = toBroker('2', emitter2) - - const client = { id: 'multipleTest' } - const subs = [{ - topic: 'hello', - qos: 1 - }, { - topic: 'hello/#', - qos: 1 - }, { - topic: 'matteo', - qos: 1 - }] - - let gotSubs = false - let addedSubs = false - - function close () { - if (gotSubs && addedSubs) { - instance.destroy(t.pass.bind(t, 'first dies')) - instance2.destroy(t.pass.bind(t, 'second dies')) - emitter.close(t.pass.bind(t, 'first emitter dies')) - emitter2.close(t.pass.bind(t, 'second emitter dies')) +test('multiple persistences', { + timeout: 60 * 1000 +}, async t => { + t.plan(3) + const executeTest = new Promise((resolve, reject) => { + db.flushall() + const p1 = setUpPersistence(t, '1') + const p2 = setUpPersistence(t, '2') + const instance = p1.instance + const instance2 = p2.instance + + const client = { id: 'multipleTest' } + const subs = [{ + topic: 'hello', + qos: 1 + }, { + topic: 'hello/#', + qos: 1 + }, { + topic: 'matteo', + qos: 1 + }] + + let gotSubs = false + let addedSubs = false + + function close () { + if (gotSubs && addedSubs) { + cleanUpPersistence(t, p1) + cleanUpPersistence(t, p2) + resolve() + } } - } - instance2._waitFor(client, true, 'hello', () => { - instance2.subscriptionsByTopic('hello', (err, resubs) => { - t.notOk(err, 'subs by topic no error') - t.deepEqual(resubs, [{ - clientId: client.id, - topic: 'hello/#', - qos: 1, - rh: undefined, - rap: undefined, - nl: undefined - }, { - clientId: client.id, - topic: 'hello', - qos: 1, - rh: undefined, - rap: undefined, - nl: undefined - }], 'received correct subs') - gotSubs = true - close() + instance2._waitFor(client, true, 'hello', () => { + instance2.subscriptionsByTopic('hello', (err, resubs) => { + t.assert.ok(!err, 'subs by topic no error') + t.assert.deepEqual(resubs, [{ + clientId: client.id, + topic: 'hello/#', + qos: 1, + rh: undefined, + rap: undefined, + nl: undefined + }, { + clientId: client.id, + topic: 'hello', + qos: 1, + rh: undefined, + rap: undefined, + nl: undefined + }], 'received correct subs') + gotSubs = true + close() + }) }) - }) - let ready = false - let ready2 = false + let ready = false + let ready2 = false - function addSubs () { - if (ready && ready2) { - instance.addSubscriptions(client, subs, err => { - t.notOk(err, 'add subs no error') - addedSubs = true - close() - }) + function addSubs () { + if (ready && ready2) { + instance.addSubscriptions(client, subs, err => { + t.assert.ok(!err, 'add subs no error') + addedSubs = true + close() + }) + } } - } - instance.on('ready', () => { - ready = true - addSubs() - }) + instance.on('ready', () => { + ready = true + addSubs() + }) - instance2.on('ready', () => { - ready2 = true - addSubs() + instance2.on('ready', () => { + ready2 = true + addSubs() + }) }) + await executeTest }) -test('unknown cache key', t => { - t.plan(3) - db.flushall() - const emitter = mqemitterRedis() - const instance = persistence() - const client = { id: 'unknown_pubrec' } - - instance.broker = toBroker('1', emitter) - - // packet with no brokerId - const packet = { - cmd: 'pubrec', - topic: 'hello', - qos: 2, - retain: false - } - - function close () { - instance.destroy(t.pass.bind(t, 'instance dies')) - emitter.close(t.pass.bind(t, 'emitter dies')) - } +test('unknown cache key', async t => { + t.plan(2) + const executeTest = new Promise((resolve, reject) => { + db.flushall() + const p = setUpPersistence(t, '1') + const instance = p.instance + const client = { id: 'unknown_pubrec' } + + // packet with no brokerId + const packet = { + cmd: 'pubrec', + topic: 'hello', + qos: 2, + retain: false + } - instance.outgoingUpdate(client, packet, (err, client, packet) => { - t.equal(err.message, 'unknown key', 'Received unknown PUBREC') - close() + instance.on('ready', () => { + instance.outgoingUpdate(client, packet, (err, client, packet) => { + t.assert.ok(err, 'error received') + t.assert.equal(err.message, 'unknown key', 'Received unknown PUBREC') + cleanUpPersistence(t, p) + resolve() + }) + }) }) + await executeTest }) -test('wills table de-duplicate', t => { - t.plan(5) - db.flushall() - const emitter = mqemitterRedis() - const instance = persistence() - const client = { id: 'willsTest' } - - instance.broker = toBroker('1', emitter) - - const packet = { - cmd: 'publish', - topic: 'hello', - payload: 'willsTest', - qos: 1, - retain: false, - brokerId: instance.broker.id, - brokerCounter: 42, - messageId: 123 - } +test('wills table de-duplicate', async t => { + t.plan(3) + const executeTest = new Promise((resolve, reject) => { + db.flushall() + const p = setUpPersistence(t, '1') + const instance = p.instance + const client = { id: 'willsTest' } + + const packet = { + cmd: 'publish', + topic: 'hello', + payload: 'willsTest', + qos: 1, + retain: false, + brokerId: instance.broker.id, + brokerCounter: 42, + messageId: 123 + } - instance.putWill(client, packet, err => { - t.notOk(err, 'putWill #1 no error') instance.putWill(client, packet, err => { - t.notOk(err, 'putWill #2 no error') - let willCount = 0 - const wills = instance.streamWill() - wills.on('data', function (chunk) { - willCount++ - }) - wills.on('end', function () { - t.equal(willCount, 1, 'should only be one will') - close() + t.assert.ok(!err, 'putWill #1 no error') + instance.putWill(client, packet, err => { + t.assert.ok(!err, 'putWill #2 no error') + let willCount = 0 + const wills = instance.streamWill() + wills.on('data', (chunk) => { + willCount++ + }) + wills.on('end', () => { + t.assert.equal(willCount, 1, 'should only be one will') + cleanUpPersistence(t, p) + resolve() + }) }) }) }) - - function close () { - instance.destroy(t.pass.bind(t, 'instance dies')) - emitter.close(t.pass.bind(t, 'emitter dies')) - } + await executeTest }) -test.onFinish(() => { - process.exit(0) -}) +// clients will keep on running after the test +sleep(10).then(() => process.exit(0)) diff --git a/tester.js b/tester.js index 2a50db4..66c9d69 100644 --- a/tester.js +++ b/tester.js @@ -1,4 +1,3 @@ -// npm install mqtt fastq // command to run : node fastbench 25000 const queue = require('fastq')(worker, 1) const mqtt = require('mqtt')