Skip to content

Commit

Permalink
style: more async/await (#3358)
Browse files Browse the repository at this point in the history
- es6(async/promise): pre_send_trans_email_respond, process_delivery
- server:
  - es6(async) _graceful, get_smtp_server, setup_smtp_listeners
  - replace async.eachLimit with Promise.all batches
  - status: replace async.map with Promise.allSettled
  • Loading branch information
msimerson authored May 15, 2024
1 parent 027b1ba commit aba0638
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 241 deletions.
4 changes: 4 additions & 0 deletions Changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
- remove config setting ipv6_enabled #3322
- remove undocumented use of send_email with arity of 2. #3322
- encapsulate force_tls logic into get_force_tls #3322
- es6(async/promise): pre_send_trans_email_respond, process_delivery
- queue/lmtp: refactored for DRY and improved readability #3322
- smtp_client: pass connect_timeout, maybe fixes #3281
- spamassassin: repackaged as NPM module #3348
Expand All @@ -66,6 +67,9 @@
- doc(Plugins.md): update registry
- remove last vestiges of header_hide_version (long ago renamed)
- server.js: use the local logger methods
- es6(async): _graceful, get_smtp_server, setup_smtp_listeners
- replace async.eachLimit with Promise.all batches
- status: replace async.map with Promise.allSettled
- get Haraka version from utils.getVersion (which includes git id if running from repo)
- tls_socket: remove secureConnection. Fixes #2743
- getSocketOpts is now async
Expand Down
2 changes: 1 addition & 1 deletion outbound/hmail.js
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ class HMailItem extends events.EventEmitter {
return this.bounce(`Domain ${this.todo.domain} sends and receives no email (NULL MX)`);
}

// resolves the MX hostnames into IPs
// resolves the MX hostnames to IPs
this.mxlist = await net_utils.resolve_mx_hosts(mxs);

this.try_deliver();
Expand Down
104 changes: 52 additions & 52 deletions outbound/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const fs = require('node:fs');
const path = require('node:path');

const async = require('async');
const { Address } = require('address-rfc2821');
const config = require('haraka-config');
const constants = require('haraka-constants');
Expand Down Expand Up @@ -232,75 +231,76 @@ exports.send_trans_email = function (transaction, next) {
transaction.results = new ResultStore(connection);
}

connection.pre_send_trans_email_respond = retval => {
connection.pre_send_trans_email_respond = async (retval) => {
const deliveries = get_deliveries(transaction);
const hmails = [];
const ok_paths = [];

let todo_index = 1;

async.forEachSeries(deliveries, (deliv, cb) => {
const todo = new TODOItem(deliv.domain, deliv.rcpts, transaction);
todo.uuid = `${todo.uuid}.${todo_index}`;
todo_index++;
this.process_delivery(ok_paths, todo, hmails, cb);
},
(err) => {
if (err) {
for (let i=0, l=ok_paths.length; i<l; i++) {
fs.unlink(ok_paths[i], () => {});
}
transaction.results.add({ name: 'outbound'}, { err });
if (next) next(constants.denysoft, err);
return;
try {
for (const deliv of deliveries) {
const todo = new TODOItem(deliv.domain, deliv.rcpts, transaction);
todo.uuid = `${todo.uuid}.${todo_index}`;
todo_index++;
await this.process_delivery(ok_paths, todo, hmails);
}

for (const hmail of hmails) {
delivery_queue.push(hmail);
}
catch (err) {
for (let i=0, l=ok_paths.length; i<l; i++) {
fs.unlink(ok_paths[i], () => {});
}
transaction.results.add({ name: 'outbound'}, { err });
if (next) next(constants.denysoft, err);
return;
}

transaction.results.add({ name: 'outbound'}, { pass: "queued" });
if (next) {
next(constants.ok, `Message Queued (${transaction.uuid})`);
}
});
for (const hmail of hmails) {
delivery_queue.push(hmail);
}

transaction.results.add({ name: 'outbound'}, { pass: "queued" });
if (next) next(constants.ok, `Message Queued (${transaction.uuid})`);
}

plugins.run_hooks('pre_send_trans_email', connection);
}

exports.process_delivery = function (ok_paths, todo, hmails, cb) {
logger.info(exports, `Transaction delivery for domain: ${todo.domain}`);
const fname = _qfile.name();
const tmp_path = path.join(queue_dir, `${_qfile.platformDOT}${fname}`);
const ws = new FsyncWriteStream(tmp_path, { flags: constants.WRITE_EXCL });
exports.process_delivery = function (ok_paths, todo, hmails) {
return new Promise((resolve, reject) => {

logger.info(exports, `Transaction delivery for domain: ${todo.domain}`);
const fname = _qfile.name();
const tmp_path = path.join(queue_dir, `${_qfile.platformDOT}${fname}`);
const ws = new FsyncWriteStream(tmp_path, { flags: constants.WRITE_EXCL });

ws.on('close', () => {
const dest_path = path.join(queue_dir, fname);
fs.rename(tmp_path, dest_path, err => {
if (err) {
logger.error(exports, `Unable to rename tmp file!: ${err}`);
fs.unlink(tmp_path, () => {});
reject("Queue error");
}
else {
hmails.push(new HMailItem (fname, dest_path, todo.notes));
ok_paths.push(dest_path);
resolve();
}
})
})

ws.on('close', () => {
const dest_path = path.join(queue_dir, fname);
fs.rename(tmp_path, dest_path, err => {
if (err) {
logger.error(exports, `Unable to rename tmp file!: ${err}`);
fs.unlink(tmp_path, () => {});
cb("Queue error");
}
else {
hmails.push(new HMailItem (fname, dest_path, todo.notes));
ok_paths.push(dest_path);
cb();
}
ws.on('error', err => {
logger.error(exports, `Unable to write queue file (${fname}): ${err}`);
ws.destroy();
fs.unlink(tmp_path, () => {});
reject("Queueing failed");
})
})

ws.on('error', err => {
logger.error(exports, `Unable to write queue file (${fname}): ${err}`);
ws.destroy();
fs.unlink(tmp_path, () => {});
cb("Queueing failed");
this.build_todo(todo, ws, () => {
todo.message_stream.pipe(ws, { dot_stuffing: true });
});
})

this.build_todo(todo, ws, () => {
todo.message_stream.pipe(ws, { dot_stuffing: true });
});
}

exports.build_todo = (todo, ws, write_more) => {
Expand Down
5 changes: 2 additions & 3 deletions outbound/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,15 @@ exports.load_queue = pid => {
}

exports._load_cur_queue = (pid, iteratee, cb) => {
const self = exports;
logger.info(exports, "Loading outbound queue from ", queue_dir);
fs.readdir(queue_dir, (err, files) => {
if (err) {
return logger.error(exports, `Failed to load queue directory (${queue_dir}): ${err}`);
}

self.cur_time = new Date(); // set once so we're not calling it a lot
this.cur_time = new Date(); // set once so we're not calling it a lot

self.load_queue_files(pid, files, iteratee, cb);
this.load_queue_files(pid, files, iteratee, cb);
});
}

Expand Down
3 changes: 2 additions & 1 deletion outbound/timer_queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class TQTimer {
class TimerQueue {

constructor (interval = 1000) {
this.name = 'outbound/timer_queue'
this.queue = [];
this.interval_timer = setInterval(() => { this.fire(); }, interval);
this.interval_timer.unref() // allow server to exit
Expand Down Expand Up @@ -72,7 +73,7 @@ class TimerQueue {
}

drain () {
logger.debug({ name: 'outbound/timer_queue'}, `Draining ${this.queue.length} items from the queue`);
logger.debug(this, `Draining ${this.queue.length} items from the queue`);
while (this.queue.length) {
const to_run = this.queue.shift();
if (to_run.cb) to_run.cb();
Expand Down
67 changes: 35 additions & 32 deletions plugins/status.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
const fs = require('node:fs');
const path = require('node:path');

const async = require('async');

exports.register = function () {
this.outbound = require('../outbound');
this.queue_dir = require('../outbound/queue').queue_dir;
Expand Down Expand Up @@ -82,38 +80,36 @@ exports.pool_list = cb => {
const result = {};

if (server.notes.pool) {
Object.keys(server.notes.pool).forEach(name => {
for (const name of Object.keys(server.notes.pool)) {
const instance = server.notes.pool[name];

result[name] = {
inUse: instance.inUseObjectsCount(),
size: instance.getPoolSize()
};
});
}
}

cb(null, result);
}

exports.queue_list = function (cb) {
this.outbound.list_queue((err, qlist) => {
if (err) cb(err);

this.outbound.list_queue((err, qlist = []) => {
const result = [];

if (qlist) {
qlist.forEach((todo) => result.push({
for (const todo of qlist) {
result.push({
file: todo.file,
uuid: todo.uuid,
queue_time: todo.queue_time,
domain: todo.domain,
from: todo.mail_from.toString(),
to: todo.rcpt_to.map((r) => r.toString())
}));
})
}

cb(err, result);
});
})
}

exports.queue_stats = function (cb) {
Expand Down Expand Up @@ -166,14 +162,14 @@ exports.queue_push = function (file, cb) {
// cluster IPC

exports.hook_init_master = function (next) {
const self = this;
const plugin = this;

if (!server.cluster) return next();

function message_handler (sender, msg) {
if (msg.event !== 'status.request') return;

self.call_workers(msg, (err, response) => {
plugin.call_workers(msg, (response) => {
msg.result = response.filter((el) => el != null);
msg.event = 'status.result';
sender.send(msg);
Expand Down Expand Up @@ -215,32 +211,39 @@ exports.call_master = (cmd, cb) => {
}

exports.call_workers = function (cmd, cb) {

async.map(server.cluster.workers, (w, done) => {
this.call_worker(w, cmd, done);
}, cb);
Promise.allSettled(
Object.values(server.cluster.workers).map(w => this.call_worker(w, cmd))
)
.then(r => {
cb(
// r.filter(s => s.status === 'rejected').flatMap(s => s.reason),
r.filter(s => s.status === 'fulfilled').flatMap(s => s.value),
)
})
}

// sends command to worker and then wait for response or timeout
exports.call_worker = (worker, cmd, cb) => {
let timeout;
exports.call_worker = (worker, cmd) => {
return new Promise((resolve) => {
let timeout;

function message_handler (sender, msg) {
if (sender.id !== worker.id) return;
if (msg.event !== 'status.response') return;
function message_handler (sender, msg) {
if (sender.id !== worker.id) return;
if (msg.event !== 'status.response') return;

clearTimeout(timeout);
server.cluster.removeListener('message', message_handler);
clearTimeout(timeout);
server.cluster.removeListener('message', message_handler);

cb(null, msg.result);
}
resolve(msg.result);
}

timeout = setTimeout(() => {
server.cluster.removeListener('message', message_handler);
cb();
}, 1000);
timeout = setTimeout(() => {
server.cluster.removeListener('message', message_handler);
resolve();
}, 1000);


server.cluster.on('message', message_handler);
worker.send(cmd);
server.cluster.on('message', message_handler);
worker.send(cmd);
})
}
Loading

0 comments on commit aba0638

Please sign in to comment.