Skip to content

Commit

Permalink
feat: restore stream vision API
Browse files Browse the repository at this point in the history
  • Loading branch information
louis030195 committed Feb 20, 2025
1 parent fb6115a commit f950b6d
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 36 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/build-core-pipes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,5 @@ jobs:
fi
done
echo "listing all zip files:"
ls -la *.zip || echo "no zip files found"
8 changes: 1 addition & 7 deletions screenpipe-server/src/bin/screenpipe-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,14 +548,10 @@ async fn main() -> anyhow::Result<()> {
};

let audio_chunk_duration = Duration::from_secs(cli.audio_chunk_duration);
let (realtime_vision_sender, _) = tokio::sync::broadcast::channel(1000);
let realtime_vision_sender = Arc::new(realtime_vision_sender.clone());
let realtime_vision_sender_clone = realtime_vision_sender.clone();
let handle = {
let runtime = &tokio::runtime::Handle::current();
runtime.spawn(async move {
loop {
let realtime_vision_sender_clone = realtime_vision_sender.clone();
let vad_engine_clone = vad_engine.clone(); // Clone it here for each iteration
let mut shutdown_rx = shutdown_tx_clone.subscribe();
let recording_future = start_continuous_recording(
Expand Down Expand Up @@ -583,7 +579,6 @@ async fn main() -> anyhow::Result<()> {
cli.capture_unfocused_windows,
realtime_audio_devices.clone(),
cli.enable_realtime_audio_transcription,
realtime_vision_sender_clone,
);

let result = tokio::select! {
Expand Down Expand Up @@ -626,7 +621,6 @@ async fn main() -> anyhow::Result<()> {

let (audio_devices_tx, _) = broadcast::channel(100);

let _realtime_vision_sender_clone = realtime_vision_sender_clone.clone();
// TODO: Add SSE stream for realtime audio transcription
let server = Server::new(
db_server,
Expand Down Expand Up @@ -1045,7 +1039,7 @@ async fn main() -> anyhow::Result<()> {

loop {
tokio::select! {
result = run_ui(realtime_vision_sender_clone.clone()) => {
result = run_ui() => {
match result {
Ok(_) => break,
Err(e) => {
Expand Down
35 changes: 19 additions & 16 deletions screenpipe-server/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::{DatabaseManager, VideoCapture};
use anyhow::Result;
use dashmap::DashMap;
use futures::future::join_all;
use tracing::{debug, error, info, warn};
use screenpipe_audio::vad_engine::VadSensitivity;
use screenpipe_audio::{
create_whisper_channel, record_and_transcribe, vad_engine::VadEngineEnum, AudioDevice,
Expand All @@ -13,6 +12,7 @@ use screenpipe_audio::{
use screenpipe_audio::{start_realtime_recording, AudioStream};
use screenpipe_core::pii_removal::remove_pii;
use screenpipe_core::Language;
use screenpipe_events::send_event;
use screenpipe_vision::core::{RealtimeVisionEvent, WindowOcr};
use screenpipe_vision::OcrEngine;
use std::collections::HashMap;
Expand All @@ -22,6 +22,7 @@ use std::sync::Arc;
use std::time::Duration;
use tokio::runtime::Handle;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};

#[allow(clippy::too_many_arguments)]
pub async fn start_continuous_recording(
Expand Down Expand Up @@ -49,7 +50,6 @@ pub async fn start_continuous_recording(
capture_unfocused_windows: bool,
realtime_audio_devices: Vec<Arc<AudioDevice>>,
realtime_audio_enabled: bool,
realtime_vision_sender: Arc<tokio::sync::broadcast::Sender<RealtimeVisionEvent>>,
) -> Result<()> {
debug!("Starting video recording for monitor {:?}", monitor_ids);
let video_tasks = if !vision_disabled {
Expand All @@ -62,7 +62,6 @@ pub async fn start_continuous_recording(
let ocr_engine = Arc::clone(&ocr_engine);
let ignored_windows_video = ignored_windows.to_vec();
let include_windows_video = include_windows.to_vec();
let realtime_vision_sender_clone = realtime_vision_sender.clone();

let languages = languages.clone();

Expand All @@ -81,7 +80,6 @@ pub async fn start_continuous_recording(
video_chunk_duration,
languages.clone(),
capture_unfocused_windows,
realtime_vision_sender_clone,
)
.await
})
Expand Down Expand Up @@ -186,7 +184,6 @@ async fn record_video(
video_chunk_duration: Duration,
languages: Vec<Language>,
capture_unfocused_windows: bool,
realtime_vision_sender: Arc<tokio::sync::broadcast::Sender<RealtimeVisionEvent>>,
) -> Result<()> {
debug!("record_video: Starting");
let db_chunk_callback = Arc::clone(&db);
Expand Down Expand Up @@ -228,7 +225,10 @@ async fn record_video(
while is_running.load(Ordering::SeqCst) {
if let Some(frame) = video_capture.ocr_frame_queue.pop() {
for window_result in &frame.window_ocr_results {
match db.insert_frame(&device_name, None, window_result.browser_url.as_deref()).await {
match db
.insert_frame(&device_name, None, window_result.browser_url.as_deref())
.await
{
Ok(frame_id) => {
let text_json =
serde_json::to_string(&window_result.text_json).unwrap_or_default();
Expand All @@ -239,16 +239,19 @@ async fn record_video(
&window_result.text
};

let _ = realtime_vision_sender.send(RealtimeVisionEvent::Ocr(WindowOcr {
image: Some(frame.image.clone()),
text: text.clone(),
text_json: window_result.text_json.clone(),
app_name: window_result.app_name.clone(),
window_name: window_result.window_name.clone(),
focused: window_result.focused,
confidence: window_result.confidence,
timestamp: frame.timestamp,
}));
let _ = send_event(
"ocr_result",
RealtimeVisionEvent::Ocr(WindowOcr {
image: Some(frame.image.clone()),
text: text.clone(),
text_json: window_result.text_json.clone(),
app_name: window_result.app_name.clone(),
window_name: window_result.window_name.clone(),
focused: window_result.focused,
confidence: window_result.confidence,
timestamp: frame.timestamp,
}),
);
if let Err(e) = db
.insert_ocr_text(
frame_id,
Expand Down
3 changes: 2 additions & 1 deletion screenpipe-vision/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ clap = { version = "4.0", features = ["derive"] }

# Integrations
screenpipe-integrations = { path = "../screenpipe-integrations" }
screenpipe-events = { path = "../screenpipe-events" }

# Lanuage specification
screenpipe-core = { path = "../screenpipe-core" }
Expand Down Expand Up @@ -109,4 +110,4 @@ core-foundation = "0.9.4"
url = "2.5.0"

[target.'cfg(target_os = "linux")'.dependencies]
libc = "=0.2.164"
libc = "=0.2.164"
Binary file modified screenpipe-vision/bin/ui_monitor
Binary file not shown.
Binary file modified screenpipe-vision/bin/ui_monitor-aarch64-apple-darwin
Binary file not shown.
12 changes: 5 additions & 7 deletions screenpipe-vision/src/run_ui_monitoring_macos.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::core::RealtimeVisionEvent;
use crate::UIFrame;
use anyhow::Result;
use log::{debug, error, info, warn};
use screenpipe_events::send_event;
use std::fs;
use std::io;
use std::path::PathBuf;
Expand All @@ -13,12 +16,7 @@ use tokio::signal;
use tokio::time::{sleep, timeout, Duration};
use which::which;

use crate::core::RealtimeVisionEvent;
use crate::UIFrame;

pub async fn run_ui(
realtime_vision_sender: Arc<tokio::sync::broadcast::Sender<RealtimeVisionEvent>>,
) -> Result<()> {
pub async fn run_ui() -> Result<()> {
info!("starting ui monitoring service...");

let binary_name = "ui_monitor";
Expand Down Expand Up @@ -133,7 +131,7 @@ pub async fn run_ui(
frame = UIFrame::read_from_pipe(&mut reader) => {
match frame {
Ok(frame) => {
let _ = realtime_vision_sender.send(RealtimeVisionEvent::Ui(frame));
let _ = send_event("ui_frame", RealtimeVisionEvent::Ui(frame));
}
Err(e) => {
if let Some(io_err) = e.downcast_ref::<io::Error>() {
Expand Down
7 changes: 4 additions & 3 deletions screenpipe-vision/src/ui_monitoring_macos.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1302,9 +1302,10 @@ func saveToDatabase(windowId: WindowIdentifier, newTextOutput: String, timestamp
sqlite3_bind_text(updateStmt, 3, sanitizedApp, -1, SQLITE_TRANSIENT)
sqlite3_bind_text(updateStmt, 4, sanitizedWindow, -1, SQLITE_TRANSIENT)

if sqlite3_step(updateStmt) != SQLITE_DONE {
print("error updating row")
}
// comment spam
// if sqlite3_step(updateStmt) != SQLITE_DONE {
// print("error updating row")
// }
sqlite3_finalize(updateStmt)
}
} else {
Expand Down

0 comments on commit f950b6d

Please sign in to comment.