Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQP Transporter #72

Merged
merged 5 commits into from
Aug 16, 2017
Merged

Conversation

Nathan-Schwartz
Copy link
Member

Resolves #1

This Transporter is a bit more involved than some of the others because it needs to treat some packet types specially. Broadcasted packets need to use AMQP exchanges, but targeted packets only need a queue.

This implementation also deals with Requests a bit differently than other Transporters do. Each action has its own work queue that multiple nodes can pull messages from. This means that messages are less likely to be lost (due to crashes / etc). Each node will consume more messages when it has an availability. The number of messages that can be handled concurrently by a node can be specified with the amqp.prefetch option.

Requests and Responses don’t have a set time-to-live, but all other packet types do. Currently events’ time-to-live can be configured using the amqp.eventTimeToLive option, but the other types are hardcoded at 5 seconds.

Transporter Options:

new AmqpTransporter({
  amqp: {
    url: "amqp://guest:guest@localhost:5672",
    eventTimeToLive: 5000,
    prefetch: 1  
  }
});

AMQP Transport behaviour by topic

AMQP Transporter diagram

I’ve included unit tests and some integration tests. The integration tests are skipped right now because they require an amqp server to be running at amqp://guest:guest@localhost:5672. Let me know if I should remove them. 👍


Other notes:

  • I don’t have a ton of experience with JSDoc, don’t hesitate to let me know if I can improve them.
  • I tried to to use istanbul ignore the same way as in other files, but let me know if I should change anything.


@coveralls
Copy link

coveralls commented Aug 13, 2017

Coverage Status

Coverage decreased (-0.3%) to 99.135% when pulling 730b537 on Nathan-Schwartz:amqp-transporter into 603267f on ice-services:master.

@benchmark-bot
Copy link

🏁 Benchmark results

📝 Moleculer common benchmarks

⚡ Suite: Local call

Test Master PR Diff %
broker.call (normal) 531,344 659,923 +128,580 Performance: +24%
broker.call (with params) 541,749 624,819 +83,070 Performance: +15%

⚡ Suite: Call with middlewares

Test Master PR Diff %
No middlewares 557,942 655,098 +97,155 Performance: +17%
5 middlewares 520,845 665,945 +145,100 Performance: +28%

⚡ Suite: Call with statistics & metrics

Test Master PR Diff %
No statistics 512,963 669,031 +156,068 Performance: +30%
With metrics 86,164 84,331 -1,834 Performance: -2%
With statistics 114,760 155,096 +40,336 Performance: +35%
With metrics & statistics 64,645 60,426 -4,219 Performance: -7%

⚡ Suite: Remote call with FakeTransporter

