Skip to content

Commit

Permalink
Refactor root device unplug handling
Browse files Browse the repository at this point in the history
Currently, when the root device is unplugged, we send a TERM signal and
immediately ask docker to force remove the device. This is effectively
sending a KILL signal since in almost all cases the container does not have
enough time to perform graceful cleanup between TERM and force removal,
which will send a KILL signal. So we could instead just send a KILL signal
directly and wait for exit.

With this change, now for all `Detach` and `Stopped` scenario, we know
the container is stopped, and since we passed in `--rm` to docker, we know
that docker will perform container removal itself. This means that we do
not need to explicitly invoke remove anymore and this allows massive cleanup.
  • Loading branch information
nbdd0121 committed Apr 21, 2024
1 parent 72f250c commit c98e2b4
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 145 deletions.
33 changes: 0 additions & 33 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ log = "0.4"
env_logger = "0.11"
clap = { version = "4", features = ["derive"] }
clap-verbosity-flag = "2"
humantime = "2"
bytes = "1"
thiserror = "1"
tokio = { version = "1", features = ["full"] }
Expand All @@ -21,7 +20,6 @@ tokio-util = { version = "0.7", features = ["full"] }
async-stream = "0.3"
udev = "0.8"
bollard = "0.16"
futures = "0.3"
rustix = { version = "0.38", features = ["fs", "stdio", "termios", "process"] }
bitflags = "2"
aya = { git = "https://github.com/aya-rs/aya.git" }
Expand Down
13 changes: 0 additions & 13 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,13 @@ pub mod device;
pub mod logfmt;
pub mod symlink;

use std::time::Duration;

use clap::{Parser, Subcommand};
use clap_verbosity_flag::{InfoLevel, Verbosity};

pub use device::DeviceRef;
pub use logfmt::LogFormat;
pub use symlink::Symlink;

fn parse_timeout(s: &str) -> Result<Option<Duration>, humantime::DurationError> {
Ok(match s {
"inf" | "infinite" | "none" | "forever" => None,
_ => Some(humantime::parse_duration(s)?),
})
}

