Skip to content

Commit

Permalink
Track timers + display in status server to try to help diagnose stalls
Browse files Browse the repository at this point in the history
Ref #99
  • Loading branch information
animetosho committed Dec 26, 2022
1 parent bb94c33 commit 92f5751
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 36 deletions.
6 changes: 1 addition & 5 deletions bin/nyuu.js
Original file line number Diff line number Diff line change
Expand Up @@ -1293,11 +1293,7 @@ var filesToUpload = argv._;
setTimeout(function() {
var handles = cliUtil.activeHandleCounts();
if(handles) {
var handleStr = '';
for(var hn in handles[0]) {
handleStr += ', ' + hn + (handles[0][hn] > 1 ? ' (' + handles[0][hn] + ')' : '');
}
Nyuu.log.warn('Process did not terminate cleanly; active handles: ' + handleStr.substr(2));
Nyuu.log.warn('Process did not terminate cleanly; active handles: ' + cliUtil.activeHandlesStr(handles[0]));
if(verbosity >= 4 && handles[1]) {
process.stderr.write(require('util').inspect(handles[1], {colors: argv.colorize}) + '\n');
}
Expand Down
32 changes: 28 additions & 4 deletions cli/progressmgr.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ var toPercent = function(n) {
return (Math.round(n*10000)/100).toFixed(2) + '%';
};

var writeState = function(uploader, startTime, conn) {
var procman = require('./procman');
var timerMgr = require('../lib/timeoutwrap');

var writeState = function(uploader, startTime, conn, debug) {
var now = Date.now();

// TODO: JSON output etc
Expand All @@ -17,7 +20,6 @@ var writeState = function(uploader, startTime, conn) {
'',
'Post queue size: ' + uploader.queue.queue.length + ' (' + toPercent(Math.min(uploader.queue.queue.length/uploader.queue.size, 1)) + ' full)' + (uploader.queue.hasFinished ? ' - finished' : ''),
'Check queue size: ' + uploader.checkQueue.queue.length + ' + ' + uploader.checkQueue.pendingAdds + ' delayed' + ' (' + toPercent(Math.min((uploader.checkQueue.queue.length+uploader.checkQueue.pendingAdds)/uploader.checkQueue.size, 1)) + ' full)' + (uploader.checkQueue.hasFinished ? ' - finished' : ''),
'Check cache size: ' + uploader.checkCache.cacheSize + ' (' + toPercent(Math.min(uploader.checkCache.cacheSize/uploader.checkCache.size, 1)) + ' full)',
'Re-read queue size: ' + uploader.reloadQueue.queue.length,
'',
'Article activity: ' + uploader.postActive + ' posting, ' + uploader.checkActive + ' checking',
Expand Down Expand Up @@ -51,6 +53,28 @@ var writeState = function(uploader, startTime, conn) {
conn.write('===== Check Connections\' Status =====\r\n');
dumpConnections(uploader.checkConnections);
}

if(debug) {
conn.write('===== Active Timers =====\r\n');
var timers = timerMgr.all();
if(timers.length) {
conn.write(cliUtil.repeatChar(' ', 20) + '| Remaining / Delay (ms)\r\n');
timers.forEach(function(timer) {
conn.write(cliUtil.rpad(timer.label, 20, ' ') + '| ' + cliUtil.lpad(''+(now-timer.start), 9, ' ') + ' ' + cliUtil.lpad(''+timer.delay, 9, ' ') + '\r\n');
});
conn.write('\r\n');
} else {
conn.write('[none]\r\n\r\n');
}

// TODO: consider listing managed processes, opened NZB files, buffer pool state, file reader state

conn.write('===== Other =====\r\n');
conn.write('Check cache size: ' + uploader.checkCache.cacheSize + ' (' + toPercent(Math.min(uploader.checkCache.cacheSize/uploader.checkCache.size, 1)) + ' full)\r\n');
var handles = cliUtil.activeHandleCounts();
if(handles)
conn.write('Active Handles: ' + cliUtil.activeHandlesStr(handles[0]) + '\r\n');
}
};

var progressReport = function(uploader, totalPieces, totalSize, startTime, now) {
Expand Down Expand Up @@ -251,12 +275,12 @@ module.exports = {
resp.write('Specified post not found in queue');
}
resp.end();
} else if(!path || path == '/') {
} else if(!path || path == '/' || path == '/debug') {
// dump overall status
resp.writeHead(200, {
'Content-Type': 'text/plain'
});
writeState(uploader, startTime, resp);
writeState(uploader, startTime, resp, path == '/debug');
resp.end();
} else {
resp.writeHead(404, {
Expand Down
7 changes: 7 additions & 0 deletions cli/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ module.exports = {
// TODO: is there any way to exclude stdout/stderr?
}
return [hTypes, ah];
},
activeHandlesStr: function(hTypes) {
var handleStr = '';
for(var hn in hTypes) {
handleStr += ', ' + hn + (hTypes[hn] > 1 ? ' (' + hTypes[hn] + ')' : '');
}
return handleStr.substr(2);
}

};
21 changes: 11 additions & 10 deletions lib/nntp.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
var tls, net, netStub;
var async = require('async');
var util = require('./util');
var Timer = require('./timeoutwrap');

var ENCODING = null; // 'utf8'
var RE_LINE = /^(\d\d\d) (.*)\r\n$/;
Expand Down Expand Up @@ -182,7 +183,7 @@ NNTP.prototype = {
if(this._timer) this._clearTimer(); // if request timer is active, clear it

if(this.opts.connTimeout)
this._setTimer(this._onConnectFail.bind(this, new NNTPError('connect_timeout', 'Connect timed out')), this.opts.connTimeout);
this._setTimer('connect', this._onConnectFail.bind(this, new NNTPError('connect_timeout', 'Connect timed out')), this.opts.connTimeout);

var self = this;
async.waterfall([
Expand Down Expand Up @@ -335,7 +336,7 @@ NNTP.prototype = {

this._connectCb = cb; // set this back for .destroy() calls to work properly (.connect() will reset this anyway)
this._respFunc = cb;
this._setTimer(this.connect.bind(this, cb), this.opts.reconnectDelay);
this._setTimer('reconnect', this.connect.bind(this, cb), this.opts.reconnectDelay);
}
).bind(this));
},
Expand Down Expand Up @@ -653,23 +654,23 @@ NNTP.prototype = {
var self = this;
var timer = null;
if(this.opts.closeTimeout)
timer = setTimeout(function() {
timer = Timer('nntp-disconnect', function() {
self.warn('Disconnect timed out, forcefully dropping connection...');
self._destroy();
self._callMulti(self._closeWaitCb);
self._closeWaitCb = [];
}, this.opts.closeTimeout);
this.socket.once('close', function() {
if(!self.socket) return; // node 10.x issue
if(timer) clearTimeout(timer);
if(timer) timer.cancel();
self._boundOnClose();
self._callMulti(self._closeWaitCb);
self._closeWaitCb = [];
});
this.socket.removeListener('close', this._boundOnClose); // need to manage the ordering of this one
this.socket.once('error', function(err) {
if(!self.socket) return; // node 10.x issue
if(timer) clearTimeout(timer);
if(timer) timer.cancel();
self._destroy();
self._callMulti(self._closeWaitCb);
self._closeWaitCb = [];
Expand Down Expand Up @@ -834,7 +835,7 @@ NNTP.prototype = {
if(req.postRetries++ < self.opts.postRetries) {
self.warn('Got "' + (code + ' ' + info).trim() + '" response when posting article ' + msgId + '; will retry (attempt ' + req.postRetries + '/' + self.opts.postRetries + ')');
self.numErrors++;
return self._setTimer(doPost, self.opts.postRetryDelay);
return self._setTimer('repost', doPost, self.opts.postRetryDelay);
}
return cb(new NNTPError('post_denied', 'Server could not accept post '+ msgId + ', returned: ' + code + ' ' + info), msgId);
}
Expand Down Expand Up @@ -964,7 +965,7 @@ NNTP.prototype = {
_requestSetTimer: function(type, time) {
if(!time) return;
var self = this;
this._setTimer(function() {
this._setTimer(type, function() {
// timed out - retry
// we destroy the connection because this one probably isn't reliable
// since NNTP doesn't have request/response identifiers, this is the safest approach
Expand Down Expand Up @@ -1000,15 +1001,15 @@ NNTP.prototype = {
}
return rtnLen ? len : writ;
},
_setTimer: function(func, time) {
_setTimer: function(label, func, time) {
if(this._timer) throw new Error('Timer already set');
this._timer = setTimeout(function() {
this._timer = Timer('nntp-'+label, function() {
this._timer = null;
func();
}, time);
},
_clearTimer: function() {
clearTimeout(this._timer);
this._timer.cancel();
this._timer = null;
},
warn: function(msg) {
Expand Down
7 changes: 4 additions & 3 deletions lib/throttlequeue.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"use strict";
var Timer = require('./timeoutwrap');

function ThrottleCancelToken(parent, id) {
this.cancel = parent._cancelItem.bind(parent, id);
Expand Down Expand Up @@ -87,7 +88,7 @@ module.exports.prototype = {
if(waitTime <= 0) // in case this happens
setImmediate(this.onTimeout);
else
this.timer = setTimeout(this.onTimeout, waitTime);
this.timer = Timer('thottle', this.onTimeout, waitTime);
},

// TODO: support dynamically adjusting limits
Expand All @@ -99,7 +100,7 @@ module.exports.prototype = {
this.queue[idx].cb(true);
this.queue.splice(idx, 1);
if(!this.queue.length && this.timer) {
clearTimeout(this.timer);
this.timer.cancel();
this.timer = null;
}
return true;
Expand All @@ -114,7 +115,7 @@ module.exports.prototype = {
this._finish(false);
},
_finish: function(cancelled) {
if(this.timer) clearTimeout(this.timer);
if(this.timer) this.timer.cancel();
this.timer = null;
this.queue.forEach(function(item) {
item.cb(cancelled);
Expand Down
54 changes: 54 additions & 0 deletions lib/timeoutwrap.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"use strict";
// wrapper around setTimeout to allow all timers to be tracked
var timers = {};
var timerPos = 0;

function TimerWrapper(label, callback, delay) {
this.label = label;
this.cb = callback;
this._id = ++timerPos;
this.delay = delay;
this.start = Date.now();
this.timer = setTimeout(this._onTimeout.bind(this), delay);
timers[this._id] = this;
}
TimerWrapper.prototype = {
label: null,
timer: null,
start: 0,
delay: 0,
cb: null,
onCancel: null,
_id: 0,
_onTimeout: function() {
this._remove();
this.cb();
},
cancel: function() {
if(this.timer) {
clearTimeout(this.timer);
this._remove();
if(this.onCancel) this.onCancel();
}
},
_remove: function() {
delete timers[this._id];
this.timer = null;
}
};

module.exports = function(label, callback, delay) {
return new TimerWrapper(label, callback, delay);
};

module.exports.all = function() {
var ret = [];
for(var id in timers)
ret.push(timers[id]);
return ret;
};

module.exports.None = {
label: null,
cancel: function(){}
};
6 changes: 4 additions & 2 deletions lib/timerqueue.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"use strict";
var Timer = require('./timeoutwrap');

module.exports = function(size, takeStack) {
this.queue = [];
Expand All @@ -15,12 +16,13 @@ module.exports.prototype = {
pendingAdds: 0,
_pendingId: 0,
hasFinished: false,
timerLabel: 'queue',
add: function(time, data, cb) {
if(time <= 0) // NOTE: result is undefined for time < 0
this._add(data);
else {
var id = this._pendingId++;
var t = setTimeout(function() {
var t = Timer(this.timerLabel, function() {
delete this.queuePending[id];
this.pendingAdds--;
this._add(data);
Expand Down Expand Up @@ -103,7 +105,7 @@ module.exports.prototype = {
this.pendingAdds = 0;
for(var id in this.queuePending) {
var item = this.queuePending[id];
clearTimeout(item.timer);
item.timer.cancel();
if(cancel)
this._shiftAdd();
else
Expand Down
19 changes: 7 additions & 12 deletions lib/uploader.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ var TimerQueue = require('./timerqueue');
var ThrottleQueue = require('./throttlequeue');
var CacheMgr = require('./cachehelper');
var config = require('../config');
var Timer = require('./timeoutwrap');

function UploaderError(message) {
var r = Error.call(this, message);
Expand Down Expand Up @@ -77,6 +78,7 @@ function Uploader(opts, cb) {
if(!this.opts.check.tries) this.numCheckConns = 0;
this.queue = new Queue(util.optSel(this.opts.articleQueueBuffer, Math.min(Math.round(this.numPostConns*0.5)+2, 25)), this.opts.useLazyConnect);
this.checkQueue = new TimerQueue(this.opts.check.queueBuffer, this.opts.useLazyConnect);
this.checkQueue.timerLabel = 'check-delay';
this.checkCache = new CacheMgr(function(post) {
post.releaseData();
}, util.optSel(this.opts.check.queueCache, Math.min(this.numPostConns*8, 100)));
Expand All @@ -85,7 +87,6 @@ function Uploader(opts, cb) {
this.hasFinished = false;
this.postConnections = [];
this.checkConnections = [];
this._checkStartTimer = [];

this.throttlers = [];
this.rawSpeedTime = new RawSpeedTimeTracker();
Expand Down Expand Up @@ -144,7 +145,6 @@ Uploader.prototype = {
ended: false,
numPostConns: 0,
numCheckConns: 0,
_checkStartTimer: null,

addPost: function(post, continueCb, completeCb) {
this.articlesRead++;
Expand Down Expand Up @@ -313,18 +313,12 @@ Uploader.prototype = {
if(self.opts.useLazyConnect)
self._checkLoop(c, cb)();
else {
var timer = self._checkStartTimer[c.connNum] = setTimeout(function() {
self._checkStartTimer[c.connNum] = null;
Timer('check-init', function() {
c.connect(function(err) {
if(err && err.code == 'cancelled' && self.ended) return cb();
self._checkLoop(c, cb)(err);
});
}, chkOpts.delay);
timer.cancel = function() {
clearTimeout(timer);
self._checkStartTimer[c.connNum] = null;
cb();
};
}, chkOpts.delay).onCancel = cb;
}
})
], cb);
Expand Down Expand Up @@ -472,8 +466,9 @@ Uploader.prototype = {
});
},
_cancelCheckTimers: function() {
this._checkStartTimer.forEach(function(timer) {
if(timer) timer.cancel();
Timer.all().forEach(function(timer) {
if(timer.label == 'check-init')
timer.cancel();
});
},
_postComplete: function(post, err, cb) {
Expand Down

0 comments on commit 92f5751

Please sign in to comment.