From bde9e5d4d1d8a3e99f7b19726a963e4f6eeeb15c Mon Sep 17 00:00:00 2001 From: Kye Gomez Date: Sat, 4 Jan 2025 17:18:57 -0500 Subject: [PATCH] [CLEANUP] --- evalops/__init__.py | 10 +- evalops/function_eval.py | 307 ++++++++++++++ auto_eval.py => examples/auto_eval.py | 0 .../huggingface_eval_example.py | 0 .../huggingface_simple_example.py | 0 experimental/test.py | 4 +- function_call_eval.py | 243 +++++++++++ new.py | 395 ++++++++++++++++++ queen_swarm.py | 224 ---------- 9 files changed, 954 insertions(+), 229 deletions(-) create mode 100644 evalops/function_eval.py rename auto_eval.py => examples/auto_eval.py (100%) rename huggingface_eval_example.py => examples/huggingface_eval_example.py (100%) rename huggingface_simple_example.py => examples/huggingface_simple_example.py (100%) create mode 100644 function_call_eval.py create mode 100644 new.py delete mode 100644 queen_swarm.py diff --git a/evalops/__init__.py b/evalops/__init__.py index cd60a1f..7bb7ec2 100644 --- a/evalops/__init__.py +++ b/evalops/__init__.py @@ -1,5 +1,11 @@ -from evalops.main import StatisticalModelEvaluator +from evalops.function_eval import FunctionCallEvaluator from evalops.huggingface_loader import EvalDatasetLoader +from evalops.main import StatisticalModelEvaluator from evalops.wrapper import eval -__all__ = ["StatisticalModelEvaluator", "EvalDatasetLoader", "eval"] +__all__ = [ + "StatisticalModelEvaluator", + "EvalDatasetLoader", + "eval", + "FunctionCallEvaluator", +] diff --git a/evalops/function_eval.py b/evalops/function_eval.py new file mode 100644 index 0000000..6a332e1 --- /dev/null +++ b/evalops/function_eval.py @@ -0,0 +1,307 @@ +from typing import Any, Dict, List +from pydantic import BaseModel, ValidationError +import jsonschema + +from evalops.main import StatisticalModelEvaluator + + +class FunctionCallResult(BaseModel): + """ + Stores the evaluation results for function calling tests. + + Attributes: + schema_valid: Whether the function schema is valid JSON Schema + execution_valid: Whether the function execution was successful + schema_errors: List of schema validation errors if any + execution_errors: List of execution errors if any + matching_score: Score for how well the execution matched expected output + metadata: Additional metadata about the evaluation + """ + + schema_valid: bool + execution_valid: bool + schema_errors: List[str] + execution_errors: List[str] + matching_score: float + metadata: Dict[str, Any] + + +class FunctionCallEvaluator: + """ + Evaluator for testing function calling capabilities and schema correctness. + + This evaluator extends the base StatisticalModelEvaluator to add specific + function calling evaluation capabilities. + """ + + def __init__(self, base_evaluator: StatisticalModelEvaluator): + self.base_evaluator = base_evaluator + + def validate_function_schema( + self, schema: Dict[str, Any] + ) -> Dict[str, Any]: + """ + Validates if a given function schema is correct and follows JSON Schema specification. + + Args: + schema: The function schema to validate + + Returns: + Dictionary containing validation results and any errors found + """ + errors = [] + try: + # Validate basic JSON Schema structure + jsonschema.Draft7Validator.check_schema(schema) + + # Check for required function schema elements + required_fields = ["name", "description", "parameters"] + missing_fields = [ + field + for field in required_fields + if field not in schema + ] + if missing_fields: + errors.append( + f"Missing required fields: {', '.join(missing_fields)}" + ) + + # Validate parameters object + if "parameters" in schema: + if not isinstance(schema["parameters"], dict): + errors.append("Parameters must be an object") + else: + if "properties" not in schema["parameters"]: + errors.append( + "Parameters object must contain 'properties'" + ) + if "required" not in schema["parameters"]: + errors.append( + "Parameters object must specify 'required' fields" + ) + + schema_valid = len(errors) == 0 + + except jsonschema.exceptions.SchemaError as e: + schema_valid = False + errors.append(f"Schema validation error: {str(e)}") + + return {"valid": schema_valid, "errors": errors} + + def evaluate_function_call( + self, + function_schema: Dict[str, Any], + test_cases: List[Dict[str, Any]], + expected_outputs: List[Any], + ) -> FunctionCallResult: + """ + Evaluates function calling implementation against test cases. + + Args: + function_schema: The function schema to test + test_cases: List of input test cases + expected_outputs: List of expected outputs for each test case + + Returns: + FunctionCallResult containing evaluation metrics + """ + # First validate the schema + schema_validation = self.validate_function_schema( + function_schema + ) + + execution_errors = [] + execution_scores = [] + + # If schema is valid, test execution + if schema_validation["valid"]: + for test_case, expected in zip( + test_cases, expected_outputs + ): + try: + # Validate test case against schema + jsonschema.validate( + test_case, function_schema["parameters"] + ) + + # Compare output structure + execution_score = self._compare_outputs( + test_case, expected + ) + execution_scores.append(execution_score) + + except ValidationError as e: + execution_errors.append( + f"Test case validation error: {str(e)}" + ) + except Exception as e: + execution_errors.append( + f"Execution error: {str(e)}" + ) + + # Calculate average execution score + avg_execution_score = ( + sum(execution_scores) / len(execution_scores) + if execution_scores + else 0.0 + ) + + return FunctionCallResult( + schema_valid=schema_validation["valid"], + execution_valid=len(execution_errors) == 0, + schema_errors=schema_validation["errors"], + execution_errors=execution_errors, + matching_score=avg_execution_score, + metadata={ + "num_test_cases": len(test_cases), + "test_coverage": len(execution_scores) + / len(test_cases), + }, + ) + + def _compare_outputs(self, actual: Any, expected: Any) -> float: + """ + Compares actual output with expected output and returns a similarity score. + + Args: + actual: The actual output + expected: The expected output + + Returns: + Float between 0 and 1 indicating similarity + """ + if isinstance(actual, dict) and isinstance(expected, dict): + # Compare dictionary structures + actual_keys = set(actual.keys()) + expected_keys = set(expected.keys()) + + # Calculate key overlap + key_similarity = len(actual_keys & expected_keys) / len( + expected_keys + ) + + # Calculate value similarity for overlapping keys + value_scores = [] + for key in actual_keys & expected_keys: + value_scores.append( + self._compare_outputs(actual[key], expected[key]) + ) + + value_similarity = ( + sum(value_scores) / len(value_scores) + if value_scores + else 0 + ) + + return (key_similarity + value_similarity) / 2 + + elif isinstance(actual, (list, tuple)) and isinstance( + expected, (list, tuple) + ): + # Compare sequence structures + if len(actual) != len(expected): + return 0.5 # Partial match for different lengths + + element_scores = [ + self._compare_outputs(a, e) + for a, e in zip(actual, expected) + ] + return sum(element_scores) / len(element_scores) + + else: + # Direct comparison for primitive types + return float(actual == expected) + + +def create_test_suite( + function_schema: Dict[str, Any], num_cases: int = 10 +) -> List[Dict[str, Any]]: + """ + Creates a test suite for a given function schema. + + Args: + function_schema: The function schema to create tests for + num_cases: Number of test cases to generate + + Returns: + List of test cases + """ + test_cases = [] + properties = function_schema["parameters"]["properties"] + + for _ in range(num_cases): + test_case = {} + for prop_name, prop_schema in properties.items(): + test_case[prop_name] = _generate_test_value(prop_schema) + test_cases.append(test_case) + + return test_cases + + +def _generate_test_value(property_schema: Dict[str, Any]) -> Any: + """Helper function to generate test values based on property schema""" + schema_type = property_schema.get("type", "string") + + if schema_type == "string": + return "test_string" + elif schema_type == "number": + return 42.0 + elif schema_type == "integer": + return 42 + elif schema_type == "boolean": + return True + elif schema_type == "array": + items_schema = property_schema.get( + "items", {"type": "string"} + ) + return [_generate_test_value(items_schema) for _ in range(2)] + elif schema_type == "object": + obj = {} + for prop_name, prop_schema in property_schema.get( + "properties", {} + ).items(): + obj[prop_name] = _generate_test_value(prop_schema) + return obj + else: + return None + + +# # Create base evaluator +# base_evaluator = StatisticalModelEvaluator() + +# # Create function call evaluator +# func_evaluator = FunctionCallEvaluator(base_evaluator) + +# # Example function schema +# schema = { +# "name": "calculate_total", +# "description": "Calculates total with tax", +# "parameters": { +# "properties": { +# "amount": {"type": "number"}, +# "tax_rate": {"type": "number"} +# }, +# "required": ["amount", "tax_rate"] +# } +# } + +# # Create test cases +# test_cases = create_test_suite(schema) + +# # Expected outputs for test cases +# expected_outputs = [ +# {"total": 110.0}, +# {"total": 220.0}, +# # ... more expected outputs +# ] + +# # Evaluate the function +# result = func_evaluator.evaluate_function_call( +# function_schema=schema, +# test_cases=test_cases, +# expected_outputs=expected_outputs +# ) + +# print(f"Schema valid: {result.schema_valid}") +# print(f"Execution valid: {result.execution_valid}") +# print(f"Matching score: {result.matching_score}") diff --git a/auto_eval.py b/examples/auto_eval.py similarity index 100% rename from auto_eval.py rename to examples/auto_eval.py diff --git a/huggingface_eval_example.py b/examples/huggingface_eval_example.py similarity index 100% rename from huggingface_eval_example.py rename to examples/huggingface_eval_example.py diff --git a/huggingface_simple_example.py b/examples/huggingface_simple_example.py similarity index 100% rename from huggingface_simple_example.py rename to examples/huggingface_simple_example.py diff --git a/experimental/test.py b/experimental/test.py index 9327b10..bfcaa52 100644 --- a/experimental/test.py +++ b/experimental/test.py @@ -195,9 +195,7 @@ def optimized_matrix_multiply_with_addition( + 1 ) - torch.tensor( - [2**i for i in range(max_bits)], dtype=torch.int32 - ) + torch.tensor([2**i for i in range(max_bits)], dtype=torch.int32) # Process matrices in blocks for better cache utilization for i in range(0, m, chunk_size): diff --git a/function_call_eval.py b/function_call_eval.py new file mode 100644 index 0000000..f0cb788 --- /dev/null +++ b/function_call_eval.py @@ -0,0 +1,243 @@ +import json +from typing import Any, Dict, List + +from evalops.function_eval import FunctionCallEvaluator +from evalops.main import StatisticalModelEvaluator + + +# First, let's create a mock model class that implements function calling +class MockLanguageModel: + """ + Mock language model that implements function calling capabilities. + This simulates how a real LLM might handle function calls. + """ + + def run( + self, task: str, functions: List[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """Simulates model inference with function calling""" + # This is a mock implementation - in reality, this would be an LLM + if "weather" in task.lower(): + return { + "function_call": { + "name": "get_weather", + "arguments": { + "location": "New York", + "date": "2024-01-02", + }, + } + } + elif "calculate" in task.lower(): + return { + "function_call": { + "name": "calculate_total", + "arguments": {"amount": 100, "tax_rate": 0.1}, + } + } + return {"content": "I cannot help with that task."} + + +# Define some example functions that we want to test +def get_weather(location: str, date: str) -> Dict[str, Any]: + """Mock weather function""" + return { + "temperature": 72, + "conditions": "sunny", + "location": location, + "date": date, + } + + +def calculate_total(amount: float, tax_rate: float) -> Dict[str, Any]: + """Calculate total with tax""" + total = amount * (1 + tax_rate) + return { + "total": total, + "breakdown": { + "base_amount": amount, + "tax_amount": amount * tax_rate, + }, + } + + +# Define the function schemas +function_schemas = [ + { + "name": "get_weather", + "description": "Get weather information for a location and date", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "City name", + }, + "date": { + "type": "string", + "description": "Date in YYYY-MM-DD format", + }, + }, + "required": ["location", "date"], + }, + }, + { + "name": "calculate_total", + "description": "Calculate total amount including tax", + "parameters": { + "type": "object", + "properties": { + "amount": { + "type": "number", + "description": "Base amount", + }, + "tax_rate": { + "type": "number", + "description": "Tax rate as decimal", + }, + }, + "required": ["amount", "tax_rate"], + }, + }, +] + + +def main(): + # Initialize evaluators + base_evaluator = StatisticalModelEvaluator() + func_evaluator = FunctionCallEvaluator(base_evaluator) + + # Initialize model + model = MockLanguageModel() + + # Test cases for weather function + weather_test_cases = [ + { + "task": "What's the weather like in New York today?", + "expected_schema": function_schemas[0], + "expected_output": { + "temperature": 72, + "conditions": "sunny", + "location": "New York", + "date": "2024-01-02", + }, + }, + { + "task": "Tell me the weather forecast for New York", + "expected_schema": function_schemas[0], + "expected_output": { + "temperature": 72, + "conditions": "sunny", + "location": "New York", + "date": "2024-01-02", + }, + }, + ] + + # Test cases for calculation function + calculation_test_cases = [ + { + "task": "Calculate the total for $100 with 10% tax", + "expected_schema": function_schemas[1], + "expected_output": { + "total": 110.0, + "breakdown": { + "base_amount": 100.0, + "tax_amount": 10.0, + }, + }, + } + ] + + # Evaluate each function schema + for schema in function_schemas: + print(f"\nEvaluating schema for {schema['name']}:") + schema_result = func_evaluator.validate_function_schema( + schema + ) + print( + f"Schema validation result: {json.dumps(schema_result, indent=2)}" + ) + + # Run test cases for weather function + print("\nTesting weather function:") + weather_results = [] + for test_case in weather_test_cases: + # Get model response + model_response = model.run( + test_case["task"], functions=function_schemas + ) + + if "function_call" in model_response: + # Execute the function with model's arguments + func_name = model_response["function_call"]["name"] + func_args = model_response["function_call"]["arguments"] + + if func_name == "get_weather": + actual_output = get_weather(**func_args) + else: + actual_output = {"error": "Wrong function called"} + + # Evaluate the execution + result = func_evaluator.evaluate_function_call( + function_schema=test_case["expected_schema"], + test_cases=[func_args], + expected_outputs=[test_case["expected_output"]], + ) + weather_results.append(result) + + print(f"\nTest case: {test_case['task']}") + print(f"Model called: {func_name}") + print(f"Arguments: {json.dumps(func_args, indent=2)}") + print(f"Output: {json.dumps(actual_output, indent=2)}") + print( + f"Evaluation result: {json.dumps(result.model_dump(), indent=2)}" + ) + + # Run test cases for calculation function + print("\nTesting calculation function:") + calc_results = [] + for test_case in calculation_test_cases: + model_response = model.run( + test_case["task"], functions=function_schemas + ) + + if "function_call" in model_response: + func_name = model_response["function_call"]["name"] + func_args = model_response["function_call"]["arguments"] + + if func_name == "calculate_total": + actual_output = calculate_total(**func_args) + else: + actual_output = {"error": "Wrong function called"} + + result = func_evaluator.evaluate_function_call( + function_schema=test_case["expected_schema"], + test_cases=[func_args], + expected_outputs=[test_case["expected_output"]], + ) + calc_results.append(result) + + print(f"\nTest case: {test_case['task']}") + print(f"Model called: {func_name}") + print(f"Arguments: {json.dumps(func_args, indent=2)}") + print(f"Output: {json.dumps(actual_output, indent=2)}") + print( + f"Evaluation result: {json.dumps(result.model_dump(), indent=2)}" + ) + + # Calculate aggregate statistics + all_results = weather_results + calc_results + avg_matching_score = sum( + r.matching_score for r in all_results + ) / len(all_results) + schema_validity = all(r.schema_valid for r in all_results) + execution_validity = all(r.execution_valid for r in all_results) + + print("\nAggregate Results:") + print(f"Average matching score: {avg_matching_score:.2f}") + print(f"All schemas valid: {schema_validity}") + print(f"All executions valid: {execution_validity}") + + +if __name__ == "__main__": + main() diff --git a/new.py b/new.py new file mode 100644 index 0000000..a8f4104 --- /dev/null +++ b/new.py @@ -0,0 +1,395 @@ +import json +import time +from concurrent.futures import ThreadPoolExecutor +from difflib import SequenceMatcher +from functools import partial +from pathlib import Path +from typing import Any, Dict, List, Optional, Protocol + +import numpy as np +from loguru import logger +from pydantic import BaseModel +from scipy import stats + +from evalops.function_eval import FunctionCallEvaluator +from evalops.main import StatisticalModelEvaluator + + +class ModelInterface(Protocol): + """Protocol defining the required interface for model classes.""" + + def run(self, task: str, img: str = None) -> str: + """Run the model on a given task.""" + ... + + +class EvalResult(BaseModel): + """Stores evaluation results for a single model run.""" + + mean_score: float + sem: float + ci_lower: float + ci_upper: float + raw_scores: List[float] + metadata: Dict[str, Any] + function_call_results: Optional[Dict[str, Any]] = None + sentiment_score: Optional[float] = None + + +class FunctionCallResult(BaseModel): + """Stores the evaluation results for function calling tests.""" + + schema_valid: bool + execution_valid: bool + schema_errors: List[str] + execution_errors: List[str] + matching_score: float + metadata: Dict[str, Any] + + +class IntegratedModelEvaluator: + """ + Enhanced model evaluator that combines statistical evaluation, + function calling assessment, and sentiment analysis. + """ + + def __init__( + self, + cache_dir: Optional[str] = None, + log_level: str = "INFO", + random_seed: Optional[int] = None, + ): + # Initialize base statistical evaluator + self.statistical_evaluator = StatisticalModelEvaluator( + cache_dir=cache_dir, + log_level=log_level, + random_seed=random_seed, + ) + + # Initialize function call evaluator + self.function_evaluator = FunctionCallEvaluator( + self.statistical_evaluator + ) + + self.cache_dir = Path(cache_dir) if cache_dir else None + if self.cache_dir: + self.cache_dir.mkdir(parents=True, exist_ok=True) + + if random_seed is not None: + np.random.seed(random_seed) + + logger.add( + lambda msg: print(msg), + level=log_level, + format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", + ) + + def _calculate_score( + self, prediction: str, correct_answer: str + ) -> float: + """Calculate similarity score between prediction and correct answer.""" + prediction = prediction.strip().lower() + correct_answer = correct_answer.strip().lower() + + if correct_answer in prediction: + return 1.0 + + similarity = SequenceMatcher( + None, prediction, correct_answer + ).ratio() + return similarity if similarity > 0.8 else 0.0 + + def _analyze_sentiment(self, text: str) -> float: + """ + Analyze sentiment in text and return score between 0.1 and 1.0. + Basic implementation - could be enhanced with more sophisticated NLP. + """ + # List of positive and negative sentiment words + positive_words = { + "good", + "great", + "excellent", + "amazing", + "wonderful", + "fantastic", + "helpful", + "perfect", + "thank", + "thanks", + "appreciated", + "love", + "nice", + } + negative_words = { + "bad", + "poor", + "terrible", + "horrible", + "useless", + "waste", + "unhelpful", + "wrong", + "fail", + "failed", + "confused", + "disappointing", + } + + words = text.lower().split() + pos_count = sum(1 for word in words if word in positive_words) + neg_count = sum(1 for word in words if word in negative_words) + + total_count = pos_count + neg_count + if total_count == 0: + return 0.5 # Neutral sentiment + + sentiment = ( + (pos_count / (pos_count + neg_count)) + if total_count > 0 + else 0.5 + ) + # Scale to 0.1-1.0 range + return max(0.1, min(1.0, 0.1 + sentiment * 0.9)) + + def validate_function_schema( + self, schema: Dict[str, Any] + ) -> Dict[str, Any]: + """Validates if a given function schema follows JSON Schema specification.""" + return self.function_evaluator.validate_function_schema( + schema + ) + + def evaluate_function_call( + self, + function_schema: Dict[str, Any], + test_cases: List[Dict[str, Any]], + expected_outputs: List[Any], + ) -> FunctionCallResult: + """Evaluates function calling implementation against test cases.""" + return self.function_evaluator.evaluate_function_call( + function_schema=function_schema, + test_cases=test_cases, + expected_outputs=expected_outputs, + ) + + def _compare_outputs(self, actual: Any, expected: Any) -> float: + """Compares actual output with expected output and returns similarity score.""" + if isinstance(actual, dict) and isinstance(expected, dict): + actual_keys = set(actual.keys()) + expected_keys = set(expected.keys()) + + key_similarity = len(actual_keys & expected_keys) / len( + expected_keys + ) + + value_scores = [] + for key in actual_keys & expected_keys: + value_scores.append( + self._compare_outputs(actual[key], expected[key]) + ) + + value_similarity = ( + sum(value_scores) / len(value_scores) + if value_scores + else 0 + ) + return (key_similarity + value_similarity) / 2 + + elif isinstance(actual, (list, tuple)) and isinstance( + expected, (list, tuple) + ): + if len(actual) != len(expected): + return 0.5 + element_scores = [ + self._compare_outputs(a, e) + for a, e in zip(actual, expected) + ] + return sum(element_scores) / len(element_scores) + else: + return float(actual == expected) + + def evaluate_model( + self, + model: ModelInterface, + questions: List[str], + correct_answers: List[str], + imgs: Optional[List[str]] = None, + cluster_ids: Optional[List[str]] = None, + num_samples: int = 1, + batch_size: int = 32, + cache_key: Optional[str] = None, + function_schema: Optional[Dict[str, Any]] = None, + function_test_cases: Optional[List[Dict[str, Any]]] = None, + function_expected_outputs: Optional[List[Any]] = None, + analyze_sentiment: bool = False, + ) -> EvalResult: + """ + Enhanced evaluation that includes statistical analysis, function calling, + and optional sentiment analysis. + """ + start_time = time.time() + + # Check cache + if cache_key and self.cache_dir: + cache_path = self.cache_dir / f"{cache_key}.json" + if cache_path.exists(): + with open(cache_path) as f: + return EvalResult(**json.load(f)) + + # Validate inputs + assert len(questions) == len( + correct_answers + ), "Questions and answers must have same length" + if cluster_ids: + assert len(cluster_ids) == len( + questions + ), "Cluster IDs must match question length" + + # Run model predictions + all_scores = [] + sentiment_scores = [] if analyze_sentiment else None + + with ThreadPoolExecutor() as executor: + for i in range(0, len(questions), batch_size): + batch_questions = questions[i : i + batch_size] + batch_answers = correct_answers[i : i + batch_size] + + tasks = [ + partial( + self._evaluate_single_question, + model, + q, + a, + num_samples, + ) + for q, a in zip(batch_questions, batch_answers) + ] + + batch_scores = list( + executor.map(lambda f: f(), tasks) + ) + all_scores.extend(batch_scores) + + if analyze_sentiment: + batch_predictions = [ + model.run(q) for q in batch_questions + ] + sentiment_scores.extend( + [ + self._analyze_sentiment(p) + for p in batch_predictions + ] + ) + + # Calculate statistics + scores_array = np.array(all_scores) + mean_score = np.mean(scores_array) + + if cluster_ids: + sem = self._calculate_clustered_sem( + scores_array, cluster_ids + ) + else: + sem = stats.sem(scores_array) + + ci_lower, ci_upper = stats.norm.interval( + 0.95, loc=mean_score, scale=sem + ) + + # Evaluate function calling if provided + function_results = None + if ( + function_schema + and function_test_cases + and function_expected_outputs + ): + function_results = self.evaluate_function_call( + function_schema, + function_test_cases, + function_expected_outputs, + ).__dict__ + + # Create result + result = EvalResult( + mean_score=float(mean_score), + sem=float(sem), + ci_lower=float(ci_lower), + ci_upper=float(ci_upper), + raw_scores=all_scores, + metadata={ + "num_questions": len(questions), + "num_samples": num_samples, + "has_clusters": cluster_ids is not None, + "evaluation_time": time.time() - start_time, + }, + function_call_results=function_results, + sentiment_score=( + np.mean(sentiment_scores) + if sentiment_scores + else None + ), + ) + + # Cache results + if cache_key and self.cache_dir: + cache_path = self.cache_dir / f"{cache_key}.json" + with open(cache_path, "w") as f: + json.dump(result.__dict__, f) + + return result + + def _calculate_clustered_sem( + self, scores: np.ndarray, cluster_ids: List[str] + ) -> float: + """Calculate clustered standard error of the mean.""" + import pandas as pd + + df = pd.DataFrame({"score": scores, "cluster": cluster_ids}) + cluster_means = df.groupby("cluster")["score"].mean() + n_clusters = len(cluster_means) + cluster_variance = cluster_means.var() + return np.sqrt(cluster_variance / n_clusters) + + +def create_test_suite( + function_schema: Dict[str, Any], num_cases: int = 10 +) -> List[Dict[str, Any]]: + """Creates a test suite for a given function schema.""" + test_cases = [] + properties = function_schema["parameters"]["properties"] + + for _ in range(num_cases): + test_case = {} + for prop_name, prop_schema in properties.items(): + test_case[prop_name] = _generate_test_value(prop_schema) + test_cases.append(test_case) + + return test_cases + + +def _generate_test_value(property_schema: Dict[str, Any]) -> Any: + """Helper function to generate test values based on property schema.""" + schema_type = property_schema.get("type", "string") + + if schema_type == "string": + return "test_string" + elif schema_type == "number": + return 42.0 + elif schema_type == "integer": + return 42 + elif schema_type == "boolean": + return True + elif schema_type == "array": + items_schema = property_schema.get( + "items", {"type": "string"} + ) + return [_generate_test_value(items_schema) for _ in range(2)] + elif schema_type == "object": + obj = {} + for prop_name, prop_schema in property_schema.get( + "properties", {} + ).items(): + obj[prop_name] = _generate_test_value(prop_schema) + return obj + else: + return None diff --git a/queen_swarm.py b/queen_swarm.py deleted file mode 100644 index 6a8c3bc..0000000 --- a/queen_swarm.py +++ /dev/null @@ -1,224 +0,0 @@ -import asyncio -from dataclasses import dataclass -from decimal import Decimal -from typing import Any, Dict - -import ccxt -import pandas as pd -from loguru import logger -from swarms import Agent - -# Configure logging -logger.add("swarm.log", rotation="500 MB", level="INFO") - - -@dataclass -class SharedMemory: - """Shared memory system for all agents""" - - market_data: Dict = None - trading_signals: Dict = None - risk_metrics: Dict = None - positions: Dict = None - - def update(self, key: str, value: Any): - """Update any attribute in shared memory""" - if hasattr(self, key): - setattr(self, key, value) - logger.info(f"Updated shared memory: {key}") - - -class MarketDataAgent(Agent): - def __init__(self, shared_memory: SharedMemory): - super().__init__( - agent_name="Market-Data-Agent", - system_prompt="You are a market data specialist. Monitor and analyze crypto market data.", - ) - self.exchange = ccxt.kraken({"enableRateLimit": True}) - self.shared_memory = shared_memory - - async def run(self, symbol: str) -> None: - try: - data = await self.exchange.fetch_ohlcv( - symbol, "1h", limit=100 - ) - df = pd.DataFrame( - data, - columns=[ - "timestamp", - "open", - "high", - "low", - "close", - "volume", - ], - ) - self.shared_memory.update("market_data", {symbol: df}) - - except Exception as e: - logger.error(f"Market data error: {e}") - return None - - -class SignalAgent(Agent): - def __init__(self, shared_memory: SharedMemory): - super().__init__( - agent_name="Signal-Agent", - system_prompt="You are a trading signal specialist. Generate trading signals based on market data.", - ) - self.shared_memory = shared_memory - - async def run(self, symbol: str) -> None: - try: - if not self.shared_memory.market_data: - return - - df = self.shared_memory.market_data[symbol] - - # Get AI analysis - analysis = await self.run( - f"Analyze price action for {symbol}: Current price {df['close'].iloc[-1]}, " - f"Volume: {df['volume'].iloc[-1]}" - ) - - signal = { - "symbol": symbol, - "action": analysis.get("recommendation", "HOLD"), - "confidence": analysis.get("confidence", 0), - "timestamp": pd.Timestamp.now(), - } - - self.shared_memory.update( - "trading_signals", {symbol: signal} - ) - - except Exception as e: - logger.error(f"Signal generation error: {e}") - - -class RiskAgent(Agent): - def __init__(self, shared_memory: SharedMemory): - super().__init__( - agent_name="Risk-Agent", - system_prompt="You are a risk management specialist. Evaluate trading risks and set position sizes.", - ) - self.shared_memory = shared_memory - - async def run(self, symbol: str) -> None: - try: - if not ( - self.shared_memory.market_data - and self.shared_memory.trading_signals - ): - return - - signal = self.shared_memory.trading_signals[symbol] - - risk_metrics = { - "symbol": symbol, - "position_size": Decimal( - "0.01" - ), # Default conservative size - "risk_score": 0.5, - "timestamp": pd.Timestamp.now(), - } - - # Get AI risk assessment - assessment = await self.run( - f"Evaluate risk for {symbol} trade with signal confidence {signal['confidence']}" - ) - - risk_metrics.update(assessment) - self.shared_memory.update( - "risk_metrics", {symbol: risk_metrics} - ) - - except Exception as e: - logger.error(f"Risk assessment error: {e}") - - -class QueenAgent(Agent): - def __init__(self, shared_memory: SharedMemory): - super().__init__( - agent_name="Queen-Agent", - system_prompt="You are the queen bee coordinator. Make final trading decisions based on all available information.", - ) - self.shared_memory = shared_memory - self.market_agent = MarketDataAgent(shared_memory) - self.signal_agent = SignalAgent(shared_memory) - self.risk_agent = RiskAgent(shared_memory) - - async def run(self, symbol: str) -> Dict: - """Coordinate the swarm and make final decisions""" - try: - # Run all agents sequentially - await self.market_agent.run(symbol) - await self.signal_agent.run(symbol) - await self.risk_agent.run(symbol) - - # Make final decision - if all( - [ - self.shared_memory.market_data, - self.shared_memory.trading_signals, - self.shared_memory.risk_metrics, - ] - ): - signal = self.shared_memory.trading_signals[symbol] - risk = self.shared_memory.risk_metrics[symbol] - - # Get AI final decision - decision = await self.run( - f"Make final trading decision for {symbol}:\n" - f"Signal: {signal['action']}\n" - f"Confidence: {signal['confidence']}\n" - f"Risk Score: {risk['risk_score']}" - ) - - # Update positions if trade is approved - if decision.get("execute", False): - self.shared_memory.update( - "positions", - { - symbol: { - "action": signal["action"], - "size": risk["position_size"], - "timestamp": pd.Timestamp.now(), - } - }, - ) - - return decision - - except Exception as e: - logger.error(f"Queen agent error: {e}") - return {"execute": False, "reason": str(e)} - - -async def main(): - # Initialize shared memory - shared_memory = SharedMemory() - - # Initialize queen agent - queen = QueenAgent(shared_memory) - - symbols = ["BTC/USD", "ETH/USD"] - - try: - while True: - for symbol in symbols: - decision = await queen.run(symbol) - logger.info(f"Decision for {symbol}: {decision}") - await asyncio.sleep(1) # Rate limiting - - await asyncio.sleep(60) # Main loop interval - - except KeyboardInterrupt: - logger.info("Shutting down swarm...") - except Exception as e: - logger.error(f"Critical error: {e}") - raise - - -if __name__ == "__main__": - asyncio.run(main())