Test Master PR Diff %
Remote call echo.reply 13,767 11,727 -2,040 Performance: -15%
Master detailed results
{
  "name": "Moleculer common benchmarks",
  "suites": [
    {
      "name": "Local call",
      "tests": [
        {
          "name": "broker.call (normal)",
          "reference": true,
          "stat": {
            "duration": 5.000530275,
            "cycle": 0,
            "count": 2657000,
            "avg": 0.0000018820211799021454,
            "rps": 531343.6483493743,
            "percent": 0
          }
        },
        {
          "name": "broker.call (with params)",
          "fastest": true,
          "stat": {
            "duration": 5.000466823,
            "cycle": 0,
            "count": 2709000,
            "avg": 0.0000018458718431155408,
            "rps": 541749.4197821218,
            "percent": 1.9583882229651408
          }
        }
      ]
    },
    {
      "name": "Call with middlewares",
      "tests": [
        {
          "name": "No middlewares",
          "fastest": true,
          "reference": true,
          "stat": {
            "duration": 5.000517228,
            "cycle": 0,
            "count": 2790000,
            "avg": 0.00000179230008172043,
            "rps": 557942.2833257361,
            "percent": 0
          }
        },
        {
          "name": "5 middlewares",
          "stat": {
            "duration": 4.999572709,
            "cycle": 0,
            "count": 2604000,
            "avg": 0.0000019199587976190474,
            "rps": 520844.5104343416,
            "percent": -6.649034138489213
          }
        }
      ]
    },
    {
      "name": "Call with statistics & metrics",
      "tests": [
        {
          "name": "No statistics",
          "fastest": true,
          "reference": true,
          "stat": {
            "duration": 5.000355966,
            "cycle": 0,
            "count": 2565000,
            "avg": 0.0000019494565169590644,
            "rps": 512963.4804883409,
            "percent": 0
          }
        },
        {
          "name": "With metrics",
          "stat": {
            "duration": 5.013681836,
            "cycle": 0,
            "count": 432000,
            "avg": 0.00001160574499074074,
            "rps": 86164.22304624278,
            "percent": -83.20265938537875
          }
        },
        {
          "name": "With statistics",
          "stat": {
            "duration": 5.001726296,
            "cycle": 0,
            "count": 574000,
            "avg": 0.000008713808878048781,
            "rps": 114760.37792372635,
            "percent": -77.62796333679066
          }
        },
        {
          "name": "With metrics & statistics",
          "stat": {
            "duration": 5.089317055,
            "cycle": 0,
            "count": 329000,
            "avg": 0.0000154690487993921,
            "rps": 64645.2159385067,
            "percent": -87.39769624984521
          }
        }
      ]
    },
    {
      "name": "Remote call with FakeTransporter",
      "tests": [
        {
          "name": "Remote call echo.reply",
          "fastest": true,
          "stat": {
            "duration": 5.01190814,
            "cycle": 0,
            "count": 69000,
            "avg": 0.00007263634985507247,
            "rps": 13767.211623315985,
            "percent": 0
          }
        }
      ]
    }
  ],
  "timestamp": 1502592737122,
  "generated": "Sun Aug 13 2017 02:52:17 GMT+0000 (UTC)",
  "elapsedMs": 46936
}
  
PR detailed results
{
  "name": "Moleculer common benchmarks",
  "suites": [
    {
      "name": "Local call",
      "tests": [
        {
          "name": "broker.call (normal)",
          "fastest": true,
          "reference": true,
          "stat": {
            "duration": 5.00058052,
            "cycle": 0,
            "count": 3300000,
            "avg": 0.0000015153274303030302,
            "rps": 659923.3802558588,
            "percent": 0
          }
        },
        {
          "name": "broker.call (with params)",
          "stat": {
            "duration": 4.999847396,
            "cycle": 0,
            "count": 3124000,
            "avg": 0.0000016004633149807937,
            "rps": 624819.069977871,
            "percent": -5.319452428610333
          }
        }
      ]
    },
    {
      "name": "Call with middlewares",
      "tests": [
        {
          "name": "No middlewares",
          "reference": true,
          "stat": {
            "duration": 5.00078164,
            "cycle": 0,
            "count": 3276000,
            "avg": 0.000001526490122100122,
            "rps": 655097.5899039656,
            "percent": 0
          }
        },
        {
          "name": "5 middlewares",
          "fastest": true,
          "stat": {
            "duration": 5.000413262,
            "cycle": 0,
            "count": 3330000,
            "avg": 0.0000015016256042042043,
            "rps": 665944.9580509491,
            "percent": 1.655840032715389
          }
        }
      ]
    },
    {
      "name": "Call with statistics & metrics",
      "tests": [
        {
          "name": "No statistics",
          "fastest": true,
          "reference": true,
          "stat": {
            "duration": 4.999767815,
            "cycle": 0,
            "count": 3345000,
            "avg": 0.000001494698898355755,
            "rps": 669031.0677956952,
            "percent": 0
          }
        },
        {
          "name": "With metrics",
          "stat": {
            "duration": 5.004111904,
            "cycle": 0,
            "count": 422000,
            "avg": 0.00001185808508056872,
            "rps": 84330.64809415581,
            "percent": -87.39510731960385
          }
        },
        {
          "name": "With statistics",
          "stat": {
            "duration": 5.003340497,
            "cycle": 0,
            "count": 776000,
            "avg": 0.000006447603733247422,
            "rps": 155096.38020144525,
            "percent": -76.81776113739345
          }
        },
        {
          "name": "With metrics & statistics",
          "stat": {
            "duration": 5.146806354,
            "cycle": 0,
            "count": 311000,
            "avg": 0.000016549216572347265,
            "rps": 60425.82110327441,
            "percent": -90.96815917646938
          }
        }
      ]
    },
    {
      "name": "Remote call with FakeTransporter",
      "tests": [
        {
          "name": "Remote call echo.reply",
          "fastest": true,
          "stat": {
            "duration": 5.116487725,
            "cycle": 0,
            "count": 60000,
            "avg": 0.00008527479541666667,
            "rps": 11726.79447794434,
            "percent": 0
          }
        }
      ]
    }
  ],
  "timestamp": 1502592847877,
  "generated": "Sun Aug 13 2017 02:54:07 GMT+0000 (UTC)",
  "elapsedMs": 47090
}
  

