Skip to content

Commit

Permalink
fix: multi-MB writes fail under nestedTLS (#131)
Browse files Browse the repository at this point in the history
  • Loading branch information
rentallect authored Jan 11, 2024
1 parent 1421759 commit d966d09
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 71 deletions.
17 changes: 3 additions & 14 deletions src/channel/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -678,20 +678,9 @@ class ZitiChannel {
sendMessageNoWait(contentType, headers, body, options = {}) {
const timeout = options.timeout !== undefined ? options.timeout : this._timeout;
const messageId = options.sequence || this._sequence;
// this._zitiContext.logger.debug("send (no wait) -> conn[%o] seq[%o] contentType[%o]",
// (options.conn ? options.conn.id : 'n/a'),
// messageId,
// contentType,
// (body ? body.toString() : 'n/a'));

this._zitiContext.logger.debug(`send (no wait) -> ch[${this._id}] conn[${(options.conn ? options.conn.id : 'n/a')}] seq[${messageId}] contentType[${contentType}] bodyLen[${(body ? body.length : 'n/a')}] body[${(body ? body.toString() : 'n/a')}]`,
this._id,
(options.conn ? options.conn.id : 'n/a'),
messageId, contentType,
(body ? body.length : 'n/a')
,
(body ? body.toString() : 'n/a')
);

// this._zitiContext.logger.debug(`send (no wait) -> ch[${this._id}] conn[${(options.conn ? options.conn.id : 'n/a')}] seq[${messageId}] contentType[${contentType}] bodyLen[${(body ? body.length : 'n/a')}] body[${(body ? body.toString() : 'n/a')}]`);
this._zitiContext.logger.debug(`send (no wait) -> ch[${this._id}] conn[${(options.conn ? options.conn.id : 'n/a')}] seq[${messageId}] contentType[${contentType}] byteLength[${(body ? body.byteLength : 'n/a')}]`);

this._sendMarshaled(contentType, headers, body, options, messageId);
}
Expand Down
2 changes: 1 addition & 1 deletion src/context/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class ZitiContext extends EventEmitter {
//TEMP: we constrain HTTP requests that travel over nestedTLS to one-at-a-time for the moment.
// This will be removed as soon as I fix the TLS protocol collision issue that manifests
// when multiple HTTP requests are initiated simultaneously :(
this._fetchSemaphoreHTTPS = new Semaphore( 1 );
this._fetchSemaphoreHTTPS = new Semaphore( 8 );

this._pkey = null;
this._privateKeyPEM = null;
Expand Down
67 changes: 11 additions & 56 deletions src/http/ziti-inner-tls-socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,55 +21,6 @@ import { ZITI_CONSTANTS } from '../constants';

// import { Buffer } from 'buffer/'; // note: the trailing slash is important!

// class Queue {
// constructor(logger) {
// this.logger = logger;
// this.elements = {};
// this.head = 0;
// this.tail = 0;
// this.mutex = withTimeout(new Mutex(), 1 * 1000, new Error('timeout on Queue mutex'));
// }
// async enqueue(element) {
// this.logger.trace(`Queue.enqueue() entered: `, element);
// await this.mutex.runExclusive( () => {
// this.elements[this.tail] = element;
// this.tail++;
// });
// this.logger.trace(`Queue.enqueue() exiting: `, element);
// }
// async dequeue() {
// this.logger.trace(`Queue.dequeue() entered`);
// let item;
// await this.mutex.runExclusive( () => {
// item = this.elements[this.head];
// delete this.elements[this.head];
// this.head++;
// });
// this.logger.trace(`Queue.dequeue() exiting: `, item);
// return item;
// }
// peek() {
// return this.elements[this.head];
// }
// headNdx() {
// return this.head;
// }
// peekNdx(ndx) {
// return this.elements[ndx];
// }
// async acquireMutex() {
// this.logger.trace(`Queue.acquireMutex() waiting for mutex`);
// const release = await this.mutex.acquire();
// this.logger.trace(`Queue.acquireMutex() now own mutex`);
// return release;
// }
// get length() {
// return this.tail - this.head;
// }
// get isEmpty() {
// return this.length === 0;
// }
// }

class ZitiInnerTLSSocket extends EventEmitter {

Expand Down Expand Up @@ -242,7 +193,7 @@ class ZitiInnerTLSSocket extends EventEmitter {

if (success) {
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.create() wasmFD[${this.getWASMFD()}] TLS handshake completed pause start`);
await this._zitiContext.delay(500); // allow the 'SSL negotiation finished successfully' work to complete
// await this._zitiContext.delay(500); // allow the 'SSL negotiation finished successfully' work to complete
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.create() wasmFD[${this.getWASMFD()}] TLS handshake completed pause end`);
this._zitiContext.releaseTLSHandshakeLock(this.getWASMFD());
}
Expand Down Expand Up @@ -348,20 +299,24 @@ class ZitiInnerTLSSocket extends EventEmitter {
* @param {*} wireData (already TLS-encrypted)
*/
async fd_write(wireData) {
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.fd_write() fd[${this.wasmFD}]: encrypted data is ready`);

// clone the data coming in from the WASM. We do this because it is possible for teh WASM memory to detach before we actually do the channel.write operation below
let clonedWireData = new Uint8Array(wireData);

this._zitiContext.logger.trace(`ZitiInnerTLSSocket.fd_write() fd[${this.wasmFD}] byteLength[${clonedWireData.byteLength}] encrypted data is ready`);
const conn = await this.outerSocket.getZitiConnection();
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.fd_write() fd[${this.wasmFD}]: outerSocket[${this.outerSocket._id}] outerSocket.conn[${conn._id}]`);
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.fd_write() fd[${this.wasmFD}] byteLength[${clonedWireData.byteLength}] outerSocket[${this.outerSocket._id}] outerSocket.conn[${conn._id}]`);
let isConnected = await this.isConnected();
if (!isConnected) {
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.fd_write() fd[${this.wasmFD}]: (handshake data) is being sent to ch[${conn.channel.id}] --->`);
conn.channel.write(conn, wireData);
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.fd_write() fd[${this.wasmFD}] byteLength[${clonedWireData.byteLength}] (handshake data) is being sent to ch[${conn.channel.id}] --->`);
conn.channel.write(conn, clonedWireData);
} else {
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.fd_write() fd[${this.wasmFD}]: (encrypted data) is being sent to tlsConn[${conn.channel._tlsConn.wasmFD}] --->`);
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.fd_write() fd[${this.wasmFD}] byteLength[${clonedWireData.byteLength}] (encrypted data) is being sent to tlsConn[${conn.channel._tlsConn.wasmFD}] --->`);

//
this._sendingEncryptedData = true;

conn.channel.write(conn, wireData);
conn.channel.write(conn, clonedWireData);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/http/ziti-websocket-wrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,8 @@ async function initAsClient(websocket, address, protocols, options) {

websocket._zitiContext.logger.info('WebSocket handshake SUCCESSFUL');

socket.isWebSocket = true;

websocket.setSocket(socket, head, opts.maxPayload);
});
}
Expand Down

0 comments on commit d966d09

Please sign in to comment.