diff --git a/src/channel/wasm-tls-connection.js b/src/channel/wasm-tls-connection.js index e3d2a51..dfa795c 100644 --- a/src/channel/wasm-tls-connection.js +++ b/src/channel/wasm-tls-connection.js @@ -182,7 +182,7 @@ import {Mutex, withTimeout, Semaphore} from 'async-mutex'; async handshake() { this._zitiContext.logger.trace(`ZitiWASMTLSConnection.handshake(): fd[${this.wasmFD}] calling ssl_do_handshake()` ); - let result = await this._zitiContext.ssl_do_handshake( this._wasmInstance, this._SSL ); + let result = await this._zitiContext.ssl_do_handshake( false, this.wasmFD, this._wasmInstance, this._SSL ); this._zitiContext.logger.trace(`ZitiWASMTLSConnection.handshake(): fd[${this.wasmFD}] conn[${this.wasmFD}] back from ssl_do_handshake() result[${result}] (now awaiting cb)`); } diff --git a/src/context/context.js b/src/context/context.js index f749282..934fb27 100644 --- a/src/context/context.js +++ b/src/context/context.js @@ -118,10 +118,14 @@ class ZitiContext extends EventEmitter { this._connectMutexWithTimeout = withTimeout(new Mutex(), 30 * 1000); - this._tlsHandshakeLock = withTimeout(new Mutex(), 5 * 1000, new Error('timeout on _tlsHandshakeLock')); + this._tlsHandshakeLock = withTimeout(new Mutex(), 30 * 1000, new Error('timeout on _tlsHandshakeLock')); - this._fetchSemaphore = new Semaphore( 8 ); - // this._fetchSemaphore = new Semaphore( 1 ); + this._fetchSemaphoreHTTP = new Semaphore( 8 ); + + //TEMP: we constrain HTTP requests that travel over nestedTLS to one-at-a-time for the moment. + // This will be removed as soon as I fix the TLS protocol collision issue that manifests + // when multiple HTTP requests are initiated simultaneously :( + this._fetchSemaphoreHTTPS = new Semaphore( 1 ); this._pkey = null; this._privateKeyPEM = null; @@ -580,7 +584,11 @@ class ZitiContext extends EventEmitter { /** * */ - async ssl_do_handshake(wasmInstance, ssl) { + async ssl_do_handshake(useLock, fd, wasmInstance, ssl) { + + if (useLock) { + await this.acquireTLSHandshakeLock(fd); + } this.logger.trace('ZitiContext.ssl_do_handshake() entered'); @@ -2038,7 +2046,13 @@ class ZitiContext extends EventEmitter { let self = this; - const [value, release] = await self._fetchSemaphore.acquire(); + let value, release; + + if (isEqual(opts.serviceScheme, 'https')) { + [value, release] = await self._fetchSemaphoreHTTPS.acquire(); + } else { + [value, release] = await self._fetchSemaphoreHTTP.acquire(); + } let ret; diff --git a/src/http/_http_client.js b/src/http/_http_client.js index 553d109..f9ceb3e 100644 --- a/src/http/_http_client.js +++ b/src/http/_http_client.js @@ -534,7 +534,6 @@ function statusIsInformational(status) { function parserOnIncomingClient(res, shouldKeepAlive) { const socket = this.socket; const req = socket._httpMessage; - // console.log(`parserOnIncomingClient() entered req.path[${req.path}] res: `, res); // debug('AGENT incoming response!'); @@ -599,6 +598,15 @@ function parserOnIncomingClient(res, shouldKeepAlive) { if (req.aborted || !req.emit('response', res)) res._dump(); + if (socket.innerTLSSocket) { + socket.innerTLSSocket._zitiContext.logger.trace(`parserOnIncomingClient() fd[${socket.innerTLSSocket.wasmFD}] req.path[${req.path}] _closeEventPending[${socket.innerTLSSocket._closeEventPending}]`); + if (socket.innerTLSSocket._closeEventPending) { + setTimeout((socket) => { + socket.innerTLSSocket.emit('close', undefined); + }, 10, socket) + } + } + if (method === 'HEAD') return 1; // Skip body but don't treat as Upgrade. diff --git a/src/http/ziti-inner-tls-socket.js b/src/http/ziti-inner-tls-socket.js index 8d0dbcd..609e73e 100644 --- a/src/http/ziti-inner-tls-socket.js +++ b/src/http/ziti-inner-tls-socket.js @@ -155,6 +155,8 @@ class ZitiInnerTLSSocket extends EventEmitter { this._tlsReadActive = false; + this._closeEventPending = false; + this.pendingWriteArray = new Uint8Array(0) } @@ -200,6 +202,8 @@ class ZitiInnerTLSSocket extends EventEmitter { */ async create() { + await this._zitiContext.acquireTLSHandshakeLock(this.getWASMFD()); + this._wasmInstance = await this._zitiContext.getWASMInstance(); this._sslContext = await this._zitiContext.ssl_CTX_new( this._wasmInstance ); @@ -220,7 +224,7 @@ class ZitiInnerTLSSocket extends EventEmitter { this.handshake(); - let success = await this.awaitTLSHandshakeComplete( 5000 ).catch((error) => { + let success = await this.awaitTLSHandshakeComplete( 30000 ).catch((error) => { // Let any listeners know the attempt to complete a nestedTLS handshake has timed out, // which is possibly a condition where the Service is misconfigured, and/or is not really @@ -237,7 +241,10 @@ class ZitiInnerTLSSocket extends EventEmitter { }); if (success) { - this._zitiContext.logger.trace(`ZitiInnerTLSSocket.create() wasmFD[${this.getWASMFD()}] TLS handshake completed`); + this._zitiContext.logger.trace(`ZitiInnerTLSSocket.create() wasmFD[${this.getWASMFD()}] TLS handshake completed pause start`); + await this._zitiContext.delay(500); // allow the 'SSL negotiation finished successfully' work to complete + this._zitiContext.logger.trace(`ZitiInnerTLSSocket.create() wasmFD[${this.getWASMFD()}] TLS handshake completed pause end`); + this._zitiContext.releaseTLSHandshakeLock(this.getWASMFD()); } } @@ -277,7 +284,7 @@ class ZitiInnerTLSSocket extends EventEmitter { */ async handshake() { this._zitiContext.logger.trace(`ZitiInnerTLSSocket.handshake(): fd[${this.wasmFD}] calling ssl_do_handshake()` ); - let result = await this._zitiContext.ssl_do_handshake( this._wasmInstance, this._SSL ); + let result = await this._zitiContext.ssl_do_handshake( false, this.wasmFD, this._wasmInstance, this._SSL ); this._zitiContext.logger.trace(`ZitiInnerTLSSocket.handshake(): fd[${this.wasmFD}] back from ssl_do_handshake(): result[${result}] (now awaiting cb)`); } @@ -286,7 +293,7 @@ class ZitiInnerTLSSocket extends EventEmitter { */ async isConnected() { - this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() entered: fd[${this.wasmFD}] connected[${this._connected}]`); + // this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() entered: fd[${this.wasmFD}] connected[${this._connected}]`); await this._isConnectedMutex.runExclusive( async () => { @@ -295,14 +302,14 @@ class ZitiInnerTLSSocket extends EventEmitter { // Ask the SSL if its handshake has completed yet let _connected = this._zitiContext.ssl_is_init_finished(this._wasmInstance, this._SSL); - this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() ssl_is_init_finished() result: SSL[${this._SSL}] fd[${this.wasmFD}] connected[${_connected}]`); + // this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() ssl_is_init_finished() result: SSL[${this._SSL}] fd[${this.wasmFD}] connected[${_connected}]`); // If SSL indicates handshake has completed, let's delay a smidge, and allow the WASM mTLS ciphersuite-exchange to complete, // before we turn loose any writes to the connection if (_connected) { - this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() pausing fd[${this.wasmFD}]`); + // this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() pausing fd[${this.wasmFD}]`); await this._zitiContext.delay(500); - this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() resuming fd[${this.wasmFD}]`); + // this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() resuming fd[${this.wasmFD}]`); this._connected = true; } } @@ -424,16 +431,13 @@ class ZitiInnerTLSSocket extends EventEmitter { // If the TLS handshake has completed, and we get a zero-length buffer... if (isConnected) { - // ...then emit the 'close' event (...after slight delay) - - this._zitiContext.logger.trace(`ZitiInnerTLSSocket.process() fd[${this.wasmFD}] pausing before emitting 'close' event`); + // ...then indicate we have a pending 'close' event. The 'close' event will be emitted by the ClientRequest + // when it runs and sees the need to emit the event. We need to defer/delay the emission of the 'close' + // event because it is possible that the data for this connection hasn't completed the decrypt flow yet, + // and thus the HTTP Response pqrsing logic hasn't run yet, and if we issue a 'close' before then, the + // HTTP Response parsing logic will interpret the close as a 'socket hang up' error. - setTimeout((self) => { - - self._zitiContext.logger.trace(`ZitiInnerTLSSocket.process() fd[${self.wasmFD}] emitting 'close' event after pause`); - self.emit('close', undefined); - - }, 1000, this) + this._closeEventPending = true; }