opts = { amqp: { url: opts } };

// Number of requests a broker will handle concurrently
if (typeof opts.amqp.prefetch !== "number")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will break if opts is a string.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@WoLfulus If opts was a string it should have already been reassigned to an object by this point. The first if-statement in the constructor should handle this case, I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, I didn't see you reassigning opts at line 41, so yeah, it wont fail.

My bad :(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries 🙂

opts.amqp.prefetch = 1;

// Number of milliseconds before an event expires
if (typeof opts.amqp.eventTimeToLive !== "number")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This too

@icebob
Copy link
Member

icebob commented Aug 13, 2017

Thanks @Nathan-Schwartz . Great job! I'm on holiday now, but I'm going to review it next week.

@@ -498,6 +501,13 @@ class Transit {
* @memberOf Transit
*/
publish(packet) {
if (this.subscribing) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it solve an issue that publish a packet before subscriptions are ready

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thats correct.

this.logger.error("AMQP connection closed!", crashWorthy && err || "");
})
.on("blocked", (reason) => {
this.logger.warn("AMQP connection blocked!", reason);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's mean bocker & unblocked?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the docs I read that a RabbitMQ server (after version 3.2.0) can decide to block a connection.

Typically it will do this if there is some resource shortage, e.g., memory, and messages are published on the connection. - amqp.node

I don't know how common this error is, but I thought logging would help with debugging.

})
.filter(a => a);

xdescribe("Test AMQPTransporter events", () => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is xdescribe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

xdescribe is the same as describe.skip, I did this for the integration tests I wrote so the describe block won't run. These tests require an AMQP server to be running at amqp://guest:guest@localhost:5672, so I don't think they would pass in the current CI environment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I didn't know xdescribe :)

@icebob
Copy link
Member

icebob commented Aug 15, 2017

Integration test is perfect. But in case of dependency we can't running it in npm test. Maybe we need to move out from jest scope and create an other npm script which can run it. But I'm going to consider it.
Tomorrow I'm going to test is with a RabbitMQ server.

By the way, very nice PR. Congratulation 😉 👍 (I like that you followed my coding styles)

@Nathan-Schwartz
Copy link
Member Author

I think another script makes sense 👍

Thank you! I really enjoyed working on Moleculer, it is an awesome project!

@icebob
Copy link
Member

icebob commented Aug 16, 2017

@Nathan-Schwartz I'm trying to connect a remote RabbitMQ, but I got ECONNREFUSED from 127.0.0.1.

// Create broker
let broker = new ServiceBroker({
	namespace: "multi",
	nodeID: process.argv[2] || "server-" + process.pid,
	transporter: "amqp://guest:[email protected]:5672",
	logger: console
});

Err:

[2017-08-16T12:36:15.502Z] ERROR server-8120/TRANSPORTER: AMQP failed to connect! { Error: connect ECONNREFUSED 127.0.0.1:5672
    at Object.exports._errnoException (util.js:1018:11)
    at exports._exceptionWithHostPort (util.js:1041:20)
    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1090:14)
  cause:
   { Error: connect ECONNREFUSED 127.0.0.1:5672
       at Object.exports._errnoException (util.js:1018:11)
       at exports._exceptionWithHostPort (util.js:1041:20)
       at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1090:14)
     code: 'ECONNREFUSED',
     errno: 'ECONNREFUSED',
     syscall: 'connect',
     address: '127.0.0.1',
     port: 5672 },
  isOperational: true,
  code: 'ECONNREFUSED',
  errno: 'ECONNREFUSED',
  syscall: 'connect',
  address: '127.0.0.1',
  port: 5672 }

