Skip to content

Commit

Permalink
fix: drop pipes runtime (#1391)
Browse files Browse the repository at this point in the history
* fix: drop pipes runtime, remove watchdog of pipes, fix auto destruction of pipe on windows

* fix: auto destruction of pipes on macos

* fix: pipe killing on macos (try with id first)

* fix: update comps

* fix: purging of pipes on windows

---------

Co-authored-by: tribhuwan-kumar <[email protected]>
  • Loading branch information
tribhuwan-kumar and tribhuwan-kumar authored Feb 20, 2025
1 parent f950b6d commit d4fd975
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 321 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,20 @@ export function BreakingChangesInstructionsDialog() {
const handleResetAllPipes = async () => {
setIsDeleting(true);
try {
const cmd = Command.sidecar("screenpipe", ["pipe", "purge", "-y"]);
await cmd.execute();
const response = await fetch(`http://localhost:3030/pipes/purge`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
}),
});
if(!response.ok){
toast({
title: "failed to purge pipes",
description: "failed to purge pipes, please try again",
variant: "destructive"
});
return;
}
toast({
title: "all pipes deleted",
description: "you can now reinstall the updated pipes from the store",
Expand Down
16 changes: 14 additions & 2 deletions screenpipe-app-tauri/components/pipe-store.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,20 @@ export const PipeStore: React.FC = () => {
duration: 100000,
});

const cmd = Command.sidecar("screenpipe", ["pipe", "purge", "-y"]);
await cmd.execute();
const response = await fetch(`http://localhost:3030/pipes/purge`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
}),
});
if(!response.ok){
toast({
title: "failed to purge pipes",
description: "failed to purge pipes, please try again",
variant: "destructive"
});
return;
}
await fetchInstalledPipes();