#[derive(Parser)]
pub struct Args {
#[command(flatten)]
Expand Down Expand Up @@ -72,10 +63,6 @@ pub struct Run {
/// Exit code to return when the root device is unplugged
pub root_unplugged_exit_code: u8,

#[arg(short = 't', long, default_value = "20s", id = "TIMEOUT", value_parser = parse_timeout)]
/// Timeout when waiting for the container to be removed
pub remove_timeout: core::option::Option<Duration>, // needs to be `core::option::Option` because `Option` is treated specially by clap.

#[arg(trailing_var_arg = true, id = "ARGS")]
/// Arguments to pass to `docker run`
pub docker_args: Vec<String>,
Expand Down
60 changes: 0 additions & 60 deletions src/docker/container.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
use std::path::Path;
use std::pin::pin;
use std::sync::Arc;
use std::time::Duration;

use anyhow::{anyhow, Context, Error, Result};
use bollard::service::EventMessage;
use futures::future::{BoxFuture, Shared};
use futures::FutureExt;
use rustix::process::Signal;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::Mutex;
Expand All @@ -22,28 +18,11 @@ pub struct Container {
docker: bollard::Docker,
id: String,
user: String,
remove_event: Shared<BoxFuture<'static, Option<EventMessage>>>,
cgroup_device_filter: Mutex<Option<Box<dyn DeviceAccessController + Send>>>,
}

impl Container {
pub(super) fn new(docker: &bollard::Docker, id: String, user: String) -> Result<Self> {
let mut remove_events = docker.events(Some(bollard::system::EventsOptions {
filters: [
("container".to_owned(), vec![id.to_owned()]),
("type".to_owned(), vec!["container".to_owned()]),
("event".to_owned(), vec!["destroy".to_owned()]),
]
.into(),
..Default::default()
}));

// Spawn the future to start listening event.
let remove_evevnt = tokio::spawn(async move { remove_events.next().await?.ok() })
.map(|x| x.ok().flatten())
.boxed()
.shared();

// Dropping the device filter will cause the container to have arbitrary device access.
// So keep it alive until we're sure that the container is stopped.
let cgroup_device_filter: Option<Box<dyn DeviceAccessController + Send>> =
Expand Down Expand Up @@ -73,7 +52,6 @@ impl Container {
} else {
user
},
remove_event: remove_evevnt,
cgroup_device_filter: Mutex::new(cgroup_device_filter),
})
}
Expand All @@ -82,44 +60,6 @@ impl Container {
&self.id
}

pub async fn remove(&self, timeout: Option<Duration>) -> Result<()> {
log::info!("Removing container {}", self.id);

// Since we passed "--rm" flag, docker will automatically start removing the container.
// Ignore any error for manual removal.
let _: Result<()> = async {
self.rename(&format!("removing-{}", self.id)).await?;
let options = bollard::container::RemoveContainerOptions {
force: true,
..Default::default()
};
self.docker
.remove_container(&self.id, Some(options))
.await?;
Ok(())
}
.await;

if let Some(duration) = timeout {
tokio::time::timeout(duration, self.remove_event.clone())
.await?
.context("no destroy event")?;
} else {
self.remove_event
.clone()
.await
.context("no destroy event")?;
}

Ok(())
}

pub async fn rename(&self, name: &str) -> Result<()> {
let required = bollard::container::RenameContainerOptions { name };
self.docker.rename_container(&self.id, required).await?;
Ok(())
}

pub async fn exec_as_root<T: ToString>(&self, cmd: &[T]) -> Result<IoStream> {
let cmd = cmd.iter().map(|s| s.to_string()).collect();
let options = bollard::exec::CreateExecOptions {
Expand Down
65 changes: 28 additions & 37 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::mem::ManuallyDrop;
use std::pin::pin;
use std::sync::Arc;

use anyhow::Result;
use anyhow::{Context, Result};
use clap::Parser;
use clap_verbosity_flag::{InfoLevel, Verbosity};
use log::info;
Expand Down Expand Up @@ -51,8 +51,6 @@ impl Display for Event {
async fn run(param: cli::Run, verbosity: Verbosity<InfoLevel>) -> Result<u8> {
let hub_path = param.root_device.device()?.syspath().to_owned();

let mut status = 0;

let docker = Docker::connect_with_defaults()?;
let container = Arc::new(docker.run(param.docker_args).await?);
// Dropping the `Container` will detach the device cgroup filter.
Expand All @@ -78,48 +76,41 @@ async fn run(param: cli::Run, verbosity: Verbosity<InfoLevel>) -> Result<u8> {
}
};

let stream = pin!(tokio_stream::empty()
let mut stream = pin!(tokio_stream::empty()
.merge(hotplug_stream)
.merge(container_stream));

let result: Result<()> = async {
tokio::pin!(stream);
while let Some(event) = stream.next().await {
let event = event?;
info!("{}", event);
match event {
Event::Initialized => {
if !verbosity.is_silent() {
container.attach().await?.pipe_std();
}
}
Event::Detach(dev) if dev.syspath() == hub_path => {
info!("Hub device detached. Stopping container.");
status = param.root_unplugged_exit_code;
container.kill(Signal::Term).await?;
break;
let status = loop {
let event = stream.try_next().await?.context("No more events")?;
info!("{}", event);
match event {
Event::Initialized => {
if !verbosity.is_silent() {
container.attach().await?.pipe_std();
}
Event::Stopped(code) => {
// Use the container exit code, but only if it won't be confused
// with the pre-defined root_unplugged_exit_code.
if code != param.root_unplugged_exit_code {
status = code;
} else {
status = 1;
}
break;
}
Event::Detach(dev) if dev.syspath() == hub_path => {
info!("Hub device detached. Stopping container.");
container.kill(Signal::Kill).await?;

let _ = container.wait().await?;
break param.root_unplugged_exit_code;
}
Event::Stopped(code) => {
// Use the container exit code, but only if it won't be confused
// with the pre-defined root_unplugged_exit_code.
if code != param.root_unplugged_exit_code {
break code;
} else {
break 1;
}
_ => {}
}
_ => {}
}
Ok(())
}
.await;
};

drop(ManuallyDrop::into_inner(container_keep));

if container.remove(param.remove_timeout).await.is_ok() {
drop(ManuallyDrop::into_inner(container_keep));
}
result?;
Ok(status)
}

Expand Down

0 comments on commit c98e2b4

Please sign in to comment.