diff --git a/src/tokio/core.rs b/src/tokio/core.rs index c8f6240..76df7d7 100644 --- a/src/tokio/core.rs +++ b/src/tokio/core.rs @@ -20,6 +20,7 @@ use tokio::{ }; use tracing::debug; +#[derive(Debug)] pub struct TokioCommandWrap { command: Command, wrappers: IndexMap>, diff --git a/src/tokio/job_object.rs b/src/tokio/job_object.rs index 6bcfd68..d6a30bd 100644 --- a/src/tokio/job_object.rs +++ b/src/tokio/job_object.rs @@ -4,6 +4,7 @@ use tokio::{ process::{Child, Command}, task::spawn_blocking, }; +use tracing::{debug, instrument}; use windows::Win32::{ Foundation::{CloseHandle, HANDLE}, System::Threading::CREATE_SUSPENDED, @@ -20,10 +21,11 @@ use super::CreationFlags; use super::KillOnDrop; use super::{TokioChildWrapper, TokioCommandWrap, TokioCommandWrapper}; -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct JobObject; impl TokioCommandWrapper for JobObject { + #[instrument(level = "debug", skip(self))] fn pre_spawn(&mut self, command: &mut Command, core: &TokioCommandWrap) -> Result<()> { let mut flags = CREATE_SUSPENDED; #[cfg(feature = "creation-flags")] @@ -35,6 +37,7 @@ impl TokioCommandWrapper for JobObject { Ok(()) } + #[instrument(level = "debug", skip(self))] fn wrap_child( &mut self, inner: Box, @@ -45,6 +48,15 @@ impl TokioCommandWrapper for JobObject { #[cfg(not(feature = "kill-on-drop"))] let kill_on_drop = false; + #[cfg(feature = "creation-flags")] + let create_suspended = core + .get_wrap::() + .map_or(false, |flags| flags.0.contains(CREATE_SUSPENDED)); + #[cfg(not(feature = "creation-flags"))] + let create_suspended = false; + + debug!(?kill_on_drop, ?create_suspended, "options from other wrappers"); + let handle = HANDLE( inner .inner() @@ -55,14 +67,7 @@ impl TokioCommandWrapper for JobObject { let job_port = make_job_object(handle, kill_on_drop)?; // only resume if the user didn't specify CREATE_SUSPENDED - #[cfg(feature = "creation-flags")] - let resume = core - .get_wrap::() - .map_or(false, |flags| !flags.0.contains(CREATE_SUSPENDED)); - #[cfg(not(feature = "creation-flags"))] - let resume = true; - - if resume { + if !create_suspended { resume_threads(handle)?; } @@ -78,6 +83,7 @@ pub struct JobObjectChild { } impl JobObjectChild { + #[instrument(level = "debug", skip(job_port))] pub(crate) fn new(inner: Box, job_port: JobPort) -> Self { Self { inner, @@ -104,10 +110,12 @@ impl TokioChildWrapper for JobObjectChild { self.inner.into_inner() } + #[instrument(level = "debug", skip(self))] fn start_kill(&mut self) -> Result<()> { terminate_job(self.job_port.job, 1) } + #[instrument(level = "debug", skip(self))] fn wait(&mut self) -> Box> + '_> { Box::new(async { if let ChildExitStatus::Exited(status) = &self.exit_status { @@ -138,6 +146,7 @@ impl TokioChildWrapper for JobObjectChild { }) } + #[instrument(level = "debug", skip(self))] fn try_wait(&mut self) -> Result> { wait_on_job(self.job_port.completion_port, Some(Duration::ZERO))?; self.inner.try_wait() diff --git a/src/tokio/kill_on_drop.rs b/src/tokio/kill_on_drop.rs index 0fcfa57..f9256a1 100644 --- a/src/tokio/kill_on_drop.rs +++ b/src/tokio/kill_on_drop.rs @@ -4,7 +4,7 @@ use tokio::process::Command; use super::{TokioCommandWrap, TokioCommandWrapper}; -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct KillOnDrop; impl TokioCommandWrapper for KillOnDrop { diff --git a/src/tokio/process_group.rs b/src/tokio/process_group.rs index 5330067..5589a0a 100644 --- a/src/tokio/process_group.rs +++ b/src/tokio/process_group.rs @@ -19,6 +19,7 @@ use tokio::{ process::{Child, Command}, task::spawn_blocking, }; +use tracing::instrument; use crate::ChildExitStatus; @@ -49,6 +50,7 @@ pub struct ProcessGroupChild { } impl ProcessGroupChild { + #[instrument(level = "debug")] pub(crate) fn new(inner: Box, pgid: Pid) -> Self { Self { inner, @@ -59,6 +61,7 @@ impl ProcessGroupChild { } impl TokioCommandWrapper for ProcessGroup { + #[instrument(level = "debug", skip(self))] fn pre_spawn(&mut self, command: &mut Command, _core: &TokioCommandWrap) -> Result<()> { #[cfg(tokio_unstable)] { @@ -78,6 +81,7 @@ impl TokioCommandWrapper for ProcessGroup { Ok(()) } + #[instrument(level = "debug", skip(self))] fn wrap_child( &mut self, inner: Box, @@ -97,10 +101,12 @@ impl TokioCommandWrapper for ProcessGroup { } impl ProcessGroupChild { + #[instrument(level = "debug", skip(self))] fn signal_imp(&self, sig: Signal) -> Result<()> { killpg(self.pgid, sig).map_err(Error::from) } + #[instrument(level = "debug")] fn wait_imp(pgid: Pid, flag: WaitPidFlag) -> Result>> { // wait for processes in a loop until every process in this group has // exited (this ensures that we reap any zombies that may have been @@ -156,10 +162,12 @@ impl TokioChildWrapper for ProcessGroupChild { self.inner.into_inner() } + #[instrument(level = "debug", skip(self))] fn start_kill(&mut self) -> Result<()> { self.signal_imp(Signal::SIGKILL) } + #[instrument(level = "debug", skip(self))] fn wait(&mut self) -> Box> + '_> { Box::new(async { if let ChildExitStatus::Exited(status) = &self.exit_status { @@ -188,6 +196,7 @@ impl TokioChildWrapper for ProcessGroupChild { }) } + #[instrument(level = "debug", skip(self))] fn try_wait(&mut self) -> Result> { if let ChildExitStatus::Exited(status) = &self.exit_status { return Ok(Some(*status)); diff --git a/src/tokio/process_session.rs b/src/tokio/process_session.rs index 4b6812e..b525ce2 100644 --- a/src/tokio/process_session.rs +++ b/src/tokio/process_session.rs @@ -2,6 +2,7 @@ use std::io::{Error, Result}; use nix::unistd::{setsid, Pid}; use tokio::process::Command; +use tracing::instrument; use super::{TokioCommandWrap, TokioCommandWrapper}; @@ -9,6 +10,7 @@ use super::{TokioCommandWrap, TokioCommandWrapper}; pub struct ProcessSession; impl TokioCommandWrapper for ProcessSession { + #[instrument(level = "debug", skip(self))] fn pre_spawn(&mut self, command: &mut Command, _core: &TokioCommandWrap) -> Result<()> { unsafe { command.pre_exec(move || setsid().map_err(Error::from).map(|_| ())); @@ -17,6 +19,7 @@ impl TokioCommandWrapper for ProcessSession { Ok(()) } + #[instrument(level = "debug", skip(self))] fn wrap_child( &mut self, inner: Box, diff --git a/src/windows.rs b/src/windows.rs index f548d86..5c75648 100644 --- a/src/windows.rs +++ b/src/windows.rs @@ -6,6 +6,7 @@ use std::{ time::Duration, }; +use tracing::instrument; use windows::Win32::{ Foundation::{CloseHandle, HANDLE}, System::{ @@ -46,6 +47,7 @@ unsafe impl Sync for JobPort {} /// /// If `kill_on_drop` is true, we opt into the `JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE` flag, which /// essentially implements the "reap children" feature of Unix systems directly in Win32. +#[instrument(level = "debug")] pub(crate) fn make_job_object(handle: HANDLE, kill_on_drop: bool) -> Result { let job = unsafe { CreateJobObjectW(None, None) }.map_err(Error::other)?; @@ -95,6 +97,7 @@ pub(crate) fn make_job_object(handle: HANDLE, kill_on_drop: bool) -> Result Result<()> { #[inline] unsafe fn inner(pid: u32, tool_handle: HANDLE) -> Result<()> { @@ -134,10 +137,13 @@ pub(crate) fn resume_threads(child_process: HANDLE) -> Result<()> { } /// Terminate a job object without waiting for the processes to exit. +#[instrument(level = "debug")] pub(crate) fn terminate_job(job: HANDLE, exit_code: u32) -> Result<()> { unsafe { TerminateJobObject(job, exit_code) }.map_err(Error::other) } +/// Wait for a job to complete. +#[instrument(level = "debug")] pub(crate) fn wait_on_job( completion_port: HANDLE, timeout: Option,