t.update({
Expand Down
26 changes: 17 additions & 9 deletions screenpipe-app-tauri/src-tauri/src/sidecar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,14 @@ pub async fn stop_screenpipe(
) -> Result<(), String> {
debug!("Killing screenpipe");

let mut manager = state.0.lock().await;
if let Some(manager) = manager.as_mut() {
if let Some(child) = manager.child.take() {
if let Err(e) = child.kill() {
error!("Failed to kill child process: {}", e);
#[cfg(not(target_os = "windows"))]
{
let mut manager = state.0.lock().await;
if let Some(manager) = manager.as_mut() {
if let Some(child) = manager.child.take() {
if let Err(e) = child.kill() {
error!("Failed to kill child process: {}", e);
}
}
}
}
Expand All @@ -102,11 +105,16 @@ pub async fn stop_screenpipe(
}
#[cfg(target_os = "windows")]
{
use std::os::windows::process::CommandExt;

const CREATE_NO_WINDOW: u32 = 0x08000000;
tokio::process::Command::new("taskkill")
.args(&["/F", "/T", "/IM", "screenpipe.exe"])
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
tokio::process::Command::new("powershell")
.arg("-NoProfile")
.arg("-WindowStyle")
.arg("hidden")
.arg("-Command")
.arg(format!(
r#"taskkill.exe /F /T /IM screenpipe.exe"#,
))
.creation_flags(CREATE_NO_WINDOW)
.output()
.await
Expand Down
260 changes: 0 additions & 260 deletions screenpipe-core/src/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,248 +84,6 @@ mod pipes {
.to_string()
}

async fn create_watchdog_script(parent_pid: u32, child_pid: u32) -> Result<PathBuf> {
let script_content = if cfg!(windows) {
format!(
r#"
$parentPid = {parent_pid}
$childPid = {child_pid}
function Get-ChildProcesses($ProcessId) {{
Get-WmiObject Win32_Process | Where-Object {{ $_.ParentProcessId -eq $ProcessId }} | ForEach-Object {{
$_.ProcessId
Get-ChildProcesses $_.ProcessId
}}
}}
while ($true) {{
try {{
$parent = Get-Process -Id $parentPid -ErrorAction Stop
Start-Sleep -Seconds 1
}} catch {{
Write-Host "Parent process ($parentPid) not found, terminating child processes"
# Get all child processes recursively
$children = Get-ChildProcesses $childPid
# Add the main process to the list
$allProcesses = @($childPid) + $children
foreach ($pid in $allProcesses) {{
try {{
Stop-Process -Id $pid -Force -ErrorAction SilentlyContinue
Write-Host "Stopped process: $pid"
}} catch {{
Write-Host "Process $pid already terminated"
}}
}}
exit
}}
}}
"#
)
} else {
format!(
r#"#!/bin/bash
set -x # Enable debug mode
parent_pid={parent_pid}
child_pid={child_pid}
echo "[$(date)] Watchdog started - monitoring parent PID: $parent_pid, child PID: $child_pid"
find_all_children() {{
local parent=$1
local children=$(pgrep -P $parent)
echo $children
for child in $children; do
find_all_children $child
done
}}
cleanup() {{
echo "[$(date)] Starting cleanup..."
# Get all child processes recursively
all_children=$(find_all_children $child_pid)
echo "[$(date)] Found child processes: $all_children"
# Try to kill by process group first
child_pgid=$(ps -o pgid= -p $child_pid 2>/dev/null | tr -d ' ')
if [ ! -z "$child_pgid" ]; then
echo "[$(date)] Killing process group $child_pgid"
pkill -TERM -g $child_pgid 2>/dev/null || true
sleep 1
pkill -KILL -g $child_pgid 2>/dev/null || true
fi
# Kill all children individually if they still exist
if [ ! -z "$all_children" ]; then
echo "[$(date)] Killing all child processes: $all_children"
kill -TERM $all_children 2>/dev/null || true
sleep 1
kill -KILL $all_children 2>/dev/null || true
fi
# Kill the main process if it still exists
echo "[$(date)] Killing main process $child_pid"
kill -TERM $child_pid 2>/dev/null || true
sleep 1
kill -KILL $child_pid 2>/dev/null || true
# Final verification
sleep 1
remaining=$(ps -o pid= -g $child_pgid 2>/dev/null || true)
if [ ! -z "$remaining" ]; then
echo "[$(date)] WARNING: Some processes might still be running: $remaining"
pkill -KILL -g $child_pgid 2>/dev/null || true
fi
exit 0
}}
trap cleanup SIGTERM SIGINT
while true; do
if ! ps -p $parent_pid > /dev/null 2>&1; then
echo "[$(date)] Parent process ($parent_pid) not found, terminating child processes"
cleanup
exit
fi
sleep 1
done
"#
)
};

let temp_dir = std::env::temp_dir();
let script_name = if cfg!(windows) {
format!("watchdog_{parent_pid}_{child_pid}.ps1")
} else {
format!("watchdog_{parent_pid}_{child_pid}.sh")
};
let script_path = temp_dir.join(script_name);

tokio::fs::write(&script_path, script_content).await?;

// Set executable permissions on Unix systems only
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = tokio::fs::metadata(&script_path).await?.permissions();
perms.set_mode(0o755);
tokio::fs::set_permissions(&script_path, perms).await?;
}

Ok(script_path)
}

async fn spawn_watchdog(parent_pid: u32, child_pid: u32) -> Result<tokio::process::Child> {
let script_path = create_watchdog_script(parent_pid, child_pid).await?;

info!(
"Spawning watchdog process for parent_pid={}, child_pid={}",
parent_pid, child_pid
);
info!("Watchdog script path: {:?}", script_path);

// Create a log file for the watchdog
let log_path =
std::env::temp_dir().join(format!("watchdog_{}_{}.log", parent_pid, child_pid));

#[cfg(windows)]
let child = {
Command::new("powershell")
.arg("-ExecutionPolicy")
.arg("Bypass")
.arg("-NoProfile")
.arg("-NonInteractive")
.arg("-WindowStyle")
.arg("Hidden")
.arg("-File")
.arg(&script_path)
.creation_flags(0x08000000)
.stdout(std::fs::File::create(&log_path)?)
.stderr(std::fs::File::create(&log_path)?)
.spawn()?
};

#[cfg(not(windows))]
let child = {
Command::new("bash")
.arg(&script_path)
.stdout(std::fs::File::create(&log_path)?)
.stderr(std::fs::File::create(&log_path)?)
.spawn()?
};

if let Some(id) = child.id() {
info!("Watchdog process spawned with PID: {}", id);

// Modify verification for Windows
#[cfg(windows)]
{
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
let output = Command::new("powershell")
.arg("-NoProfile")
.arg("-NonInteractive")
.arg("-Command")
.arg(format!(
"Get-Process -Id {} -ErrorAction SilentlyContinue",
id
))
.output()
.await;

match output {
Ok(output) => {
if output.status.success() {
info!("Watchdog process verified running with PID: {}", id);
} else {
error!("Watchdog process not found after spawn! PID: {}", id);
}
}
Err(e) => error!("Failed to verify watchdog process: {}", e),
}
});
}

#[cfg(not(windows))]
{
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(1)).await;
let output = Command::new("ps")
.args(["-p", &id.to_string()])
.output()
.await;

match output {
Ok(output) => {
if output.status.success() {
info!("Watchdog process verified running with PID: {}", id);
} else {
error!("Watchdog process not found after spawn! PID: {}", id);
}
}
Err(e) => error!("Failed to verify watchdog process: {}", e),
}
});
}
}

// Clean up script after a delay
let script_path_clone = script_path.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(30)).await;
if let Err(e) = tokio::fs::remove_file(&script_path_clone).await {
error!("Failed to remove watchdog script: {}", e);
}
});

