Skip to content

Commit

Permalink
Sdk realtime (#1421)
Browse files Browse the repository at this point in the history
* speaker type

* speaker type

* speaker type

* speaker type

* similar string dep

* deepgram speaker diarization and nova3

* enigo

* sdk

* sdk

* returned to proper sdk

* sdk build

* deepgram with mic echo check and speaker info

* restoring
  • Loading branch information
m13v authored Feb 20, 2025
1 parent 8c85048 commit c6a5fab
Show file tree
Hide file tree
Showing 7 changed files with 2,802 additions and 14 deletions.
38 changes: 35 additions & 3 deletions screenpipe-audio/src/deepgram/realtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,14 @@ use futures::{SinkExt, TryStreamExt};
use screenpipe_core::Language;
use screenpipe_events::send_event;
use std::sync::{atomic::AtomicBool, Arc};
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::broadcast::Receiver;
use tokio::sync::oneshot;
use tracing::info;
use std::sync::atomic::{AtomicI64, Ordering};

// Add this near other static/global variables
static LAST_DISPLAY_AUDIO_ACTIVITY: AtomicI64 = AtomicI64::new(0);

pub async fn stream_transcription_deepgram(
stream: Arc<AudioStream>,
Expand Down Expand Up @@ -81,6 +85,7 @@ pub async fn start_deepgram_stream(
deepgram::common::options::OptionsBuilder::new()
.model(deepgram::common::options::Model::Nova2)
.smart_format(true)
.diarize(true)
.build(),
)
.keep_alive()
Expand All @@ -89,7 +94,7 @@ pub async fn start_deepgram_stream(
.encoding(Encoding::Linear16);

let mut handle = req.clone().handle().await?;
let mut results = req.stream(get_stream(stream)).await?;
let mut results = req.stream(get_stream(stream, device.device_type.clone())).await?;
let device_clone = device.clone();

loop {
Expand All @@ -116,11 +121,33 @@ pub async fn start_deepgram_stream(
Ok(())
}

fn get_stream(mut stream: Receiver<Vec<f32>>) -> FuturesReceiver<Result<Bytes, RecvError>> {
fn get_stream(mut stream: Receiver<Vec<f32>>, device_type: DeviceType) -> FuturesReceiver<Result<Bytes, RecvError>> {
let (mut tx, rx) = mpsc::channel(1);

tokio::spawn(async move {
while let Ok(data) = stream.recv().await {
if device_type == DeviceType::Output {
let sum_squares: f32 = data.iter().map(|&x| x * x).sum();
let rms = (sum_squares / data.len() as f32).sqrt();

if rms > 0.01 {
LAST_DISPLAY_AUDIO_ACTIVITY.store(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64,
Ordering::SeqCst
);
}
} else if SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64
- LAST_DISPLAY_AUDIO_ACTIVITY.load(Ordering::SeqCst) < 100
{
continue;
}

let mut bytes = BytesMut::with_capacity(data.len() * 2);
for sample in data {
bytes.put_i16_le((sample * i16::MAX as f32) as i16);
Expand All @@ -142,6 +169,10 @@ async fn handle_transcription(result: StreamResponse, device: Arc<AudioDevice>)
let res = channel.alternatives.first().unwrap();
let text = res.transcript.clone();
let is_input = device.device_type == DeviceType::Input;

let speaker = res.words.first()
.and_then(|w| w.speaker)
.map(|s| s.to_string());

if !text.is_empty() {
let _ = send_event(
Expand All @@ -152,6 +183,7 @@ async fn handle_transcription(result: StreamResponse, device: Arc<AudioDevice>)
transcription: text.to_string(),
is_final,
is_input,
speaker,
},
);
}
Expand Down
2 changes: 2 additions & 0 deletions screenpipe-audio/src/realtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ pub struct RealtimeTranscriptionEvent {
pub transcription: String,
pub is_final: bool,
pub is_input: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub speaker: Option<String>,
}
Loading

0 comments on commit c6a5fab

Please sign in to comment.