@icebob
Copy link
Member

icebob commented Aug 16, 2017

I tried the dev/amqp.js with let open = require("amqplib").connect("amqp://192.168.51.29"); and it's working, no connection error.

I think (API) the amqp.connect first parameter is always an URI. But here it always got an Object.

@Nathan-Schwartz
Copy link
Member Author

@icebob You are correct, I should have passed in this.opts.amqp.url, not this.opts.amqp. I am not quite sure how it was connecting successfully earlier 🤔. Anyways, I just made the change and added it to the first commit.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.3%) to 99.135% when pulling fcae3b0 on Nathan-Schwartz:amqp-transporter into 9d59ed3 on ice-services:master.

1 similar comment
@coveralls
Copy link

Coverage Status

Coverage decreased (-0.3%) to 99.135% when pulling fcae3b0 on Nathan-Schwartz:amqp-transporter into 9d59ed3 on ice-services:master.

@icebob
Copy link
Member

icebob commented Aug 16, 2017

Thanks. Btw, how could you modify an old commit? :)

@icebob
Copy link
Member

icebob commented Aug 16, 2017

I tested. It seems it's working properly ;)

@icebob
Copy link
Member

icebob commented Aug 16, 2017

A minor difference. In all other transporter if the connection lost, trying reconnect in 5 sec. But in amqp,

NATS:

[2017-08-16T16:31:15.442Z] INFO  server-4064/BROKER: 'file' service is registered!
[2017-08-16T16:31:15.451Z] INFO  server-4064/BROKER: 'math' service is registered!
[2017-08-16T16:31:15.459Z] INFO  server-4064/TRANSIT: Connecting to the transporter...
[2017-08-16T16:31:15.509Z] INFO  server-4064/TRANSPORTER: NATS client is connected!
[2017-08-16T16:31:15.532Z] INFO  server-4064/BROKER: Broker started.
[2017-08-16T16:31:24.121Z] WARN  server-4064/TRANSPORTER: NATS client is disconnected!
[2017-08-16T16:31:26.122Z] WARN  server-4064/TRANSPORTER: NATS client is reconnecting...
[2017-08-16T16:31:29.129Z] WARN  server-4064/TRANSPORTER: NATS client is reconnecting...
[2017-08-16T16:31:32.155Z] WARN  server-4064/TRANSPORTER: NATS client is reconnecting...
[2017-08-16T16:31:35.158Z] WARN  server-4064/TRANSPORTER: NATS client is reconnecting...
[2017-08-16T16:31:38.181Z] WARN  server-4064/TRANSPORTER: NATS client is reconnecting...
[2017-08-16T16:31:38.184Z] INFO  server-4064/TRANSPORTER: NATS client is reconnected!

AMQP:

[2017-08-16T16:33:30.806Z] INFO  server-3328/BROKER: 'file' service is registered!
[2017-08-16T16:33:30.814Z] INFO  server-3328/BROKER: 'math' service is registered!
[2017-08-16T16:33:30.822Z] INFO  server-3328/TRANSIT: Connecting to the transporter...
[2017-08-16T16:33:30.946Z] INFO  server-3328/TRANSPORTER: AMQP connected!
[2017-08-16T16:33:30.956Z] INFO  server-3328/TRANSPORTER: AMQP channel created!
[2017-08-16T16:33:31.100Z] INFO  server-3328/BROKER: Broker started.
[2017-08-16T16:33:53.759Z] ERROR server-3328/TRANSPORTER: AMQP connection error! { Error: write ECONNRESET
    at exports._errnoException (util.js:1018:11)
    at Socket._writeGeneric (net.js:715:26)
    at Socket._write (net.js:734:8)
    at doWrite (_stream_writable.js:334:12)
    at writeOrBuffer (_stream_writable.js:320:5)
    at Socket.Writable.write (_stream_writable.js:247:11)
    at Socket.write (net.js:661:40)
    at Connection.C.sendMethod (d:\Work\JS\ice-services\moleculer\node_modules\amqplib\lib\connection.js:531:17)
    at Object.accept (d:\Work\JS\ice-services\moleculer\node_modules\amqplib\lib\connection.js:87:18)
    at Connection.mainAccept [as accept] (d:\Work\JS\ice-services\moleculer\node_modules\amqplib\lib\connection.js:63:33)
    at Socket.go (d:\Work\JS\ice-services\moleculer\node_modules\amqplib\lib\connection.js:476:48)
    at emitNone (events.js:86:13)
    at Socket.emit (events.js:185:7)
    at emitReadable_ (_stream_readable.js:432:10)
    at emitReadable (_stream_readable.js:426:7)
    at readableAddChunk (_stream_readable.js:187:13) code: 'ECONNRESET', errno: 'ECONNRESET', syscall: 'write' }
[2017-08-16T16:33:53.761Z] WARN  server-3328/TRANSPORTER: AMQP channel closed!
[2017-08-16T16:33:53.763Z] ERROR server-3328/TRANSPORTER: AMQP connection closed! { Error: write ECONNRESET
    at exports._errnoException (util.js:1018:11)
    at Socket._writeGeneric (net.js:715:26)
    at Socket._write (net.js:734:8)
    at doWrite (_stream_writable.js:334:12)
    at writeOrBuffer (_stream_writable.js:320:5)
    at Socket.Writable.write (_stream_writable.js:247:11)
    at Socket.write (net.js:661:40)
    at Connection.C.sendMethod (d:\Work\JS\ice-services\moleculer\node_modules\amqplib\lib\connection.js:531:17)
    at Object.accept (d:\Work\JS\ice-services\moleculer\node_modules\amqplib\lib\connection.js:87:18)
    at Connection.mainAccept [as accept] (d:\Work\JS\ice-services\moleculer\node_modules\amqplib\lib\connection.js:63:33)
    at Socket.go (d:\Work\JS\ice-services\moleculer\node_modules\amqplib\lib\connection.js:476:48)
    at emitNone (events.js:86:13)
    at Socket.emit (events.js:185:7)
    at emitReadable_ (_stream_readable.js:432:10)
    at emitReadable (_stream_readable.js:426:7)
    at readableAddChunk (_stream_readable.js:187:13) code: 'ECONNRESET', errno: 'ECONNRESET', syscall: 'write' }
[2017-08-16T16:33:53.764Z] ERROR server-3328/TRANSPORTER: AMQP connection closed!

d:\Work\JS\ice-services\moleculer\node_modules\amqplib\lib\channel.js:149
    throw new IllegalOperationError(msg, stack);
    ^
