diff --git a/lib/cmd/class/cached-prepare-result-packet.js b/lib/cmd/class/cached-prepare-result-packet.js deleted file mode 100644 index 5eaf00a2..00000000 --- a/lib/cmd/class/cached-prepare-result-packet.js +++ /dev/null @@ -1,35 +0,0 @@ -'use strict'; - -const PrepareResultPacket = require('./prepare-result-packet'); - -/** - * Prepare result - * see https://mariadb.com/kb/en/com_stmt_prepare/#com_stmt_prepare_ok - */ -class CachedPrepareResultPacket extends PrepareResultPacket { - constructor(statementId, parameterCount, columns, database, sql, placeHolderIndex, conn) { - super(statementId, parameterCount, columns, database, sql, placeHolderIndex, conn); - this.cached = true; - this.use = 1; - } - - incrementUse() { - this.use += 1; - } - - unCache() { - this.cached = false; - if (this.use <= 0) { - super.close(); - } - } - - close() { - this.use -= 1; - if (this.use <= 0 && !this.cached) { - super.close(); - } - } -} - -module.exports = CachedPrepareResultPacket; diff --git a/lib/cmd/class/prepare-cache-wrapper.js b/lib/cmd/class/prepare-cache-wrapper.js new file mode 100644 index 00000000..15eec4e3 --- /dev/null +++ b/lib/cmd/class/prepare-cache-wrapper.js @@ -0,0 +1,43 @@ +'use strict'; + +const PrepareWrapper = require('./prepare-wrapper'); + +/** + * Prepare cache wrapper + * see https://mariadb.com/kb/en/com_stmt_prepare/#com_stmt_prepare_ok + */ +class PrepareCacheWrapper { + #use = 0; + #cached; + #prepare; + + constructor(prepare) { + this.#prepare = prepare; + this.#cached = true; + } + + incrementUse() { + this.#use += 1; + return new PrepareWrapper(this, this.#prepare); + } + + unCache() { + this.#cached = false; + if (this.#use === 0) { + this.#prepare.close(); + } + } + + decrementUse() { + this.#use -= 1; + if (this.#use === 0 && !this.#cached) { + this.#prepare.close(); + } + } + + toString() { + return 'Prepare{use:' + this.#use + ',cached:' + this.#cached + '}'; + } +} + +module.exports = PrepareCacheWrapper; diff --git a/lib/cmd/class/prepare-result-packet.js b/lib/cmd/class/prepare-result-packet.js index 70e705d0..1af97595 100644 --- a/lib/cmd/class/prepare-result-packet.js +++ b/lib/cmd/class/prepare-result-packet.js @@ -20,6 +20,10 @@ class PrepareResultPacket { this.#conn = conn; } + get conn() { + return this.#conn; + } + execute(values, opts, cb, stack) { let _opts = opts, _cb = cb; @@ -29,7 +33,7 @@ class PrepareResultPacket { _opts = undefined; } - if (this.closed) { + if (this.isClose()) { const error = Errors.createError( `Execute fails, prepare command as already been closed`, Errors.ER_PREPARE_CLOSED, @@ -46,9 +50,9 @@ class PrepareResultPacket { } } - const cmdParam = new CommandParameter(this.query, values, _opts, cb); + const cmdParam = new CommandParameter(this.query, values, _opts, _cb); if (stack) cmdParam.stack = stack; - const conn = this.#conn; + const conn = this.conn; const promise = new Promise((resolve, reject) => conn.executePromise.call(conn, cmdParam, this, resolve, reject)); if (!_cb) { return promise; @@ -70,7 +74,7 @@ class PrepareResultPacket { _opts = undefined; } - if (this.closed) { + if (this.isClose()) { const error = Errors.createError( `Execute fails, prepare command as already been closed`, Errors.ER_PREPARE_CLOSED, @@ -90,18 +94,25 @@ class PrepareResultPacket { const cmdParam = new CommandParameter(this.query, values, _opts, cb); if (stack) cmdParam.stack = stack; - const cmd = new ExecuteStream(cmdParam, this.#conn.opts, this, this.#conn.socket); - if (this.#conn.opts.logger.error) cmd.on('error', this.#conn.opts.logger.error); - this.#conn.addCommand(cmd); + const cmd = new ExecuteStream(cmdParam, this.conn.opts, this, this.conn.socket); + if (this.conn.opts.logger.error) cmd.on('error', this.conn.opts.logger.error); + this.conn.addCommand(cmd); return cmd.inStream; } + isClose() { + return this.closed; + } + close() { if (!this.closed) { this.closed = true; this.#conn.emit('close_prepare', this); } } + toString() { + return 'Prepare{closed:' + this.closed + '}'; + } } module.exports = PrepareResultPacket; diff --git a/lib/cmd/class/prepare-wrapper.js b/lib/cmd/class/prepare-wrapper.js new file mode 100644 index 00000000..1e9eb8cb --- /dev/null +++ b/lib/cmd/class/prepare-wrapper.js @@ -0,0 +1,67 @@ +'use strict'; + +/** + * Prepare result wrapper + * This permit to ensure that cache can be close only one time cache. + */ +class PrepareWrapper { + #closed = false; + #cacheWrapper; + #prepare; + #conn; + + constructor(cacheWrapper, prepare) { + this.#cacheWrapper = cacheWrapper; + this.#prepare = prepare; + this.#conn = prepare.conn; + this.execute = this.#prepare.execute; + this.executeStream = this.#prepare.executeStream; + } + get conn() { + return this.#conn; + } + + get id() { + return this.#prepare.id; + } + + get parameterCount() { + return this.#prepare.parameterCount; + } + + get _placeHolderIndex() { + return this.#prepare._placeHolderIndex; + } + + get columns() { + return this.#prepare.columns; + } + + set columns(columns) { + this.#prepare.columns = columns; + } + get database() { + return this.#prepare.database; + } + + get query() { + return this.#prepare.query; + } + + isClose() { + return this.#closed; + } + + close() { + if (!this.#closed) { + this.#closed = true; + this.#cacheWrapper.decrementUse(); + } + } + + toString() { + return 'PrepareWrapper{closed:' + this.#closed + ',cache:' + this.#cacheWrapper + '}'; + } +} + +module.exports = PrepareWrapper; diff --git a/lib/cmd/prepare.js b/lib/cmd/prepare.js index c76af735..e6b9a4be 100644 --- a/lib/cmd/prepare.js +++ b/lib/cmd/prepare.js @@ -2,7 +2,7 @@ const Parser = require('./parser'); const Parse = require('../misc/parse'); const BinaryEncoder = require('./encoder/binary-encoder'); -const CachedPrepareResultPacket = require('./class/cached-prepare-result-packet'); +const PrepareCacheWrapper = require('./class/prepare-cache-wrapper'); const PrepareResult = require('./class/prepare-result-packet'); const ServerStatus = require('../const/server-status'); const Errors = require('../misc/errors'); @@ -29,13 +29,11 @@ class Prepare extends Parser { */ start(out, opts, info) { // check in cache if enabled - if (info._prepareCache) { - const key = info.database + '|' + this.sql; - const cachedItem = info._prepareCache.get(key); - if (cachedItem) { - cachedItem.incrementUse(); + if (this.conn.prepareCache) { + let cachedPrepare = this.conn.prepareCache.get(this.sql); + if (cachedPrepare) { this.emit('send_end'); - return this.successEnd(cachedItem); + return this.successEnd(cachedPrepare); } } if (opts.logger.query) opts.logger.query(`PREPARE: ${this.sql}`); @@ -55,32 +53,22 @@ class Prepare extends Parser { } successPrepare(info, opts) { - let prepare; - if (info._prepareCache) { - const key = info.database + '|' + this.sql; - prepare = new CachedPrepareResultPacket( - this.statementId, - this.parameterCount, - this._columnsPrepare, - info.database, - this.sql, - this.placeHolderIndex, - this.conn - ); - info._prepareCache.set(key, prepare); - } else { - prepare = new PrepareResult( - this.statementId, - this.parameterCount, - this._columnsPrepare, - info.database, - this.sql, - this.placeHolderIndex, - this.conn - ); + let prepare = new PrepareResult( + this.statementId, + this.parameterCount, + this._columns, + info.database, + this.sql, + this.placeHolderIndex, + this.conn + ); + + if (this.conn.prepareCache) { + let cached = new PrepareCacheWrapper(prepare); + this.conn.prepareCache.set(this.sql, cached); + return this.successEnd(cached.incrementUse()); } - this._columnsPrepare = null; - return this.success(prepare); + return this.successEnd(prepare); } /** @@ -104,7 +92,7 @@ class Prepare extends Parser { this.columnNo = packet.readUInt16(); this.parameterCount = packet.readUInt16(); this._parameterNo = this.parameterCount; - this._columnsPrepare = []; + this._columns = []; if (this._parameterNo > 0) return (this.onPacketReceive = this.skipPrepareParameterPacket); if (this.columnNo > 0) return (this.onPacketReceive = this.readPrepareColumnsPacket); return this.successPrepare(info, opts); @@ -132,7 +120,7 @@ class Prepare extends Parser { readPrepareColumnsPacket(packet, out, opts, info) { this.columnNo--; - this._columnsPrepare.push(new ColumnDefinition(packet, info, opts.rowsAsArray)); + this._columns.push(new ColumnDefinition(packet, info, opts.rowsAsArray)); if (this.columnNo === 0) { if (info.eofDeprecated) { return this.successPrepare(info, opts); diff --git a/lib/connection-promise.js b/lib/connection-promise.js index 300bc991..1534b702 100644 --- a/lib/connection-promise.js +++ b/lib/connection-promise.js @@ -31,6 +31,10 @@ class ConnectionPromise { return this.#conn.info; } + get prepareCache() { + return this.#conn.prepareCache; + } + /** * Permit to change user during connection. * All user variables will be reset, Prepare commands will be released. @@ -107,12 +111,6 @@ class ConnectionPromise { } static _EXECUTE_CMD(conn, cmdParam) { - let prepareFromCache; - if ((prepareFromCache = conn.info.prepareFromCache(cmdParam.sql)) != null) { - return prepareFromCache - .execute(cmdParam.values, cmdParam.opts, null, cmdParam.stack) - .finally(() => prepareFromCache.close()); - } return new Promise(conn.prepare.bind(conn, cmdParam)) .then((prepare) => { return prepare.execute(cmdParam.values, cmdParam.opts, null, cmdParam.stack).finally(() => prepare.close()); diff --git a/lib/connection.js b/lib/connection.js index b56beec4..69e0d4d0 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -28,6 +28,7 @@ const BatchBulk = require('./cmd/batch-bulk'); const ChangeUser = require('./cmd/change-user'); const { Status } = require('./const/connection_status'); const CommandParameter = require('./command-parameter'); +const LruPrepareCache = require('./lru-prepare-cache'); const convertFixedTime = function (tz) { if (tz === 'Etc/UTC' || tz === 'Z') { @@ -80,12 +81,15 @@ class Connection extends EventEmitter { streamOut; streamIn; info; + prepareCache; constructor(options) { super(); this.opts = Object.assign(new EventEmitter(), options); this.info = new ConnectionInformation(this.opts); + this.prepareCache = + this.opts.prepareCacheLength > 0 ? new LruPrepareCache(this.info, this.opts.prepareCacheLength) : null; this.addCommand = this.addCommandQueue; this.streamOut = new PacketOutputStream(this.opts, this.info); this.streamIn = new PacketInputStream( @@ -204,36 +208,37 @@ class Connection extends EventEmitter { for (let i = 0; i < vals.length; i++) { executes.push(prepare.execute(vals[i], cmdParam.opts, null, cmdParam.stack)); } - return Promise.all(executes).then( - function (res) { - prepare.close(); - if (cmdParam.opts && cmdParam.opts.fullResult) { - return Promise.resolve(res); - } else { - // aggregate results - const firstResult = res[0]; - if (firstResult instanceof OkPacket) { - let affectedRows = 0; - const insertId = firstResult.insertId; - const warningStatus = firstResult.warningStatus; - for (let i = 0; i < res.length; i++) { - affectedRows += res[i].affectedRows; - } - return Promise.resolve(new OkPacket(affectedRows, insertId, warningStatus)); + return Promise.all(executes) + .then( + function (res) { + if (cmdParam.opts && cmdParam.opts.fullResult) { + return Promise.resolve(res); } else { - // results have result-set. example :'INSERT ... RETURNING' // aggregate results - const rs = []; - rs.meta = res.meta; - res.forEach((row) => { - Array.prototype.push.apply(rs, row); - }); - rs.meta = res.meta; - return Promise.resolve(rs); + const firstResult = res[0]; + if (firstResult instanceof OkPacket) { + let affectedRows = 0; + const insertId = firstResult.insertId; + const warningStatus = firstResult.warningStatus; + for (let i = 0; i < res.length; i++) { + affectedRows += res[i].affectedRows; + } + return Promise.resolve(new OkPacket(affectedRows, insertId, warningStatus)); + } else { + // results have result-set. example :'INSERT ... RETURNING' + // aggregate results + const rs = []; + rs.meta = res.meta; + res.forEach((row) => { + Array.prototype.push.apply(rs, row); + }); + rs.meta = res.meta; + return Promise.resolve(rs); + } } - } - }.bind(this) - ); + }.bind(this) + ) + .finally(() => prepare.close()); } }); } @@ -1257,11 +1262,11 @@ class Connection extends EventEmitter { prepare(cmdParam, resolve, reject) { if (!cmdParam.sql) return reject(Errors.createError('sql parameter is mandatory', Errors.ER_UNDEFINED_SQL, this.info, 'HY000')); - if (this.sendQueue.isEmpty() || !this.receiveQueue.peekFront()) { + if (this.prepareCache && (this.sendQueue.isEmpty() || !this.receiveQueue.peekFront())) { // no command in queue, database is then considered ok, and cache can be search right now - const cache = this.info.prepareFromCache(cmdParam.sql); - if (cache) { - return resolve(cache); + const cachedPrepare = this.prepareCache.get(cmdParam.sql); + if (cachedPrepare) { + return resolve(cachedPrepare); } } diff --git a/lib/lru-prepare-cache.js b/lib/lru-prepare-cache.js new file mode 100644 index 00000000..5b683d54 --- /dev/null +++ b/lib/lru-prepare-cache.js @@ -0,0 +1,43 @@ +'use strict'; +const LRU = require('lru-cache'); + +/** + * LRU prepare cache + * + */ +class LruPrepareCache { + #lruCache; + #info; + constructor(info, prepareCacheLength) { + this.#info = info; + this.#lruCache = new LRU({ + max: prepareCacheLength, + dispose: (value, key) => value.unCache() + }); + } + + get(sql) { + const key = this.#info.database + '|' + sql; + const cachedItem = this.#lruCache.get(key); + if (cachedItem) { + return cachedItem.incrementUse(); + } + return null; + } + + set(sql, cache) { + const key = this.#info.database + '|' + sql; + this.#lruCache.set(key, cache); + } + + toString() { + let keyStr = ''; + for (const value of this.#lruCache.keys()) { + keyStr += '[' + value + '],'; + } + if (keyStr.length > 1) keyStr = keyStr.substring(0, keyStr.length - 1); + return 'info{cache:' + keyStr + '}'; + } +} + +module.exports = LruPrepareCache; diff --git a/lib/misc/connection-information.js b/lib/misc/connection-information.js index 5f6e36aa..11e40dbe 100644 --- a/lib/misc/connection-information.js +++ b/lib/misc/connection-information.js @@ -1,5 +1,4 @@ 'use strict'; -const LRU = require('lru-cache'); class ConnectionInformation { constructor(opts) { @@ -8,27 +7,6 @@ class ConnectionInformation { this.serverVersion = null; this.serverCapabilities = null; this.database = opts.database; - if (opts.prepareCacheLength > 0) { - this._prepareCache = new LRU({ - max: opts.prepareCacheLength, - dispose: (value, key) => value.unCache() - }); - this.prepareFromCache = this.prepareFromCacheEnable; - } else { - this.prepareFromCache = (sql) => { - return null; - }; - } - } - - prepareFromCacheEnable(sql) { - const key = this.database + '|' + sql; - const cachedItem = this._prepareCache.get(key); - if (cachedItem) { - cachedItem.incrementUse(); - return cachedItem; - } - return null; } hasMinVersion(major, minor, patch) { diff --git a/test/integration/test-execute-callback.js b/test/integration/test-execute-callback.js index d2bc60e5..81ff9a9a 100644 --- a/test/integration/test-execute-callback.js +++ b/test/integration/test-execute-callback.js @@ -163,34 +163,35 @@ describe('prepare and execute callback', () => { prepare.close(); prepare.execute('1', (err, res) => { if (err) { - done(err); + assert.isTrue(err.message.includes('Execute fails, prepare command as already been closed')); } else { - //remove from cache - conn.execute('select 1, ?', ['2']); - conn.execute('select 2, ?', ['2']); - conn.execute('select 3, ?', ['2']); - conn.execute('select 4, ?', ['2'], (err, res) => { - //removed from cache, must really be closed - prepare.execute('1', (err, res) => { - if (!err) { - done(new Error('must have thrown error')); - } else { - assert.isTrue(err.message.includes('Execute fails, prepare command as already been closed')); - conn.prepare('select ?', (err, prepare2) => { - if (err) { - done(err); - } else { - prepare2.execute('1', (res) => { - prepare2.close(); - conn.end(); - done(); - }); - } - }); - } - }); - }); + done(new Error('expect to have thrown an error')); } + //remove from cache + conn.execute('select 1, ?', ['2']); + conn.execute('select 2, ?', ['2']); + conn.execute('select 3, ?', ['2']); + conn.execute('select 4, ?', ['2'], (err, res) => { + //removed from cache, must really be closed + prepare.execute('1', (err, res) => { + if (!err) { + done(new Error('must have thrown error')); + } else { + assert.isTrue(err.message.includes('Execute fails, prepare command as already been closed')); + conn.prepare('select ?', (err, prepare2) => { + if (err) { + done(err); + } else { + prepare2.execute('1', (res) => { + prepare2.close(); + conn.end(); + done(); + }); + } + }); + } + }); + }); }); }); }); diff --git a/test/integration/test-execute.js b/test/integration/test-execute.js index ae93c626..374e2c3a 100644 --- a/test/integration/test-execute.js +++ b/test/integration/test-execute.js @@ -98,8 +98,12 @@ describe('prepare and execute', () => { await prepare.execute('1'); await prepare.close(); - //in cache, so must still work - await prepare.execute('1'); + try { + await prepare.execute('1'); + throw new Error('must have thrown error'); + } catch (e) { + assert.isTrue(e.message.includes('Execute fails, prepare command as already been closed')); + } await conn.execute('select 1, ?', ['2']); await conn.execute('select 2, ?', ['2']); @@ -125,25 +129,89 @@ describe('prepare and execute', () => { const conn = await base.createConnection({ prepareCacheLength: 2 }); let prepare = await conn.prepare('select ?', [1]); const initialPrepareId = prepare.id; + assert.equal(prepare.toString(), 'PrepareWrapper{closed:false,cache:Prepare{use:1,cached:true}}'); + assert.equal(conn.prepareCache.toString(), 'info{cache:[testn|select ?]}'); + + prepare.close(); + assert.equal(prepare.toString(), 'PrepareWrapper{closed:true,cache:Prepare{use:0,cached:true}}'); + assert.equal(conn.prepareCache.toString(), 'info{cache:[testn|select ?]}'); + + prepare.close(); + assert.equal(prepare.toString(), 'PrepareWrapper{closed:true,cache:Prepare{use:0,cached:true}}'); + assert.equal(conn.prepareCache.toString(), 'info{cache:[testn|select ?]}'); + + prepare = await conn.prepare('select ?', [1]); + assert.equal(prepare.toString(), 'PrepareWrapper{closed:false,cache:Prepare{use:1,cached:true}}'); + assert.equal(conn.prepareCache.toString(), 'info{cache:[testn|select ?]}'); + + let prepare_2 = await conn.prepare('select ?', [1]); + assert.equal(prepare_2.toString(), 'PrepareWrapper{closed:false,cache:Prepare{use:2,cached:true}}'); + assert.equal(conn.prepareCache.toString(), 'info{cache:[testn|select ?]}'); prepare.close(); - await conn.prepare('select ? + 1', [1]); - await conn.prepare('select ? + 2', [1]); - await conn.prepare('select ? + 3', [1]); - await conn.prepare({ sql: 'select ? + 3' }, [1]); - await conn.prepare({ sql: 'select 4' }); + assert.equal(prepare.toString(), 'PrepareWrapper{closed:true,cache:Prepare{use:1,cached:true}}'); + assert.equal(conn.prepareCache.toString(), 'info{cache:[testn|select ?]}'); + + prepare_2.close(); + assert.equal(prepare.toString(), 'PrepareWrapper{closed:true,cache:Prepare{use:0,cached:true}}'); + assert.equal(conn.prepareCache.toString(), 'info{cache:[testn|select ?]}'); + + prepare = await conn.prepare('select ? + 1', [1]); + assert.equal(prepare.toString(), 'PrepareWrapper{closed:false,cache:Prepare{use:1,cached:true}}'); + assert.equal(conn.prepareCache.toString(), 'info{cache:[testn|select ? + 1],[testn|select ?]}'); + + let preparePlus2 = await conn.prepare('select ? + 2', [1]); + assert.equal(preparePlus2.toString(), 'PrepareWrapper{closed:false,cache:Prepare{use:1,cached:true}}'); + assert.equal('info{cache:[testn|select ? + 2],[testn|select ? + 1]}', conn.prepareCache.toString()); + + let prepare3 = await conn.prepare('select ? + 3', [1]); + assert.equal(prepare3.toString(), 'PrepareWrapper{closed:false,cache:Prepare{use:1,cached:true}}'); + assert.equal('info{cache:[testn|select ? + 3],[testn|select ? + 2]}', conn.prepareCache.toString()); + + let prepare2 = await conn.prepare({ sql: 'select ? + 2' }, [1]); + assert.equal(prepare2.toString(), 'PrepareWrapper{closed:false,cache:Prepare{use:2,cached:true}}'); + assert.equal('info{cache:[testn|select ? + 2],[testn|select ? + 3]}', conn.prepareCache.toString()); + + prepare = await conn.prepare({ sql: 'select 4' }); + assert.equal(prepare.toString(), 'PrepareWrapper{closed:false,cache:Prepare{use:1,cached:true}}'); + assert.equal(conn.prepareCache.toString(), 'info{cache:[testn|select 4],[testn|select ? + 2]}'); + assert.equal(prepare2.toString(), 'PrepareWrapper{closed:false,cache:Prepare{use:2,cached:true}}'); + assert.equal(prepare3.toString(), 'PrepareWrapper{closed:false,cache:Prepare{use:1,cached:false}}'); prepare = await conn.prepare('select ?', [1]); + assert.equal(conn.prepareCache.toString(), 'info{cache:[testn|select ?],[testn|select 4]}'); + assert.equal(prepare2.toString(), 'PrepareWrapper{closed:false,cache:Prepare{use:2,cached:false}}'); + prepare2.close(); + assert.equal(prepare2.toString(), 'PrepareWrapper{closed:true,cache:Prepare{use:1,cached:false}}'); + preparePlus2.close(); + assert.equal(preparePlus2.toString(), 'PrepareWrapper{closed:true,cache:Prepare{use:0,cached:false}}'); + assert.notEqual(prepare.id, initialPrepareId); const secondPrepareId = prepare.id; for (let i = 0; i < 10; i++) { const prepare2 = await conn.prepare('select ?', [i]); + assert.equal(conn.prepareCache.toString(), 'info{cache:[testn|select ?],[testn|select 4]}'); + assert.equal(prepare2.toString(), 'PrepareWrapper{closed:false,cache:Prepare{use:2,cached:true}}'); assert.equal(prepare2.id, secondPrepareId); prepare2.close(); + assert.equal(prepare2.toString(), 'PrepareWrapper{closed:true,cache:Prepare{use:1,cached:true}}'); + prepare2.close(); + assert.equal(prepare2.toString(), 'PrepareWrapper{closed:true,cache:Prepare{use:1,cached:true}}'); } conn.end(); }); + it('prepare no cache', async () => { + const conn = await base.createConnection({ prepareCacheLength: 0 }); + let prepare = await conn.prepare('select ?', [1]); + assert.equal(prepare.toString(), 'Prepare{closed:false}'); + prepare.close(); + assert.equal(prepare.toString(), 'Prepare{closed:true}'); + prepare.close(); + assert.equal(prepare.toString(), 'Prepare{closed:true}'); + await conn.end(); + }); + it('basic prepare and execute', async () => { const conn = await base.createConnection({ prepareCacheLength: 0 }); // https://jira.mariadb.org/browse/XPT-266