Skip to content

Commit

Permalink
Simplify hotplug code by removing an intermediate layer
Browse files Browse the repository at this point in the history
  • Loading branch information
nbdd0121 committed Apr 19, 2024
1 parent 1d6f2ea commit 2bbfb43
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 135 deletions.
134 changes: 55 additions & 79 deletions src/hotplug/mod.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
mod attached_device;
pub use attached_device::AttachedDevice;

use crate::cgroup::Access;
use crate::cli;
use crate::dev::DeviceMonitor;
use crate::docker::Container;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::Result;
use async_stream::try_stream;
use std::path::PathBuf;
use std::{collections::HashMap, sync::Arc};
use tokio_stream::StreamExt;

pub use crate::dev::{Device, DeviceEvent};
pub use attached_device::AttachedDevice;

#[derive(Clone)]
pub enum Event {
Attach(AttachedDevice),
Detach(AttachedDevice),
}
use super::Event;
use crate::cgroup::Access;
use crate::cli;
use crate::dev::{DeviceEvent, DeviceMonitor};
use crate::docker::Container;

pub struct HotPlug {
pub container: Arc<Container>,
Expand All @@ -44,84 +39,65 @@ impl HotPlug {
})
}

fn find_symlinks(&self, device: &Device) -> Vec<PathBuf> {
self.symlinks
.iter()
.filter_map(|dev| dev.matches(device))
.collect()
}

pub fn start(&mut self) -> impl tokio_stream::Stream<Item = Result<Event>> + '_ {
pub fn run(&mut self) -> impl tokio_stream::Stream<Item = Result<Event>> + '_ {
try_stream! {
while let Some(event) = self.monitor.try_read()? {
match event {
DeviceEvent::Add(device) => {
if device.devnode().is_none() {
continue;
}
let device = self.allow_device(&device).await?;
yield Event::Attach(device);
}
DeviceEvent::Remove(device) => {
if let Some(plugged) = self.deny_device(device.udev()).await? {
yield Event::Detach(plugged);
}
}
if let Some(event) = self.process(event).await? {
yield event;
}
}
}
}

pub fn run(&mut self) -> impl tokio_stream::Stream<Item = Result<Event>> + '_ {
try_stream! {
yield Event::Initialized;

while let Some(event) = self.monitor.try_next().await? {
match event {
DeviceEvent::Add(device) => {
if device.devnode().is_none() {
continue;
}
let device = self.allow_device(&device).await?;
yield Event::Attach(device);
}
DeviceEvent::Remove(device) => {
if let Some(plugged) = self.deny_device(device.udev()).await? {
yield Event::Detach(plugged);
}
}
if let Some(event) = self.process(event).await? {
yield event;
}
}
}
}

