Skip to content

Commit

Permalink
the on_audio_data event handler instead of adding the service to the …
Browse files Browse the repository at this point in the history
…pipeline
  • Loading branch information
adriancowham committed Feb 5, 2025
1 parent cc54255 commit c954ba4
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 28 deletions.
9 changes: 7 additions & 2 deletions examples/canonical-metrics/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def main():
call completion, CanonicalMetrics will send the audio buffer to Canonical for
analysis. Visit https://voice.canonical.chat to learn more.
"""
audio_buffer_processor = AudioBufferProcessor(num_channels=2)
audio_buffer_processor = AudioBufferProcessor(num_channels=2, buffer_size=1000000)
canonical = CanonicalMetricsService(
audio_buffer_processor=audio_buffer_processor,
aiohttp_session=session,
Expand All @@ -115,13 +115,18 @@ async def main():
tts,
transport.output(),
audio_buffer_processor, # captures audio into a buffer
canonical, # uploads audio buffer to Canonical AI for metrics
context_aggregator.assistant(),
]
)

task = PipelineTask(pipeline, PipelineParams(allow_interruptions=True))

@audio_buffer_processor.event_handler("on_audio_data")
async def on_audio_data(
audio_buffer: bytes, sample_rate: int, num_channels: int, end_of_audio: bool
):
canonical.process_audio_buffer(audio_buffer, sample_rate, num_channels, end_of_audio)

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await audio_buffer_processor.start_recording()
Expand Down
8 changes: 4 additions & 4 deletions src/pipecat/processors/audio/audio_buffer_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async def start_recording(self):
self._reset_recording()

async def stop_recording(self):
await self._call_on_audio_data_handler()
await self._call_on_audio_data_handler(end_of_audio=True)
self._recording = False

async def process_frame(self, frame: Frame, direction: FrameDirection):
Expand All @@ -106,20 +106,20 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
self._last_bot_frame_at = time.time()

if self._buffer_size > 0 and len(self._user_audio_buffer) > self._buffer_size:
await self._call_on_audio_data_handler()
await self._call_on_audio_data_handler(end_of_audio=False)

if isinstance(frame, (CancelFrame, EndFrame)):
await self.stop_recording()

await self.push_frame(frame, direction)

async def _call_on_audio_data_handler(self):
async def _call_on_audio_data_handler(self, end_of_audio: bool):
if not self.has_audio() or not self._recording:
return

merged_audio = self.merge_audio_buffers()
await self._call_event_handler(
"on_audio_data", merged_audio, self._sample_rate, self._num_channels
"on_audio_data", merged_audio, self._sample_rate, self._num_channels, end_of_audio
)
self._reset_audio_buffers()

Expand Down
54 changes: 32 additions & 22 deletions src/pipecat/services/canonical.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ def __init__(
self._assistant_speaks_first = assistant_speaks_first
self._output_dir = output_dir
self._context = context
self._audio_filename = self._get_output_filename()

async def stop(self, frame: EndFrame):
await super().stop(frame)
Expand All @@ -95,32 +96,41 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
await self.push_frame(frame, direction)

async def _process_audio(self):
audio_buffer_processor = self._audio_buffer_processor

if not audio_buffer_processor.has_audio():
return

async def process_audio_buffer(
self, audio_buffer: bytes, sample_rate: int, num_channels: int, end_of_audio: bool
):
# Create output directory if it doesn't exist
os.makedirs(self._output_dir, exist_ok=True)
filename = self._get_output_filename()
audio = audio_buffer_processor.merge_audio_buffers()

with io.BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:
wf.setsampwidth(2)
wf.setnchannels(audio_buffer_processor.num_channels)
wf.setframerate(audio_buffer_processor.sample_rate)
wf.writeframes(audio)
async with aiofiles.open(filename, "wb") as file:
await file.write(buffer.getvalue())

# Write audio buffer to file with proper WAV format
try:
await self._multipart_upload(filename)
await aiofiles.os.remove(filename)
except FileNotFoundError:
pass
# First write: create file with WAV header
if not os.path.exists(self._audio_filename):
with io.BytesIO() as buffer:
with wave.open(buffer, "wb") as wf:
wf.setsampwidth(2) # 16-bit audio
wf.setnchannels(num_channels)
wf.setframerate(sample_rate)
wf.writeframes(audio_buffer)
async with aiofiles.open(self._audio_filename, "wb") as file:
await file.write(buffer.getvalue())
# Subsequent writes: append raw audio data
else:
async with aiofiles.open(self._audio_filename, "ab") as file:
await file.write(audio_buffer)
except Exception as e:
logger.error(f"Failed to upload recording: {e}")
logger.error(f"Failed to write audio buffer: {e}")
return

# Handle end of audio
if end_of_audio:
try:
await self._multipart_upload(self._audio_filename)
await aiofiles.os.remove(self._audio_filename)
except FileNotFoundError:
pass
except Exception as e:
logger.error(f"Failed to upload recording: {e}")

def _get_output_filename(self):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
Expand Down

0 comments on commit c954ba4

Please sign in to comment.