Skip to content

Commit

Permalink
Merge pull request #412 from cloudant/408-split-changes
Browse files Browse the repository at this point in the history
Split changes spooling
  • Loading branch information
ricellis authored Nov 24, 2021
2 parents 8125569 + fe18c76 commit 0e787c2
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 25 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
- [IMPROVED] Added quiet option to backup and restore to suppress batch messages.
- [IMPROVED] Added a preflight check for restore function to make sure that a target database is new and empty.
- [IMPROVED] Added handling for errors reading log file.
- [IMPROVED] Split changes spooling to improve reliability on databases with
millions of documents.

# 2.7.0 (2021-09-14)
- [UPGRADED] Cloudant client dependency from `@cloudant/cloudant` to `@ibm-cloud/cloudant`.
Expand Down
69 changes: 44 additions & 25 deletions includes/spoolchanges.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ module.exports = function(db, log, bufferSize, ee, callback) {
let batch = 0;
let lastSeq = null;
const logStream = fs.createWriteStream(log);
let pending = 0;
// The number of changes to fetch per request
const limit = 100000;

// send documents ids to the queue in batches of bufferSize + the last batch
const processBuffer = function(lastOne) {
Expand All @@ -57,36 +60,52 @@ module.exports = function(db, log, bufferSize, ee, callback) {
processBuffer(false);
} else if (c.last_seq) {
lastSeq = c.last_seq;
pending = c.pending;
}
}
};

// stream the changes feed to disk
db.service.postChangesAsStream({ db: db.db, seq_interval: 10000 }).then(response => {
response.result.pipe(liner())
.on('error', function(err) {
callback(err);
function getChanges(since = 0) {
debug('making changes request since ' + since);
return db.service.postChangesAsStream({ db: db.db, since: since, limit: limit, seq_interval: limit })
.then(response => {
response.result.pipe(liner())
.on('error', function(err) {
logStream.end();
callback(err);
})
.pipe(change(onChange))
.on('error', function(err) {
logStream.end();
callback(err);
})
.on('finish', function() {
processBuffer(true);
if (!lastSeq) {
logStream.end();
debug('changes request terminated before last_seq was sent');
callback(new error.BackupError('SpoolChangesError', 'Changes request terminated before last_seq was sent'));
} else {
debug(`changes request completed with last_seq: ${lastSeq} and ${pending} changes pending.`);
if (pending > 0) {
// Return the next promise
return getChanges(lastSeq);
} else {
debug('finished streaming database changes');
logStream.end(':changes_complete ' + lastSeq + '\n', 'utf8', callback);
}
}
});
})
.pipe(change(onChange))
.on('error', function(err) {
callback(err);
})
.on('finish', function() {
processBuffer(true);
if (!lastSeq) {
logStream.end();
debug('changes request terminated before last_seq was sent');
callback(new error.BackupError('SpoolChangesError', 'Changes request terminated before last_seq was sent'));
} else {
debug('finished streaming database changes');
logStream.end(':changes_complete ' + lastSeq + '\n', 'utf8', callback);
.catch(err => {
logStream.end();
if (err.status && err.status >= 400) {
callback(error.convertResponseError(err));
} else if (err.name !== 'SpoolChangesError') {
callback(new error.BackupError('SpoolChangesError', `Failed changes request - ${err.message}`));
}
});
}).catch(err => {
if (err.status && err.status >= 400) {
callback(error.convertResponseError(err));
} else {
callback(new error.BackupError('SpoolChangesError', `Failed changes request - ${err.message}`));
}
});
}

getChanges();
};
94 changes: 94 additions & 0 deletions test/spoolchanges.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,38 @@ const longTestTimeout = 3000;

const db = request.client(`${url}/${dbName}`, { parallelism: 1 });

const seqSuffix = Buffer.alloc(124, 'abc123').toString('base64');
function provideChanges(batchSize, totalChanges, fullResponse = false) {
let pending = totalChanges;
const sparseResultsArray = Array(batchSize).fill({
seq: null,
id: 'doc',
changes: [{ rev: '1-abcdef0123456789abcdef0123456789' }]
});
nock(url)
.post(`/${dbName}/_changes`)
.query(true)
.times(totalChanges / batchSize + (totalChanges % batchSize > 0 ? 1 : 0))
.reply(200, (uri, requestBody) => {
pending -= batchSize;
const lastSeq = (totalChanges - pending);
const seq = lastSeq - batchSize;
return {
results: fullResponse
? Array.from(Array(batchSize), (_, i) => {
return {
seq: `${seq + i}-${seqSuffix}`,
id: `doc${seq + i}`,
changes: [{ rev: '1-abcdef0123456789abcdef0123456789' }]
};
})
: sparseResultsArray,
pending: pending,
last_seq: `${lastSeq}-abc`
};
});
}

describe('#unit Check spool changes', function() {
it('should terminate on request error', function(done) {
nock(url)
Expand Down Expand Up @@ -59,4 +91,66 @@ describe('#unit Check spool changes', function() {
done();
});
}).timeout(longTestTimeout);

it('should keep collecting changes', function(done) {
// This test validates that spooling will correctly
// continue across multiple requests
// (4 batches of 100000 to be precise).
// This test might take up to 10 seconds
this.timeout(10 * 1000);

// Use full changes for this test
provideChanges(100000, 400000, true);
changes(db, '/dev/null', 500, null, function(err) {
assert.ok(!err);
assert.ok(nock.isDone());
done();
});
});

it('should keep collecting sparse changes', function(done) {
// This test checks that making thousands of requests doesn't
// make anything bad happen.
// This test might take up to 25 seconds
this.timeout(25 * 1000);
// Use sparse changes for this test and a batch size of 1
provideChanges(1, 2500);
changes(db, '/dev/null', 500, null, function(err) {
assert.ok(!err);
assert.ok(nock.isDone());
done();
});
});
});

describe('Longer spool changes checks', function() {
it('#slow should keep collecting changes (25M)', function(done) {
// This test might take up to 2 minutes
this.timeout(2 * 60 * 1000);
// Note changes spooling uses a constant batch size, we are setting
// a test value here and setting the buffer to match
const batch = 100000;
// Use sparse changes for this test
provideChanges(batch, 25000000);
changes(db, '/dev/null', batch, null, function(err) {
assert.ok(!err);
assert.ok(nock.isDone());
done();
});
});

it('#slower should keep collecting changes (500M)', function(done) {
// This test might take up to 90 minutes
this.timeout(90 * 60 * 1000);
// Note changes spooling uses a constant batch size, we are setting
// a test value here and setting the buffer to match
const batch = 1000000;
// Use full changes for this test to exercise load
provideChanges(batch, 500000000, true);
changes(db, '/dev/null', batch, null, function(err) {
assert.ok(!err);
assert.ok(nock.isDone());
done();
});
});
});

0 comments on commit 0e787c2

Please sign in to comment.