async fn allow_device(&mut self, device: &Device) -> Result<AttachedDevice> {
let device = device.clone();
let symlinks = self.find_symlinks(&device);
let device = AttachedDevice { device, symlinks };
let devnode = device.devnode().unwrap();
self.container.device(devnode.devnum, Access::all()).await?;
self.container.mknod(&devnode.path, devnode.devnum).await?;
for symlink in &device.symlinks {
self.container.symlink(&devnode.path, symlink).await?;
}
let syspath = device.syspath().to_owned();
self.devices.insert(syspath, device.clone());
Ok(device)
}
async fn process(&mut self, event: DeviceEvent) -> Result<Option<Event>> {
match event {
DeviceEvent::Add(device) => {
let Some(devnode) = device.devnode() else {
return Ok(None);
};

let symlinks: Vec<_> = self
.symlinks
.iter()
.filter_map(|dev| dev.matches(&device))
.collect();

self.container.device(devnode.devnum, Access::all()).await?;
self.container.mknod(&devnode.path, devnode.devnum).await?;
for symlink in &symlinks {
self.container.symlink(&devnode.path, symlink).await?;
}

let syspath = device.syspath().to_owned();
let device = AttachedDevice { device, symlinks };
self.devices.insert(syspath, device.clone());

Ok(Some(Event::Attach(device)))
}
DeviceEvent::Remove(device) => {
let Some(device) = self.devices.remove(device.syspath()) else {
return Ok(None);
};

let devnode = device.devnode().unwrap();
self.container
.device(devnode.devnum, Access::empty())
.await?;
self.container.rm(&devnode.path).await?;
for symlink in &device.symlinks {
self.container.rm(symlink).await?;
}

async fn deny_device(&mut self, device: &udev::Device) -> Result<Option<AttachedDevice>> {
let syspath = device.syspath().to_owned();
if let Some(device) = self.devices.remove(&syspath) {
let devnode = device.devnode().unwrap();
self.container
.device(devnode.devnum, Access::empty())
.await?;
self.container.rm(&devnode.path).await?;
for symlink in &device.symlinks {
self.container.rm(symlink).await?;
Ok(Some(Event::Detach(device)))
}
Ok(Some(device))
} else {
Ok(None)
}
}
}
73 changes: 17 additions & 56 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ mod docker;
mod hotplug;
mod util;

use cli::{Action, DeviceRef, Symlink};
use docker::{Container, Docker};
use hotplug::{AttachedDevice, Event as HotPlugEvent, HotPlug};
use cli::Action;
use docker::Docker;
use hotplug::{AttachedDevice, HotPlug};

use std::fmt::Display;
use std::pin::pin;
Expand All @@ -16,7 +16,7 @@ use tokio_stream::StreamExt;

use anyhow::Result;
use clap::Parser;
use clap_verbosity_flag::{InfoLevel, LogLevel, Verbosity};
use clap_verbosity_flag::{InfoLevel, Verbosity};
use log::info;

#[derive(Clone)]
Expand All @@ -27,15 +27,6 @@ enum Event {
Stopped(i64),
}

impl From<HotPlugEvent> for Event {
fn from(evt: HotPlugEvent) -> Self {
match evt {
HotPlugEvent::Attach(dev) => Self::Attach(dev),
HotPlugEvent::Detach(dev) => Self::Detach(dev),
}
}
}

impl Display for Event {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand All @@ -55,44 +46,6 @@ impl Display for Event {
}
}

fn run_hotplug(
device: DeviceRef,
symlinks: Vec<Symlink>,
container: Arc<Container>,
verbosity: Verbosity<impl LogLevel>,
) -> impl tokio_stream::Stream<Item = Result<Event>> {
async_stream::try_stream! {
let name = container.name().await?;
let id = container.id();
info!("Attaching to container {name} ({id})");

let hub_path = device.device()?.syspath().to_owned();
let mut hotplug = HotPlug::new(container.clone(), hub_path.clone(), symlinks)?;

{
let events = hotplug.start();
tokio::pin!(events);
while let Some(event) = events.next().await {
yield Event::from(event?);
}
}

yield Event::Initialized;

if !verbosity.is_silent() {
container.attach().await?.pipe_std();
}

{
let events = hotplug.run();
tokio::pin!(events);
while let Some(event) = events.next().await {
yield Event::from(event?);
}
}
}
}

async fn run(param: cli::Run, verbosity: Verbosity<InfoLevel>) -> Result<u8> {
let hub_path = param.root_device.device()?.syspath().to_owned();

Expand All @@ -102,12 +55,15 @@ async fn run(param: cli::Run, verbosity: Verbosity<InfoLevel>) -> Result<u8> {
let container = Arc::new(docker.run(param.docker_args).await?);
drop(container.clone().pipe_signals());

let hotplug_stream = run_hotplug(
param.root_device,
param.symlink,
container.clone(),
verbosity,
info!(
"Attaching to container {} ({})",
container.name().await?,
container.id()
);

let mut hotplug = HotPlug::new(container.clone(), hub_path.clone(), param.symlink)?;
let hotplug_stream = hotplug.run();

let container_stream = {
let container = container.clone();
async_stream::try_stream! {
Expand All @@ -126,6 +82,11 @@ async fn run(param: cli::Run, verbosity: Verbosity<InfoLevel>) -> Result<u8> {
let event = event?;
info!("{}", event);
match event {
Event::Initialized => {
if !verbosity.is_silent() {
container.attach().await?.pipe_std();
}
}
Event::Detach(dev) if dev.syspath() == hub_path => {
info!("Hub device detached. Stopping container.");
status = param.root_unplugged_exit_code;
Expand Down

0 comments on commit 2bbfb43

Please sign in to comment.