Skip to content

Commit

Permalink
AudioBufferProcessor: add start_recording()/stop_recording()
Browse files Browse the repository at this point in the history
  • Loading branch information
aconchillo committed Feb 1, 2025
1 parent 3432727 commit 07504f9
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 17 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `resample_audio()` is now deprecated, use `create_default_resampler()`
instead.

### Removed

- `AudioBufferProcessor.reset_audio_buffers()` has been removed, use
`AudioBufferProcessor.start_recording()` and
``AudioBufferProcessor.stop_recording()` instead.

### Fixed

- Fixed a `AudioBufferProcessor` that would cause crackling in some recordings.
Expand Down
1 change: 1 addition & 0 deletions examples/canonical-metrics/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ async def main():

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await audio_buffer_processor.start_recording()
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([context_aggregator.user().get_context_frame()])

Expand Down
6 changes: 4 additions & 2 deletions examples/chatbot-audio-recording/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ async def main():
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)

# Save audio every 10 seconds.
audiobuffer = AudioBufferProcessor(buffer_size=480000)
# NOTE: Watch out! This will save all the conversation in memory. You
# can pass `buffer_size` to get periodic callbacks.
audiobuffer = AudioBufferProcessor()

pipeline = Pipeline(
[
Expand All @@ -132,6 +133,7 @@ async def on_audio_data(buffer, audio, sample_rate, num_channels):

@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
await audiobuffer.start_recording()
await transport.capture_participant_transcription(participant["id"])
await task.queue_frames([context_aggregator.user().get_context_frame()])

Expand Down
37 changes: 22 additions & 15 deletions src/pipecat/processors/audio/audio_buffer_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
Frame,
InputAudioRawFrame,
OutputAudioRawFrame,
StartFrame,
)
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor

Expand Down Expand Up @@ -47,6 +46,8 @@ def __init__(
self._last_user_frame_at = 0
self._last_bot_frame_at = 0

self._recording = False

self._resampler = create_default_resampler()

self._register_event_handler("on_audio_data")
Expand Down Expand Up @@ -74,21 +75,18 @@ def merge_audio_buffers(self) -> bytes:
else:
return b""

def reset_audio_buffers(self):
self._user_audio_buffer = bytearray()
self._bot_audio_buffer = bytearray()
async def start_recording(self):
self._recording = True
self._reset_recording()

self._last_user_frame_at = time.time()
self._last_bot_frame_at = time.time()
async def stop_recording(self):
await self._call_on_audio_data_handler()
self._recording = False

async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)

if isinstance(frame, StartFrame):
self._last_user_frame_at = time.time()
self._last_bot_frame_at = time.time()

if isinstance(frame, InputAudioRawFrame):
if self._recording and isinstance(frame, InputAudioRawFrame):
# Add silence if we need to.
silence = self._compute_silence(self._last_user_frame_at)
self._user_audio_buffer.extend(silence)
Expand All @@ -97,7 +95,7 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
self._user_audio_buffer.extend(resampled)
# Save time of frame so we can compute silence.
self._last_user_frame_at = time.time()
elif isinstance(frame, OutputAudioRawFrame):
elif self._recording and isinstance(frame, OutputAudioRawFrame):
# Add silence if we need to.
silence = self._compute_silence(self._last_bot_frame_at)
self._bot_audio_buffer.extend(silence)
Expand All @@ -111,23 +109,32 @@ async def process_frame(self, frame: Frame, direction: FrameDirection):
await self._call_on_audio_data_handler()

if isinstance(frame, (CancelFrame, EndFrame)):
await self._call_on_audio_data_handler()
await self.stop_recording()

await self.push_frame(frame, direction)

async def _call_on_audio_data_handler(self):
if not self.has_audio():
if not self.has_audio() or not self._recording:
return

merged_audio = self.merge_audio_buffers()
await self._call_event_handler(
"on_audio_data", merged_audio, self._sample_rate, self._num_channels
)
self.reset_audio_buffers()
self._reset_audio_buffers()

def _buffer_has_audio(self, buffer: bytearray) -> bool:
return buffer is not None and len(buffer) > 0

def _reset_recording(self):
self._reset_audio_buffers()
self._last_user_frame_at = time.time()
self._last_bot_frame_at = time.time()

def _reset_audio_buffers(self):
self._user_audio_buffer = bytearray()
self._bot_audio_buffer = bytearray()

async def _resample_audio(self, frame: AudioRawFrame) -> bytes:
return await self._resampler.resample(frame.audio, frame.sample_rate, self._sample_rate)

Expand Down

0 comments on commit 07504f9

Please sign in to comment.