diff --git a/persistence.js b/persistence.js index b4bdd3b..d430959 100644 --- a/persistence.js +++ b/persistence.js @@ -424,6 +424,7 @@ class RedisPersistence extends CachedPersistence { const key = willKey(this.broker.id, client.id) packet.clientId = client.id packet.brokerId = this.broker.id + this._db.lrem(WILLSKEY, 0, key) // Remove duplicates this._db.rpush(WILLSKEY, key) this._db.setBuffer(key, msgpack.encode(packet), encodeBuffer) diff --git a/test.js b/test.js index 1f91cc2..e912ba9 100644 --- a/test.js +++ b/test.js @@ -262,6 +262,48 @@ test('unknown cache key', t => { }) }) +test('wills table de-duplicate', t => { + t.plan(5) + db.flushall() + const emitter = mqemitterRedis() + const instance = persistence() + const client = { id: 'willsTest' } + + instance.broker = toBroker('1', emitter) + + const packet = { + cmd: 'publish', + topic: 'hello', + payload: 'willsTest', + qos: 1, + retain: false, + brokerId: instance.broker.id, + brokerCounter: 42, + messageId: 123 + } + + instance.putWill(client, packet, err => { + t.notOk(err, 'putWill #1 no error') + instance.putWill(client, packet, err => { + t.notOk(err, 'putWill #2 no error') + let willCount = 0 + const wills = instance.streamWill() + wills.on('data', function (chunk) { + willCount++ + }) + wills.on('end', function () { + t.equal(willCount, 1, 'should only be one will') + close() + }) + }) + }) + + function close () { + instance.destroy(t.pass.bind(t, 'instance dies')) + emitter.close(t.pass.bind(t, 'emitter dies')) + } +}) + test.onFinish(() => { process.exit(0) })