Skip to content

Commit

Permalink
Merge pull request #1100 from pipecat-ai/aleix/use-task-cancel-on-lef…
Browse files Browse the repository at this point in the history
…t-disconnected

use `task.cancel()` when participant leaves/disconnects
  • Loading branch information
aconchillo authored Jan 29, 2025
2 parents 79ef8c9 + 0547a15 commit 8cd23c4
Show file tree
Hide file tree
Showing 49 changed files with 90 additions and 125 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Fixed a `DailyTransport` issue that would cause events to be triggered before
join finished.

- Fixed a `PipelineTask` issue that was preventing processors to be cleaned up
after cancelling the task.

- Fixed an issue where queuing a `CancelFrame` to a pipeline task would not
cause the task to finish. However, using `PipelineTask.cancel()` is still the
recommended way to cancel a task.

### Other

- Updated all examples to use `task.cancel()` instead of pushing an `EndFrame`
when a participant leaves/disconnects. If you push an `EndFrame` this will
cause the bot to run through everything that is internally queued (which could
take seconds). Instead, if a participant disconnects there is nothing else to
be sent and therefore we should stop immediately.

## [0.0.54] - 2025-01-27

### Added
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Here is a very basic Pipecat bot that greets a user when they join a real-time s
```python
import asyncio

from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.task import PipelineTask
from pipecat.pipeline.runner import PipelineRunner
Expand Down Expand Up @@ -122,7 +122,7 @@ async def main():
# Register an event handler to exit the application when the user leaves.
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.cancel()

# Run the pipeline task
await runner.run(task)
Expand Down
4 changes: 3 additions & 1 deletion examples/canonical-metrics/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,13 @@ async def on_first_participant_joined(transport, participant):
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
print(f"Participant left: {participant}")
await task.queue_frame(EndFrame())
await task.cancel()

@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
if state == "left":
# Here we don't want to cancel, we just want to finish sending
# whatever is queued, so we use an EndFrame().
await task.queue_frame(EndFrame())

runner = PipelineRunner()
Expand Down
3 changes: 1 addition & 2 deletions examples/chatbot-audio-recording/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from runner import configure

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -139,7 +138,7 @@ async def on_first_participant_joined(transport, participant):
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
print(f"Participant left: {participant}")
await task.queue_frame(EndFrame())
await task.cancel()

runner = PipelineRunner()

Expand Down
4 changes: 3 additions & 1 deletion examples/deployment/flyio-example/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ async def on_first_participant_joined(transport, participant):

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.cancel()

@transport.event_handler("on_call_state_updated")
async def on_call_state_updated(transport, state):
if state == "left":
# Here we don't want to cancel, we just want to finish sending
# whatever is queued, so we use an EndFrame().
await task.queue_frame(EndFrame())

runner = PipelineRunner()
Expand Down
21 changes: 10 additions & 11 deletions examples/deployment/modal-example/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,22 @@
from dotenv import load_dotenv
from loguru import logger

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport

load_dotenv(override=True)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")


async def main(room_url: str, token: str):
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.cartesia import CartesiaTTSService
from pipecat.services.openai import OpenAILLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport

transport = DailyTransport(
room_url,
token,
Expand Down Expand Up @@ -79,7 +78,7 @@ async def on_first_participant_joined(transport, participant):

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.cancel()

runner = PipelineRunner()

Expand Down
4 changes: 2 additions & 2 deletions examples/foundational/03-still-frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from loguru import logger
from runner import configure

from pipecat.frames.frames import EndFrame, TextFrame
from pipecat.frames.frames import TextFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
Expand Down Expand Up @@ -53,7 +53,7 @@ async def on_first_participant_joined(transport, participant):

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.cancel()

await runner.run(task)

Expand Down
4 changes: 2 additions & 2 deletions examples/foundational/06-listen-and-respond.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from runner import configure

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, Frame, MetricsFrame
from pipecat.frames.frames import Frame, MetricsFrame
from pipecat.metrics.metrics import (
LLMUsageMetricsData,
ProcessingMetricsData,
Expand Down Expand Up @@ -115,7 +115,7 @@ async def on_first_participant_joined(transport, participant):

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.cancel()

runner = PipelineRunner()

Expand Down
3 changes: 1 addition & 2 deletions examples/foundational/06a-image-sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
EndFrame,
Frame,
OutputImageRawFrame,
TextFrame,
Expand Down Expand Up @@ -144,7 +143,7 @@ async def on_first_participant_joined(transport, participant):

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.cancel()

runner = PipelineRunner()

Expand Down
3 changes: 1 addition & 2 deletions examples/foundational/07-interruptible-vad.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from loguru import logger
from runner import configure

from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -94,7 +93,7 @@ async def on_first_participant_joined(transport, participant):

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.cancel()

runner = PipelineRunner()

Expand Down
3 changes: 1 addition & 2 deletions examples/foundational/07-interruptible.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from runner import configure

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -92,7 +91,7 @@ async def on_first_participant_joined(transport, participant):

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.cancel()

runner = PipelineRunner()

Expand Down
3 changes: 1 addition & 2 deletions examples/foundational/07a-interruptible-anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from runner import configure

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -96,7 +95,7 @@ async def on_first_participant_joined(transport, participant):

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.cancel()

runner = PipelineRunner()

Expand Down
4 changes: 2 additions & 2 deletions examples/foundational/07b-interruptible-langchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from runner import configure

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame, LLMMessagesFrame
from pipecat.frames.frames import LLMMessagesFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -124,7 +124,7 @@ async def on_first_participant_joined(transport, participant):

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.cancel()

runner = PipelineRunner()

Expand Down
3 changes: 1 addition & 2 deletions examples/foundational/07c-interruptible-deepgram-vad.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

from pipecat.frames.frames import (
BotInterruptionFrame,
EndFrame,
StopInterruptionFrame,
UserStartedSpeakingFrame,
UserStoppedSpeakingFrame,
Expand Down Expand Up @@ -106,7 +105,7 @@ async def on_first_participant_joined(transport, participant):

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.cancel()

runner = PipelineRunner()

Expand Down
3 changes: 1 addition & 2 deletions examples/foundational/07c-interruptible-deepgram.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from runner import configure

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -91,7 +90,7 @@ async def on_first_participant_joined(transport, participant):

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.cancel()

runner = PipelineRunner()

Expand Down
3 changes: 1 addition & 2 deletions examples/foundational/07d-interruptible-elevenlabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from runner import configure

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -92,7 +91,7 @@ async def on_first_participant_joined(transport, participant):

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.cancel()

runner = PipelineRunner()

Expand Down
4 changes: 1 addition & 3 deletions examples/foundational/07e-interruptible-playht-http.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@
from runner import configure

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.services.openai import OpenAILLMService
from pipecat.services.playht import PlayHTHttpTTSService
from pipecat.transcriptions.language import Language
from pipecat.transports.services.daily import DailyParams, DailyTransport

load_dotenv(override=True)
Expand Down Expand Up @@ -94,7 +92,7 @@ async def on_first_participant_joined(transport, participant):

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.cancel()

runner = PipelineRunner()

Expand Down
3 changes: 1 addition & 2 deletions examples/foundational/07e-interruptible-playht.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from runner import configure

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -95,7 +94,7 @@ async def on_first_participant_joined(transport, participant):

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.cancel()

runner = PipelineRunner()

Expand Down
3 changes: 1 addition & 2 deletions examples/foundational/07f-interruptible-azure.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from runner import configure

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -101,7 +100,7 @@ async def on_first_participant_joined(transport, participant):

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.cancel()

runner = PipelineRunner()

Expand Down
3 changes: 1 addition & 2 deletions examples/foundational/07g-interruptible-openai-tts.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from runner import configure

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -89,7 +88,7 @@ async def on_first_participant_joined(transport, participant):

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.cancel()

runner = PipelineRunner()

Expand Down
3 changes: 1 addition & 2 deletions examples/foundational/07h-interruptible-openpipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from runner import configure

from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.frames.frames import EndFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
Expand Down Expand Up @@ -99,7 +98,7 @@ async def on_first_participant_joined(transport, participant):

@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
await task.queue_frame(EndFrame())
await task.cancel()

runner = PipelineRunner()

Expand Down
Loading

0 comments on commit 8cd23c4

Please sign in to comment.