Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a ring buffer to pipe data between IO objects #819

Merged
merged 8 commits into from
Jan 23, 2024
327 changes: 266 additions & 61 deletions src/exec/use_pty/pipe.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
io::{self, Read, Write},
io::{self, IoSlice, IoSliceMut, Read, Write},
marker::PhantomData,
os::fd::AsRawFd,
};
Expand Down Expand Up @@ -100,16 +100,100 @@
}
}

/// The size of the internal buffer of the pipe.
const BUFSIZE: usize = 6 * 1024;
struct RingBuffer {
storage: [u8; Self::LEN],
// The start index of the non-empty section of the buffer.
start: usize,
// The length of the non-empty section of the buffer.
len: usize,
}

impl RingBuffer {
/// The size of the internal storage of the ring buffer.
const LEN: usize = 8 * 1024;

/// Create a new, empty buffer.
fn new() -> Self {
Self {
storage: [0; Self::LEN],
start: 0,
len: 0,
}
}

fn is_full(&self) -> bool {
self.len == self.storage.len()
}

fn insert<R: Read>(&mut self, read: &mut R) -> io::Result<usize> {
let inserted_len = if self.is_empty() {
// Case 1.1. The buffer is empty, meaning that there are two empty slices in `storage`:
// `start..` and `..start`.
let (second_slice, first_slice) = self.storage.split_at_mut(self.start);
read.read_vectored(&mut [first_slice, second_slice].map(IoSliceMut::new))?
} else {
let &mut Self { start, len, .. } = self;
let end = start + len;
if end >= self.storage.len() {
// Case 1.2. The buffer is not empty and the non-empty section wraps around
// `storage`. Meaning that there is only one empty slice in `storage`: `end..start`.
let end = end % self.storage.len();
read.read(&mut self.storage[end..start])?

Check warning on line 141 in src/exec/use_pty/pipe.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/use_pty/pipe.rs#L140-L141

Added lines #L140 - L141 were not covered by tests
} else {
// Case 1.3. The buffer is non empty and the non-empty section is a contiguous
// slice of `storage`. Meaning that there are two empty slices in `storage`:
// `..start` and `end..`.
let (mid, first_slice) = self.storage.split_at_mut(end);
let second_slice = &mut mid[..start];
read.read_vectored(&mut [first_slice, second_slice].map(IoSliceMut::new))?
}
};

self.len += inserted_len;

Ok(inserted_len)
}

fn is_empty(&self) -> bool {
self.len == 0
}

fn remove<W: Write>(&mut self, write: &mut W) -> io::Result<usize> {
let removed_len = if self.is_full() {
// Case 2.1. The buffer is full, meaning that there are two non-empty slices in
// `storage`: `start..` and `..start`.
let (second_slice, first_slice) = self.storage.split_at(self.start);
write.write_vectored(&[first_slice, second_slice].map(IoSlice::new))?
} else {
let end = self.start + self.len;
if end >= self.storage.len() {
// Case 2.2. The buffer is not full and the non-empty section wraps around
// `storage`. Meanning that there are two non-empty slices in `storage`: `start..`
// and `..end`.
let end = end % self.storage.len();
let first_slice = &self.storage[self.start..];
let second_slice = &self.storage[..end];
write.write_vectored(&[first_slice, second_slice].map(IoSlice::new))?
} else {
// Case 2.3. The buffer is not full and the non-empty section is a contiguous slice
// of `storage.` Meaning that there is only one non-empty slice in `storage`:
// `start..end`.
write.write(&self.storage[self.start..end])?
}
};

self.start += removed_len;
self.start %= self.storage.len();

self.len -= removed_len;

Ok(removed_len)
}
}

/// A buffer that stores the bytes read from `R` before they are written to `W`.
struct Buffer<R, W> {
buffer: [u8; BUFSIZE],
/// The start of the busy section of the buffer.
start: usize,
/// The end of the busy section of the buffer.
end: usize,
internal: RingBuffer,
/// The handle for the event of the reader.
read_handle: EventHandle,
/// The handle for the event of the writer.
Expand All @@ -128,27 +212,13 @@
write_handle.ignore(registry);

Self {
buffer: [0; BUFSIZE],
start: 0,
end: 0,
internal: RingBuffer::new(),

Check warning on line 215 in src/exec/use_pty/pipe.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/use_pty/pipe.rs#L215

Added line #L215 was not covered by tests
read_handle,
write_handle,
marker: PhantomData,
}
}

/// Return true if the buffer is empty.
fn is_empty(&self) -> bool {
self.start == self.end
}

/// Return true if the buffer is full.
fn is_full(&self) -> bool {
// FIXME: This doesn't really mean that the buffer is full but it cannot be used for writes
// anyway.
self.end == BUFSIZE
}

/// Read bytes into the buffer.
///
/// Calling this function will block until `read` is ready to be read.
Expand All @@ -157,23 +227,17 @@
read: &mut R,
registry: &mut EventRegistry<T>,
) -> io::Result<()> {
// Don't read if the buffer is full.
if self.is_full() {
// If the buffer is full, there is nothing to be read.
if self.internal.is_full() {

Check warning on line 231 in src/exec/use_pty/pipe.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/use_pty/pipe.rs#L230-L231

Added lines #L230 - L231 were not covered by tests
self.read_handle.ignore(registry);
return Ok(());
}

// This is the remaining free section that follows the busy section of the buffer.
let buffer = &mut self.buffer[self.end..];

// Read `len` bytes from `read` into the buffer.
let len = read.read(buffer)?;
// Read bytes and insert them into the buffer.
let inserted_len = self.internal.insert(read)?;

Check warning on line 237 in src/exec/use_pty/pipe.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/use_pty/pipe.rs#L237

Added line #L237 was not covered by tests

// Mark the `len` bytes after the busy section as busy too.
self.end += len;

// If we read something, the buffer is not empty anymore and we can resume writing.
if len > 0 {
// If we inserted something, the buffer is not empty anymore and we can resume writing.
if inserted_len > 0 {

Check warning on line 240 in src/exec/use_pty/pipe.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/use_pty/pipe.rs#L240

Added line #L240 was not covered by tests
self.write_handle.resume(registry);
}

Expand All @@ -188,29 +252,17 @@
write: &mut W,
registry: &mut EventRegistry<T>,
) -> io::Result<()> {
// Don't write if the buffer is empty.
if self.is_empty() {
// If the buffer is empty, there is nothing to be written.
if self.internal.is_empty() {

Check warning on line 256 in src/exec/use_pty/pipe.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/use_pty/pipe.rs#L255-L256

Added lines #L255 - L256 were not covered by tests
self.write_handle.ignore(registry);
return Ok(());
}

// This is the busy section of the buffer.
let buffer = &self.buffer[self.start..self.end];

// Write the first `len` bytes of the busy section to `write`.
let len = write.write(buffer)?;

if len == buffer.len() {
// If we were able to write all the busy section, we can mark the whole buffer as free.
self.start = 0;
self.end = 0;
} else {
// Otherwise we just free the first `len` bytes of the busy section.
self.start += len;
}
// Remove bytes from the buffer and write them.
let removed_len = self.internal.remove(write)?;

Check warning on line 262 in src/exec/use_pty/pipe.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/use_pty/pipe.rs#L262

Added line #L262 was not covered by tests

// If we wrote something, the buffer is not full anymore and we can resume reading.
if len > 0 {
// If we removed something, the buffer is not full anymore and we can resume reading.
if removed_len > 0 {

Check warning on line 265 in src/exec/use_pty/pipe.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/use_pty/pipe.rs#L265

Added line #L265 was not covered by tests
self.read_handle.resume(registry);
}

Expand All @@ -219,16 +271,169 @@

/// Flush this buffer, ensuring that all the contents of its internal buffer are written.
fn flush(&mut self, write: &mut W) -> io::Result<()> {
// This is the busy section of the buffer.
let buffer = &self.buffer[self.start..self.end];
// Remove bytes from the buffer and write them.
self.internal.remove(write)?;

Check warning on line 275 in src/exec/use_pty/pipe.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/use_pty/pipe.rs#L274-L275

Added lines #L274 - L275 were not covered by tests

// Write the complete busy section to `write`.
write.write_all(buffer)?;
write.flush()
}

Check warning on line 278 in src/exec/use_pty/pipe.rs

View check run for this annotation

Codecov / codecov/patch

src/exec/use_pty/pipe.rs#L277-L278

Added lines #L277 - L278 were not covered by tests
}

// If we were able to write all the busy section, we can mark the whole buffer as free.
self.start = 0;
self.end = 0;
#[cfg(test)]
mod tests {
use super::RingBuffer;

write.flush()
#[test]
fn empty_buffer_is_empty() {
let buf = RingBuffer::new();

assert!(buf.is_empty());
}

#[test]
fn full_buffer_is_full() {
let mut buf = RingBuffer::new();

let inserted_len = buf.insert(&mut [0x45; RingBuffer::LEN].as_slice()).unwrap();
assert_eq!(inserted_len, RingBuffer::LEN);

assert!(buf.is_full());
}

#[test]
fn buffer_is_fifo() {
let mut buf = RingBuffer::new();

let expected = (0..=u8::MAX).collect::<Vec<u8>>();
let inserted_len = buf.insert(&mut expected.as_slice()).unwrap();
assert_eq!(inserted_len, expected.len());

let mut found = vec![];
let removed_len = buf.remove(&mut found).unwrap();
assert_eq!(removed_len, expected.len());

assert_eq!(expected, found);
}

#[test]
fn insert_into_empty_buffer_with_offset() {
const HALF_LEN: usize = RingBuffer::LEN / 2;
let mut buf = RingBuffer::new();

// This should leave the buffer empty but with the start field pointing to the middle of
// the buffer.
// ┌───────────────────┐
// │ │
// └───────────────────┘
// ▲
// │
// start
buf.insert(&mut [0u8; HALF_LEN].as_slice()).unwrap();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this particular case you could set the initial condition by directly setting the value of start to HALF_LEN. that way this unit test does not rely on insert and remove correctly working in cases 1.1 and 2.1.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I thought about it but I think I'd rather leave those tests as implementation agnostic as possible.

buf.remove(&mut vec![]).unwrap();

// Then we fill the first half of the buffer with ones and the second one with twos in a
// single insertion. This tests case 1.1.
// ┌─────────┬─────────┐
// │ 2 │ 1 │
// └─────────┴─────────┘
// ▲
// │
// start
let mut expected = vec![1; HALF_LEN];
expected.extend_from_slice(&[2; HALF_LEN]);
buf.insert(&mut expected.as_slice()).unwrap();

// When we remove all the elements of the buffer we should find them in the same order we
// inserted them. This tests case 2.1.
let mut found = vec![];
let removed_len = buf.remove(&mut found).unwrap();
assert_eq!(removed_len, expected.len());

assert_eq!(expected, found);
}

#[test]
fn insert_into_non_empty_wrapping_buffer() {
const QUARTER_LEN: usize = RingBuffer::LEN / 4;
let mut buf = RingBuffer::new();

// This should leave the buffer empty but with the start field pointing to the middle of
// the buffer.
// ┌───────────────────────┐
// │ │
// └───────────────────────┘
// ▲
// │
// start
buf.insert(&mut [0; 2 * QUARTER_LEN].as_slice()).unwrap();
buf.remove(&mut vec![]).unwrap();

// Then we fill one quarter of the buffer with ones. This gives us a non-empty buffer whose
// empty section is not contiguous.
// ┌───────────┬─────┬─────┐
// │ │ 1 │ │
// └───────────┴─────┴─────┘
// ▲
// │
// start
let mut expected = vec![1; QUARTER_LEN];
buf.insert(&mut expected.as_slice()).unwrap();
// Then we fill one quarter of the buffer with twos and another quarter of the buffer with
// threes in a single insertion. This tests case 1.2.
// ┌─────┬─────┬─────┬─────┐
// │ 3 │ │ 1 │ 2 │
// └─────┴─────┴─────┴─────┘
// ▲
// │
// start
let mut second_half = vec![2; QUARTER_LEN];
second_half.extend_from_slice(&[3; QUARTER_LEN]);
buf.insert(&mut second_half.as_slice()).unwrap();

expected.extend(second_half);

// When we remove all the elements of the buffer we should find them in the same order we
// inserted them. This tests case 2.2.
let mut found = vec![];
let removed_len = buf.remove(&mut found).unwrap();
assert_eq!(removed_len, expected.len());

assert_eq!(expected, found);
}

#[test]
fn insert_into_non_empty_non_wrapping_buffer() {
const QUARTER_LEN: usize = RingBuffer::LEN / 4;
let mut buf = RingBuffer::new();

// We fill one quarter of the buffer with ones. This gives us a non-empty buffer whose
// empty section is contiguous.
// ┌─────┬────────────────┐
// │ 1 │ │
// └─────┴────────────────┘
// ▲
// │
// └ start
let mut expected = vec![1; QUARTER_LEN];
buf.insert(&mut expected.as_slice()).unwrap();

// Then we fill one quarter of the buffer with twos. This tests case 1.3.
// ┌─────┬─────┬──────────┐
// │ 1 │ 2 │ │
// └─────┴─────┴──────────┘
// ▲
// │
// └ start
let second_half = vec![2; QUARTER_LEN];
buf.insert(&mut second_half.as_slice()).unwrap();

expected.extend(second_half);

// When we remove all the elements of the buffer we should find them in the same order we
// inserted them. This tests case 2.3.
let mut found = vec![];
let removed_len = buf.remove(&mut found).unwrap();
assert_eq!(removed_len, expected.len());

assert_eq!(expected, found);
}
}
Loading