Skip to content

Commit

Permalink
found axon cannot with our project
Browse files Browse the repository at this point in the history
  • Loading branch information
mr-kelly committed Jul 28, 2015
1 parent 6f412df commit 46264e2
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 41 deletions.
3 changes: 1 addition & 2 deletions lib/actor/actorManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ ActorManager.prototype.init = function () {
return new Promise(function (resolve, reject) {
var randomPort = require('random-port');
randomPort(function(port) {

$this.rpcPort = port;

logger.debug("Get random port for rpcServer: %s", $this.rpcPort);

// get a randome port for RpcServer
logger.info('=== ActorManager inited! ===');
logger.info('=== ActorManager inited!, rpcPort: %s ===', $this.rpcPort);
resolve();
})

Expand Down
2 changes: 1 addition & 1 deletion lib/actor/etcManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ EtcManager.prototype.getActors = function (forceRetry) {
} else {
var infos = value;
resolve(infos);
logger.info('================== emit onActorsChanged');
logger.trace('================== emit onActorsChanged : %s', _.keys(infos).length);
$this.emit('onActorsChanged', infos);

$this.actorsEtc = infos;
Expand Down
4 changes: 2 additions & 2 deletions lib/actor/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ var CosmosActor = function CosmosActor(appConfig, actorConfig) {

CosmosActor.prototype.start = function () {
var $this = this;
return $this.etc.init().then(function () {
return $this.actorMgr.init();
return $this.actorMgr.init().then(function () {
return $this.etc.init();
}).then(function () {
return $this.rpc.init();
}).then(function () {
Expand Down
58 changes: 30 additions & 28 deletions lib/actor/rpc/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@

var EtcManager = require('../etcManager');
var ActorManager = require('../actorManager');
// var zerorpc = require('axon-rpc');
var rpc = require('axon-rpc')
, axon = require('axon')
, req = axon.socket('req');

var util = require('util');
var RpcClient = require('./rpcClient');
Expand Down Expand Up @@ -50,15 +46,17 @@ Rpc.prototype.init = function () {
};

Rpc.prototype.call = function(actorName, funcName) {
var client = this.rpcClients[actorName];
var $this = this;

var client = $this.rpcClients[actorName];
if (!client) {
logger.error("[Rpc:call]Not found actor: %s", actorName);
return undefined;
return Promise.reject(new Error("[Rpc:call]Not found actor: " + actorName));
}

var funcArgs = Array.prototype.slice.call(arguments, 2, arguments.length - 1);

logger.debug("[Rpc:call] actor: %s, funcName: %s, args: %s", actorName, funcName, funcArgs);
logger.debug("[Rpc:call] to actor: %s, funcName: %s, args: %s", actorName, funcName, funcArgs);

return client.call(funcName, funcArgs);
}
Expand All @@ -68,9 +66,7 @@ Rpc.prototype.register = function () {
return this.etcManager.registerActor(this.actor).then(function (isSuccess) {
if (!isSuccess) {
logger.error('Error registerActor!');
return new Promise(function (resolve) {
resolve({});
});
return Promise.reject(new Error('Error registerActor!'));
} else {
logger.info('Success register rpc actor!');
}
Expand All @@ -82,35 +78,41 @@ Rpc.prototype.onActorsChanged = function (actors) {

// get actors, function init
for (var key in actors) {
var actor = actors[key];
var actorName = actor.name;
var funcs = actor.functions;

// create the Rpc client
var client = new rpc.Client(req);
var rpcHost = actor.host;
var rpcPort = actor.rpcPort;
var serverUri = 'tcp://' + rpcHost + ':' + rpcPort;

var otherActor = actors[key];
// except self
if (rpcHost == this.actor.config.host && rpcPort == this.actor.rpcPort) {
logger.info('Ignore self when rpc creatation');
if (otherActor.host == this.actor.config.host &&
otherActor.rpcPort == this.actor.rpcPort) {
logger.debug('[IGNORE]Ignore self when rpc creatation');
continue;
}
logger.info('Connect to RPC Server: ' + serverUri);

req.connect(serverUri);
var rpcClient = this.rpcClients[otherActor.name];

if (rpcClient) {
if (rpcClient.host == otherActor.host &&
rpcClient.rpcPort == otherActor.rpcPort) {
logger.debug(
"[EXIST]exist rpc client of %s-%s:%s, no need to create again",
otherActor.name, otherActor.host, otherActor.rpcPort);
continue;
} else {
logger.error("Diff actor: %s", otherActor.name);
rpcClient.close();
}
// TODO: close old

}
rpcClient = new RpcClient(otherActor);

var rpcClient = new RpcClient(client);

this.rpcClients[actor.name] = rpcClient;
this.rpcClients[otherActor.name] = rpcClient;

// proxy to rpc
// var proxyFuncs = {};

// define the Actor into this rpc class
// define the otherActor into this rpc class
// this[actorName] = proxyFuncs;
logger.info('[Bind RPC] bind %s to self:%s', actorName, this.actor.name);
logger.info('[Bind RPC] bind %s to self:%s', otherActor.name, this.actor.name);
// rpc function create
// funcs.forEach(function (funcName) {
// logger.info(util.format('Create Function: %s, From actor: %s', funcName, actor.host + ':' + actor.rpcPort));
Expand Down
25 changes: 24 additions & 1 deletion lib/actor/rpc/rpcClient.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,35 @@
'use strict';
// var zerorpc = require('axon-rpc');
var rpc = require('axon-rpc')
, axon = require('axon');

var logger = require('../../logger').getLogger('cosmos');
var Promise = require('bluebird');
var _ = require('underscore');

var RpcClient = function RpcClient(client) {
var RpcClient = function RpcClient(actor) {
this.actor = actor;
var actorName = actor.name;
var funcs = actor.functions;

this.req = axon.socket('req');

// create the Rpc client
var client = new rpc.Client(this.req);
this.host = actor.host;
this.rpcPort = actor.rpcPort;
this.serverUri = 'tcp://' + this.host + ':' + this.rpcPort;

logger.debug('Connect to RPC Server: ' + this.serverUri);

this.req.connect(this.serverUri);

this.client = client;
};

RpcClient.prototype.close = function() {
this.req.close();
}
RpcClient.prototype.call = function (funcName, funcArgs) {
var _this = this;

Expand All @@ -23,6 +45,7 @@ RpcClient.prototype.call = function (funcName, funcArgs) {
}
applyArgs.push(newFuncArgs);
applyArgs.push(function (error, res) {

if (error) {
console.error(error);
reject(error);
Expand Down
12 changes: 8 additions & 4 deletions lib/actor/rpc/rpcServer.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var logger = require('../../logger').getLogger('cosmos');

var RpcServer = function RpcServer(actor) {

var $this = this;
this.actor = actor;
// Self Actor Rpc Server
var server =
Expand All @@ -23,7 +24,10 @@ var RpcServer = function RpcServer(actor) {
var reply = arguments[arguments.length - 1];

actor.onRpcCall(funcName, funcArgs).then(function (ret) {
console.error(ret);
logger.error(ret);


logger.error("======= i am " + $this.actor.name);
reply(null, ret);
});
}
Expand All @@ -34,11 +38,11 @@ var RpcServer = function RpcServer(actor) {
// }
});

// var uri = util.format('tcp://0.0.0.0:%s', actor.rpcPort);
var uri = util.format('tcp://0.0.0.0:%s', actor.rpcPort);
// this.rpcServer.bind(actor.rpcPort);
rep.bind(actor.rpcPort);
rep.bind(uri);

logger.debug('Create Rpc Server: ' + actor.rpcPort);
logger.debug('Create Rpc Server: ' + uri);

// this.rpcServer.on('error', function (error) {
// logger.error('RPC server error:', error);
Expand Down
1 change: 0 additions & 1 deletion lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ Cosmos.prototype.start = function() {

var appConfig = $this.config.app;
var actorConfig = $this.config.actors[$this.actorName];
console.dir($this.actorTypes);
actorConfig.handler = $this.actorTypes[actorConfig.type].handler;
actorConfig.remote = $this.actorTypes[actorConfig.type].remote;
if (actorConfig.handler == null) {
Expand Down
3 changes: 1 addition & 2 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,11 @@ describe('rpc', function() {
.then(function() {
return actor2.rpc.call('actor1', 'rpcTestFromActor2')
.then(function(ret) {
logger.info("RPC from node 2");
assert.equal(ret, "from node 2")

done();


}).catch(function(err) {
logger.error(err);
assert.equal(false);
Expand Down

0 comments on commit 46264e2

Please sign in to comment.