Ok(child)
}

pub async fn run_pipe(
pipe: &str,
screenpipe_dir: PathBuf,
Expand Down Expand Up @@ -564,24 +322,6 @@ done
debug!("[{}] streaming logs for next.js pipe", pipe);
stream_logs(pipe, &mut child).await?;

let child_pid = child.id().expect("Failed to get child PID") as u32;
let parent_pid = std::process::id();

info!("Spawned bun process with PID: {}", child_pid);

// Spawn watchdog process
match spawn_watchdog(parent_pid, child_pid).await {
Ok(watchdog) => {
debug!(
"Watchdog process spawned successfully with PID: {:?}",
watchdog.id()
);
}
Err(e) => {
warn!("Failed to spawn watchdog process: {}", e);
}
}

return Ok((child, PipeState::Port(port)));
}

Expand Down
5 changes: 4 additions & 1 deletion screenpipe-server/src/bin/screenpipe-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,11 @@ async fn main() -> anyhow::Result<()> {

let audio_runtime = Runtime::new().unwrap();
let vision_runtime = Runtime::new().unwrap();
let pipes_runtime = Runtime::new().unwrap();

let audio_handle = audio_runtime.handle().clone();
let vision_handle = vision_runtime.handle().clone();
let pipes_handle = pipes_runtime.handle().clone();

let db_clone = Arc::clone(&db);
let output_path_clone = Arc::new(local_data_dir.join("data").to_string_lossy().into_owned());
Expand Down Expand Up @@ -951,7 +953,7 @@ async fn main() -> anyhow::Result<()> {
}
match pipe_manager.start_pipe_task(pipe.id.clone()).await {
Ok(future) => {
tokio::spawn(future);
pipes_handle.spawn(future);
}
Err(e) => {
error!("failed to start pipe {}: {}", pipe.id, e);
Expand Down Expand Up @@ -1073,6 +1075,7 @@ async fn main() -> anyhow::Result<()> {
}

tokio::task::block_in_place(|| {
drop(pipes_runtime);
drop(vision_runtime);
drop(audio_runtime);
});
Expand Down
Loading

0 comments on commit d4fd975

Please sign in to comment.