Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform I/O in mnt namespace directly instead of calling into docker #5

Merged
merged 4 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ tokio-util = { version = "0.7", features = ["full"] }
async-stream = "0.3"
udev = "0.8"
bollard = "0.16"
rustix = { version = "0.38", features = ["fs", "stdio", "termios", "process"] }
rustix = { version = "0.38", features = ["fs", "stdio", "termios", "process", "thread"] }
bitflags = "2"
aya = { git = "https://github.com/aya-rs/aya.git" }

Expand Down
141 changes: 65 additions & 76 deletions src/docker/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,30 @@ use std::path::Path;
use std::pin::pin;
use std::sync::Arc;

use anyhow::{anyhow, Context, Error, Result};
use rustix::process::Signal;
use anyhow::{anyhow, ensure, Context, Error, Result};
use rustix::fs::{FileType, Gid, Mode, Uid};
use rustix::process::{Pid, Signal};
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio_stream::StreamExt;

use super::{IoStream, IoStreamSource};
use super::IoStream;
use crate::cgroup::{
Access, DeviceAccessController, DeviceAccessControllerV1, DeviceAccessControllerV2, DeviceType,
};

pub struct Container {
docker: bollard::Docker,
id: String,
user: String,
pid: Pid,
uid: Uid,
gid: Gid,
cgroup_device_filter: Mutex<Option<Box<dyn DeviceAccessController + Send>>>,
}

