diff --git a/CHANGES.md b/CHANGES.md index 6f1c8948..c78fa9e7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -4,6 +4,8 @@ - [IMPROVED] Added quiet option to backup and restore to suppress batch messages. - [IMPROVED] Added a preflight check for restore function to make sure that a target database is new and empty. - [IMPROVED] Added handling for errors reading log file. +- [IMPROVED] Split changes spooling to improve reliability on databases with + millions of documents. # 2.7.0 (2021-09-14) - [UPGRADED] Cloudant client dependency from `@cloudant/cloudant` to `@ibm-cloud/cloudant`. diff --git a/includes/spoolchanges.js b/includes/spoolchanges.js index e4171de7..ce923713 100644 --- a/includes/spoolchanges.js +++ b/includes/spoolchanges.js @@ -34,6 +34,9 @@ module.exports = function(db, log, bufferSize, ee, callback) { let batch = 0; let lastSeq = null; const logStream = fs.createWriteStream(log); + let pending = 0; + // The number of changes to fetch per request + const limit = 100000; // send documents ids to the queue in batches of bufferSize + the last batch const processBuffer = function(lastOne) { @@ -57,36 +60,52 @@ module.exports = function(db, log, bufferSize, ee, callback) { processBuffer(false); } else if (c.last_seq) { lastSeq = c.last_seq; + pending = c.pending; } } }; - // stream the changes feed to disk - db.service.postChangesAsStream({ db: db.db, seq_interval: 10000 }).then(response => { - response.result.pipe(liner()) - .on('error', function(err) { - callback(err); + function getChanges(since = 0) { + debug('making changes request since ' + since); + return db.service.postChangesAsStream({ db: db.db, since: since, limit: limit, seq_interval: limit }) + .then(response => { + response.result.pipe(liner()) + .on('error', function(err) { + logStream.end(); + callback(err); + }) + .pipe(change(onChange)) + .on('error', function(err) { + logStream.end(); + callback(err); + }) + .on('finish', function() { + processBuffer(true); + if (!lastSeq) { + logStream.end(); + debug('changes request terminated before last_seq was sent'); + callback(new error.BackupError('SpoolChangesError', 'Changes request terminated before last_seq was sent')); + } else { + debug(`changes request completed with last_seq: ${lastSeq} and ${pending} changes pending.`); + if (pending > 0) { + // Return the next promise + return getChanges(lastSeq); + } else { + debug('finished streaming database changes'); + logStream.end(':changes_complete ' + lastSeq + '\n', 'utf8', callback); + } + } + }); }) - .pipe(change(onChange)) - .on('error', function(err) { - callback(err); - }) - .on('finish', function() { - processBuffer(true); - if (!lastSeq) { - logStream.end(); - debug('changes request terminated before last_seq was sent'); - callback(new error.BackupError('SpoolChangesError', 'Changes request terminated before last_seq was sent')); - } else { - debug('finished streaming database changes'); - logStream.end(':changes_complete ' + lastSeq + '\n', 'utf8', callback); + .catch(err => { + logStream.end(); + if (err.status && err.status >= 400) { + callback(error.convertResponseError(err)); + } else if (err.name !== 'SpoolChangesError') { + callback(new error.BackupError('SpoolChangesError', `Failed changes request - ${err.message}`)); } }); - }).catch(err => { - if (err.status && err.status >= 400) { - callback(error.convertResponseError(err)); - } else { - callback(new error.BackupError('SpoolChangesError', `Failed changes request - ${err.message}`)); - } - }); + } + + getChanges(); }; diff --git a/test/spoolchanges.js b/test/spoolchanges.js index eb36d74f..999b0c34 100644 --- a/test/spoolchanges.js +++ b/test/spoolchanges.js @@ -26,6 +26,38 @@ const longTestTimeout = 3000; const db = request.client(`${url}/${dbName}`, { parallelism: 1 }); +const seqSuffix = Buffer.alloc(124, 'abc123').toString('base64'); +function provideChanges(batchSize, totalChanges, fullResponse = false) { + let pending = totalChanges; + const sparseResultsArray = Array(batchSize).fill({ + seq: null, + id: 'doc', + changes: [{ rev: '1-abcdef0123456789abcdef0123456789' }] + }); + nock(url) + .post(`/${dbName}/_changes`) + .query(true) + .times(totalChanges / batchSize + (totalChanges % batchSize > 0 ? 1 : 0)) + .reply(200, (uri, requestBody) => { + pending -= batchSize; + const lastSeq = (totalChanges - pending); + const seq = lastSeq - batchSize; + return { + results: fullResponse + ? Array.from(Array(batchSize), (_, i) => { + return { + seq: `${seq + i}-${seqSuffix}`, + id: `doc${seq + i}`, + changes: [{ rev: '1-abcdef0123456789abcdef0123456789' }] + }; + }) + : sparseResultsArray, + pending: pending, + last_seq: `${lastSeq}-abc` + }; + }); +} + describe('#unit Check spool changes', function() { it('should terminate on request error', function(done) { nock(url) @@ -59,4 +91,66 @@ describe('#unit Check spool changes', function() { done(); }); }).timeout(longTestTimeout); + + it('should keep collecting changes', function(done) { + // This test validates that spooling will correctly + // continue across multiple requests + // (4 batches of 100000 to be precise). + // This test might take up to 10 seconds + this.timeout(10 * 1000); + + // Use full changes for this test + provideChanges(100000, 400000, true); + changes(db, '/dev/null', 500, null, function(err) { + assert.ok(!err); + assert.ok(nock.isDone()); + done(); + }); + }); + + it('should keep collecting sparse changes', function(done) { + // This test checks that making thousands of requests doesn't + // make anything bad happen. + // This test might take up to 25 seconds + this.timeout(25 * 1000); + // Use sparse changes for this test and a batch size of 1 + provideChanges(1, 2500); + changes(db, '/dev/null', 500, null, function(err) { + assert.ok(!err); + assert.ok(nock.isDone()); + done(); + }); + }); +}); + +describe('Longer spool changes checks', function() { + it('#slow should keep collecting changes (25M)', function(done) { + // This test might take up to 2 minutes + this.timeout(2 * 60 * 1000); + // Note changes spooling uses a constant batch size, we are setting + // a test value here and setting the buffer to match + const batch = 100000; + // Use sparse changes for this test + provideChanges(batch, 25000000); + changes(db, '/dev/null', batch, null, function(err) { + assert.ok(!err); + assert.ok(nock.isDone()); + done(); + }); + }); + + it('#slower should keep collecting changes (500M)', function(done) { + // This test might take up to 90 minutes + this.timeout(90 * 60 * 1000); + // Note changes spooling uses a constant batch size, we are setting + // a test value here and setting the buffer to match + const batch = 1000000; + // Use full changes for this test to exercise load + provideChanges(batch, 500000000, true); + changes(db, '/dev/null', batch, null, function(err) { + assert.ok(!err); + assert.ok(nock.isDone()); + done(); + }); + }); });