Skip to content

Commit

Permalink
Merge pull request #11 from pop-os/master
Browse files Browse the repository at this point in the history
[pull] master from pop-os:master
  • Loading branch information
KyleGospo authored May 3, 2023
2 parents ca7c9ba + 837da2c commit 62ffaba
Show file tree
Hide file tree
Showing 16 changed files with 364 additions and 100 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ In addition to `config.kdl`, additional process scheduling profiles are stored i
- User-config: `/etc/system76-scheduler/process-scheduler/`
- Distribution: `/usr/share/system76-scheduler/process-scheduler/`

An [example configuration is provided here](./data/pop-os.kdl). It is parsed the same as the assignments and exceptions nodes in the main config, and profiles can inherit values from the previous assignment of the same name.
An [example configuration is provided here](./data/pop_os.kdl). It is parsed the same as the assignments and exceptions nodes in the main config, and profiles can inherit values from the previous assignment of the same name.

### Profile

Expand Down
2 changes: 1 addition & 1 deletion config/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "system76-scheduler-config"
version = "2.0.0"
version = "2.0.1"
edition = "2021"

[dependencies]
Expand Down
2 changes: 1 addition & 1 deletion daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "system76-scheduler"
description = "Linux service that manages process priorities and CFS scheduler latencies for improved responsiveness on the desktop"
repository = "https://github.com/pop-os/system76-scheduler"
version = "2.0.0"
version = "2.0.1"
edition = "2021"
license = "MPL-2.0"
publish = false
Expand Down
100 changes: 63 additions & 37 deletions daemon/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ mod pw;
mod service;
mod utils;

use cfs::paths::SchedPaths;
use clap::ArgMatches;
use dbus::{CpuMode, Server};
use std::time::{Duration, Instant};
use std::{
path::Path,
time::{Duration, Instant},
};
use tokio::sync::mpsc::Sender;
use upower_dbus::UPowerProxy;
use zbus::{Connection, PropertyStream};
Expand Down Expand Up @@ -158,7 +160,7 @@ async fn daemon(
return reload(connection).await;
}

let service = &mut service::Service::new(owner, SchedPaths::new()?);
let service = &mut service::Service::new(owner);
service.reload_configuration();

let (tx, mut rx) = tokio::sync::mpsc::channel(4);
Expand Down Expand Up @@ -194,39 +196,14 @@ async fn daemon(

// Use execsnoop-bpfcc to watch for new processes being created.
if service.config.process_scheduler.execsnoop {
tracing::debug!("monitoring process IDs in realtime with execsnoop");
let tx = tx.clone();
let (scheduled_tx, mut scheduled_rx) = tokio::sync::mpsc::unbounded_channel();
std::thread::spawn(move || {
if let Ok(mut watcher) = execsnoop::watch() {
// Listen for spawned process, scheduling them to be handled with a delay of 1 second after creation.
// The delay is to ensure that a process has been added to a cgroup
while let Some(process) = watcher.next() {
let Ok(cmdline) = std::str::from_utf8(process.cmd) else {
continue
};

let name = process::name(cmdline);

let _res = scheduled_tx.send((
Instant::now() + Duration::from_secs(2),
ExecCreate {
pid: process.pid,
parent_pid: process.parent_pid,
name: name.to_owned(),
cmdline: cmdline.to_owned(),
},
));
}
}
});

tokio::task::spawn_local(async move {
while let Some((delay, process)) = scheduled_rx.recv().await {
tokio::time::sleep_until(delay.into()).await;
let _res = tx.send(Event::ExecCreate(process)).await;
}
});
if Path::new(execsnoop::EXECSNOOP_PATH).exists() {
integrate_execsnoop(tx.clone());
} else {
tracing::warn!(
"install {} to monitor processes in realtime",
execsnoop::EXECSNOOP_PATH
);
}
}

// Monitors pipewire-connected processes.
Expand Down Expand Up @@ -266,6 +243,7 @@ async fn daemon(
}) => {
service.assign_new_process(&mut buffer, pid, parent_pid, name, cmdline);
service.assign_children(&mut buffer, pid);
service.garbage_clean(&mut buffer);
}