IllegalOperationError: Channel closed
    at Channel.<anonymous> (d:\Work\JS\ice-services\moleculer\node_modules\amqplib\lib\channel.js:149:11)
    at Channel.C.publish (d:\Work\JS\ice-services\moleculer\node_modules\amqplib\lib\channel_model.js:158:15)
    at AmqpTransporter.publish (d:\Work\JS\ice-services\moleculer\src\transporters\amqp.js:319:17)
    at Transit.publish (d:\Work\JS\ice-services\moleculer\src\transit.js:512:18)
    at Transit.emit (d:\Work\JS\ice-services\moleculer\src\transit.js:216:8)
    at ServiceBroker.emit (d:\Work\JS\ice-services\moleculer\src\service-broker.js:1122:17)
    at Timeout.setInterval [as _onTimeout] (d:\Work\JS\ice-services\moleculer\examples\multi-server\server.js:35:9)
    at ontimeout (timers.js:380:14)
    at tryOnTimeout (timers.js:244:5)
    at Timer.listOnTimeout (timers.js:214:5)

And the process exited.
Could you modify it to similar logic?

@Nathan-Schwartz
Copy link
Member Author

Nathan-Schwartz commented Aug 16, 2017

@icebob I do it using git commit --amend if I want to make changes to the last commit. Otherwise, I use git rebase -i <commit> to squash, rename commits, and change old commits.

It can create problems if you change a commit that is already merged though :)

@Nathan-Schwartz
Copy link
Member Author

I wanted to implement reconnection, but its a bit tricky.

Unfortunately right now amqp.node doesn't handle connection recovery:

When the connection is lost, we could still try something like

setTimeout(() => this.connect(), 5000);

But the problem is that we don't know if all of the queues and exchanges were created successfully, and the exchanges might not be bound.

To make sure that all exchanges, queues, and bindings are ready we would need Transit to call makeSubscriptions again, and also send another <prefix>.INFO packet so we can call _makeServiceSpecificSubscriptions to initialize queues for requests.

If we assume that all queues, bindings, and exchanges are fine, reconnecting could work in some cases. I think there would be some edge-cases we couldn't handle well, though.

I tend to think that the transporter will behave more predictably if we don't try to reconnect.

What do you think?

@icebob
Copy link
Member

icebob commented Aug 16, 2017

The reconnecting logic is implemented in transit, not in transporters. Just need to reject the connect promise correctly. I'm trying to fix it.

@Nathan-Schwartz
Copy link
Member Author

Great! I think it is ready 🙂

@icebob icebob merged commit 5c58752 into moleculerjs:master Aug 16, 2017
@icebob
Copy link
Member

icebob commented Aug 16, 2017

Merged. Very good job. Thanks again!

Further task: It would be good to separate amqp integration test, skip it in jest when we call npm test or npm run ci. Dou you have any idea how we can do it?

Brainstorming: What do you think. Because of RabbitMQ has load balancing solution. This new AMQP transporter can work without Moleculer built-in load balancing? I think transporter disable load balancing in broker. But I have no experience with AMQP and I can't consider it completely :(

@icebob icebob mentioned this pull request Aug 17, 2017
12 tasks
@icebob
Copy link
Member

icebob commented Aug 19, 2017

@Nathan-Schwartz What do you think?

@Nathan-Schwartz
Copy link
Member Author

@icebob For the tests we could try a folder structure like this:

