From dfc2153f11e8e7d74913e8a942d22b0bd0b2cb74 Mon Sep 17 00:00:00 2001 From: leoshimo <56844000+leoshimo@users.noreply.github.com> Date: Thu, 7 Dec 2023 15:06:31 -0700 Subject: [PATCH] fix: Move rustyline to separate thread to detect client disconnecting --- vrsctl/src/main.rs | 36 ++++++++++++++++++-------- vrsctl/src/repl.rs | 64 +++++++++++++++++++++++++++------------------- 2 files changed, 63 insertions(+), 37 deletions(-) diff --git a/vrsctl/src/main.rs b/vrsctl/src/main.rs index c68e696..5536941 100644 --- a/vrsctl/src/main.rs +++ b/vrsctl/src/main.rs @@ -24,15 +24,31 @@ async fn main() -> Result<()> { debug!("Connected to runtime: {:?}", conn); let conn = Connection::new(conn); - let mut client = Client::new(conn); - - if let Some(cmd) = args.get_one::("command") { - run_cmd(&mut client, cmd).await - } else if let Some(file) = args.get_one::("file") { - run_file(&mut client, file).await - } else { - repl::run(&mut client).await + let client = Client::new(conn); + + let run = async { + if let Some(cmd) = args.get_one::("command") { + run_cmd(&client, cmd).await + } else if let Some(file) = args.get_one::("file") { + run_file(&client, file).await + } else { + repl::run(&client).await + } + }; + + tokio::select! { + biased; + res = run => { + if let Err(e) = res { + eprintln!("Terminated with error: {e}"); + } + }, + _ = client.closed() => { + eprintln!("Connection closed"); + } } + + Ok(()) } /// The clap CLI interface @@ -43,7 +59,7 @@ fn cli() -> clap::Command { } /// Run a single request -async fn run_cmd(client: &mut Client, cmd: &str) -> Result<()> { +async fn run_cmd(client: &Client, cmd: &str) -> Result<()> { let f = lyric::parse(cmd)?; let resp = client.request(f).await?; match resp.contents { @@ -54,7 +70,7 @@ async fn run_cmd(client: &mut Client, cmd: &str) -> Result<()> { } /// Run a script file -async fn run_file(client: &mut Client, file: &str) -> Result<()> { +async fn run_file(client: &Client, file: &str) -> Result<()> { let f = File::open(file).with_context(|| format!("Failed to open {}", file))?; let mut f = BufReader::new(f); let mut line = String::new(); diff --git a/vrsctl/src/repl.rs b/vrsctl/src/repl.rs index e5cf8e5..318d5a8 100644 --- a/vrsctl/src/repl.rs +++ b/vrsctl/src/repl.rs @@ -2,43 +2,28 @@ use anyhow::Result; use lyric::Form; -use std::path::PathBuf; +use std::{path::PathBuf, thread}; +use tokio::sync::mpsc; use vrs::Client; use crate::editor::{self, Editor}; -use rustyline::error::ReadlineError; +use rustyline::{error::ReadlineError, ExternalPrinter}; /// Entrypoint for running REPL. /// Returns Err if REPL terminated with error -pub(crate) async fn run(client: &mut Client) -> Result<()> { +pub(crate) async fn run(client: &Client) -> Result<()> { let mut rl = editor::editor()?; - // let mut printer = rl.create_external_printer()?; + let mut printer = rl.create_external_printer()?; let history = history_file(); + let (line_tx, mut line_rx) = mpsc::channel(32); - load_history(&mut rl, &history); - - loop { + // Uses separate thread for rustyline - rustyline is not async + thread::spawn(move || loop { + load_history(&mut rl, &history); match rl.readline("vrs> ") { Ok(line) => { let _ = rl.add_history_entry(line.as_str()); - let f = match lyric::parse(&line) { - Ok(f) => f, - Err(e) => { - eprintln!("{}", e); - continue; - } - }; - match client.request(f).await { - Ok(resp) => match resp.contents { - // TODO: Bringup different formats for clients - e.g. REPL should use text format only - Ok(Form::RawString(s)) => println!("{}", s), - Ok(c) => println!("{}", c), - Err(e) => eprintln!("{}", e), - }, - Err(e) => { - eprintln!("{}", e); - } - } + let _ = line_tx.blocking_send(line); } Err(ReadlineError::Interrupted) | Err(ReadlineError::Eof) => break, Err(err) => { @@ -46,9 +31,34 @@ pub(crate) async fn run(client: &mut Client) -> Result<()> { break; } } - } + save_history(&mut rl, &history); + }); - save_history(&mut rl, &history); + loop { + let line = match line_rx.recv().await { + Some(l) => l, + None => break, // rustyline exited + }; + let f = match lyric::parse(&line) { + Ok(f) => f, + Err(e) => { + eprintln!("{}", e); + continue; + } + }; + match client.request(f).await { + Ok(resp) => match resp.contents { + // TODO: Bringup different formats for clients - e.g. REPL should use text format only + Ok(Form::RawString(s)) => printer.print(format!("{s}"))?, + Ok(c) => printer.print(format!("{c}"))?, + Err(e) => eprintln!("{}", e), + }, + Err(e) => { + eprintln!("{}", e); + break; + } + } + } client.shutdown().await; Ok(())