Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'socket hang up' errors (again) #130

Merged
merged 1 commit into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/channel/wasm-tls-connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ import {Mutex, withTimeout, Semaphore} from 'async-mutex';
async handshake() {

this._zitiContext.logger.trace(`ZitiWASMTLSConnection.handshake(): fd[${this.wasmFD}] calling ssl_do_handshake()` );
let result = await this._zitiContext.ssl_do_handshake( this._wasmInstance, this._SSL );
let result = await this._zitiContext.ssl_do_handshake( false, this.wasmFD, this._wasmInstance, this._SSL );
this._zitiContext.logger.trace(`ZitiWASMTLSConnection.handshake(): fd[${this.wasmFD}] conn[${this.wasmFD}] back from ssl_do_handshake() result[${result}] (now awaiting cb)`);
}

Expand Down
24 changes: 19 additions & 5 deletions src/context/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,14 @@ class ZitiContext extends EventEmitter {

this._connectMutexWithTimeout = withTimeout(new Mutex(), 30 * 1000);

this._tlsHandshakeLock = withTimeout(new Mutex(), 5 * 1000, new Error('timeout on _tlsHandshakeLock'));
this._tlsHandshakeLock = withTimeout(new Mutex(), 30 * 1000, new Error('timeout on _tlsHandshakeLock'));

this._fetchSemaphore = new Semaphore( 8 );
// this._fetchSemaphore = new Semaphore( 1 );
this._fetchSemaphoreHTTP = new Semaphore( 8 );

//TEMP: we constrain HTTP requests that travel over nestedTLS to one-at-a-time for the moment.
// This will be removed as soon as I fix the TLS protocol collision issue that manifests
// when multiple HTTP requests are initiated simultaneously :(
this._fetchSemaphoreHTTPS = new Semaphore( 1 );

this._pkey = null;
this._privateKeyPEM = null;
Expand Down Expand Up @@ -580,7 +584,11 @@ class ZitiContext extends EventEmitter {
/**
*
*/
async ssl_do_handshake(wasmInstance, ssl) {
async ssl_do_handshake(useLock, fd, wasmInstance, ssl) {

if (useLock) {
await this.acquireTLSHandshakeLock(fd);
}

this.logger.trace('ZitiContext.ssl_do_handshake() entered');

Expand Down Expand Up @@ -2038,7 +2046,13 @@ class ZitiContext extends EventEmitter {

let self = this;

const [value, release] = await self._fetchSemaphore.acquire();
let value, release;

if (isEqual(opts.serviceScheme, 'https')) {
[value, release] = await self._fetchSemaphoreHTTPS.acquire();
} else {
[value, release] = await self._fetchSemaphoreHTTP.acquire();
}

let ret;

Expand Down
10 changes: 9 additions & 1 deletion src/http/_http_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,6 @@ function statusIsInformational(status) {
function parserOnIncomingClient(res, shouldKeepAlive) {
const socket = this.socket;
const req = socket._httpMessage;
// console.log(`parserOnIncomingClient() entered req.path[${req.path}] res: `, res);

// debug('AGENT incoming response!');

Expand Down Expand Up @@ -599,6 +598,15 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
if (req.aborted || !req.emit('response', res))
res._dump();

if (socket.innerTLSSocket) {
socket.innerTLSSocket._zitiContext.logger.trace(`parserOnIncomingClient() fd[${socket.innerTLSSocket.wasmFD}] req.path[${req.path}] _closeEventPending[${socket.innerTLSSocket._closeEventPending}]`);
if (socket.innerTLSSocket._closeEventPending) {
setTimeout((socket) => {
socket.innerTLSSocket.emit('close', undefined);
}, 10, socket)
}
}

if (method === 'HEAD')
return 1; // Skip body but don't treat as Upgrade.

Expand Down
36 changes: 20 additions & 16 deletions src/http/ziti-inner-tls-socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ class ZitiInnerTLSSocket extends EventEmitter {

this._tlsReadActive = false;

this._closeEventPending = false;

this.pendingWriteArray = new Uint8Array(0)

}
Expand Down Expand Up @@ -200,6 +202,8 @@ class ZitiInnerTLSSocket extends EventEmitter {
*/
async create() {

await this._zitiContext.acquireTLSHandshakeLock(this.getWASMFD());

this._wasmInstance = await this._zitiContext.getWASMInstance();

this._sslContext = await this._zitiContext.ssl_CTX_new( this._wasmInstance );
Expand All @@ -220,7 +224,7 @@ class ZitiInnerTLSSocket extends EventEmitter {

this.handshake();

let success = await this.awaitTLSHandshakeComplete( 5000 ).catch((error) => {
let success = await this.awaitTLSHandshakeComplete( 30000 ).catch((error) => {

// Let any listeners know the attempt to complete a nestedTLS handshake has timed out,
// which is possibly a condition where the Service is misconfigured, and/or is not really
Expand All @@ -237,7 +241,10 @@ class ZitiInnerTLSSocket extends EventEmitter {
});

if (success) {
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.create() wasmFD[${this.getWASMFD()}] TLS handshake completed`);
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.create() wasmFD[${this.getWASMFD()}] TLS handshake completed pause start`);
await this._zitiContext.delay(500); // allow the 'SSL negotiation finished successfully' work to complete
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.create() wasmFD[${this.getWASMFD()}] TLS handshake completed pause end`);
this._zitiContext.releaseTLSHandshakeLock(this.getWASMFD());
}
}

Expand Down Expand Up @@ -277,7 +284,7 @@ class ZitiInnerTLSSocket extends EventEmitter {
*/
async handshake() {
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.handshake(): fd[${this.wasmFD}] calling ssl_do_handshake()` );
let result = await this._zitiContext.ssl_do_handshake( this._wasmInstance, this._SSL );
let result = await this._zitiContext.ssl_do_handshake( false, this.wasmFD, this._wasmInstance, this._SSL );
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.handshake(): fd[${this.wasmFD}] back from ssl_do_handshake(): result[${result}] (now awaiting cb)`);
}

Expand All @@ -286,7 +293,7 @@ class ZitiInnerTLSSocket extends EventEmitter {
*/
async isConnected() {

this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() entered: fd[${this.wasmFD}] connected[${this._connected}]`);
// this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() entered: fd[${this.wasmFD}] connected[${this._connected}]`);

await this._isConnectedMutex.runExclusive( async () => {

Expand All @@ -295,14 +302,14 @@ class ZitiInnerTLSSocket extends EventEmitter {
// Ask the SSL if its handshake has completed yet
let _connected = this._zitiContext.ssl_is_init_finished(this._wasmInstance, this._SSL);

this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() ssl_is_init_finished() result: SSL[${this._SSL}] fd[${this.wasmFD}] connected[${_connected}]`);
// this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() ssl_is_init_finished() result: SSL[${this._SSL}] fd[${this.wasmFD}] connected[${_connected}]`);

// If SSL indicates handshake has completed, let's delay a smidge, and allow the WASM mTLS ciphersuite-exchange to complete,
// before we turn loose any writes to the connection
if (_connected) {
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() pausing fd[${this.wasmFD}]`);
// this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() pausing fd[${this.wasmFD}]`);
await this._zitiContext.delay(500);
this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() resuming fd[${this.wasmFD}]`);
// this._zitiContext.logger.trace(`ZitiInnerTLSSocket.isConnected() resuming fd[${this.wasmFD}]`);
this._connected = true;
}
}
Expand Down Expand Up @@ -424,16 +431,13 @@ class ZitiInnerTLSSocket extends EventEmitter {

// If the TLS handshake has completed, and we get a zero-length buffer...
if (isConnected) {
// ...then emit the 'close' event (...after slight delay)

this._zitiContext.logger.trace(`ZitiInnerTLSSocket.process() fd[${this.wasmFD}] pausing before emitting 'close' event`);
// ...then indicate we have a pending 'close' event. The 'close' event will be emitted by the ClientRequest
// when it runs and sees the need to emit the event. We need to defer/delay the emission of the 'close'
// event because it is possible that the data for this connection hasn't completed the decrypt flow yet,
// and thus the HTTP Response pqrsing logic hasn't run yet, and if we issue a 'close' before then, the
// HTTP Response parsing logic will interpret the close as a 'socket hang up' error.

setTimeout((self) => {

self._zitiContext.logger.trace(`ZitiInnerTLSSocket.process() fd[${self.wasmFD}] emitting 'close' event after pause`);
self.emit('close', undefined);

}, 1000, this)
this._closeEventPending = true;

}

Expand Down
Loading