Skip to content

Commit

Permalink
Avoid the Bun 1.1.7 bug (#9)
Browse files Browse the repository at this point in the history
Bun currently has a bug where raw FDs don't interact well with
`fs.createReadStream`, and the result is that not all data is seen by
the `'data'` callback: oven-sh/bun#9907

This change is an egregious workaround that makes Rust read the file
until the process closes it. It is suboptimal because of all the data
copying that happens, and crossing the FFI barrier also takes some time,
but at least this should let us not be completely blocked. Now there is
one more (optional) argument to `Pty` that is a replacement of the
`'data'` callback.

```shell
~/ruspty$ cat test.js 
const { Pty } = require('./index');
const fs = require('fs');

const ENV = process.env;
const CWD = '.';

let firstread = true;

const pty = new Pty(
  'sh',
  [],
  ENV,
  CWD,
  { rows: 24, cols: 80 },
  (...result) => {
    console.log({ result });
    pty.close();
  },
  (err, chunk) => {
    if (err !== null) {
      throw err;
    }
    console.log(chunk.toString());
    if (firstread) {
      write.write('ls\n', () => {
        console.log('ls written');
        write.end('exit\n', () => {
          console.log('end done');
        });
      });
      firstread = false;
    }
  },
);

const write = fs.createWriteStream('', {
  fd: pty.fd(),
  autoClose: true,
});

write.on('close', () => {
  console.log('write close');
});

write.on('error', (err) => {
  if (err.code && err.code.indexOf('EIO') !== -1) {
    console.log('YAY');
  } else {
    console.log('write error', { err });
  }
});

console.log('gonna wait');
setTimeout(() => {
  console.log('done, hopefully');
}, 3000);
~/ruspty$ ./node_modules/.bin/bun --version 
1.1.7
~/ruspty$ ./node_modules/.bin/bun test.js 
gonna wait
sh-5.2$ 
ls written
end done
ls

exit

write close
1.0.26.txt  build.rs    index.js       package-lock.json          src
1.1.7.txt   bun.lockb   index.test.ts  package.json               target
Cargo.lock  flake.lock  node.txt       replit.nix                 test.js
Cargo.toml  flake.nix   node_modules   ruspty.linux-x64-gnu.node
README.md   index.d.ts  npm            rustfmt.toml

sh-5.2$ 
exit

{
  result: [ null, 0 ],
}
done, hopefully
```

---------

Co-authored-by: Szymon Kaliski <[email protected]>
  • Loading branch information
lhchavez and szymonkaliski authored May 6, 2024
1 parent 443152a commit d83a57a
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ crate-type = ["cdylib"]
# Default enable napi4 feature, see https://nodejs.org/api/n-api.html#node-api-version-matrix
napi = { version = "2.12.2", default-features = false, features = ["napi4"] }
napi-derive = "2.12.2"
rustix = "0.38.30"
rustix = { version = "0.38.30", features = ["event"] }
rustix-openpty = "0.1.1"
libc = "0.2.152"

Expand Down
4 changes: 4 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export interface PtyOptions {
dir?: string
size?: Size
onExit: (err: null | Error, exitCode: number) => void
onData?: (err: null | Error, data: Buffer) => void
}
/** A size struct to pass to resize. */
export interface Size {
Expand Down Expand Up @@ -62,6 +63,9 @@ export interface Size {
* // TODO: Handle the error.
* });
* ```
*
* The last parameter (a callback that gets stdin chunks) is optional and is only there for
* compatibility with bun 1.1.7.
*/
export class Pty {
/** The pid of the forked process. */
Expand Down
25 changes: 25 additions & 0 deletions index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,31 @@ describe('PTY', () => {
Bun.write(pty.fd(), message + EOT + EOT);
});

// This test is not supported on Darwin at all.
(os.type() !== 'Darwin' ? test : test.skip)(
'works with data callback',
(done) => {
const message = 'hello bun\n';
let buffer = '';

const pty = new Pty({
command: '/bin/cat',
onExit: () => {
expect(buffer).toBe('hello bun\r\nhello bun\r\n');
pty.close();

done();
},
onData: (err, chunk) => {
expect(err).toBeNull();
buffer += chunk.toString();
},
});

Bun.write(pty.fd(), message + EOT + EOT);
},
);

test("doesn't break when executing non-existing binary", (done) => {
try {
new Pty({
Expand Down
153 changes: 133 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ use std::process::{Command, Stdio};
use std::thread;

use libc::{self, c_int};
use napi::bindgen_prelude::JsFunction;
use napi::bindgen_prelude::{Buffer, JsFunction};
use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode};
use napi::Status::GenericFailure;
use napi::{self, Env};
use rustix_openpty::openpty;
use rustix_openpty::rustix::termios::{self, InputModes, OptionalActions, Winsize};

#[cfg(target_os = "linux")]
use rustix::event::{poll, PollFd, PollFlags};

#[macro_use]
extern crate napi_derive;

Expand Down Expand Up @@ -61,6 +64,9 @@ extern crate napi_derive;
/// // TODO: Handle the error.
/// });
/// ```
///
/// The last parameter (a callback that gets stdin chunks) is optional and is only there for
/// compatibility with bun 1.1.7.
#[napi]
#[allow(dead_code)]
struct Pty {
Expand All @@ -81,6 +87,8 @@ struct PtyOptions {
pub size: Option<Size>,
#[napi(ts_type = "(err: null | Error, exitCode: number) => void")]
pub on_exit: JsFunction,
#[napi(ts_type = "(err: null | Error, data: Buffer) => void")]
pub on_data: Option<JsFunction>,
}

/// A size struct to pass to resize.
Expand Down Expand Up @@ -219,35 +227,140 @@ impl Pty {
// analysis to ensure that every single call goes through the wrapper to avoid double `wait`'s
// on a child.
// - Have a single thread loop where other entities can register children (by sending the pid
// over a channel) and this loop can use `epoll` to listen for each child's `pidfd` for when
// over a channel) and this loop can use `poll` to listen for each child's `pidfd` for when
// they are ready to be `wait`'ed. This has the inconvenience that it consumes one FD per child.
//
// For discussion check out: https://github.com/replit/ruspty/pull/1#discussion_r1463672548
let ts_on_exit: ThreadsafeFunction<i32, ErrorStrategy::CalleeHandled> = opts
.on_exit
.create_threadsafe_function(0, |ctx| ctx.env.create_int32(ctx.value).map(|v| vec![v]))?;
thread::spawn(move || match child.wait() {
Ok(status) => {
if status.success() {
ts_on_exit.call(Ok(0), ThreadsafeFunctionCallMode::Blocking);
} else {
ts_on_exit.call(
Ok(status.code().unwrap_or(-1)),
let ts_on_data = opts
.on_data
.map(|on_data| {
Ok::<
(
ThreadsafeFunction<Buffer, ErrorStrategy::CalleeHandled>,
OwnedFd,
),
napi::Error,
>((
on_data.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?,
match controller_fd.try_clone() {
Ok(fd) => Ok(fd),
Err(err) => Err(napi::Error::new(
GenericFailure,
format!(
"OS error when setting up child process wait: {}",
err.raw_os_error().unwrap_or(-1)
),
)),
}?,
))
})
.transpose()?;
thread::spawn(move || {
#[cfg(target_os = "linux")]
{
// The following code only works on Linux due to the reliance on pidfd.
use rustix::process::{pidfd_open, Pid, PidfdFlags};

if let Some((ts_on_data, controller_fd)) = ts_on_data {
if let Err(err) = || -> Result<(), napi::Error> {
let pidfd = pidfd_open(
unsafe { Pid::from_raw_unchecked(child.id() as i32) },
PidfdFlags::empty(),
)
.map_err(|err| napi::Error::new(GenericFailure, format!("pidfd_open: {:#?}", err)))?;
let mut poll_fds = [
PollFd::new(&controller_fd, PollFlags::IN),
PollFd::new(&pidfd, PollFlags::IN),
];
let mut buf = [0u8; 16 * 1024];
loop {
for poll_fd in &mut poll_fds[..] {
poll_fd.clear_revents();
}
poll(&mut poll_fds, -1).map_err(|err| {
napi::Error::new(
GenericFailure,
format!("OS error when waiting for child read: {:#?}", err),
)
})?;
// Always check the controller FD first to see if it has any events.
if poll_fds[0].revents().contains(PollFlags::IN) {
match rustix::io::read(&controller_fd, &mut buf) {
Ok(n) => {
ts_on_data.call(
Ok(buf[..n as usize].into()),
ThreadsafeFunctionCallMode::Blocking,
);
}
Err(errno) => {
if errno == rustix::io::Errno::AGAIN || errno == rustix::io::Errno::INTR {
// These two errors are safe to retry.
continue;
}
if errno == rustix::io::Errno::IO {
// This error happens when the child closes. We can simply break the loop.
return Ok(());
}
return Err(napi::Error::new(
GenericFailure,
format!("OS error when reading from child: {:#?}", errno,),
));
}
}
// If there was data, keep trying to read this FD.
continue;
}

// Now that we're sure that the controller FD doesn't have any events, we have
// successfully drained the child's output, so we can now check if the child has
// exited.
if poll_fds[1].revents().contains(PollFlags::IN) {
return Ok(());
}
}
}() {
ts_on_data.call(Err(err), ThreadsafeFunctionCallMode::Blocking);
}
}
}
#[cfg(not(target_os = "linux"))]
{
if let Some((ts_on_data, _controller_fd)) = ts_on_data {
ts_on_data.call(
Err(napi::Error::new(
GenericFailure,
"the data callback is only implemented in Linux",
)),
ThreadsafeFunctionCallMode::Blocking,
);
}
}
Err(err) => {
ts_on_exit.call(
Err(napi::Error::new(
GenericFailure,
format!(
"OS error when waiting for child process to exit: {}",
err.raw_os_error().unwrap_or(-1)
),
)),
ThreadsafeFunctionCallMode::Blocking,
);
match child.wait() {
Ok(status) => {
if status.success() {
ts_on_exit.call(Ok(0), ThreadsafeFunctionCallMode::Blocking);
} else {
ts_on_exit.call(
Ok(status.code().unwrap_or(-1)),
ThreadsafeFunctionCallMode::Blocking,
);
}
}
Err(err) => {
ts_on_exit.call(
Err(napi::Error::new(
GenericFailure,
format!(
"OS error when waiting for child process to exit: {}",
err.raw_os_error().unwrap_or(-1)
),
)),
ThreadsafeFunctionCallMode::Blocking,
);
}
}
});

Expand Down

0 comments on commit d83a57a

Please sign in to comment.