Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi-MB writes fail under nestedTLS #131

Merged
merged 1 commit into from
Jan 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 3 additions & 14 deletions src/channel/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -678,20 +678,9 @@ class ZitiChannel {
sendMessageNoWait(contentType, headers, body, options = {}) {
const timeout = options.timeout !== undefined ? options.timeout : this._timeout;
const messageId = options.sequence || this._sequence;
// this._zitiContext.logger.debug("send (no wait) -> conn[%o] seq[%o] contentType[%o]",
// (options.conn ? options.conn.id : 'n/a'),
// messageId,
// contentType,
// (body ? body.toString() : 'n/a'));

this._zitiContext.logger.debug(`send (no wait) -> ch[${this._id}] conn[${(options.conn ? options.conn.id : 'n/a')}] seq[${messageId}] contentType[${contentType}] bodyLen[${(body ? body.length : 'n/a')}] body[${(body ? body.toString() : 'n/a')}]`,
this._id,
(options.conn ? options.conn.id : 'n/a'),
messageId, contentType,
(body ? body.length : 'n/a')
,
(body ? body.toString() : 'n/a')
);

// this._zitiContext.logger.debug(`send (no wait) -> ch[${this._id}] conn[${(options.conn ? options.conn.id : 'n/a')}] seq[${messageId}] contentType[${contentType}] bodyLen[${(body ? body.length : 'n/a')}] body[${(body ? body.toString() : 'n/a')}]`);
this._zitiContext.logger.debug(`send (no wait) -> ch[${this._id}] conn[${(options.conn ? options.conn.id : 'n/a')}] seq[${messageId}] contentType[${contentType}] byteLength[${(body ? body.byteLength : 'n/a')}]`);

this._sendMarshaled(contentType, headers, body, options, messageId);
}
Expand Down
2 changes: 1 addition & 1 deletion src/context/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class ZitiContext extends EventEmitter {
//TEMP: we constrain HTTP requests that travel over nestedTLS to one-at-a-time for the moment.
// This will be removed as soon as I fix the TLS protocol collision issue that manifests
// when multiple HTTP requests are initiated simultaneously :(
this._fetchSemaphoreHTTPS = new Semaphore( 1 );
this._fetchSemaphoreHTTPS = new Semaphore( 8 );

this._pkey = null;
this._privateKeyPEM = null;
Expand Down
67 changes: 11 additions & 56 deletions src/http/ziti-inner-tls-socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,55 +21,6 @@ import { ZITI_CONSTANTS } from '../constants';

// import { Buffer } from 'buffer/'; // note: the trailing slash is important!

// class Queue {
// constructor(logger) {
// this.logger = logger;
// this.elements = {};
// this.head = 0;
// this.tail = 0;
// this.mutex = withTimeout(new Mutex(), 1 * 1000, new Error('timeout on Queue mutex'));
// }
// async enqueue(element) {
// this.logger.trace(`Queue.enqueue() entered: `, element);
// await this.mutex.runExclusive( () => {
// this.elements[this.tail] = element;
// this.tail++;
// });
// this.logger.trace(`Queue.enqueue() exiting: `, element);
// }
// async dequeue() {
// this.logger.trace(`Queue.dequeue() entered`);
// let item;
// await this.mutex.runExclusive( () => {
// item = this.elements[this.head];
// delete this.elements[this.head];
// this.head++;
// });
// this.logger.trace(`Queue.dequeue() exiting: `, item);
// return item;
// }
// peek() {
// return this.elements[this.head];
// }
// headNdx() {
// return this.head;
// }
// peekNdx(ndx) {
// return this.elements[ndx];
// }
// async acquireMutex() {
// this.logger.trace(`Queue.acquireMutex() waiting for mutex`);
// const release = await this.mutex.acquire();
// this.logger.trace(`Queue.acquireMutex() now own mutex`);
// return release;
// }
// get length() {
// return this.tail - this.head;
// }
// get isEmpty() {
// return this.length === 0;
// }
// }

class ZitiInnerTLSSocket extends EventEmitter {

Expand Down Expand Up @@ -242,7 +193,7 @@ class ZitiInnerTLSSocket extends EventEmitter {

if (success) {
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.create() wasmFD[${this.getWASMFD()}] TLS handshake completed pause start`);
await this._zitiContext.delay(500); // allow the 'SSL negotiation finished successfully' work to complete
// await this._zitiContext.delay(500); // allow the 'SSL negotiation finished successfully' work to complete
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.create() wasmFD[${this.getWASMFD()}] TLS handshake completed pause end`);
this._zitiContext.releaseTLSHandshakeLock(this.getWASMFD());
}
Expand Down Expand Up @@ -348,20 +299,24 @@ class ZitiInnerTLSSocket extends EventEmitter {
* @param {*} wireData (already TLS-encrypted)
*/
async fd_write(wireData) {
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.fd_write() fd[${this.wasmFD}]: encrypted data is ready`);

// clone the data coming in from the WASM. We do this because it is possible for teh WASM memory to detach before we actually do the channel.write operation below
let clonedWireData = new Uint8Array(wireData);

this._zitiContext.logger.trace(`ZitiInnerTLSSocket.fd_write() fd[${this.wasmFD}] byteLength[${clonedWireData.byteLength}] encrypted data is ready`);
const conn = await this.outerSocket.getZitiConnection();
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.fd_write() fd[${this.wasmFD}]: outerSocket[${this.outerSocket._id}] outerSocket.conn[${conn._id}]`);
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.fd_write() fd[${this.wasmFD}] byteLength[${clonedWireData.byteLength}] outerSocket[${this.outerSocket._id}] outerSocket.conn[${conn._id}]`);
let isConnected = await this.isConnected();
if (!isConnected) {
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.fd_write() fd[${this.wasmFD}]: (handshake data) is being sent to ch[${conn.channel.id}] --->`);
conn.channel.write(conn, wireData);
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.fd_write() fd[${this.wasmFD}] byteLength[${clonedWireData.byteLength}] (handshake data) is being sent to ch[${conn.channel.id}] --->`);
conn.channel.write(conn, clonedWireData);
} else {
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.fd_write() fd[${this.wasmFD}]: (encrypted data) is being sent to tlsConn[${conn.channel._tlsConn.wasmFD}] --->`);
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.fd_write() fd[${this.wasmFD}] byteLength[${clonedWireData.byteLength}] (encrypted data) is being sent to tlsConn[${conn.channel._tlsConn.wasmFD}] --->`);

//
this._sendingEncryptedData = true;

conn.channel.write(conn, wireData);
conn.channel.write(conn, clonedWireData);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/http/ziti-websocket-wrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,8 @@ async function initAsClient(websocket, address, protocols, options) {

websocket._zitiContext.logger.info('WebSocket handshake SUCCESSFUL');

socket.isWebSocket = true;

websocket.setSocket(socket, head, opts.maxPayload);
});
}
Expand Down
Loading