From 2bf91d2043ac4f47d1bae541e3ba41a820fd39dd Mon Sep 17 00:00:00 2001 From: Erik Friesen Date: Sat, 16 Jul 2022 14:47:21 -0400 Subject: [PATCH 1/3] Wills table de duplicate, streamWill on startup support --- persistence.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/persistence.js b/persistence.js index b4bdd3b..4ee13f7 100644 --- a/persistence.js +++ b/persistence.js @@ -424,6 +424,7 @@ class RedisPersistence extends CachedPersistence { const key = willKey(this.broker.id, client.id) packet.clientId = client.id packet.brokerId = this.broker.id + this._db.lrem(WILLSKEY, 0, key) // Remove duplicates this._db.rpush(WILLSKEY, key) this._db.setBuffer(key, msgpack.encode(packet), encodeBuffer) @@ -475,7 +476,7 @@ class RedisPersistence extends CachedPersistence { stream.emit('error', err) } else { for (const result of results) { - if (!brokers || !brokers[result.split(':')[1]]) { + if (!brokers || !brokers[result.split(':')[1]] || brokers.length === 1) { stream.write(result) } } From aa2bca842a7ba21bcf552e805525dee1222ac8e7 Mon Sep 17 00:00:00 2001 From: Erik Friesen Date: Sat, 23 Jul 2022 11:14:31 -0400 Subject: [PATCH 2/3] Add test for duplicates, removed streamWill addition --- persistence.js | 2 +- test.js | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/persistence.js b/persistence.js index 4ee13f7..d430959 100644 --- a/persistence.js +++ b/persistence.js @@ -476,7 +476,7 @@ class RedisPersistence extends CachedPersistence { stream.emit('error', err) } else { for (const result of results) { - if (!brokers || !brokers[result.split(':')[1]] || brokers.length === 1) { + if (!brokers || !brokers[result.split(':')[1]]) { stream.write(result) } } diff --git a/test.js b/test.js index 1f91cc2..ba7b53f 100644 --- a/test.js +++ b/test.js @@ -262,6 +262,43 @@ test('unknown cache key', t => { }) }) +test('wills table de-duplicate', t => { + t.plan(3) + db.flushall() + const emitter = mqemitterRedis() + const instance = persistence() + const client = { id: 'willsTest' } + + instance.broker = toBroker('1', emitter) + + const packet = { + cmd: 'publish', + topic: 'hello', + payload: 'willsTest', + qos: 1, + retain: false, + brokerId: instance.broker.id, + brokerCounter: 42, + messageId: 123 + } + + instance.putWill(client, packet, err => { + instance.putWill(client, packet, err => { + const key = `will:${instance.broker.id}:${encodeURIComponent(client.id)}` + instance._db.lrem('will', 0, key, function (err, value) { + t.equal(value, 1, 'should only be one will') + close() + }) + }) + }) + + function close () { + instance.destroy(t.pass.bind(t, 'instance dies')) + emitter.close(t.pass.bind(t, 'emitter dies')) + } + +}) + test.onFinish(() => { process.exit(0) }) From d99bb2c11e28f1459416cd636c591b45f402d11f Mon Sep 17 00:00:00 2001 From: Erik Friesen Date: Sat, 30 Jul 2022 12:57:03 -0400 Subject: [PATCH 3/3] wills dedupe test switch to streamWill --- test.js | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/test.js b/test.js index ba7b53f..e912ba9 100644 --- a/test.js +++ b/test.js @@ -263,7 +263,7 @@ test('unknown cache key', t => { }) test('wills table de-duplicate', t => { - t.plan(3) + t.plan(5) db.flushall() const emitter = mqemitterRedis() const instance = persistence() @@ -283,10 +283,16 @@ test('wills table de-duplicate', t => { } instance.putWill(client, packet, err => { + t.notOk(err, 'putWill #1 no error') instance.putWill(client, packet, err => { - const key = `will:${instance.broker.id}:${encodeURIComponent(client.id)}` - instance._db.lrem('will', 0, key, function (err, value) { - t.equal(value, 1, 'should only be one will') + t.notOk(err, 'putWill #2 no error') + let willCount = 0 + const wills = instance.streamWill() + wills.on('data', function (chunk) { + willCount++ + }) + wills.on('end', function () { + t.equal(willCount, 1, 'should only be one will') close() }) }) @@ -296,7 +302,6 @@ test('wills table de-duplicate', t => { instance.destroy(t.pass.bind(t, 'instance dies')) emitter.close(t.pass.bind(t, 'emitter dies')) } - }) test.onFinish(() => {