Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External printer API rework #737

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 0 additions & 57 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ chrono = { version = "0.4.19", default-features = false, features = [
"clock",
"serde",
] }
crossbeam = { version = "0.8.2", optional = true }
crossterm = { version = "0.27.0", features = ["serde"] }
fd-lock = "3.0.3"
itertools = "0.12.0"
Expand All @@ -41,7 +40,7 @@ tempfile = "3.3.0"

[features]
bashisms = []
external_printer = ["crossbeam"]
external_printer = []
sqlite = ["rusqlite/bundled", "serde_json"]
sqlite-dynlib = ["rusqlite", "serde_json"]
system_clipboard = ["arboard"]
Expand Down
24 changes: 14 additions & 10 deletions examples/external_printer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,45 @@
// cargo run --example external_printer --features=external_printer

use {
reedline::ExternalPrinter,
reedline::{DefaultPrompt, Reedline, Signal},
reedline::{DefaultPrompt, ExternalPrinter, Reedline, Signal},
std::thread,
std::thread::sleep,
std::time::Duration,
};

fn main() {
let printer = ExternalPrinter::default();
// make a clone to use it in a different thread
let p_clone = printer.clone();
// get the Sender<String> to have full sending control
let p_sender = printer.sender();
let printer = ExternalPrinter::new();

// Get a clone of the sender to use in a separate thread.
let sender = printer.sender();

// Note that the senders can also be cloned.
// let sender_clone = sender.clone();

// external printer that prints a message every second
thread::spawn(move || {
let mut i = 1;
loop {
sleep(Duration::from_secs(1));
assert!(p_clone
.print(format!("Message {i} delivered.\nWith two lines!"))
assert!(sender
.send(format!("Message {i} delivered.\nWith two lines!").into())
.is_ok());
i += 1;
}
});

let sender = printer.sender();

// external printer that prints a bunch of messages after 3 seconds
thread::spawn(move || {
sleep(Duration::from_secs(3));
for _ in 0..10 {
sleep(Duration::from_millis(1));
assert!(p_sender.send("Fast Hello !".to_string()).is_ok());
assert!(sender.send("Hello!".into()).is_ok());
}
});

// create a `Reedline` struct and assign the external printer
let mut line_editor = Reedline::create().with_external_printer(printer);
let prompt = DefaultPrompt::default();

Expand Down
76 changes: 26 additions & 50 deletions src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,14 @@ use std::path::PathBuf;
use itertools::Itertools;
use nu_ansi_term::{Color, Style};

#[cfg(feature = "external_printer")]
use crate::ExternalPrinter;
use crate::{enums::ReedlineRawEvent, CursorConfig};
#[cfg(feature = "bashisms")]
use crate::{
history::SearchFilter,
menu_functions::{parse_selection_char, ParseAction},
};
#[cfg(feature = "external_printer")]
use {
crate::external_printer::ExternalPrinter,
crossbeam::channel::TryRecvError,
std::io::{Error, ErrorKind},
};
use {
crate::{
completion::{Completer, DefaultCompleter},
Expand Down Expand Up @@ -154,7 +150,7 @@ pub struct Reedline {
kitty_protocol: KittyProtocolGuard,

#[cfg(feature = "external_printer")]
external_printer: Option<ExternalPrinter<String>>,
external_printer: Option<ExternalPrinter>,
}

struct BufferEditor {
Expand Down Expand Up @@ -679,22 +675,24 @@ impl Reedline {
let mut paste_enter_state = false;

#[cfg(feature = "external_printer")]
if let Some(ref external_printer) = self.external_printer {
// get messages from printer as crlf separated "lines"
let messages = Self::external_messages(external_printer)?;
if !messages.is_empty() {
// print the message(s)
self.painter.print_external_message(
messages,
self.editor.line_buffer(),
prompt,
)?;
if let Some(printer) = &self.external_printer {
let any_messages = self.painter.print_external_messages(
printer.receiver(),
self.editor.line_buffer(),
prompt,
)?;

if any_messages {
self.repaint(prompt)?;
}
}

let mut latest_resize = None;
loop {

// There could be multiple events queued up!
// pasting text, resizes, blocking this thread (e.g. during debugging)
// We should be able to handle all of them as quickly as possible without causing unnecessary output steps.
while event::poll(Duration::from_millis(POLL_WAIT))? {
Comment on lines +691 to +695
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was some subtlety around when we crossterm::event::poll and crossterm::event::read to avoid idling in a hotter than necessary loop #651

So I am not sure if this do ... while to while transformation is correct.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, the current loop structure will block on the first read and later external messages will not get printed until the user types which is confusing. I wonder if there's a better solution though...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Damn you asynchronous concurrency....

Not blocking by polling certainly makes reading that channel possible.
Could we shove both kinds of events into one channel and block on that? (that would be the no async just threads everywhere path)

With how we dealt with crossterm and stdout so far I am certainly not screaming: cargo add tokio

Copy link
Member Author

@IanManske IanManske Feb 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the channel + threads should work, but I think that means there will have to be 3 threads in total:

  • One to forward messages from event::read
  • One to forward messages from the external printer
  • The main thread, to pull messages off the channel

It is possible to remove the external printer thread if we wrap the external printer sender in a new type. This way, we can allow the external thread to send messages directly to the merged channel (Reciever<ReedlineMessage>), instead of having to move data from the external channel Reciever<Vec<u8>> to the merged channel. E.g.,:

enum ReedlineMessage {
    Event(crossterm::Event),
    External(Vec<u8>),
}

struct ExternalPrinterSender(SyncSender<ReedlineMessage>);

impl ExternalPrinterSender {
    pub fn send(&self, data: Vec<u8>) -> SendError<Vec<u8> {
        self.0.send(ReedlineMessage::External(data)).map_err(...)
    }
}

The potential problem with this is that nu-system, or wherever job controls ends up, would need to take reedline as a dependency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Crossterm mentions a restriction on the threads their blocking functions can be run from and that it is incompatible with their async oriented EventStream

https://docs.rs/crossterm/latest/crossterm/event/index.html

Maybe we need to bite the Future bullet at some point? (providing an executor would suck for library users, but we may want to unlock things like async prompt or completion updates.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Crossterm mentions a restriction on the threads their blocking functions can be run from

This should be fine, one thread can both poll and read.

Maybe we need to bite the Future bullet at some point?

In the future (ha), it might be necessary. So far, it looks like we can probably work around it?

Copy link
Member Author

@IanManske IanManske Feb 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two other possible solutions:

  1. Use a higher poll wait time like it was suggested in external_printer does not print until I press some key #757. We could use the higher wait time for the first poll only and then switch back to the current wait time after the first read.

  2. Have only two threads:

    1. the main thread: has the old poll + read loop and prints the prompt, etc.
    2. the external printer thread: takes messages off the channel receiver and prints these messages.

    Then, we synchronize the two threads using a Mutex or something similar to ensure that only one of them prints to the terminal at a time. Compared to the three thread approach before, this has the benefit that it should be easier to isolate external_printer handling, so users that do not use the external_printer feature should not have to pay any extra performance cost.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how 1) would play out both in terms of external latency and syscall churn. (would need to see it)

  1. sounds more involved. Question would be if the crossterm cursor or terminal size polling operations may collide with streaming output from another thread. But yeah with the appropriate locking this may be workable.

match event::read()? {
Event::Resize(x, y) => {
latest_resize = Some((x, y));
Expand Down Expand Up @@ -723,13 +721,6 @@ impl Reedline {
}
}
}

// There could be multiple events queued up!
// pasting text, resizes, blocking this thread (e.g. during debugging)
// We should be able to handle all of them as quickly as possible without causing unnecessary output steps.
if !event::poll(Duration::from_millis(POLL_WAIT))? {
break;
}
}

if let Some((x, y)) = latest_resize {
Expand Down Expand Up @@ -1730,38 +1721,23 @@ impl Reedline {
)
}

/// Adds an external printer
/// Returns a reference to the external printer, or none if one was not set
///
/// ## Required feature:
/// `external_printer`
#[cfg(feature = "external_printer")]
pub fn with_external_printer(mut self, printer: ExternalPrinter<String>) -> Self {
self.external_printer = Some(printer);
self
pub fn external_printer(&self) -> Option<&ExternalPrinter> {
self.external_printer.as_ref()
}

/// Add a new external printer, or overwrite the existing one
///
/// ## Required feature:
/// `external_printer`
#[cfg(feature = "external_printer")]
fn external_messages(external_printer: &ExternalPrinter<String>) -> Result<Vec<String>> {
let mut messages = Vec::new();
loop {
let result = external_printer.receiver().try_recv();
match result {
Ok(line) => {
let lines = line.lines().map(String::from).collect::<Vec<_>>();
messages.extend(lines);
}
Err(TryRecvError::Empty) => {
break;
}
Err(TryRecvError::Disconnected) => {
return Err(Error::new(
ErrorKind::NotConnected,
TryRecvError::Disconnected,
));
}
}
}
Ok(messages)
pub fn with_external_printer(mut self, printer: ExternalPrinter) -> Self {
self.external_printer = Some(printer);
self
}

fn submit_buffer(&mut self, prompt: &dyn Prompt) -> io::Result<EventStatus> {
Expand Down
104 changes: 51 additions & 53 deletions src/external_printer.rs
Original file line number Diff line number Diff line change
@@ -1,72 +1,70 @@
//! To print messages while editing a line
//!
//! See example:
//!
//! ``` shell
//! cargo run --example external_printer --features=external_printer
//! ```
#[cfg(feature = "external_printer")]
use {
crossbeam::channel::{bounded, Receiver, SendError, Sender},
std::fmt::Display,
};
use std::sync::mpsc::{self, Receiver, SyncSender};

#[cfg(feature = "external_printer")]
pub const EXTERNAL_PRINTER_DEFAULT_CAPACITY: usize = 20;

/// An ExternalPrinter allows to print messages of text while editing a line.
/// The message is printed as a new line, the line-edit will continue below the
/// output.
/// An external printer allows one to print messages of text while editing a line.
/// The message is printed as a new line, and the line-edit will continue below the output.
///
/// Use [`sender`](Self::sender) to receive a [`SyncSender`] for use in other threads.
///
/// ## Required feature:
/// `external_printer`
#[cfg(feature = "external_printer")]
#[derive(Debug, Clone)]
pub struct ExternalPrinter<T>
where
T: Display,
{
sender: Sender<T>,
receiver: Receiver<T>,
#[derive(Debug)]
pub struct ExternalPrinter {
sender: SyncSender<Vec<u8>>,
receiver: Receiver<Vec<u8>>,
}
sholderbach marked this conversation as resolved.
Show resolved Hide resolved

#[cfg(feature = "external_printer")]
impl<T> ExternalPrinter<T>
where
T: Display,
{
/// Creates an ExternalPrinter to store lines with a max_cap
pub fn new(max_cap: usize) -> Self {
let (sender, receiver) = bounded::<T>(max_cap);
impl ExternalPrinter {
/// The default maximum number of lines that can be queued up for printing
pub const DEFAULT_CAPACITY: usize = 20;

/// Create a new `ExternalPrinter` with the [default capacity](Self::DEFAULT_CAPACITY)
pub fn new() -> Self {
Self::default()
}

/// Create a new `ExternalPrinter` with the given capacity
///
/// The capacity determines the maximum number of lines that can be queued up for printing
/// before subsequent [`send`](SyncSender::send) calls on the [`sender`](Self::sender) will block.
pub fn with_capacity(capacity: usize) -> Self {
let (sender, receiver) = mpsc::sync_channel(capacity);
Self { sender, receiver }
}
/// Gets a Sender to use the printer externally by sending lines to it
pub fn sender(&self) -> Sender<T> {

/// Returns a new [`SyncSender`] which can be used in other threads to queue messages to print
pub fn sender(&self) -> SyncSender<Vec<u8>> {
self.sender.clone()
}
/// Receiver to get messages if any
pub fn receiver(&self) -> &Receiver<T> {

pub(crate) fn receiver(&self) -> &Receiver<Vec<u8>> {
&self.receiver
sholderbach marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Convenience method if the whole Printer is cloned, blocks if max_cap is reached.
///
pub fn print(&self, line: T) -> Result<(), SendError<T>> {
self.sender.send(line)
impl Default for ExternalPrinter {
fn default() -> Self {
Self::with_capacity(Self::DEFAULT_CAPACITY)
}
}

/// Convenience method to get a line if any, doesn´t block.
pub fn get_line(&self) -> Option<T> {
self.receiver.try_recv().ok()
#[cfg(test)]
mod tests {
use super::*;

#[test]
fn impls_send() {
fn impls_send<T: Send>(_: &T) {}

let printer = ExternalPrinter::new();
impls_send(&printer);
impls_send(&printer.sender())
}
}

#[cfg(feature = "external_printer")]
impl<T> Default for ExternalPrinter<T>
where
T: Display,
{
fn default() -> Self {
Self::new(EXTERNAL_PRINTER_DEFAULT_CAPACITY)
#[test]
fn receives_message() {
let printer = ExternalPrinter::new();
let sender = printer.sender();
assert!(sender.send(b"some text".into()).is_ok());
assert_eq!(printer.receiver().recv(), Ok(b"some text".into()))
}
}
Loading
Loading