From b81def464ed402a624ab8a5c668d5dbb6cd576b4 Mon Sep 17 00:00:00 2001 From: Jacky Zhao Date: Mon, 28 Oct 2024 13:51:48 -0700 Subject: [PATCH] poll for TIOCINQ and TIOCOUTQ instead of POLLIN, bump repeats (#51) we noticed a few cases where if a program exited really quickly but dumped a lot of output (e.g. `env` in the replit shell) we could sometimes miss output we used to poll the controller side fd for POLLIN but that actually isn't _fully_ correct. the tty pipes look something like: ``` controlling program <> controller fd <> user fd <> user program |---user space------||--------kernel space------||-user space-| ``` we can sometimes enter cases where the controller side _thinks_ it has no more data to give to the controlling program (nodejs when using @replit/ruspty) but the kernel has decided to block on passing some user fd data to the controller fd (on linux for example, the pipe in the user fd -> controller fd direction has about 4kb of capacity) for example, if node isnt processing data events quickly enough, the controller-side queue can fill up and the user fd write will block until it has space again, we could rarely enter a race if nodejs decides to read an entire 4kb block completely emptying the controller fd queue and then the POLLIN logic could return with no more data left (even though the user side still has a pending write!) this would drop data :( a few changes: - rust-side - poll TIOCINQ and TIOCOUTQ on controller and user instead of just POLLIN on controller - wrapper level - trigger 'end' event on the read stream on EIO instead of just calling the cb (that way, other things with handles to the socket can also handle end appropriately) - exit condition is now - fd closed && fd fully read && program actually exited - test level - bump repeats from 50 -> 500 - rewrite it to a more standard async format so errors during the test actually are associated with that test - added a test for fast output (>4kb output and fast exit) --- npm/darwin-arm64/package.json | 2 +- npm/darwin-x64/package.json | 2 +- npm/linux-x64-gnu/package.json | 2 +- package-lock.json | 4 +- package.json | 4 +- src/lib.rs | 68 +-- tests/index.test.ts | 728 ++++++++++++++++----------------- wrapper.ts | 31 +- 8 files changed, 404 insertions(+), 437 deletions(-) diff --git a/npm/darwin-arm64/package.json b/npm/darwin-arm64/package.json index 725d4b1..75de279 100644 --- a/npm/darwin-arm64/package.json +++ b/npm/darwin-arm64/package.json @@ -1,6 +1,6 @@ { "name": "@replit/ruspty-darwin-arm64", - "version": "3.3.0", + "version": "3.4.0", "os": [ "darwin" ], diff --git a/npm/darwin-x64/package.json b/npm/darwin-x64/package.json index 4cc36e7..886abe1 100644 --- a/npm/darwin-x64/package.json +++ b/npm/darwin-x64/package.json @@ -1,6 +1,6 @@ { "name": "@replit/ruspty-darwin-x64", - "version": "3.3.0", + "version": "3.4.0", "os": [ "darwin" ], diff --git a/npm/linux-x64-gnu/package.json b/npm/linux-x64-gnu/package.json index cf1560f..122c20d 100644 --- a/npm/linux-x64-gnu/package.json +++ b/npm/linux-x64-gnu/package.json @@ -1,6 +1,6 @@ { "name": "@replit/ruspty-linux-x64-gnu", - "version": "3.3.0", + "version": "3.4.0", "os": [ "linux" ], diff --git a/package-lock.json b/package-lock.json index a968647..6cea6de 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@replit/ruspty", - "version": "3.3.0", + "version": "3.4.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@replit/ruspty", - "version": "3.3.0", + "version": "3.4.0", "license": "MIT", "devDependencies": { "@napi-rs/cli": "^2.18.2", diff --git a/package.json b/package.json index 2f1b96c..124f422 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@replit/ruspty", - "version": "3.3.0", + "version": "3.4.0", "main": "dist/wrapper.js", "types": "dist/wrapper.d.ts", "author": "Szymon Kaliski ", @@ -36,7 +36,7 @@ }, "scripts": { "artifacts": "napi artifacts", - "build": "napi build --platform --release && npm run build:wrapper", + "build": "napi build --platform --release && npm run build:wrapper && npm run format", "build:wrapper": "tsup", "prepublishOnly": "napi prepublish -t npm", "test": "vitest run", diff --git a/src/lib.rs b/src/lib.rs index beb2ab2..59ca4ff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,9 @@ use std::collections::HashMap; use std::fs::File; -use std::io::Error; use std::io::ErrorKind; -use std::io::Write; +use std::io::{Error, Write}; use std::os::fd::{AsRawFd, OwnedFd}; -use std::os::fd::{BorrowedFd, FromRawFd, IntoRawFd, RawFd}; +use std::os::fd::{FromRawFd, IntoRawFd, RawFd}; use std::os::unix::process::CommandExt; use std::process::{Command, Stdio}; use std::thread; @@ -12,14 +11,13 @@ use std::time::Duration; use backoff::backoff::Backoff; use backoff::ExponentialBackoffBuilder; -use libc::{self, c_int}; use napi::bindgen_prelude::JsFunction; use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode}; use napi::Status::GenericFailure; use napi::{self, Env}; use nix::errno::Errno; use nix::fcntl::{fcntl, FcntlArg, FdFlag, OFlag}; -use nix::poll::{poll, PollFd, PollFlags, PollTimeout}; +use nix::libc::{self, c_int, ioctl, FIONREAD, TIOCOUTQ, TIOCSCTTY, TIOCSWINSZ}; use nix::pty::{openpty, Winsize}; use nix::sys::termios::{self, SetArg}; @@ -59,14 +57,11 @@ fn cast_to_napi_error(err: Errno) -> napi::Error { napi::Error::new(GenericFailure, err) } -// if the child process exits before the controller fd is fully read, we might accidentally -// end in a case where onExit is called but js hasn't had the chance to fully read the controller fd +// if the child process exits before the controller fd is fully read or the user fd is fully +// flushed, we might accidentally end in a case where onExit is called but js hasn't had +// the chance to fully read the controller fd // let's wait until the controller fd is fully read before we call onExit -fn poll_controller_fd_until_read(raw_fd: RawFd) { - // wait until fd is fully read (i.e. POLLIN no longer set) - let borrowed_fd = unsafe { BorrowedFd::borrow_raw(raw_fd) }; - let poll_fd = PollFd::new(borrowed_fd, PollFlags::POLLIN); - +fn poll_pty_fds_until_read(controller_fd: RawFd, user_fd: RawFd) { let mut backoff = ExponentialBackoffBuilder::default() .with_initial_interval(Duration::from_millis(1)) .with_max_interval(Duration::from_millis(100)) @@ -74,30 +69,42 @@ fn poll_controller_fd_until_read(raw_fd: RawFd) { .build(); loop { - if let Err(err) = poll(&mut [poll_fd], PollTimeout::ZERO) { - if err == Errno::EINTR || err == Errno::EAGAIN { - // we were interrupted, so we should just try again - continue; - } + // check both input and output queues for both FDs + let mut controller_inq: i32 = 0; + let mut controller_outq: i32 = 0; + let mut user_inq: i32 = 0; + let mut user_outq: i32 = 0; - // we should almost never hit this, but if we do, we should just break out of the loop. this - // can happen if Node destroys the terminal before waiting for the child process to go away. - break; - } + // safe because we're passing valid file descriptors and properly sized integers + unsafe { + // check bytes waiting to be read (FIONREAD, equivalent to TIOCINQ on Linux) + if ioctl(controller_fd, FIONREAD, &mut controller_inq) == -1 + || ioctl(user_fd, FIONREAD, &mut user_inq) == -1 + { + // break if we can't read + break; + } - // check if POLLIN is no longer set (i.e. there is no more data to read) - if let Some(flags) = poll_fd.revents() { - if !flags.contains(PollFlags::POLLIN) { + // check bytes waiting to be written (TIOCOUTQ) + if ioctl(controller_fd, TIOCOUTQ, &mut controller_outq) == -1 + || ioctl(user_fd, TIOCOUTQ, &mut user_outq) == -1 + { + // break if we can't read break; } } - // wait for a bit before trying again + // if all queues are empty, we're done + if controller_inq == 0 && controller_outq == 0 && user_inq == 0 && user_outq == 0 { + break; + } + + // apply backoff strategy if let Some(d) = backoff.next_backoff() { thread::sleep(d); continue; } else { - // we have exhausted our attempts, its joever + // we have exhausted our attempts break; } } @@ -181,7 +188,7 @@ impl Pty { } // become the controlling tty for the program - let err = libc::ioctl(raw_user_fd, libc::TIOCSCTTY.into(), 0); + let err = libc::ioctl(raw_user_fd, TIOCSCTTY.into(), 0); if err == -1 { return Err(Error::new(ErrorKind::Other, "ioctl-TIOCSCTTY")); } @@ -244,10 +251,7 @@ impl Pty { let wait_result = child.wait(); // try to wait for the controller fd to be fully read - poll_controller_fd_until_read(raw_controller_fd); - - // we don't drop the controller fd immediately - // let pty.close() be responsible for closing it + poll_pty_fds_until_read(raw_controller_fd, raw_user_fd); drop(user_fd); match wait_result { @@ -310,7 +314,7 @@ fn pty_resize(fd: i32, size: Size) -> Result<(), napi::Error> { ws_ypixel: 0, }; - let res = unsafe { libc::ioctl(fd, libc::TIOCSWINSZ, &window_size as *const _) }; + let res = unsafe { libc::ioctl(fd, TIOCSWINSZ, &window_size as *const _) }; if res == -1 { return Err(napi::Error::new( napi::Status::GenericFailure, diff --git a/tests/index.test.ts b/tests/index.test.ts index 3fde299..6ad01be 100644 --- a/tests/index.test.ts +++ b/tests/index.test.ts @@ -1,7 +1,7 @@ import { Pty, getCloseOnExec, setCloseOnExec } from '../wrapper'; import { type Writable } from 'stream'; import { readdirSync, readlinkSync } from 'fs'; -import { describe, test, expect, beforeEach, afterEach } from 'vitest'; +import { describe, test, expect, beforeEach, afterEach, vi, type Mock } from 'vitest'; import { exec as execAsync } from 'child_process'; import { promisify } from 'util'; const exec = promisify(execAsync); @@ -47,138 +47,123 @@ function getOpenFds(): FdRecord { describe( 'PTY', - { repeats: 50 }, + { repeats: 500 }, () => { - test('spawns and exits', () => - new Promise((done) => { - const oldFds = getOpenFds(); - const message = 'hello from a pty'; - let buffer = ''; + test('spawns and exits', async () => { + const oldFds = getOpenFds(); + const message = 'hello from a pty'; + let buffer = ''; - const pty = new Pty({ - command: '/bin/echo', - args: [message], - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - expect(buffer.trim()).toBe(message); - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }, - }); + const onExit = vi.fn(); + const pty = new Pty({ + command: '/bin/echo', + args: [message], + onExit, + }); - const readStream = pty.read; - readStream.on('data', (chunk) => { - buffer = chunk.toString(); - }); - })); + const readStream = pty.read; + readStream.on('data', (chunk) => { + buffer = chunk.toString(); + }); - test('captures an exit code', () => - new Promise((done) => { - const oldFds = getOpenFds(); - const pty = new Pty({ - command: '/bin/sh', - args: ['-c', 'exit 17'], - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(17); - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }, - }); + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + expect(buffer.trim()).toBe(message); + expect(getOpenFds()).toStrictEqual(oldFds); + }); - // set a pty reader so it can flow - pty.read.on('data', () => { }); - })); + test('captures an exit code', async () => { + const oldFds = getOpenFds(); + const onExit = vi.fn(); + const pty = new Pty({ + command: '/bin/sh', + args: ['-c', 'exit 17'], + onExit, + }); - test('can be written to', () => - new Promise((done) => { - const oldFds = getOpenFds(); + // set a pty reader so it can flow + pty.read.on('data', () => {}); - // The message should end in newline so that the EOT can signal that the input has ended and not - // just the line. - const message = 'hello cat\n'; - let buffer = ''; + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 17); + expect(getOpenFds()).toStrictEqual(oldFds); + }); - // We have local echo enabled, so we'll read the message twice. - const expectedResult = 'hello cat\r\nhello cat\r\n'; + test('can be written to', async () => { + const oldFds = getOpenFds(); + const message = 'hello cat\n'; + let buffer = ''; + const onExit = vi.fn(); - const pty = new Pty({ - command: '/bin/cat', - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - let result = buffer.toString(); - if (IS_DARWIN) { - // Darwin adds the visible EOT to the stream. - result = result.replace('^D\b\b', ''); - } - expect(result.trim()).toStrictEqual(expectedResult.trim()); - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }, - }); + const pty = new Pty({ + command: '/bin/cat', + onExit, + }); - const writeStream = pty.write; - const readStream = pty.read; + const writeStream = pty.write; + const readStream = pty.read; - readStream.on('data', (data) => { - buffer += data.toString(); - }); - writeStream.write(message); - writeStream.end(EOT); - })); + readStream.on('data', (data) => { + buffer += data.toString(); + }); + + writeStream.write(message); + writeStream.end(EOT); + + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + + let result = buffer.toString(); + if (IS_DARWIN) { + // Darwin adds the visible EOT to the stream. + result = result.replace('^D\b\b', ''); + } - test('can be started in non-interactive fashion', () => - new Promise((done) => { - const oldFds = getOpenFds(); + const expectedResult = 'hello cat\r\nhello cat\r\n'; + expect(result.trim()).toStrictEqual(expectedResult.trim()); + expect(getOpenFds()).toStrictEqual(oldFds); + }); - let buffer = ''; + test('can be started in non-interactive fashion', async () => { + const oldFds = getOpenFds(); + let buffer = ''; + const onExit = vi.fn(); - const expectedResult = '\r\n'; + const pty = new Pty({ + command: '/bin/cat', + interactive: false, + onExit, + }); - const pty = new Pty({ - command: '/bin/cat', - interactive: false, - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - let result = buffer.toString(); - expect(result.trim()).toStrictEqual(expectedResult.trim()); - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }, - }); + const readStream = pty.read; + readStream.on('data', (data) => { + buffer += data.toString(); + }); - const readStream = pty.read; + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + let result = buffer.toString(); + const expectedResult = '\r\n'; + expect(result.trim()).toStrictEqual(expectedResult.trim()); + expect(getOpenFds()).toStrictEqual(oldFds); + }); - readStream.on('data', (data) => { - buffer += data.toString(); - }); - })); + test('can be resized', async () => { + const oldFds = getOpenFds(); + let buffer = ''; + let state: 'expectPrompt' | 'expectDone1' | 'expectDone2' | 'done' = 'expectPrompt'; + const onExit = vi.fn(); - test('can be resized', () => - new Promise((done) => { - const oldFds = getOpenFds(); - let buffer = ''; - let state: 'expectPrompt' | 'expectDone1' | 'expectDone2' | 'done' = - 'expectPrompt'; - const pty = new Pty({ - command: '/bin/sh', - size: { rows: 24, cols: 80 }, - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - - expect(state).toBe('done'); - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }, - }); + const pty = new Pty({ + command: '/bin/sh', + size: { rows: 24, cols: 80 }, + onExit, + }); - const writeStream = pty.write; - const readStream = pty.read; + const writeStream = pty.write; + const readStream = pty.read; + const statePromise = new Promise((resolve) => { readStream.on('data', (data) => { buffer += data.toString(); @@ -200,291 +185,271 @@ describe( if (state === 'expectDone2' && buffer.includes('done2\r\n')) { expect(buffer).toContain('60 100'); state = 'done'; - writeStream.write(EOT); - return; + resolve(); } }); - })); + }); - test('respects working directory', () => - new Promise((done) => { - const oldFds = getOpenFds(); - const cwd = process.cwd(); - let buffer = ''; + await statePromise; + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + expect(state).toBe('done'); + expect(getOpenFds()).toStrictEqual(oldFds); + }); - const pty = new Pty({ - command: '/bin/pwd', - dir: cwd, - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - expect(buffer.trim()).toBe(cwd); - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }, - }); + test('respects working directory', async () => { + const oldFds = getOpenFds(); + const cwd = process.cwd(); + let buffer = ''; + const onExit = vi.fn(); - const readStream = pty.read; - readStream.on('data', (data) => { - buffer += data.toString(); - }); - })); + const pty = new Pty({ + command: '/bin/pwd', + dir: cwd, + onExit, + }); - test('respects env', () => - new Promise((done) => { - const oldFds = getOpenFds(); - const message = 'hello from env'; - let buffer = ''; + const readStream = pty.read; + readStream.on('data', (data) => { + buffer += data.toString(); + }); - const pty = new Pty({ - command: '/bin/sh', - args: ['-c', 'echo $ENV_VARIABLE && exit'], - envs: { - ENV_VARIABLE: message, - }, - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - expect(buffer.trim()).toBe(message); - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }, - }); + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + expect(buffer.trim()).toBe(cwd); + expect(getOpenFds()).toStrictEqual(oldFds); + }); - const readStream = pty.read; - readStream.on('data', (data) => { - buffer += data.toString(); - }); - })); + test('respects env', async () => { + const oldFds = getOpenFds(); + const message = 'hello from env'; + let buffer = ''; + const onExit = vi.fn(); - test('resize after exit shouldn\'t throw', () => new Promise((done, reject) => { + const pty = new Pty({ + command: '/bin/sh', + args: ['-c', 'echo $ENV_VARIABLE && exit'], + envs: { + ENV_VARIABLE: message, + }, + onExit, + }); + + const readStream = pty.read; + readStream.on('data', (data) => { + buffer += data.toString(); + }); + + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + expect(buffer.trim()).toBe(message); + expect(getOpenFds()).toStrictEqual(oldFds); + }); + + test('resize after exit shouldn\'t throw', async () => { + const onExit = vi.fn(); const pty = new Pty({ command: '/bin/echo', args: ['hello'], - onExit: (err, exitCode) => { - try { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - expect(() => { - pty.resize({ rows: 60, cols: 100 }); - }).not.toThrow(); - done(); - } catch (e) { - reject(e) - } - }, + onExit, }); - pty.read.on('data', () => { }); - })); + pty.read.on('data', () => {}); - test('resize after close shouldn\'t throw', () => new Promise((done, reject) => { + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + expect(() => { + pty.resize({ rows: 60, cols: 100 }); + }).not.toThrow(); + }); + + test('resize after close shouldn\'t throw', async () => { + const onExit = vi.fn(); const pty = new Pty({ command: '/bin/sh', - onExit: (err, exitCode) => { - try { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - } catch (e) { - reject(e) - } - }, + onExit, }); - pty.read.on('data', () => { }); + pty.read.on('data', () => {}); pty.close(); expect(() => { pty.resize({ rows: 60, cols: 100 }); }).not.toThrow(); - done(); - })); + + process.kill(pty.pid, 'SIGKILL'); + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, -1); + }); test( 'ordering is correct', - () => - new Promise((done) => { - const oldFds = getOpenFds(); - let buffer = Buffer.from(''); - const n = 1024; - const pty = new Pty({ - command: '/bin/sh', - args: [ - '-c', - 'seq 0 1024' - ], - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - expect(buffer.toString().trim().split('\n').map(Number)).toStrictEqual( - Array.from({ length: n + 1 }, (_, i) => i), - ); - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }, - }); - - const readStream = pty.read; - readStream.on('data', (data) => { - buffer = Buffer.concat([buffer, data]); - }); - }), - ); + async () => { + const oldFds = getOpenFds(); + let buffer = Buffer.from(''); + const n = 1024; + const onExit = vi.fn(); - test('doesnt miss large output from fast commands', - () => - new Promise((done) => { - const payload = `hello`.repeat(4096); - let buffer = Buffer.from(''); - const pty = new Pty({ - command: '/bin/echo', - args: [ - '-n', - payload - ], - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - // account for the newline - expect(buffer.toString().length).toBe(payload.length); - done(); - }, - }); - - const readStream = pty.read; - readStream.on('data', (data) => { - buffer = Buffer.concat([buffer, data]); - }); - }) - ); + const pty = new Pty({ + command: '/bin/sh', + args: [ + '-c', + `seq 0 ${n}` + ], + onExit, + }); - testSkipOnDarwin( - 'does not leak files', - () => - new Promise((done) => { - const oldFds = getOpenFds(); - const promises = []; - for (let i = 0; i < 10; i++) { - promises.push( - new Promise((accept) => { - let buffer = Buffer.from(''); - const pty = new Pty({ - command: '/bin/sh', - args: ['-c', 'sleep 0.1 ; ls /proc/$$/fd'], - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - expect( - buffer - .toString() - .trim() - .split(/\s+/) - .filter((fd) => { - // Some shells dup stdio to fd 255 for reasons. - return fd !== '255'; - }) - .toSorted(), - ).toStrictEqual(['0', '1', '2']); - accept(); - }, - }); - - const readStream = pty.read; - readStream.on('data', (data) => { - buffer = Buffer.concat([buffer, data]); - }); - }), - ); - } - Promise.allSettled(promises).then(() => { - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }); - }), - ); + const readStream = pty.read; + readStream.on('data', (data) => { + buffer = Buffer.concat([buffer, data]); + }); - test( - 'can run concurrent shells', - () => - new Promise((done) => { - const oldFds = getOpenFds(); - const donePromises: Array> = []; - const readyPromises: Array> = []; - const writeStreams: Array = []; - - // We have local echo enabled, so we'll read the message twice. - const expectedResult = 'ready\r\nhello cat\r\nhello cat\r\n'; - - for (let i = 0; i < 10; i++) { - donePromises.push( - new Promise((accept) => { - let buffer = Buffer.from(''); - const pty = new Pty({ - command: '/bin/sh', - args: ['-c', 'echo ready ; exec cat'], - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - let result = buffer.toString(); - if (IS_DARWIN) { - // Darwin adds the visible EOT to the stream. - result = result.replace('^D\b\b', ''); - } - expect(result).toStrictEqual(expectedResult); - accept(); - }, - }); - - readyPromises.push( - new Promise((ready) => { - let readyMessageReceived = false; - const readStream = pty.read; - readStream.on('data', (data) => { - buffer = Buffer.concat([buffer, data]); - if (!readyMessageReceived) { - readyMessageReceived = true; - ready(); - } - }); - }), - ); - writeStreams.push(pty.write); - }), - ); - } - Promise.allSettled(readyPromises).then(() => { - // The message should end in newline so that the EOT can signal that the input has ended and not - // just the line. - const message = 'hello cat\n'; - for (const writeStream of writeStreams) { - writeStream.write(message); - writeStream.end(EOT); - } - }); - Promise.allSettled(donePromises).then(() => { - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }); - }), + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + + const lines = buffer.toString().trim().split('\n'); + expect(lines.length).toBe(n + 1); + for (let i = 0; i < n + 1; i++) { + expect(Number(lines[i]), `expected line ${i} to contain ${i} but got ${lines[i]}`).toBe(i); + } + + expect(getOpenFds()).toStrictEqual(oldFds); + } ); - test("doesn't break when executing non-existing binary", () => - new Promise((done) => { - const oldFds = getOpenFds(); - try { - new Pty({ - command: '/bin/this-does-not-exist', - onExit: () => { - expect(getOpenFds()).toStrictEqual(oldFds); - }, - }); - } catch (e: any) { - expect(e.message).toContain('No such file or directory'); - - done(); + test('doesnt miss large output from fast commands', async () => { + const payload = `hello`.repeat(4096); + let buffer = Buffer.from(''); + const onExit = vi.fn(); + + const pty = new Pty({ + command: '/bin/echo', + args: [ + '-n', + payload + ], + onExit, + }); + + const readStream = pty.read; + readStream.on('data', (data) => { + buffer = Buffer.concat([buffer, data]); + }); + + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + expect(buffer.toString().length).toBe(payload.length); + }); + + testSkipOnDarwin('does not leak files', async () => { + const oldFds = getOpenFds(); + const promises = []; + + for (let i = 0; i < 10; i++) { + const onExit = vi.fn(); + let buffer = Buffer.from(''); + + const pty = new Pty({ + command: '/bin/sh', + args: ['-c', 'sleep 0.1 ; ls /proc/$$/fd'], + onExit, + }); + + const readStream = pty.read; + readStream.on('data', (data) => { + buffer = Buffer.concat([buffer, data]); + }); + + promises.push( + vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)).then(() => { + expect(onExit).toHaveBeenCalledWith(null, 0); + expect( + buffer + .toString() + .trim() + .split(/\s+/) + .filter((fd) => { + // Some shells dup stdio to fd 255 for reasons. + return fd !== '255'; + }) + .toSorted(), + ).toStrictEqual(['0', '1', '2']); + }) + ); + } + + await Promise.all(promises); + expect(getOpenFds()).toStrictEqual(oldFds); + }); + + test('can run concurrent shells', async () => { + const oldFds = getOpenFds(); + const writeStreams: Array = []; + const buffers: Array = []; + const onExits: Array = []; + const expectedResult = 'hello cat\r\nhello cat\r\n'; + + // Create 10 concurrent shells + for (let i = 0; i < 10; i++) { + const onExit = vi.fn(); + onExits.push(onExit); + buffers[i] = Buffer.from(''); + + const pty = new Pty({ + command: '/bin/cat', + onExit, + }); + + const readStream = pty.read; + readStream.on('data', (data) => { + buffers[i] = Buffer.concat([buffers[i], data]); + }); + + writeStreams.push(pty.write); + pty.write.write('hello cat\n'); + } + + // Wait for initial output + await vi.waitFor(() => + buffers.every(buffer => buffer.toString().includes('hello cat\r\n')) + ); + + // Send EOT to all shells + for (const writeStream of writeStreams) { + writeStream.end(EOT); + } + + // Wait for all shells to exit + await Promise.all(onExits.map(onExit => + vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)) + )); + + // Verify results + for (let i = 0; i < 10; i++) { + expect(onExits[i]).toHaveBeenCalledWith(null, 0); + let result = buffers[i].toString(); + if (IS_DARWIN) { + result = result.replace('^D\b\b', ''); } - })); + expect(result).toStrictEqual(expectedResult); + } + + expect(getOpenFds()).toStrictEqual(oldFds); + }); + + test("doesn't break when executing non-existing binary", async () => { + const oldFds = getOpenFds(); + + await expect(async () => { + new Pty({ + command: '/bin/this-does-not-exist', + onExit: () => {}, + }); + }).rejects.toThrow('No such file or directory'); + + expect(getOpenFds()).toStrictEqual(oldFds); + }); }, ); @@ -492,52 +457,49 @@ describe('cgroup opts', () => { beforeEach(async () => { if (!IS_DARWIN) { // create a new cgroup with the right permissions - await exec("sudo cgcreate -g 'cpu:/test.slice'") - await exec("sudo chown -R $(id -u):$(id -g) /sys/fs/cgroup/cpu/test.slice") + await exec("sudo cgcreate -g 'cpu:/test.slice'"); + await exec("sudo chown -R $(id -u):$(id -g) /sys/fs/cgroup/cpu/test.slice"); } }); afterEach(async () => { if (!IS_DARWIN) { // remove the cgroup - await exec("sudo cgdelete cpu:/test.slice") + await exec("sudo cgdelete cpu:/test.slice"); } }); - testSkipOnDarwin('basic cgroup', () => new Promise((done) => { + testSkipOnDarwin('basic cgroup', async () => { const oldFds = getOpenFds(); let buffer = ''; + const onExit = vi.fn(); + const pty = new Pty({ command: '/bin/cat', args: ['/proc/self/cgroup'], cgroupPath: '/sys/fs/cgroup/cpu/test.slice', - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - expect(buffer).toContain('/test.slice'); - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }, + onExit, }); const readStream = pty.read; readStream.on('data', (data) => { buffer = data.toString(); }); - }) - ); - testOnlyOnDarwin('cgroup is not supported on darwin', () => { + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + expect(buffer).toContain('/test.slice'); + expect(getOpenFds()).toStrictEqual(oldFds); + }); + + testOnlyOnDarwin('cgroup is not supported on darwin', async () => { expect(() => { new Pty({ command: '/bin/cat', args: ['/proc/self/cgroup'], cgroupPath: '/sys/fs/cgroup/cpu/test.slice', - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - }, - }) + onExit: vi.fn(), + }); }).toThrowError(); }); }); diff --git a/wrapper.ts b/wrapper.ts index e47f63f..6babfc5 100644 --- a/wrapper.ts +++ b/wrapper.ts @@ -47,7 +47,7 @@ export class Pty { #fd: number; #handledClose: boolean = false; - #handledEndOfData: boolean = false; + #fdClosed: boolean = false; #socket: ReadStream; #writable: Writable; @@ -63,13 +63,14 @@ export class Pty { constructor(options: PtyOptions) { const realExit = options.onExit; - let markExited: (value: ExitResult) => void; + let markExited!: (value: ExitResult) => void; let exitResult: Promise = new Promise((resolve) => { markExited = resolve; }); - let markFdClosed: () => void; - let fdClosed = new Promise((resolve) => { - markFdClosed = resolve; + + let markReadFinished!: () => void; + let readFinished = new Promise((resolve) => { + markReadFinished = resolve; }); const mockedExit = (error: NodeJS.ErrnoException | null, code: number) => { markExited({ error, code }); @@ -89,23 +90,21 @@ export class Pty { }); // catch end events - const handleEnd = async () => { - if (this.#handledEndOfData) { + const handleClose = async () => { + if (this.#fdClosed) { return; } - this.#handledEndOfData = true; + this.#fdClosed = true; // must wait for fd close and exit result before calling real exit - await fdClosed; + await readFinished; const result = await exitResult; realExit(result.error, result.code); }; - this.read.on('end', handleEnd); - this.read.on('close', () => { - markFdClosed(); - }); + this.read.once('end', markReadFinished); + this.read.once('close', handleClose); // PTYs signal their done-ness with an EIO error. we therefore need to filter them out (as well as // cleaning up other spurious errors) so that the user doesn't need to handle them and be in @@ -123,7 +122,9 @@ export class Pty { // is nothing left to read and we can start tearing things down. If we hadn't received an // error so far, we are considered to be in good standing. this.read.off('error', handleError); - handleEnd(); + // emit 'end' to signal no more data + // this will trigger our 'end' handler which marks readFinished + this.read.emit('end'); return; } } @@ -144,7 +145,7 @@ export class Pty { } resize(size: Size) { - if (this.#handledClose || this.#handledEndOfData) { + if (this.#handledClose || this.#fdClosed) { return; }