Skip to content

Commit

Permalink
add text 2 structured output sample code
Browse files Browse the repository at this point in the history
  • Loading branch information
Corneliu Croitoru committed Nov 7, 2024
1 parent 9867905 commit 25c2967
Show file tree
Hide file tree
Showing 7 changed files with 665 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ examples/chat-demo-app/bin/*.js
!examples/lambda/url_rewrite/*.js
examples/resources/ui/public/aws-exports.json
examples/resources/ui/dist
examples/text-2-structured-output/venv


.DS_Store
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ Get hands-on experience with the Multi-Agent Orchestrator through our diverse se
- [`ecommerce-support-simulator`](https://github.com/awslabs/multi-agent-orchestrator/tree/main/examples/ecommerce-support-simulator): AI-powered customer support system
- [`chat-chainlit-app`](https://github.com/awslabs/multi-agent-orchestrator/tree/main/examples/chat-chainlit-app): Chat application built with Chainlit
- [`fast-api-streaming`](https://github.com/awslabs/multi-agent-orchestrator/tree/main/examples/fast-api-streaming): FastAPI implementation with streaming support
- [`text-2-structured-output`](https://github.com/awslabs/multi-agent-orchestrator/tree/main/examples/text-2-structured-output): Natural Language to Structured Data


All examples are available in both Python and TypeScript implementations. Check out our [documentation](https://awslabs.github.io/multi-agent-orchestrator/) for comprehensive guides on setting up and using the Multi-Agent Orchestrator!

Expand Down
177 changes: 177 additions & 0 deletions examples/text-2-structured-output/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
# Natural Language to Structured Data

A demonstration of how to transform free-text queries into structured, actionable data using a multi-agent architecture.

## Overview

This project implements a proof-of-concept system that:
1. Takes natural language input from users
2. Routes queries to specialized agents using an orchestrator
3. Transforms free text into structured formats (JSON for product searches, contextual responses for returns)

Perfect for teams looking to build systems that need to:
- Convert customer queries into structured database searches
- Transform natural language into API-ready parameters
- Handle multiple types of queries with different output structures
- Maintain context and provide real-time responses

## Overview

This project implements a multi-agent orchestration system designed to handle various e-commerce related queries. It features:

- **Product Search Agent**: Processes natural language queries about products and converts them into structured search parameters
- **Returns & Terms Assistant**: Handles inquiries about return policies, refunds, and terms & conditions
- **Greeting Agent**: Welcomes users and helps them navigate the available services

The system uses AWS Bedrock with Claude 3 Sonnet as the underlying language model and implements streaming responses for real-time interaction.

## Prerequisites

- Python 3.12
- AWS Account with Bedrock access

## Setup

Install dependencies:
```bash
pip install -r requirements.txt
```

## Usage

Run the script using:

```bash
python main.py
```

The system will start in interactive mode, allowing you to input queries and receive responses in real-time.

## Sample Interactions

### 1. Product Search

**Input:**
```
Show me Prime-eligible headphones under $100 with good reviews
```
**Output:**
```json
{
"department": "Electronics",
"categories": ["Headphones"],
"priceRange": {
"max": 100,
"currency": "USD"
},
"customerReview": {
"stars": 4,
"operator": "gte"
},
"shippingOptions": {
"prime": true
},
"condition": "New"
}
```

### 2. Returns Policy

**Input:**
```
How do I return a damaged item?
```

**Output:**
```
For damaged items, here's our return process:
1. Document the damage with photos
2. Contact customer service within 48 hours of receipt
3. You'll receive a prepaid return label
4. Package the item securely with all original materials
5. Ship within 14 days of receiving the return label
The refund will be processed to your original payment method within 3-5 business days after we receive the item. Shipping costs are covered for damaged items.
```

### 3. General Greeting

**Input:**
```
hello
```

**Output:**
```markdown
## Welcome! 👋

I'm the greeting agent, here to help you navigate our services. We have several specialized agents available:

- **Product Search Agent**: Find products, compare prices, and discover deals
- **Returns and Terms Assistant**: Get help with returns, refunds, and policies

How can we assist you today? Feel free to ask about:
- Product searches and recommendations
- Return policies and procedures
- General assistance and guidance
```

## Agents

The system is built on three main components:

1. **MultiAgentOrchestrator**: Routes queries to appropriate agents
2. **Agents**: Specialized handlers for different types of queries
3. **Streaming Handler**: Manages real-time response generation


### Product Search Agent
The current implementation demonstrates the agent's capability to convert natural language queries into structured JSON output. This is just the first step - in a production environment, you would:

1. Implement the TODO section in the `process_request` method
2. Add calls to your internal APIs, databases, or search engines
3. Use the structured JSON to query your product catalog
4. Return actual product results instead of just the parsed query

Example implementation in the TODO section:
```python
# After getting parsed_response:
products = await your_product_service.search(
department=parsed_response['department'],
price_range=parsed_response['priceRange'],
# ... other parameters
)
return ConversationMessage(
role=ParticipantRole.ASSISTANT.value,
content=[{"text": format_product_results(products)}]
)
```

### Returns and Terms Assistant
The current implementation uses a static prompt. To make it more powerful and maintenance-friendly:

1. Integrate with a vector storage solution like [Amazon Bedrock Knowledge Base](https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base.html) or other vector databases
2. Set up a retrieval system to fetch relevant policy documents
3. Update the agent's prompt with retrieved context

Example enhancement:
```python
retriever = BedrockKnowledgeBaseRetriever(
kb_id="your-kb-id",
region_name="your-region"
)
# Add to the agent's configuration
```

### Greeting Agent
The greeting agent has been implemented as a crucial component for chat-based interfaces. Its primary purposes are:

1. Providing a friendly entry point to the system
2. Helping users understand available capabilities
3. Guiding users toward the most appropriate agent
4. Reducing user confusion and improving engagement

This pattern is especially useful in chat interfaces where users might not initially know what kinds of questions they can ask or which agent would best serve their needs.

164 changes: 164 additions & 0 deletions examples/text-2-structured-output/multi_agent_query_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import uuid
import asyncio
import argparse
from queue import Queue
from threading import Thread

from multi_agent_orchestrator.orchestrator import MultiAgentOrchestrator, AgentResponse, OrchestratorConfig
from multi_agent_orchestrator.types import ConversationMessage
from multi_agent_orchestrator.classifiers import BedrockClassifier, BedrockClassifierOptions
from multi_agent_orchestrator.storage import DynamoDbChatStorage
from multi_agent_orchestrator.agents import (
BedrockLLMAgent,
AgentResponse,
AgentCallbacks,
BedrockLLMAgentOptions,
)

from typing import Dict, List, Any

from product_search_agent import ProductSearchAgent, ProductSearchAgentOptions
from prompts import RETURNS_PROMPT, GREETING_AGENT_PROMPT


class MyCustomHandler(AgentCallbacks):
def __init__(self, queue) -> None:
super().__init__()
self._queue = queue
self._stop_signal = None

def on_llm_new_token(self, token: str, **kwargs) -> None:
self._queue.put(token)

def on_llm_start(self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any) -> None:
print("generation started")

def on_llm_end(self, response: Any, **kwargs: Any) -> None:
print("\n\ngeneration concluded")
self._queue.put(self._stop_signal)

def setup_orchestrator(streamer_queue):

classifier = BedrockClassifier(BedrockClassifierOptions(
model_id='anthropic.claude-3-sonnet-20240229-v1:0',
))


orchestrator = MultiAgentOrchestrator(options=OrchestratorConfig(
LOG_AGENT_CHAT=True,
LOG_CLASSIFIER_CHAT=True,
LOG_CLASSIFIER_RAW_OUTPUT=True,
LOG_CLASSIFIER_OUTPUT=True,
LOG_EXECUTION_TIMES=True,
MAX_RETRIES=3,
USE_DEFAULT_AGENT_IF_NONE_IDENTIFIED=False,
NO_SELECTED_AGENT_MESSAGE = """
I'm not quite sure how to help with that. Could you please:
- Provide more details, or
- Rephrase your question?
If you're unsure where to start, try saying **"hello"** to see:
- A list of available agents
- Their specific roles and capabilities
This will help you understand the kinds of questions and topics our system can assist you with.
""",
MAX_MESSAGE_PAIRS_PER_AGENT=10
),
classifier = classifier
)

product_search_agent = ProductSearchAgent(ProductSearchAgentOptions(
name="Product Search Agent",
description="Specializes in e-commerce product searches and listings. Handles queries about finding specific products, product rankings, specifications, price comparisons within an online shopping context. Use this agent for shopping-related queries and product discovery in a retail environment.",
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
save_chat=True,
))

my_handler = MyCustomHandler(streamer_queue)
returns_agent = BedrockLLMAgent(BedrockLLMAgentOptions(
name="Returns and Terms Assistant",
streaming=True,
description="Specializes in explaining return policies, refund processes, and terms & conditions. Provides clear guidance on customer rights, warranty claims, and special cases while maintaining up-to-date knowledge of consumer protection regulations and e-commerce best practices.",
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
#TODO SET a retriever to fetch data from a knowledge base
callbacks=my_handler
))

returns_agent.set_system_prompt(RETURNS_PROMPT)

orchestrator.add_agent(product_search_agent)
orchestrator.add_agent(returns_agent)

agents = orchestrator.get_all_agents()

greeting_agent = BedrockLLMAgent(BedrockLLMAgentOptions(
name="Greeting agent",
streaming=True,
description="Says hello and lists the available agents",
model_id="anthropic.claude-3-sonnet-20240229-v1:0",
save_chat=False,
callbacks=my_handler
))

agent_list = "\n".join([f"{i}-{info['name']}: {info['description']}" for i, (_, info) in enumerate(agents.items(), 1)])

greeting_prompt = GREETING_AGENT_PROMPT(agent_list)
greeting_agent.set_system_prompt(greeting_prompt)

orchestrator.add_agent(greeting_agent)
return orchestrator

async def start_generation(query, user_id, session_id, streamer_queue):
try:
# Create a new orchestrator for this query
orchestrator = setup_orchestrator(streamer_queue)

response = await orchestrator.route_request(query, user_id, session_id)
if isinstance(response, AgentResponse) and response.streaming is False:
if isinstance(response.output, str):
streamer_queue.put(response.output)
elif isinstance(response.output, ConversationMessage):
streamer_queue.put(response.output.content[0].get('text'))
except Exception as e:
print(f"Error in start_generation: {e}")
finally:
streamer_queue.put(None) # Signal the end of the response

async def response_generator(query, user_id, session_id):
streamer_queue = Queue()

# Start the generation process in a separate thread
Thread(target=lambda: asyncio.run(start_generation(query, user_id, session_id, streamer_queue))).start()

#print("Waiting for the response...")
while True:
try:
value = await asyncio.get_event_loop().run_in_executor(None, streamer_queue.get)
if value is None:
break
yield value
streamer_queue.task_done()
except Exception as e:
print(f"Error in response_generator: {e}")
break

async def run_chatbot():
user_id = str(uuid.uuid4())
session_id = str(uuid.uuid4())

while True:
query = input("\nEnter your query (or 'quit' to exit): ").strip()
if query.lower() == 'quit':
break
try:
async for token in response_generator(query, user_id, session_id):
print(token, end='', flush=True)
print() # New line after response
except Exception as error:
print("Error:", error)

if __name__ == "__main__":
asyncio.run(run_chatbot())
Loading

0 comments on commit 25c2967

Please sign in to comment.