Skip to content

Commit

Permalink
Update unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
markbackman committed Feb 4, 2025
1 parent ac23408 commit 0dc1d3b
Showing 1 changed file with 87 additions and 27 deletions.
114 changes: 87 additions & 27 deletions tests/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"""

import unittest
from typing import Any, Dict
from unittest.mock import AsyncMock, MagicMock, patch

from pipecat.frames.frames import LLMMessagesAppendFrame, LLMMessagesUpdateFrame, TTSSpeakFrame
Expand Down Expand Up @@ -165,16 +166,14 @@ async def test_static_flow_transitions(self):
append_frames = [f for f in first_frames if isinstance(f, LLMMessagesAppendFrame)]
self.assertEqual(len(append_frames), 1, "Should have exactly one AppendFrame")

async def test_dynamic_flow_transitions(self):
"""Test transitions in dynamic flow.
async def test_dynamic_flow_old_style_transition(self):
"""Test transitions with old-style (args, flow_manager) callback."""

Verifies that:
1. Transition callback is called with correct arguments
2. Dynamic node transitions work properly
3. State is updated correctly
"""
# Create mock transition callback
mock_transition_handler = AsyncMock()
# Create mock transition handler
async def old_style_handler(args: Dict, flow_manager: FlowManager):
pass

mock_transition_handler = AsyncMock(side_effect=old_style_handler)

# Initialize flow manager
flow_manager = FlowManager(
Expand All @@ -200,42 +199,103 @@ async def test_dynamic_flow_transitions(self):
],
}

# Set node and get registered function
# Set node and trigger transition
await flow_manager.set_node("test", test_node)
self.assertEqual(flow_manager.current_node, "test")
test_args = {"test": "value"}
transition_func = flow_manager.llm.register_function.call_args[0][1]

# Reset frame tracking
self.mock_task.queue_frames.reset_mock()
# Track callback
context_updated_callback = None

# Get the registered function and call it
transition_func = flow_manager.llm.register_function.call_args[0][1]
async def mock_result_callback(result, properties=None):
nonlocal context_updated_callback
if properties and properties.on_context_updated:
context_updated_callback = properties.on_context_updated

# Execute transition
await transition_func(
"test_function", "test_id", test_args, self.mock_llm, {}, mock_result_callback
)
await context_updated_callback()

# Verify handler was called correctly
mock_transition_handler.assert_called_once_with(test_args, flow_manager)

async def test_dynamic_flow_new_style_transition(self):
"""Test transitions with new-style (args, result, flow_manager) callback.
Verifies that:
1. FlowManager calls transition callback with (args, flow_manager)
2. Our wrapper transforms this to (args, result, flow_manager)
3. Final handler receives all three arguments correctly
"""
calls = []

# Create handler that records the transformed calls
async def new_style_handler(args: Dict, result: Any, flow_manager: FlowManager):
"""Records calls with new-style signature."""
calls.append((args, result, flow_manager))

# Create wrapper that adapts old style calls to new style
async def wrapper(args, flow_manager):
"""Adapts (args, flow_manager) to (args, result, flow_manager)."""
await new_style_handler(args, {"status": "acknowledged"}, flow_manager)

mock_transition_handler = AsyncMock(side_effect=wrapper)

# Initialize flow manager
flow_manager = FlowManager(
task=self.mock_task,
llm=self.mock_llm,
context_aggregator=self.mock_context_aggregator,
)
await flow_manager.initialize()

# Create test node with transition callback
test_node: NodeConfig = {
"task_messages": [{"role": "system", "content": "Test message"}],
"functions": [
{
"type": "function",
"function": {
"name": "test_function",
"description": "Test function",
"parameters": {},
"transition_callback": mock_transition_handler,
},
}
],
}

# Set node and trigger transition
await flow_manager.set_node("test", test_node)
test_args = {"test": "value"}
transition_func = flow_manager.llm.register_function.call_args[0][1]

# Track the on_context_updated callback
# Track callback
context_updated_callback = None

async def mock_result_callback(result, properties=None):
nonlocal context_updated_callback
if properties and properties.on_context_updated:
context_updated_callback = properties.on_context_updated

# Call the transition function
# Execute transition
await transition_func(
"test_function",
"test_id",
test_args,
self.mock_llm,
{},
mock_result_callback,
"test_function", "test_id", test_args, self.mock_llm, {}, mock_result_callback
)

# Execute the context updated callback
self.assertIsNotNone(context_updated_callback, "Context updated callback not set")
await context_updated_callback()

# Verify handler was called with correct arguments
# Verify FlowManager called the wrapper with old-style args
mock_transition_handler.assert_called_once_with(test_args, flow_manager)

# Verify handler received transformed args
self.assertEqual(len(calls), 1, "Handler should be called exactly once")
actual_args, actual_result, actual_flow_manager = calls[0]
self.assertEqual(actual_args, test_args)
self.assertEqual(actual_result, {"status": "acknowledged"})
self.assertEqual(actual_flow_manager, flow_manager)

async def test_node_validation(self):
"""Test node configuration validation."""
flow_manager = FlowManager(
Expand Down

0 comments on commit 0dc1d3b

Please sign in to comment.