Event::RefreshProcessMap => {
Expand All @@ -275,10 +253,12 @@ async fn daemon(
Event::SetForegroundProcess(pid) => {
tracing::debug!("setting {pid} as foreground process");
service.set_foreground_process(&mut buffer, pid);
service.garbage_clean(&mut buffer);
}

Event::Pipewire(scheduler_pipewire::ProcessEvent::Add(process)) => {
service.set_pipewire_process(&mut buffer, process);
service.garbage_clean(&mut buffer);
}

Event::Pipewire(scheduler_pipewire::ProcessEvent::Remove(process)) => {
Expand Down Expand Up @@ -363,8 +343,54 @@ fn autogroup_set(enable: bool) {
let _res = std::fs::write(PATH, if enable { b"1" } else { b"0" });
}

/// Listens to exec events from the kernel to get process IDs in realtime.
fn integrate_execsnoop(tx: Sender<Event>) {
tracing::info!("monitoring process IDs in realtime with execsnoop");
let (scheduled_tx, mut scheduled_rx) = tokio::sync::mpsc::unbounded_channel();
std::thread::spawn(move || {
match execsnoop::watch() {
Ok(mut watcher) => {
// Listen for spawned process, scheduling them to be handled with a delay of 1 second after creation.
// The delay is to ensure that a process has been added to a cgroup
while let Some(process) = watcher.next() {
let Ok(cmdline) = std::str::from_utf8(process.cmd) else {
continue
};

let name = process::name(cmdline);

tracing::debug!(
"{:?} created by {:?} ({name})",
process.pid,
process.parent_pid
);
let _res = scheduled_tx.send((
Instant::now() + Duration::from_secs(2),
ExecCreate {
pid: process.pid,
parent_pid: process.parent_pid,
name: name.to_owned(),
cmdline: cmdline.to_owned(),
},
));
}
}
Err(error) => {
tracing::error!("failed to start execsnoop: {error}");
}
}
});

tokio::task::spawn_local(async move {
while let Some((delay, process)) = scheduled_rx.recv().await {
tokio::time::sleep_until(delay.into()).await;
let _res = tx.send(Event::ExecCreate(process)).await;
}
});
}

fn uptime() -> Option<u64> {
let uptime = std::fs::read_to_string("/proc/uptime").ok()?;
let seconds = uptime.split('.').next()?;
seconds.parse::<u64>().ok()
}
}
21 changes: 11 additions & 10 deletions daemon/src/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,20 @@ impl<'owner> Process<'owner> {
#[derive(Default)]
pub struct Map<'owner> {
pub map: HashMap<u64, Arc<LCell<'owner, Process<'owner>>>>,
pub pid_map: HashMap<u32, Arc<LCell<'owner, Process<'owner>>>>,
pub drain: HashSet<u64>,
}

impl<'owner> Map<'owner> {
/// Removes processes that remain in the drain filter.
pub fn drain_filter(&mut self) {
pub fn drain_filter(&mut self, owner: &LCellOwner<'owner>) {
for hash in self.drain.drain() {
self.map.remove(&hash);
if let Some(process) = self.map.remove(&hash) {
self.pid_map.remove(&process.ro(owner).id);
}
}

self.map.shrink_to(1024);
}

/// This will be used to keep track of what processes were destroyed since the last refresh.
Expand All @@ -82,14 +87,8 @@ impl<'owner> Map<'owner> {
}
}

pub fn get_pid(
&self,
token: &LCellOwner<'owner>,
pid: u32,
) -> Option<&Arc<LCell<'owner, Process<'owner>>>> {
self.map
.values()
.find(|&process| process.ro(token).id == pid)
pub fn get_pid(&self, pid: u32) -> Option<&Arc<LCell<'owner, Process<'owner>>>> {
self.pid_map.get(&pid)
}

pub fn insert(
Expand Down Expand Up @@ -117,9 +116,11 @@ impl<'owner> Map<'owner> {
entry.get().clone()
}
Entry::Vacant(entry) => {
let pid = process.id;
let process = Arc::new(LCell::new(process));

entry.insert(process.clone());
self.pid_map.insert(pid, process.clone());
process
}
}
Expand Down
24 changes: 20 additions & 4 deletions daemon/src/pw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ async fn pipewire_service(tx: Sender<ProcessEvent>) {
SocketEvent::Add(socket) => {
if !active_sessions.contains(&socket) {
if let Ok(stream) = UnixStream::connect(&socket) {
active_sessions.insert(socket.clone());
let tx = tx.clone();
let pw_tx = pw_tx.clone();
std::thread::spawn(move || {
Expand Down Expand Up @@ -106,23 +107,34 @@ pub(crate) async fn monitor(tx: Sender<Event>) {
loop {
tokio::time::sleep(Duration::from_secs(3)).await;

let result = std::process::Command::new("system76-scheduler")
let exe_link_target = std::fs::read_link("/proc/self/exe");
let Ok(exe) = exe_link_target else {
tracing::error!("failed to determine the daemon exe name: {:?}", exe_link_target.err());
break;
};

tracing::debug!("connected to pipewire");

let result = std::process::Command::new(exe)
.arg("pipewire")
.stdin(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.spawn();

let Ok(mut child) = result else {
continue;
tracing::error!("failed to spawn pipewire watcher: {:?}", result.err());
break;
};

let Some(stdout) = child.stdout.take() else {
continue;
tracing::error!("pipewire process is missing the stdout pipe");
break;
};

let Ok(stdout) = tokio::process::ChildStdout::from_std(stdout) else {
continue;
tracing::error!("failed to create tokio stdout from pipewire process");
break;
};

let mut stdout = tokio::io::BufReader::new(stdout);
Expand All @@ -142,16 +154,20 @@ pub(crate) async fn monitor(tx: Sender<Event>) {
if !managed.insert(pid) {
continue;
}
tracing::debug!("{pid} started using pipewire");
}
ProcessEvent::Remove(pid) => {
if !managed.remove(&pid) {
continue;
}
tracing::debug!("{pid} stopped using pipewire");
}
}

let _res = tx.send(Event::Pipewire(event)).await;
}
}
}

tracing::info!("stopped listening to pipewire");
}
Loading

0 comments on commit 62ffaba

Please sign in to comment.