Skip to content

Commit

Permalink
Merge pull request #778 from amqp-node/configure-channel-highwatermark
Browse files Browse the repository at this point in the history
Support channel options like highwatermark
  • Loading branch information
cressie176 authored Dec 6, 2024
2 parents d2af467 + 6add58c commit c8ae9cd
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 7 deletions.
14 changes: 12 additions & 2 deletions lib/callback_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ class CallbackModel extends EventEmitter {
this.connection._updateSecret(newSecret, reason, cb);
}

createChannel (cb) {
createChannel (options, cb) {
if (arguments.length === 1) {
cb = options;
options = undefined;
}
var ch = new Channel(this.connection);
ch.setOptions(options);
ch.open(function (err, ok) {
if (err === null)
cb && cb(null, ch);
Expand All @@ -39,8 +44,13 @@ class CallbackModel extends EventEmitter {
return ch;
}

createConfirmChannel (cb) {
createConfirmChannel (options, cb) {
if (arguments.length === 1) {
cb = options;
options = undefined;
}
var ch = new ConfirmChannel(this.connection);
ch.setOptions(options);
ch.open(function (err) {
if (err !== null)
return cb && cb(err);
Expand Down
6 changes: 5 additions & 1 deletion lib/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ class Channel extends EventEmitter {
this.handleMessage = acceptDeliveryOrReturn;
}

setOptions(options) {
this.options = options;
}

allocate () {
this.ch = this.connection.freshChannel(this);
this.ch = this.connection.freshChannel(this, this.options);
return this;
}

Expand Down
6 changes: 4 additions & 2 deletions lib/channel_model.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ class ChannelModel extends EventEmitter {
return promisify(this.connection._updateSecret.bind(this.connection))(newSecret, reason);
}

async createChannel() {
async createChannel(options) {
const channel = new Channel(this.connection);
channel.setOptions(options);
await channel.open();
return channel;
}

async createConfirmChannel() {
async createConfirmChannel(options) {
const channel = new ConfirmChannel(this.connection);
channel.setOptions(options);
await channel.open();
await channel.rpc(defs.ConfirmSelect, {nowait: false}, defs.ConfirmSelectOk);
return channel;
Expand Down
63 changes: 61 additions & 2 deletions test/callback_api.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,14 @@ suite('updateSecret', function() {
});

function channel_test_fn(method) {
return function(name, chfun) {
return function(name, options, chfun) {
if (arguments.length === 2) {
chfun = options;
options = {};
}
test(name, function(done) {
connect(kCallback(function(c) {
c[method](kCallback(function(ch) {
c[method](options, kCallback(function(ch) {
chfun(ch, done);
}, done));
}, done));
Expand Down Expand Up @@ -210,6 +214,33 @@ suite('sending messages', function() {
});
});

var channelOptions = {};

channel_test('find high watermark', function(ch, done) {
var msg = randomString();
var baseline = 0;
ch.assertQueue('', {exclusive: true}, function(e, q) {
if (e !== null) return done(e);
while (ch.sendToQueue(q.queue, Buffer.from(msg))) {
baseline++;
};
channelOptions.highWaterMark = baseline * 2;
done();
})
});

channel_test('set high watermark', channelOptions, function(ch, done) {
var msg = randomString();
ch.assertQueue('', {exclusive: true}, function(e, q) {
if (e !== null) return done(e);
var ok;
for (var i = 0; i < channelOptions.highWaterMark; i++) {
ok = ch.sendToQueue(q.queue, Buffer.from(msg));
assert.equal(ok, true);
}
done();
});
});
});

suite('ConfirmChannel', function() {
Expand All @@ -228,6 +259,34 @@ suite('ConfirmChannel', function() {
ch.waitForConfirms(done);
});

var channelOptions = {};

confirm_channel_test('find high watermark', function(ch, done) {
var msg = randomString();
var baseline = 0;
ch.assertQueue('', {exclusive: true}, function(e, q) {
if (e !== null) return done(e);
while (ch.sendToQueue(q.queue, Buffer.from(msg))) {
baseline++;
};
channelOptions.highWaterMark = baseline * 2;
done();
})
});

confirm_channel_test('set high watermark', channelOptions, function(ch, done) {
var msg = randomString();
ch.assertQueue('', {exclusive: true}, function(e, q) {
if (e !== null) return done(e);
var ok;
for (var i = 0; i < channelOptions.highWaterMark; i++) {
ok = ch.sendToQueue(q.queue, Buffer.from(msg));
assert.equal(ok, true);
}
done();
});
});

});

suite("Error handling", function() {
Expand Down

0 comments on commit c8ae9cd

Please sign in to comment.