diff --git a/package.json b/package.json index e050b9d..4bfef86 100644 --- a/package.json +++ b/package.json @@ -39,6 +39,7 @@ "process-nextick-args": "^1.0.6", "pump": "^1.0.1", "qlobber": "^0.7.0", + "syncthrough": "^0.3.1", "through2": "^2.0.0", "throughv": "^1.0.3" } diff --git a/persistence.js b/persistence.js index 4d662fa..0f2112a 100644 --- a/persistence.js +++ b/persistence.js @@ -3,6 +3,7 @@ var Redis = require('ioredis') var through = require('through2') var throughv = require('throughv') +var syncthrough = require('syncthrough') var fs = require('fs') var path = require('path') var lua = fs.readFileSync(path.join(__dirname, 'lib/cursor.lua')) @@ -21,6 +22,7 @@ var qlobberOpts = { } var offlineClientsCountKey = 'counter:offline:clients' var offlineSubscriptionsCountKey = 'counter:offline:subscriptions' +var retainedRegexp = /[#+]/ function RedisPersistence (opts) { if (!(this instanceof RedisPersistence)) { @@ -35,7 +37,7 @@ function RedisPersistence (opts) { var that = this this._decodeAndAugment = function decodeAndAugment (chunk, enc, cb) { - that._getPipeline().getBuffer(chunk, function decodeMessage (err, result) { + that._db.getBuffer(chunk, function decodeMessage (err, result) { var decoded if (result) { decoded = msgpack.decode(result) @@ -75,8 +77,7 @@ function checkAndSplit (prefix, pattern) { var qlobber = new Qlobber(qlobberOpts) qlobber.add(pattern, true) - // TODO use ctor - var instance = through.obj(splitArray) + var instance = syncthrough(splitArray) instance._qlobber = qlobber instance._prefix = prefix @@ -84,7 +85,7 @@ function checkAndSplit (prefix, pattern) { return instance } -function splitArray (keys, enc, cb) { +function splitArray (keys, enc) { var prefix = this._prefix.length for (var i = 0, l = keys.length; i < l; i++) { var key = keys[i].slice(prefix) @@ -92,14 +93,13 @@ function splitArray (keys, enc, cb) { this.push(keys[i]) } } - cb() } RedisPersistence.prototype.createRetainedStream = function (pattern) { return new MatchStream({ objectMode: true, redis: this._db, - match: 'retained:' + pattern.split(/[#+]/)[0] + '*' + match: 'retained:' + pattern.split(retainedRegexp)[0] + '*' }).pipe(checkAndSplit('retained:', pattern)) .pipe(throughv.obj(this._decodeAndAugment)) }