impl Container {
pub(super) fn new(docker: &bollard::Docker, id: String, user: String) -> Result<Self> {
pub(super) async fn new(docker: &bollard::Docker, id: String, pid: u32) -> Result<Self> {
// Dropping the device filter will cause the container to have arbitrary device access.
// So keep it alive until we're sure that the container is stopped.
let cgroup_device_filter: Option<Box<dyn DeviceAccessController + Send>> =
Expand All @@ -43,24 +46,31 @@ impl Container {
},
};

Ok(Self {
let mut this = Self {
docker: docker.clone(),
id,
user: if user.is_empty() {
// If user is not specified, use root.
"root".to_owned()
} else {
user
},
pid: Pid::from_raw(pid.try_into()?).context("Invalid PID")?,
uid: Uid::ROOT,
gid: Gid::ROOT,
cgroup_device_filter: Mutex::new(cgroup_device_filter),
})
};

let uid = this.exec(&["id", "-u"]).await?.trim().parse()?;
let gid = this.exec(&["id", "-g"]).await?.trim().parse()?;
// Only invalid uid/gid for Linux is negative one.
ensure!(uid != u32::MAX && gid != u32::MAX);
// SAFETY: We just checked that the uid/gid is not -1.
this.uid = unsafe { Uid::from_raw(uid) };
this.gid = unsafe { Gid::from_raw(gid) };

Ok(this)
}

pub fn id(&self) -> &str {
&self.id
}

pub async fn exec_as_root<T: ToString>(&self, cmd: &[T]) -> Result<IoStream> {
pub async fn exec<T: ToString>(&self, cmd: &[T]) -> Result<String> {
let cmd = cmd.iter().map(|s| s.to_string()).collect();
let options = bollard::exec::CreateExecOptions {
cmd: Some(cmd),
Expand All @@ -69,7 +79,6 @@ impl Container {
attach_stderr: Some(true),
tty: Some(true),
detach_keys: Some("ctrl-c".to_string()),
user: Some("root".to_string()),
..Default::default()
};
let response = self.docker.create_exec::<String>(&self.id, options).await?;
Expand All @@ -80,16 +89,19 @@ impl Container {
..Default::default()
};
let response = self.docker.start_exec(&id, Some(options)).await?;
let bollard::exec::StartExecResults::Attached { input, output } = response else {
let bollard::exec::StartExecResults::Attached {
input: _,
mut output,
} = response
else {
unreachable!("we asked for attached IO streams");
};

Ok(IoStream {
output,
input,
source: IoStreamSource::Exec(id),
docker: self.docker.clone(),
})
let mut result = String::new();
while let Some(output) = output.next().await {
result.push_str(&output?.to_string());
}
Ok(result)
}

pub async fn attach(&self) -> Result<IoStream> {
Expand All @@ -110,7 +122,7 @@ impl Container {
Ok(IoStream {
output: response.output,
input: response.input,
source: IoStreamSource::Container(self.id.clone()),
id: self.id.clone(),
docker: self.docker.clone(),
})
}
Expand All @@ -125,10 +137,7 @@ impl Container {
}

pub async fn kill(&self, signal: Signal) -> Result<()> {
let options = bollard::container::KillContainerOptions {
signal: format!("{}", signal as i32),
};
self.docker.kill_container(&self.id, Some(options)).await?;
rustix::process::kill_process(self.pid, signal).context("Failed to kill container init")?;
Ok(())
}

Expand All @@ -155,62 +164,42 @@ impl Container {
Ok(u8::try_from(code).unwrap_or(1))
}

pub async fn chown_to_user(&self, path: &str) -> Result<()> {
// Use `-h` to not follow symlink, and `user:` will use user's login group.
self.exec_as_root(&["chown", "-h", &format!("{}:", self.user), path])
.await?
.collect()
.await?;
Ok(())
}

// Note: we use `&str` here instead of `Path` because docker API expects string instead `OsStr`.
pub async fn mkdir(&self, path: &str) -> Result<()> {
self.exec_as_root(&["mkdir", "-p", path])
.await?
.collect()
.await?;
Ok(())
}

pub async fn mkdir_for(&self, path: &str) -> Result<()> {
if let Some(path) = std::path::Path::new(path).parent() {
self.mkdir(path.to_str().unwrap()).await?;
}
Ok(())
}

pub async fn mknod(&self, node: &Path, (major, minor): (u32, u32)) -> Result<()> {
self.rm(node).await?;
let node = node.to_str().context("node is not UTF-8")?;
self.mkdir_for(node).await?;
self.exec_as_root(&["mknod", node, "c", &major.to_string(), &minor.to_string()])
.await?
.collect()
.await?;
self.chown_to_user(node).await?;
Ok(())
crate::util::namespace::MntNamespace::of_pid(self.pid)?.enter(|| {
if let Some(parent) = node.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _ = std::fs::remove_file(node);
rustix::fs::mknodat(
rustix::fs::CWD,
node,
FileType::CharacterDevice,
Mode::from(0o644),
rustix::fs::makedev(major, minor),
)?;
if !self.uid.is_root() {
rustix::fs::chown(node, Some(self.uid), Some(self.gid))?;
}
Ok(())
})?
}

pub async fn symlink(&self, source: &Path, link: &Path) -> Result<()> {
let source = source.to_str().context("node is not UTF-8")?;
let link = link.to_str().context("symlink is not UTF-8")?;
self.mkdir_for(link).await?;
self.exec_as_root(&["ln", "-sf", source, link])
.await?
.collect()
.await?;
self.chown_to_user(link).await?;
Ok(())
crate::util::namespace::MntNamespace::of_pid(self.pid)?.enter(|| {
if let Some(parent) = link.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _ = std::fs::remove_file(link);
std::os::unix::fs::symlink(source, link)?;
// No need to chown symlink. Permission is determined by the target.
Ok(())
})?
}

pub async fn rm(&self, node: &Path) -> Result<()> {
let node = node.to_str().context("node is not UTF-8")?;
self.exec_as_root(&["rm", "-f", node])
.await?
.collect()
.await?;
Ok(())
crate::util::namespace::MntNamespace::of_pid(self.pid)?.enter(|| {
let _ = std::fs::remove_file(node);
})
}

pub async fn device(&self, (major, minor): (u32, u32), access: Access) -> Result<()> {
Expand Down
11 changes: 6 additions & 5 deletions src/docker/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ impl Docker {
pub async fn get<T: AsRef<str>>(&self, name: T) -> Result<Container> {
let response = self.0.inspect_container(name.as_ref(), None).await?;
let id = response.id.context("Failed to obtain container ID")?;
let config = response
.config
.context("Failed to obtain container config")?;
let user = config.user.context("Failed to obtain container user")?;
Container::new(&self.0, id, user)
let pid = response
.state
.context("Failed to obtain container state")?
.pid
.context("Failed to obtain container pid")?;
Container::new(&self.0, id, pid.try_into()?).await
}

pub async fn run<U: AsRef<str>, T: AsRef<[U]>>(&self, args: T) -> Result<Container> {
Expand Down
44 changes: 8 additions & 36 deletions src/docker/iostream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,10 @@ use tokio::task::JoinHandle;
use tokio_stream::{Stream, StreamExt};
use tokio_util::io::ReaderStream;

pub(super) enum IoStreamSource {
Container(String),
Exec(String),
}

pub struct IoStream {
pub output: std::pin::Pin<Box<dyn Stream<Item = Result<LogOutput, Error>> + Send>>,
pub input: Pin<Box<dyn AsyncWrite + Send>>,
pub(super) source: IoStreamSource,
pub(super) id: String,
pub(super) docker: bollard::Docker,
}

Expand All @@ -30,14 +25,6 @@ enum StreamData {
}

impl IoStream {
pub async fn collect(mut self) -> Result<String> {
let mut result = String::default();
while let Some(output) = self.output.next().await {
result.push_str(&output?.to_string());
}
Ok(result)
}

pub fn pipe_std(self) -> JoinHandle<Result<()>> {
let stdin = crate::util::tty_mode_guard::TtyModeGuard::new(tokio::io::stdin(), |mode| {
// Switch input to raw mode, but don't touch output modes (as it can also be connected
Expand Down Expand Up @@ -72,7 +59,7 @@ impl IoStream {
) -> JoinHandle<Result<()>> {
let mut input = self.input;
let docker = self.docker;
let source = self.source;
let id = self.id;

let resize_stream = resize_stream.map(|data| {
let (rows, cols) = data.context("Listening for tty resize")?;
Expand Down Expand Up @@ -101,7 +88,7 @@ impl IoStream {
while let Some(data) = streams.next().await {
match data? {
StreamData::Resize(rows, cols) => {
resize_tty(&docker, &source, (rows, cols)).await?;
resize_tty(&docker, &id, (rows, cols)).await?;
}
StreamData::StdIn(mut buf) => {
input.write_all_buf(&mut buf).await?;
Expand All @@ -123,26 +110,11 @@ impl IoStream {
}
}

async fn resize_tty(
docker: &bollard::Docker,
source: &IoStreamSource,
(rows, cols): (u16, u16),
) -> Result<()> {
match source {
IoStreamSource::Container(id) => {
let options = bollard::container::ResizeContainerTtyOptions {
height: rows,
width: cols,
};
docker.resize_container_tty(id, options).await?;
}
IoStreamSource::Exec(id) => {
let options = bollard::exec::ResizeExecOptions {
height: rows,
width: cols,
};
docker.resize_exec(id, options).await?;
}
async fn resize_tty(docker: &bollard::Docker, id: &str, (rows, cols): (u16, u16)) -> Result<()> {
let options = bollard::container::ResizeContainerTtyOptions {
height: rows,
width: cols,
};
docker.resize_container_tty(id, options).await?;
Ok(())
}
2 changes: 0 additions & 2 deletions src/docker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,3 @@ mod iostream;
pub use container::Container;
pub use docker::Docker;
pub use iostream::IoStream;

use iostream::IoStreamSource;
1 change: 1 addition & 0 deletions src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod escape;
pub mod namespace;
pub mod tty_mode_guard;
42 changes: 42 additions & 0 deletions src/util/namespace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::fs::File;
use std::os::fd::AsFd;

use anyhow::Result;
use rustix::process::Pid;
use rustix::thread::{LinkNameSpaceType, UnshareFlags};

pub struct MntNamespace {
fd: File,
}

impl MntNamespace {
/// Open the mount namespace of a process.
pub fn of_pid(pid: Pid) -> Result<MntNamespace> {
let path = format!("/proc/{}/ns/mnt", pid.as_raw_nonzero());
let fd = File::open(path)?;
Ok(MntNamespace { fd })
}

/// Enter the mount namespace.
pub fn enter<T: Send, F: FnOnce() -> T + Send>(&self, f: F) -> Result<T> {
// To avoid messing with rest of the process, we do everything in a new thread.
// Use scoped thread to avoid 'static bound (we need to access fd).
std::thread::scope(|scope| {
scope
.spawn(|| -> Result<T> {
// Unshare FS for this specific thread so we can switch to another namespace.
// Not doing this will cause EINVAL when switching to namespaces.
rustix::thread::unshare(UnshareFlags::FS)?;

// Switch this particular thread to the container's mount namespace.
rustix::thread::move_into_link_name_space(
self.fd.as_fd(),
Some(LinkNameSpaceType::Mount),
)?;
Ok(f())
})
.join()
.map_err(|_| anyhow::anyhow!("work thread panicked"))?
})
}
}