diff --git a/.gitignore b/.gitignore index 2997eaa..b9ca696 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,6 @@ app/.env app/__pycache__/ app/agents/__pycache__/ app/services/__pycache__/ -app/utils/__pycache__/ \ No newline at end of file +app/utils/__pycache__/ + +app/.pytest_cache \ No newline at end of file diff --git a/Makefile b/Makefile index 4e412d6..1076674 100644 --- a/Makefile +++ b/Makefile @@ -4,6 +4,8 @@ prep: cd app && python3 prep_data.py run: cd app && PYTHONPATH=../.. python3 -m main +test: + cd app && pytest evals.py -v # dev dev: diff --git a/app/agents/base_agents.py b/app/agents/base_agents.py index 4278b79..253b688 100644 --- a/app/agents/base_agents.py +++ b/app/agents/base_agents.py @@ -1,64 +1,98 @@ # agents/base_agents.py from swarm import Agent -def create_user_interface_agent(transfer_functions): - return Agent( - name="User Interface Agent", - instructions=""" - You are a user interface agent that handles all interactions with the user. - Call this agent for general questions and when no other agent is correct for the user query. - When you welcome user only, show all your agents and capabilities. - Users can go back to you when they write words like: menu, go back, go user center - """, - functions=transfer_functions - ) - -def create_help_center_agent(transfer_functions): - return Agent( - name="Help Center Agent", - instructions="You are a Teslamate and Myteslamate agent who deals with questions about these products.", - functions=transfer_functions - ) - -def create_stripe_agent(transfer_functions): - return Agent( - name="Stripe Agent", - instructions=""" - You are a Stripe help center agent who deals with questions about customers, subscription status, payment, billing etc. - - Important: Always ask to confirm and display id used when you run stripe_cancel and stripe_refund! - - You must always pass the task the user center agent. - """, - functions=transfer_functions, - ) - -def create_grafana_agent(transfer_functions): - return Agent( - name="Grafana Agent", - instructions="You are a Grafana help center agent who deals with questions about users, dashboards etc. You must always pass the task the user center agent.", - functions=transfer_functions, - ) - -def create_argocd_agent(transfer_functions): - return Agent( - name="ArgoCD Agent", - instructions="You are a ArgoCD help center agent who deals with questions about applications and pods etc. You must always pass the task the user center agent.", - functions=transfer_functions, - ) - -def create_kubectl_agent(transfer_functions): - return Agent( - name="Kubectl Agent", - instructions="""You are a kubectl reading bot that can answer users' questions using information from a kubernetes. \n - In the previous step, a plan has been prepared. Use the plan to create a kubectl command and call the command. - The command ALWAYS begin by "kubectl ". +# Import utilities +from utils.db import * +from utils.print import * +from utils.kubernetes import * + +# Import services +from services.stripe_service import * +from services.grafana_service import * +from services.docs_service import * +from services.notification_service import * + +# Import agent functions and create transfer functions +def transfer_to_user_center_agent(): + return user_interface_agent + +def transfer_to_help_center(): + return help_center_agent + +def transfer_to_stripe_agent(): + return stripe_agent + +def transfer_to_grafana_agent(): + return grafana_agent + +def transfer_to_db_agent(): + return db_agent + +def transfer_to_kube_agent(): + return kubectl_agent + +def transfer_to_query(): + return query_agent + +def transfer_to_apologize(): + return apologize_agent + + +user_interface_agent = Agent( + name="User Interface Agent", + instructions=""" + You are a user interface agent that handles all interactions with the user. + Call this agent for general questions and when no other agent is correct for the user query. + When you welcome user only, show all your agents and capabilities. + Users can go back to you when they write words like: menu, go back, go user center + """, + functions=[ + transfer_to_help_center, + transfer_to_stripe_agent, + transfer_to_db_agent, + transfer_to_grafana_agent, + transfer_to_kube_agent + ] +) + +help_center_agent = Agent( + name="Help Center Agent", + instructions="You are a Teslamate and Myteslamate agent who deals with questions about these products.", + functions=[query_docs, submit_ticket, send_email, transfer_to_user_center_agent] +) + +stripe_agent = Agent( + name="Stripe Agent", + instructions=""" + You are a Stripe help center agent who deals with questions about customers, subscription status, payment, billing etc. - Please send is_sync parameter as boolean only for update_sync_policy. - Please always confirm before run restore_db and update_grafana. - """, - functions=transfer_functions - ) + Important for stripe_cancel and stripe_refund: Always ask to confirm and display id used before run! + + Always ask if you need to pass the task to user center agent or stripe agent. + """, + functions=[stripe_query, stripe_cancel, stripe_payments_list, stripe_refund], +) + +grafana_agent = Agent( + name="Grafana Agent", + instructions="""You are a Grafana help center agent who deals with questions about users, dashboards etc. + + Always ask if you need to pass the task to user center agent or stripe agent. + """, + functions=[grafana_query], +) + +kubectl_agent = Agent( + name="Kubectl Agent", + instructions="""You are a kubectl reading bot that can answer users' questions using information from a kubernetes. \n + In the previous step, a plan has been prepared. Use the plan to create a kubectl command and call the command. + The command ALWAYS begin by "kubectl ". + + Please send is_sync parameter as boolean only for update_sync_policy. + Please always confirm before run restore_db and update_grafana. + """, + functions=[kubectl, get_app, update_sync_policy, restore_db, update_grafana, transfer_to_user_center_agent] +) DATA_DESCRIPTION = """ @@ -71,41 +105,42 @@ def create_kubectl_agent(transfer_functions): 2. Table: services: This table provides services owned by users including id, user_id and status. Columns: id, name, created_at, user_id, status. + +Created_at columns are usefull to date filters. +Customers are users with a grafana_org_id. +Tesla users are users with a tesla_id. """ -def create_db_agent(transfer_functions): - return Agent( - name="Db Agent", - instructions=f"""You are a database reading bot that can answer users' questions using information from a database. \n - {DATA_DESCRIPTION} \n - Given the user's question, decide whether the question can be answered using the information in the database. \n - Return a JSON with two keys, 'reasoning' and 'can_answer', and no preamble or explanation. - Return one of the following JSON: - {{"reasoning": "I can find a user based solely on an email address because email column on user table contains it.", "can_answer":true}} - {{"reasoning": "I can find the services of customer because user_id column is linked with id column of customer", "can_answer":true}} - {{"reasoning": "I can find active service because status column is 1", "can_answer":true}} - - If the question can be answered, hand it over to the Query agent. - If it's not possible, pass the task to the Apologize agent and explain why you can't. - You must always pass the task to another agent.""", - functions=transfer_functions - ) - -def create_query_agent(transfer_functions): - return Agent( - name="Query Agent", - instructions=f"""You are a database reading bot that can answer users' questions using information from a database. \n - {DATA_DESCRIPTION} \n - In the previous step, a plan has been prepared. Use the plan to create a SQL query and call the database. - Show the result of this call. - You must always pass the task the user center agent. - """, - functions=transfer_functions - ) - -def create_apologize_agent(transfer_functions): - return Agent( - name="Apologize Agent", - instructions="Apologize and explain to the user why you cannot complete the task and retrieve the data.", - functions=transfer_functions - ) \ No newline at end of file +db_agent = Agent( + name="Db Agent", + instructions=f"""You are a database reading bot that can answer users' questions using information from a database. \n + {DATA_DESCRIPTION} \n + Given the user's question, decide whether the question can be answered using the information in the database. \n + Return a JSON with two keys, 'reasoning' and 'can_answer', and no preamble or explanation. + Return one of the following JSON: + {{"reasoning": "I can find a user based solely on an email address because email column on user table contains it.", "can_answer":true}} + {{"reasoning": "I can find the services of customer because user_id column is linked with id column of customer", "can_answer":true}} + {{"reasoning": "I can find active service because status column is 1", "can_answer":true}} + + If the question can be answered, hand it over to the Query agent. + If it's not possible, pass the task to the Apologize agent and explain why you can't. + You must always pass the task to another agent.""", + functions=[transfer_to_query, transfer_to_apologize] +) + +query_agent = Agent( + name="Query Agent", + instructions=f"""You are a database reading bot that can answer users' questions using information from a database. \n + {DATA_DESCRIPTION} \n + In the previous step, a plan has been prepared. Use the plan to create a SQL query and call the database. + Show the result of this call. + You must always pass the task the user center agent. + """, + functions=[query_db, transfer_to_stripe_agent, transfer_to_user_center_agent] +) + +apologize_agent = Agent( + name="Apologize Agent", + instructions="Apologize and explain to the user why you cannot complete the task and retrieve the data.", + functions=[transfer_to_query] +) diff --git a/app/evals.py b/app/evals.py new file mode 100644 index 0000000..a683ee5 --- /dev/null +++ b/app/evals.py @@ -0,0 +1,69 @@ +# main.py +import sys +from pathlib import Path +import pytest + + +# Ajoute le répertoire racine du projet au chemin Python +project_root = Path(__file__).parent +sys.path.append(str(project_root)) + +from swarm import Swarm +from agents.base_agents import kubectl_agent, stripe_agent, grafana_agent, db_agent + +client = Swarm() + +def run_and_get_tool_calls(agent, query, get="tool_calls"): + """Helper function to run a query with an agent and return the specified attribute from response messages.""" + message = {"role": "user", "content": query} + response = client.run(agent=agent, messages=[message], execute_tools=False) + return response.messages[-1].get(get) + +# Configuration de tests +test_cases = [ + {"agent": kubectl_agent, "query": "get pods count", "expected_function": "kubectl"}, + {"agent": kubectl_agent, "query": "get app chatbot", "expected_function": "get_app", "expected_arguments": '{"app_name":"chatbot"}'}, + {"agent": kubectl_agent, "query": "unsync app chatbot", "expected_function": "update_sync_policy", "expected_arguments": '{"app_name":"chatbot","is_sync":"false"}'}, + {"agent": stripe_agent, "query": "get user test@test.com", "expected_function": "stripe_query", "expected_arguments": '{"email":"test@test.com"}'}, + {"agent": stripe_agent, "query": "get payments of customer id 1234RTY78", "expected_function": "stripe_payments_list", "expected_arguments": '{"customer_id":"1234RTY78"}'}, + {"agent": grafana_agent, "query": "show user test@test.com in grafana", "expected_function": "grafana_query", "expected_arguments": '{"email":"test@test.com"}'}, + {"agent": db_agent, "query": "find user test@test.com", "expected_function": "transfer_to_query"}, +] + +@pytest.mark.parametrize("case", test_cases) +def test_tool_calls(case): + """Test tool calls based on provided cases in `test_cases`.""" + tool_calls = run_and_get_tool_calls(case["agent"], case["query"]) + + assert tool_calls and len(tool_calls) == 1 + assert tool_calls[0]["function"]["name"] == case["expected_function"] + + if "expected_arguments" in case: + assert tool_calls[0]["function"]["arguments"] == case["expected_arguments"] + +@pytest.mark.parametrize( + "query", + [ + "cancel the subscription xxxxxx", + "refund payment with charge id xxxxxx", + ], +) +def test_confirm_content(query): + """Test to confirm specific content in response messages.""" + content = run_and_get_tool_calls(stripe_agent, query, "content") + assert "confirm" in content.lower() or "xxxxxx" in content + +@pytest.mark.parametrize( + "query", + [ + "select all users since yesterday", + "count all customers since today", + #"count all tesla users since today", + "find services from test@test.com", + ], +) +def test_db_transfer_queries(query): + """Test db_agent calls to ensure the 'transfer_to_query' function is used.""" + tool_calls = run_and_get_tool_calls(db_agent, query) + assert tool_calls and len(tool_calls) == 1 + assert tool_calls[0]["function"]["name"] == "transfer_to_query" diff --git a/app/main.py b/app/main.py index f039572..d53aaeb 100644 --- a/app/main.py +++ b/app/main.py @@ -9,82 +9,9 @@ from fastapi import FastAPI, WebSocket from swarm import Swarm from swarm.repl import run_demo_loop -import json - -# Import utilities -from utils.db import * -from utils.print import * -from utils.kubernetes import * - -# Import services -from services.stripe_service import * -from services.grafana_service import * -from services.docs_service import * -from services.notification_service import * - -# Import agent functions and create transfer functions -def transfer_to_user_center_agent(): - return user_interface_agent - -def transfer_to_help_center(): - return help_center_agent - -def transfer_to_stripe_agent(): - return stripe_agent - -def transfer_to_grafana_agent(): - return grafana_agent - -def transfer_to_db_agent(): - return db_agent - -def transfer_to_argocd_agent(): - return argocd_agent - -def transfer_to_kube_agent(): - return kubectl_agent - -def transfer_to_query(): - return query_agent - -def transfer_to_apologize(): - return apologize_agent - -# Create list of transfer functions -transfer_to_all = [ - transfer_to_help_center, - transfer_to_stripe_agent, - transfer_to_db_agent, - transfer_to_grafana_agent, - transfer_to_argocd_agent, - transfer_to_kube_agent -] # Import and initialize agents -from agents.base_agents import ( - create_user_interface_agent, - create_help_center_agent, - create_stripe_agent, - create_grafana_agent, - create_argocd_agent, - create_kubectl_agent, - create_db_agent, - create_query_agent, - create_apologize_agent -) - -# Initialize agents -user_interface_agent = create_user_interface_agent(transfer_to_all) -transfer_to_all.append(transfer_to_user_center_agent) - -help_center_agent = create_help_center_agent([query_docs, submit_ticket, send_email, transfer_to_user_center_agent]) -stripe_agent = create_stripe_agent([stripe_query, stripe_cancel, stripe_payments_list, stripe_refund, transfer_to_user_center_agent]) -grafana_agent = create_grafana_agent([grafana_query, transfer_to_stripe_agent, transfer_to_user_center_agent]) -argocd_agent = create_argocd_agent([argocd, transfer_to_user_center_agent]) -kubectl_agent = create_kubectl_agent([kubectl, get_app, update_sync_policy, restore_db, update_grafana, transfer_to_user_center_agent]) -db_agent = create_db_agent([transfer_to_query, transfer_to_apologize]) -query_agent = create_query_agent([query_db, transfer_to_stripe_agent, transfer_to_user_center_agent]) -apologize_agent = create_apologize_agent([transfer_to_query]) +from agents.base_agents import * # Initialize FastAPI and Swarm app = FastAPI() diff --git a/app/services/stripe_service.py b/app/services/stripe_service.py index a650c89..3f421fb 100644 --- a/app/services/stripe_service.py +++ b/app/services/stripe_service.py @@ -2,6 +2,7 @@ from stripe import StripeClient def stripe_query(email): + """Get stripe customer and related subscriptions""" client = StripeClient(os.getenv('STRIPE_KEY')) customers = client.customers.list({'email': email}) @@ -15,6 +16,7 @@ def stripe_query(email): return {"response": "No customer found."} def stripe_cancel(subscription_id): + """Cancel a subscription""" client = StripeClient(os.getenv('STRIPE_KEY')) subscription = client.subscriptions.cancel( subscription_exposed_id=subscription_id, @@ -26,6 +28,7 @@ def stripe_cancel(subscription_id): return {"response": "No results found."} def stripe_payments_list(customer_id): + """List payments from a customer id""" client = StripeClient(os.getenv('STRIPE_KEY')) charges = client.charges.list(params={'customer': customer_id}) if charges: @@ -33,6 +36,7 @@ def stripe_payments_list(customer_id): return {"response": "No payments found."} def stripe_refund(charge_id): + """Refund a charge with charge id""" client = StripeClient(os.getenv('STRIPE_KEY')) refund = client.refunds.create(params={'charge': charge_id}) if refund: