Skip to content

Commit

Permalink
feat: Support multiple wssER's in same network (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
rentallect authored Dec 6, 2023
1 parent b166817 commit 8957238
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 47 deletions.
37 changes: 20 additions & 17 deletions src/channel/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,17 +221,15 @@ class ZitiChannel {
*/
async hello() {

this._zitiContext.logger.trace('ZitiChannel.hello() ch[%d] entered', this._id);
this._zitiContext.logger.trace('ZitiChannel.hello() ch[%d] wssER[%s] entered', this._id, this._edgeRouterHost);

await this._zws.open();

this._zitiContext.logger.trace('ZitiChannel.hello() ch[%d] _zws.open completed', this._id);
this._zitiContext.logger.trace('ZitiChannel.hello() ch[%d] wssER[%s] _zws.open completed', this._id, this._edgeRouterHost);

if (this.isHelloCompleted) {
this._zitiContext.logger.trace('ZitiChannel.hello() ch[%d] Hello handshake was previously completed', this._id);
return new Promise( async (resolve) => {
resolve( {channel: this, data: null});
});
this._zitiContext.logger.trace('ZitiChannel.hello() ch[%d] wssER[%s] Hello handshake was previously completed', this._id, this._edgeRouterHost);
return( {channel: this, data: null, helloCompletedDuration: this._helloCompletedDuration, edgeRouterHost: this._edgeRouterHost} );
}

if (isEqual(this._callerId, "ws:")) {
Expand All @@ -248,17 +246,17 @@ class ZitiChannel {

await this._tlsConn.create();

this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] initiating TLS handshake', this._id);
this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] wssER[%s] initiating TLS handshake', this._id, this._edgeRouterHost);

await this._tlsConn.handshake();

await this.awaitTLSHandshakeComplete();

this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] TLS handshake complete', this._id);
this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] wssER[%s] TLS handshake complete', this._id, this._edgeRouterHost);

}

this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] initiating message: ZitiEdgeProtocol.content_type.HelloType: ', this._id, ZitiEdgeProtocol.header_type.StringType);
this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] wssER[%s] initiating message: ZitiEdgeProtocol.content_type.HelloType: ', this._id, this._edgeRouterHost, ZitiEdgeProtocol.header_type.StringType);
let uuid = uuidv4();

let headers = [
Expand All @@ -278,18 +276,19 @@ class ZitiChannel {

let sequence = this.getAndIncrementSequence();

this._helloStartedTimestamp = Date.now();

let msg = await this.sendMessage( ZitiEdgeProtocol.content_type.HelloType, headers, null, {
sequence: sequence,
});

this._helloCompletedTimestamp = Date.now();
this._helloCompletedDuration = this._helloCompletedTimestamp - this._helloStartedTimestamp; //in ms
this._helloCompleted = true;
this.state = (ZitiEdgeProtocol.conn_state.Connected);
this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] Hello handshake to Edge Router [%s] completed at timestamp[%o]', this._id, this._edgeRouterHost, this._helloCompletedTimestamp);
this._zitiContext.logger.debug('ZitiChannel.hello() ch[%d] wssER[%s] Hello handshake completed at timestamp[%o]', this._id, this._edgeRouterHost, this._helloCompletedTimestamp);

return new Promise( async (resolve) => {
resolve( {channel: this, data: null});
});
return( {channel: this, data: null, helloCompletedDuration: this._helloCompletedDuration, edgeRouterHost: this._edgeRouterHost} );

}

Expand All @@ -301,7 +300,7 @@ class ZitiChannel {

const self = this;

self._zitiContext.logger.debug('initiating Connect to Edge Router [%s] for conn[%d]', this._edgeRouterHost, conn.id);
self._zitiContext.logger.debug('initiating Connect to wssER[%s] for conn[%d]', this._edgeRouterHost, conn.id);

await sodium.ready;

Expand Down Expand Up @@ -357,7 +356,7 @@ class ZitiChannel {

conn.state = (ZitiEdgeProtocol.conn_state.Connecting);

self._zitiContext.logger.debug('about to send Connect to Edge Router [%s] for conn[%d]', conn.channel.edgeRouterHost, conn.id);
self._zitiContext.logger.debug('about to send Connect to wssER[%s] for conn[%d]', conn.channel.edgeRouterHost, conn.id);

let msg = await self.sendMessage( ZitiEdgeProtocol.content_type.Connect, headers, self._network_session_token, {
conn: conn,
Expand All @@ -380,7 +379,7 @@ class ZitiChannel {
const self = this;
return new Promise( async (resolve, reject) => {

self._zitiContext.logger.debug('initiating Close to Edge Router [%s] for conn[%d]', this._edgeRouterHost, conn.id);
self._zitiContext.logger.debug('initiating Close to wssER[%s] for conn[%d]', this._edgeRouterHost, conn.id);

let sequence = conn.getAndIncrementSequence();
let uuid = uuidv4();
Expand All @@ -404,7 +403,7 @@ class ZitiChannel {

];

self._zitiContext.logger.debug('about to send Close to Edge Router [%s] for conn[%d]', conn.channel.edgeRouterHost, conn.id);
self._zitiContext.logger.debug('about to send Close to wssER[%s] for conn[%d]', conn.channel.edgeRouterHost, conn.id);

self.sendMessageNoWait( ZitiEdgeProtocol.content_type.StateClosed, headers, self._network_session_token, {
conn: conn,
Expand Down Expand Up @@ -960,6 +959,10 @@ class ZitiChannel {
*/
async _recvFromWireAfterDecrypt(ch, data) {

if (isEqual(ch._state, ZitiEdgeProtocol.conn_state.Closed)) {
return;
}

let buffer = data;

if (!isUndefined(ch._partialMessage)) { // if we are awaiting rest of a partial msg to arrive, append this chunk onto the end, then proceed
Expand Down
93 changes: 64 additions & 29 deletions src/context/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ class ZitiContext extends EventEmitter {

this._timeout = ZITI_CONSTANTS.ZITI_DEFAULT_TIMEOUT;

this._didInitialGetPendingChannelConnects = false;

}

get libCrypto () {
Expand Down Expand Up @@ -1404,31 +1406,28 @@ class ZitiContext extends EventEmitter {
*/
async _getPendingChannelConnects(conn, edgeRouters) {

return new Promise( async (resolve) => {
this.logger.trace('_getPendingChannelConnects entered');

this.logger.trace('_getPendingChannelConnects entered for edgeRouters [%o]', edgeRouters);
let pendingChannelConnects = new Array();

let pendingChannelConnects = new Array();
let self = this;

// Get a channel connection to each of the Edge Routers that have a WSS binding, initiating a connection if channel is not yet connected
for (var i = 0; i < edgeRouters.length; i++) {

self.logger.trace('calling getChannelByEdgeRouter for wssER [%s]', edgeRouters[i].hostname);
let ch = await self.getChannelByEdgeRouter(conn, edgeRouters[i]).catch((err) => {
self.logger.error( err );
throw new Error( err );
});
self.logger.debug('initiating Hello to [%s] for session[%s]', self.getEdgeRouterURL(edgeRouters[i]), conn.networkSessionToken);
pendingChannelConnects.push(
ch.hello()
);

let self = this;

// Get a channel connection to each of the Edge Routers that have a WS binding, initiating a connection if channel is not yet connected
edgeRouters.forEach(async function(edgeRouter, idx, array) {
self.logger.trace('calling getChannelByEdgeRouter for ER [%o]', edgeRouter);
let ch = await self.getChannelByEdgeRouter(conn, edgeRouter).catch((err) => {
self.logger.error( err );
throw new Error( err );
});
self.logger.debug('initiating Hello to [%s] for session[%s]', self.getEdgeRouterURL(edgeRouter), conn.networkSessionToken);
pendingChannelConnects.push(
ch.hello()
);
};

if (idx === array.length - 1) {
resolve(pendingChannelConnects); // Return to caller only after we have processed all edge routers
}
});
});
return pendingChannelConnects;
}


Expand Down Expand Up @@ -1549,7 +1548,7 @@ class ZitiContext extends EventEmitter {
let result = {};

find(Array.from(this._channels), function(obj) {
if (isEqual( obj[1]._edgeRouterHost, edgeRouter )) {
if (isEqual( obj[1][0]._edgeRouterHost, edgeRouter )) {
result.key = obj[0];
result.ch = obj[1];
return true;
Expand All @@ -1571,6 +1570,8 @@ class ZitiContext extends EventEmitter {
* @param {*} networkSession
*/
async connect(conn, networkSession) {

let self = this;

this.logger.debug('connect() entered for conn[%o] networkSession[%o]', conn.id, networkSession);

Expand All @@ -1597,11 +1598,44 @@ class ZitiContext extends EventEmitter {
this.logger.debug('now own _connectMutex for conn[%o]', conn.id);

let pendingChannelConnects = await this._getPendingChannelConnects(conn, edgeRouters);
this.logger.trace('pendingChannelConnects [%o]', pendingChannelConnects);

let channelWithNearestEdgeRouter = await Promise.race( pendingChannelConnects );
channelWithNearestEdgeRouter = channelWithNearestEdgeRouter.channel;
this.logger.debug('Channel [%d] has nearest Edge Router for conn[%o]', channelWithNearestEdgeRouter.id, conn.id);

let nearestEdgeRouter;

if (!this._didInitialGetPendingChannelConnects) {

// The first time through, we will only wait for one wssER connect to complete, and
// will select it as the "nearest". Other, slower, wssER connects will continue to
// run, and eventually complete in the background, but we will not wait for them here
// since that would impede performance.
nearestEdgeRouter = await Promise.race( pendingChannelConnects );

this.logger.trace(`Promise.race helloCompletedDuration time for wssER[${nearestEdgeRouter.edgeRouterHost}] was [${nearestEdgeRouter.helloCompletedDuration}]`);

this._didInitialGetPendingChannelConnects = true;

} else {

// Subsequently, we will wait for all wssER connects to complete, since they
// will most likely have done so before we get back here. We will then examine
// the helloCompletedDuration values across all wssERs and chose the one with
// the lowest value.

let edgeRouterConnects = await Promise.all( pendingChannelConnects );

let helloCompletedDuration = 999999999999;

find(edgeRouterConnects, function(edgeRouterConnect) {
self.logger.trace(`Promise.all helloCompletedDuration time for wssER[${edgeRouterConnect.edgeRouterHost}] was [${edgeRouterConnect.helloCompletedDuration}]`);
if (edgeRouterConnect.helloCompletedDuration < helloCompletedDuration) {
nearestEdgeRouter = edgeRouterConnect;
helloCompletedDuration = edgeRouterConnect.helloCompletedDuration;
}
});

}

let channelWithNearestEdgeRouter = nearestEdgeRouter.channel;
this.logger.debug('ch[%d] has nearest wssER[%s] for conn[%o]', channelWithNearestEdgeRouter.id, nearestEdgeRouter.edgeRouterHost, conn.id);
channelWithNearestEdgeRouter._connections._saveConnection(conn);
conn.channel = channelWithNearestEdgeRouter;

Expand Down Expand Up @@ -1956,9 +1990,10 @@ class ZitiContext extends EventEmitter {
closeChannelByEdgeRouter( edgeRouter ) {
let result = this.findChannelByEdgeRouter(edgeRouter);
if (result.key && result.ch) {
result.ch[0]._state = ZitiEdgeProtocol.conn_state.Closed;
this._channels.delete( result.key );
this._channelsById.delete( result.ch.id );
this.logger.warn('channel [%s] id[%d] deleted', result.key, result.ch.id);
this._channelsById.delete( result.ch[0].id );
this.logger.warn('channel [%s] id[%d] deleted', result.key, result.ch[0].id);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/websocket/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class ZitiWebSocket {
self._zitiContext.logger.debug("zws: waitForWSConnection: connection is now open");
callback();
} else {
self._zitiContext.logger.debug("zws: waitForWSConnection: wait...for %o", self);
self._zitiContext.logger.debug("zws: waitForWSConnection: wait...for %o", self.url);
self.waitForWSConnection(callback);
}
},
Expand Down

0 comments on commit 8957238

Please sign in to comment.