test
  |- unit
  `- integration
    |- internal
      |- broker-transit.spec.js
      |- serializer.spec.js
      `- (etc)
    |- thirdParty
      |- amqp
      `- (redis / nats / bull / etc)

and then we would ignore the thirdParty directory for the ci and test scripts. I'm not sure what the best solution is, I will keep thinking about it.

I think the AMQP transporter would work without Moleculer's load balancing. Are you thinking about making the transporter work with Moleculer load balancing, or disabling Moleculer's load balancing when the AMQP Transporter is used?

@icebob
Copy link
Member

icebob commented Aug 20, 2017

Thanks @Nathan-Schwartz!

  1. I solved this issue. I renamed the *.spec.js files to *.spc.js, and added a new NPM script as "test:amqp" which is running only *.spc.js files. You can check it in master.

  2. I think that some transporters can disable Moleculer's built-in load balancer if it has own load balancer like RabbitMQ or NATS Streaming.
    It seems, I have to implement load balancing for EVENTS packets too to support Event Driven arch. And if I have to rewrite service registry maybe I will implement a switchable load balancer.

By the way within some days I will open a PR about AMQP optimization because it doesn't need to call makeServiceSpecifigSubscriptions in publish when send an INFO packet, because it can access to services if it call the this.transit.getNodeInfo(). This method returns a service list same as INFO packet contains. So it can also call it in subscribe method. Other benefits is the INFO packet will be able to send multiple times (in case of hot reload or dynamic service loading) and in this case it subscribes multiple times to the same queues.

@Nathan-Schwartz
Copy link
Member Author

@icebob Sounds good!

I didn't know about this.transit.getNodeInfo, that sounds like a great solution 🙂

Currently it is re-asserting the REQ queues for every INFO packet, but if a service is loaded dynamically none of the new actions will a REQ queue yet. So I think makeServiceSpecificSubscriptions needs to be called every time a service is loaded.

One way to avoid re-asserting existing queues would be to track them in this.queues (similarly to this.bindings).

@icebob
Copy link
Member

icebob commented Aug 20, 2017

The other problem if I destroy a service. How can I remove asserted REQ queues? Could you show an example?

@Nathan-Schwartz
Copy link
Member Author

@icebob I don't think the REQ queues or any exchanges should be destroyed. They could be in use by other services and destroying them would break the other nodes.

Note:

  • If no Moleculer services were running with the AMQP transporter, the exchanges could be safely destroyed.
  • Technically, all of the node-specific queues can be safely destroyed (for all packet types besides REQ).

If there was only one node for a service, a REQ queue could be destroyed without breaking things.
If there were multiple nodes, you would need to wait until all of them went offline before deleting the queue. (Otherwise it would break the other nodes of this service.)

Even if it is possible to delete a queue without crashing nodes, I don't think we should do it. AMQP queues are often used as a Work Queue, so multiple nodes could be pulling from the same queue and there may be many messages waiting to be processed. If we delete the queue, we would delete all of the messages and this could be very problematic for a user.

@icebob
Copy link
Member

icebob commented Aug 21, 2017

Thanks Nathan, It's good to know!

@icebob
Copy link
Member

icebob commented Aug 21, 2017

I tried the deleteQueue in disconnect but I got "Channel ended" exceptions and I didn't know what was the reason. So now I set autoDelete: true to queues. It removes if no any consumers for the queue. It's working properly, but I don't know is it a good/best solution? @Nathan-Schwartz please review my changes in amqp-impr branch.

https://github.com/ice-services/moleculer/compare/amqp-impr?expand=1#diff-b2879eb0ba697fd1aae416c88481d346

Thanks in advance!

@Nathan-Schwartz
Copy link
Member Author

@icebob It looks good 👍

My only comment would be that I don't think we should use autoDelete for PACKET_REQUEST and PACKET_RESPONSE. I haven't tested this, but I think that it will delete the queue even if it still has messages.

It should be fine for the other packets since the messages will be removed after 5 seconds anyways.

@icebob
Copy link
Member

icebob commented Aug 21, 2017

Ohh, I thought it would be deleted when it's empty and no consumers. :(
In this case I remove this attribute from REQ and RES queues. Thanks!

@rhzs
Copy link
Contributor

rhzs commented Aug 21, 2017

@Nathan-Schwartz next time, please consider to use Buffer.from instead of new Buffer. It is being deprecated for security reason.
as seen on NodeJS 6LTS doc - https://nodejs.org/dist/latest-v6.x/docs/api/buffer.html#buffer_new_buffer_array
cc @icebob

@icebob
Copy link
Member

icebob commented Aug 21, 2017

Thanks. I changed it in other branch

@Nathan-Schwartz
Copy link
Member Author

@icebob I just found out about the ifEmpty and ifUnused options for channel#deleteQueue. We could try to use this in disconnect for REQ and RES queues if you still want to.

@icebob
Copy link
Member

icebob commented Aug 30, 2017

@Nathan-Schwartz It would be good. Could you create a PR?

@icebob icebob mentioned this pull request Sep 8, 2017
@icebob
Copy link
Member

icebob commented Sep 24, 2017

Hi @Nathan-Schwartz, I rewrote the AMQP integration tests. I used multiple broker & transporters in single files instead of multi processes. Here is my tests: https://github.com/ice-services/moleculer/tree/next/test/integration/amqp

What do you think, is it cover all test cases from your original tests?

@Nathan-Schwartz
Copy link
Member Author

@icebob Good idea! It looks good, but there are a few small things I'm noticing. Would you be willing to open a PR so it is easier to review / comment on?

@icebob
Copy link
Member

icebob commented Sep 24, 2017

@Nathan-Schwartz Thanks. I already merged it, but I think you can add comments to the PR.

#106

@Nathan-Schwartz
Copy link
Member Author

@icebob Sorry it took me so long to do a proper review, I'll try to be faster in the future.

I started looking into the new AMQP Transporter and I think using the AMQP load-balancer and grouped events are cool ideas.

I wanted to ping you because I'm noticing some issues.

Two of the new tests for disabled load balancer fail:

  • should retry unacked requests to other node
  • Should use availability-based load balancing

The new integration tests don't include the following tests, and the old tests for these cases fail:

  • Should not have a request or response dequeued until it has been received
  • Should use availability-based load balancing
  • Nodes should only receive one request at a time by default.

I tried to figure out why the tests fail and I found that we are no longer waiting for the action handler to complete before acknowledging messages. This means that prefetch is ineffective and AMQP can no longer be used to implement work queues. What was the reason for this change?

I am happy to help resolve the above issues and write more tests in the new style, if you are open to it.

Also, I want to suggest that we add a RabbitMQ server to Travis so that we can run integration tests before merging changes. Do you think that would be possible?

Thanks!
Nathan

@icebob
Copy link
Member

icebob commented Oct 3, 2017

Hi @Nathan-Schwartz,

thank you for your response.
The main changes is that I removed promise returns from messageHandler. So the ack will be called after the message received to transit and not if the message processed & response sent. It seems it wasn't a good idea, but I can fix it.

I would be happy if you can make the mentioned tests cases (would be fail). And after it I can start to fix the messageHandler code that the tests will be success.

Could you help me to write the test cases?

@Nathan-Schwartz
Copy link
Member Author

@icebob I see, thanks for the explanation.

Sounds like a plan! I will start working on the test cases today 👍

What do you think about adding a RabbitMQ server to TravisCI?

@icebob
Copy link
Member

icebob commented Oct 3, 2017

Great!

RabbitMQ on Travis would be good, but I want to skip amqp integration tests from npm test. So we could do it, if we change the script in travis.yml to npm test && npm test:amqp. Ok?

@Nathan-Schwartz
Copy link
Member Author

@icebob Sounds good to me!

@Nathan-Schwartz
Copy link
Member Author

@icebob Sometimes I am getting an error in the test case Messages that haven't finished processing should be retryable by other nodes. The error is IllegalOperationError: Channel closing.

I looked into it for a while and it seems to happen when publishing the DISCONNECT packet from worker2 and worker3.

I tried turning off autoDelete for DISCONNECT packets and not using the purge function in case it was deleting the queue's before the test finished, but I wasn't able to make the test pass consistently.

It doesn't always fail but it could cause some inconvenience in CI, so I would be okay marking it as pending for now until we figure it out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants