Skip to content

Commit

Permalink
fix: relay Signals reset to plugins (nushell#13510)
Browse files Browse the repository at this point in the history
This PR will close nushell#13501

# Description

This PR expands on [the relay of signals to running plugin
processes](nushell#13181). The Ctrlc
relay has been generalized to SignalAction::Interrupt and when
reset_signal is called on the main EngineState, a SignalAction::Reset is
now relayed to running plugins.

# User-Facing Changes

The signal handler closure now takes a `signals::SignalAction`, while
previously it took no arguments. The handler will now be called on both
interrupt and reset. The method to register a handler on the plugin side
is now called `register_signal_handler` instead of
`register_ctrlc_handler`
[example](https://github.com/nushell/nushell/pull/13510/files#diff-3e04dff88fd0780a49778a3d1eede092ec729a1264b4ef07ca0d2baa859dad05L38).
This will only affect plugin authors who have started making use of
nushell#13181, which isn't currently
part of an official release.

The change will also require all of user's plugins to be recompiled in
order that they don't error when a signal is received on the
PluginInterface.

# Testing

```
: example ctrlc
interrupt status: false
waiting for interrupt signal...
^Cinterrupt status: true
peace.
Error:   × Operation interrupted
   ╭─[display_output hook:1:1]
 1 │ if (term size).columns >= 100 { table -e } else { table }
   · ─┬
   ·  ╰── This operation was interrupted
   ╰────

: example ctrlc
interrupt status: false   <-- NOTE status is false
waiting for interrupt signal...
^Cinterrupt status: true
peace.
Error:   × Operation interrupted
   ╭─[display_output hook:1:1]
 1 │ if (term size).columns >= 100 { table -e } else { table }
   · ─┬
   ·  ╰── This operation was interrupted
   ╰────
   ```
  • Loading branch information
cablehead authored Aug 6, 2024
1 parent 73e8de9 commit 1cd0544
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 79 deletions.
8 changes: 4 additions & 4 deletions crates/nu-plugin-engine/src/interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use nu_plugin_protocol::{
};
use nu_protocol::{
ast::Operator, engine::Sequence, CustomValue, IntoSpanned, PipelineData, PluginMetadata,
PluginSignature, ShellError, Signals, Span, Spanned, Value,
PluginSignature, ShellError, SignalAction, Signals, Span, Spanned, Value,
};
use nu_utils::SharedCow;
use std::{
Expand Down Expand Up @@ -664,9 +664,9 @@ impl PluginInterface {
self.flush()
}

/// Send the plugin a ctrl-c signal.
pub fn ctrlc(&self) -> Result<(), ShellError> {
self.write(PluginInput::Ctrlc)?;
/// Send the plugin a signal.
pub fn signal(&self, action: SignalAction) -> Result<(), ShellError> {
self.write(PluginInput::Signal(action))?;
self.flush()
}

Expand Down
24 changes: 11 additions & 13 deletions crates/nu-plugin-engine/src/persistent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use crate::{
use super::{PluginInterface, PluginSource};
use nu_plugin_core::CommunicationMode;
use nu_protocol::{
engine::{ctrlc, EngineState, Stack},
PluginGcConfig, PluginIdentity, PluginMetadata, RegisteredPlugin, ShellError,
engine::{EngineState, Stack},
HandlerGuard, Handlers, PluginGcConfig, PluginIdentity, PluginMetadata, RegisteredPlugin,
ShellError,
};
use std::{
collections::HashMap,
Expand Down Expand Up @@ -37,8 +38,8 @@ struct MutableState {
preferred_mode: Option<PreferredCommunicationMode>,
/// Garbage collector config
gc_config: PluginGcConfig,
/// RAII guard for this plugin's ctrl-c handler
ctrlc_guard: Option<ctrlc::Guard>,
/// RAII guard for this plugin's signal handler
signal_guard: Option<HandlerGuard>,
}

#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -66,7 +67,7 @@ impl PersistentPlugin {
metadata: None,
preferred_mode: None,
gc_config,
ctrlc_guard: None,
signal_guard: None,
}),
}
}
Expand Down Expand Up @@ -303,29 +304,26 @@ impl RegisteredPlugin for PersistentPlugin {
self
}

fn configure_ctrlc_handler(
self: Arc<Self>,
handlers: &ctrlc::Handlers,
) -> Result<(), ShellError> {
fn configure_signal_handler(self: Arc<Self>, handlers: &Handlers) -> Result<(), ShellError> {
let guard = {
// We take a weakref to the plugin so that we don't create a cycle to the
// RAII guard that will be stored on the plugin.
let plugin = Arc::downgrade(&self);
handlers.register(Box::new(move || {
// write a Ctrl-C packet through the PluginInterface if the plugin is alive and
handlers.register(Box::new(move |action| {
// write a signal packet through the PluginInterface if the plugin is alive and
// running
if let Some(plugin) = plugin.upgrade() {
if let Ok(mutable) = plugin.mutable.lock() {
if let Some(ref running) = mutable.running {
let _ = running.interface.ctrlc();
let _ = running.interface.signal(action);
}
}
}
}))?
};

if let Ok(mut mutable) = self.mutable.lock() {
mutable.ctrlc_guard = Some(guard);
mutable.signal_guard = Some(guard);
}

Ok(())
Expand Down
7 changes: 4 additions & 3 deletions crates/nu-plugin-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ pub mod test_util;

use nu_protocol::{
ast::Operator, engine::Closure, ByteStreamType, Config, DeclId, LabeledError, PipelineData,
PipelineMetadata, PluginMetadata, PluginSignature, ShellError, Span, Spanned, Value,
PipelineMetadata, PluginMetadata, PluginSignature, ShellError, SignalAction, Span, Spanned,
Value,
};
use nu_utils::SharedCow;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -245,8 +246,8 @@ pub enum PluginInput {
Drop(StreamId),
/// See [`StreamMessage::Ack`].
Ack(StreamId),
/// Signal a ctrlc event
Ctrlc,
/// Relay signals to the plugin
Signal(SignalAction),
}

impl TryFrom<PluginInput> for StreamMessage {
Expand Down
35 changes: 18 additions & 17 deletions crates/nu-plugin/src/plugin/interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use nu_plugin_protocol::{
PluginOutput, ProtocolInfo,
};
use nu_protocol::{
engine::{ctrlc, Closure, Sequence},
Config, DeclId, LabeledError, PipelineData, PluginMetadata, PluginSignature, ShellError,
Signals, Span, Spanned, Value,
engine::{Closure, Sequence},
Config, DeclId, Handler, HandlerGuard, Handlers, LabeledError, PipelineData, PluginMetadata,
PluginSignature, ShellError, SignalAction, Signals, Span, Spanned, Value,
};
use nu_utils::SharedCow;
use std::{
Expand Down Expand Up @@ -64,10 +64,11 @@ struct EngineInterfaceState {
mpsc::Sender<(EngineCallId, mpsc::Sender<EngineCallResponse<PipelineData>>)>,
/// The synchronized output writer
writer: Box<dyn PluginWrite<PluginOutput>>,
// Mirror signals from `EngineState`
/// Mirror signals from `EngineState`. You can make use of this with
/// `engine_interface.signals()` when constructing a Stream that requires signals
signals: Signals,
/// Registered Ctrl-C handlers
ctrlc_handlers: ctrlc::Handlers,
/// Registered signal handlers
signal_handlers: Handlers,
}

impl std::fmt::Debug for EngineInterfaceState {
Expand Down Expand Up @@ -122,7 +123,7 @@ impl EngineInterfaceManager {
engine_call_subscription_sender: subscription_tx,
writer: Box::new(writer),
signals: Signals::new(Arc::new(AtomicBool::new(false))),
ctrlc_handlers: ctrlc::Handlers::new(),
signal_handlers: Handlers::new(),
}),
protocol_info_mut,
plugin_call_sender: Some(plug_tx),
Expand Down Expand Up @@ -337,9 +338,12 @@ impl InterfaceManager for EngineInterfaceManager {
});
self.send_engine_call_response(id, response)
}
PluginInput::Ctrlc => {
self.state.signals.trigger();
self.state.ctrlc_handlers.run();
PluginInput::Signal(action) => {
match action {
SignalAction::Interrupt => self.state.signals.trigger(),
SignalAction::Reset => self.state.signals.reset(),
}
self.state.signal_handlers.run(action);
Ok(())
}
}
Expand Down Expand Up @@ -521,13 +525,10 @@ impl EngineInterface {
self.state.writer.is_stdout()
}

/// Register a closure which will be called when the engine receives a Ctrl-C signal. Returns a
/// RAII guard that will keep the closure alive until it is dropped.
pub fn register_ctrlc_handler(
&self,
handler: ctrlc::Handler,
) -> Result<ctrlc::Guard, ShellError> {
self.state.ctrlc_handlers.register(handler)
/// Register a closure which will be called when the engine receives an interrupt signal.
/// Returns a RAII guard that will keep the closure alive until it is dropped.
pub fn register_signal_handler(&self, handler: Handler) -> Result<HandlerGuard, ShellError> {
self.state.signal_handlers.register(handler)
}

/// Get the full shell configuration from the engine. As this is quite a large object, it is
Expand Down
21 changes: 12 additions & 9 deletions crates/nu-protocol/src/engine/engine_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use crate::{
ast::Block,
debugger::{Debugger, NoopDebugger},
engine::{
ctrlc,
usage::{build_usage, Usage},
CachedFile, Command, CommandType, EnvVars, OverlayFrame, ScopeFrame, Stack, StateDelta,
Variable, Visibility, DEFAULT_OVERLAY_NAME,
},
eval_const::create_nu_constant,
BlockId, Category, Config, DeclId, FileId, GetSpan, HistoryConfig, Module, ModuleId, OverlayId,
ShellError, Signals, Signature, Span, SpanId, Type, Value, VarId, VirtualPathId,
BlockId, Category, Config, DeclId, FileId, GetSpan, Handlers, HistoryConfig, Module, ModuleId,
OverlayId, ShellError, SignalAction, Signals, Signature, Span, SpanId, Type, Value, VarId,
VirtualPathId,
};
use fancy_regex::Regex;
use lru::LruCache;
Expand Down Expand Up @@ -86,8 +86,8 @@ pub struct EngineState {
pub spans: Vec<Span>,
usage: Usage,
pub scope: ScopeFrame,
pub ctrlc_handlers: Option<ctrlc::Handlers>,
signals: Signals,
pub signal_handlers: Option<Handlers>,
pub env_vars: Arc<EnvVars>,
pub previous_env_vars: Arc<HashMap<String, Value>>,
pub config: Arc<Config>,
Expand Down Expand Up @@ -147,7 +147,7 @@ impl EngineState {
0,
false,
),
ctrlc_handlers: None,
signal_handlers: None,
signals: Signals::empty(),
env_vars: Arc::new(
[(DEFAULT_OVERLAY_NAME.to_string(), HashMap::new())]
Expand Down Expand Up @@ -186,7 +186,10 @@ impl EngineState {
}

pub fn reset_signals(&mut self) {
self.signals.reset()
self.signals.reset();
if let Some(ref handlers) = self.signal_handlers {
handlers.run(SignalAction::Reset);
}
}

pub fn set_signals(&mut self, signals: Signals) {
Expand Down Expand Up @@ -272,9 +275,9 @@ impl EngineState {
#[cfg(feature = "plugin")]
if !delta.plugins.is_empty() {
for plugin in std::mem::take(&mut delta.plugins) {
// Connect plugins to the ctrlc handlers
if let Some(handlers) = &self.ctrlc_handlers {
plugin.clone().configure_ctrlc_handler(handlers)?;
// Connect plugins to the signal handlers
if let Some(handlers) = &self.signal_handlers {
plugin.clone().configure_signal_handler(handlers)?;
}

// Replace plugins that overlap in identity.
Expand Down
2 changes: 0 additions & 2 deletions crates/nu-protocol/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,3 @@ pub use stack_out_dest::*;
pub use state_delta::*;
pub use state_working_set::*;
pub use variable::*;

pub mod ctrlc;
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::fmt::Debug;
use std::sync::{Arc, Mutex};

use crate::{engine::Sequence, ShellError};
use crate::{engine::Sequence, ShellError, SignalAction};

/// Handler is a closure that can be sent across threads and shared.
pub type Handler = Box<dyn Fn() + Send + Sync>;
pub type Handler = Box<dyn Fn(SignalAction) + Send + Sync>;

/// Manages a collection of handlers.
#[derive(Clone)]
Expand All @@ -23,16 +23,16 @@ impl Debug for Handlers {
}
}

/// Guard that unregisters a handler when dropped.
/// HandlerGuard that unregisters a handler when dropped.
#[derive(Clone)]
pub struct Guard {
pub struct HandlerGuard {
/// Unique ID of the handler.
id: usize,
/// Reference to the handlers list.
handlers: Arc<Mutex<Vec<(usize, Handler)>>>,
}

impl Drop for Guard {
impl Drop for HandlerGuard {
/// Drops the `Guard`, removing the associated handler from the list.
fn drop(&mut self) {
if let Ok(mut handlers) = self.handlers.lock() {
Expand All @@ -41,7 +41,7 @@ impl Drop for Guard {
}
}

impl Debug for Guard {
impl Debug for HandlerGuard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Guard").field("id", &self.id).finish()
}
Expand All @@ -56,23 +56,23 @@ impl Handlers {

/// Registers a new handler and returns an RAII guard which will unregister the handler when
/// dropped.
pub fn register(&self, handler: Handler) -> Result<Guard, ShellError> {
pub fn register(&self, handler: Handler) -> Result<HandlerGuard, ShellError> {
let id = self.next_id.next()?;
if let Ok(mut handlers) = self.handlers.lock() {
handlers.push((id, handler));
}

Ok(Guard {
Ok(HandlerGuard {
id,
handlers: Arc::clone(&self.handlers),
})
}

/// Runs all registered handlers.
pub fn run(&self) {
pub fn run(&self, action: SignalAction) {
if let Ok(handlers) = self.handlers.lock() {
for (_, handler) in handlers.iter() {
handler();
handler(action);
}
}
}
Expand All @@ -99,14 +99,14 @@ mod tests {
let called1_clone = Arc::clone(&called1);
let called2_clone = Arc::clone(&called2);

let _guard1 = handlers.register(Box::new(move || {
let _guard1 = handlers.register(Box::new(move |_| {
called1_clone.store(true, Ordering::SeqCst);
}));
let _guard2 = handlers.register(Box::new(move || {
let _guard2 = handlers.register(Box::new(move |_| {
called2_clone.store(true, Ordering::SeqCst);
}));

handlers.run();
handlers.run(SignalAction::Interrupt);

assert!(called1.load(Ordering::SeqCst));
assert!(called2.load(Ordering::SeqCst));
Expand All @@ -119,7 +119,7 @@ mod tests {
let called = Arc::new(AtomicBool::new(false));
let called_clone = Arc::clone(&called);

let guard = handlers.register(Box::new(move || {
let guard = handlers.register(Box::new(move |_| {
called_clone.store(true, Ordering::Relaxed);
}));

Expand All @@ -131,7 +131,7 @@ mod tests {
// Ensure the handler is removed after dropping the guard
assert_eq!(handlers.handlers.lock().unwrap().len(), 0);

handlers.run();
handlers.run(SignalAction::Interrupt);

// Ensure the handler is not called after being dropped
assert!(!called.load(Ordering::Relaxed));
Expand Down
2 changes: 2 additions & 0 deletions crates/nu-protocol/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
pub mod byte_stream;
mod handlers;
pub mod list_stream;
mod metadata;
mod out_dest;
mod pipeline_data;
mod signals;

pub use byte_stream::*;
pub use handlers::*;
pub use list_stream::*;
pub use metadata::*;
pub use out_dest::*;
Expand Down
12 changes: 11 additions & 1 deletion crates/nu-protocol/src/pipeline/signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::sync::{
Arc,
};

use serde::{Deserialize, Serialize};

/// Used to check for signals to suspend or terminate the execution of Nushell code.
///
/// For now, this struct only supports interruption (ctrl+c or SIGINT).
Expand Down Expand Up @@ -75,9 +77,17 @@ impl Signals {
self.signals.is_none()
}

pub(crate) fn reset(&self) {
pub fn reset(&self) {
if let Some(signals) = &self.signals {
signals.store(false, Ordering::Relaxed);
}
}
}

/// The types of things that can be signaled. It's anticipated this will change as we learn more
/// about how we'd like signals to be handled.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum SignalAction {
Interrupt,
Reset,
}
Loading

0 comments on commit 1cd0544

Please sign in to comment.