diff --git a/evalops/__init__.py b/evalops/__init__.py
index cd60a1f..7bb7ec2 100644
--- a/evalops/__init__.py
+++ b/evalops/__init__.py
@@ -1,5 +1,11 @@
-from evalops.main import StatisticalModelEvaluator
+from evalops.function_eval import FunctionCallEvaluator
from evalops.huggingface_loader import EvalDatasetLoader
+from evalops.main import StatisticalModelEvaluator
from evalops.wrapper import eval
-__all__ = ["StatisticalModelEvaluator", "EvalDatasetLoader", "eval"]
+__all__ = [
+ "StatisticalModelEvaluator",
+ "EvalDatasetLoader",
+ "eval",
+ "FunctionCallEvaluator",
+]
diff --git a/evalops/function_eval.py b/evalops/function_eval.py
new file mode 100644
index 0000000..6a332e1
--- /dev/null
+++ b/evalops/function_eval.py
@@ -0,0 +1,307 @@
+from typing import Any, Dict, List
+from pydantic import BaseModel, ValidationError
+import jsonschema
+
+from evalops.main import StatisticalModelEvaluator
+
+
+class FunctionCallResult(BaseModel):
+ """
+ Stores the evaluation results for function calling tests.
+
+ Attributes:
+ schema_valid: Whether the function schema is valid JSON Schema
+ execution_valid: Whether the function execution was successful
+ schema_errors: List of schema validation errors if any
+ execution_errors: List of execution errors if any
+ matching_score: Score for how well the execution matched expected output
+ metadata: Additional metadata about the evaluation
+ """
+
+ schema_valid: bool
+ execution_valid: bool
+ schema_errors: List[str]
+ execution_errors: List[str]
+ matching_score: float
+ metadata: Dict[str, Any]
+
+
+class FunctionCallEvaluator:
+ """
+ Evaluator for testing function calling capabilities and schema correctness.
+
+ This evaluator extends the base StatisticalModelEvaluator to add specific
+ function calling evaluation capabilities.
+ """
+
+ def __init__(self, base_evaluator: StatisticalModelEvaluator):
+ self.base_evaluator = base_evaluator
+
+ def validate_function_schema(
+ self, schema: Dict[str, Any]
+ ) -> Dict[str, Any]:
+ """
+ Validates if a given function schema is correct and follows JSON Schema specification.
+
+ Args:
+ schema: The function schema to validate
+
+ Returns:
+ Dictionary containing validation results and any errors found
+ """
+ errors = []
+ try:
+ # Validate basic JSON Schema structure
+ jsonschema.Draft7Validator.check_schema(schema)
+
+ # Check for required function schema elements
+ required_fields = ["name", "description", "parameters"]
+ missing_fields = [
+ field
+ for field in required_fields
+ if field not in schema
+ ]
+ if missing_fields:
+ errors.append(
+ f"Missing required fields: {', '.join(missing_fields)}"
+ )
+
+ # Validate parameters object
+ if "parameters" in schema:
+ if not isinstance(schema["parameters"], dict):
+ errors.append("Parameters must be an object")
+ else:
+ if "properties" not in schema["parameters"]:
+ errors.append(
+ "Parameters object must contain 'properties'"
+ )
+ if "required" not in schema["parameters"]:
+ errors.append(
+ "Parameters object must specify 'required' fields"
+ )
+
+ schema_valid = len(errors) == 0
+
+ except jsonschema.exceptions.SchemaError as e:
+ schema_valid = False
+ errors.append(f"Schema validation error: {str(e)}")
+
+ return {"valid": schema_valid, "errors": errors}
+
+ def evaluate_function_call(
+ self,
+ function_schema: Dict[str, Any],
+ test_cases: List[Dict[str, Any]],
+ expected_outputs: List[Any],
+ ) -> FunctionCallResult:
+ """
+ Evaluates function calling implementation against test cases.
+
+ Args:
+ function_schema: The function schema to test
+ test_cases: List of input test cases
+ expected_outputs: List of expected outputs for each test case
+
+ Returns:
+ FunctionCallResult containing evaluation metrics
+ """
+ # First validate the schema
+ schema_validation = self.validate_function_schema(
+ function_schema
+ )
+
+ execution_errors = []
+ execution_scores = []
+
+ # If schema is valid, test execution
+ if schema_validation["valid"]:
+ for test_case, expected in zip(
+ test_cases, expected_outputs
+ ):
+ try:
+ # Validate test case against schema
+ jsonschema.validate(
+ test_case, function_schema["parameters"]
+ )
+
+ # Compare output structure
+ execution_score = self._compare_outputs(
+ test_case, expected
+ )
+ execution_scores.append(execution_score)
+
+ except ValidationError as e:
+ execution_errors.append(
+ f"Test case validation error: {str(e)}"
+ )
+ except Exception as e:
+ execution_errors.append(
+ f"Execution error: {str(e)}"
+ )
+
+ # Calculate average execution score
+ avg_execution_score = (
+ sum(execution_scores) / len(execution_scores)
+ if execution_scores
+ else 0.0
+ )
+
+ return FunctionCallResult(
+ schema_valid=schema_validation["valid"],
+ execution_valid=len(execution_errors) == 0,
+ schema_errors=schema_validation["errors"],
+ execution_errors=execution_errors,
+ matching_score=avg_execution_score,
+ metadata={
+ "num_test_cases": len(test_cases),
+ "test_coverage": len(execution_scores)
+ / len(test_cases),
+ },
+ )
+
+ def _compare_outputs(self, actual: Any, expected: Any) -> float:
+ """
+ Compares actual output with expected output and returns a similarity score.
+
+ Args:
+ actual: The actual output
+ expected: The expected output
+
+ Returns:
+ Float between 0 and 1 indicating similarity
+ """
+ if isinstance(actual, dict) and isinstance(expected, dict):
+ # Compare dictionary structures
+ actual_keys = set(actual.keys())
+ expected_keys = set(expected.keys())
+
+ # Calculate key overlap
+ key_similarity = len(actual_keys & expected_keys) / len(
+ expected_keys
+ )
+
+ # Calculate value similarity for overlapping keys
+ value_scores = []
+ for key in actual_keys & expected_keys:
+ value_scores.append(
+ self._compare_outputs(actual[key], expected[key])
+ )
+
+ value_similarity = (
+ sum(value_scores) / len(value_scores)
+ if value_scores
+ else 0
+ )
+
+ return (key_similarity + value_similarity) / 2
+
+ elif isinstance(actual, (list, tuple)) and isinstance(
+ expected, (list, tuple)
+ ):
+ # Compare sequence structures
+ if len(actual) != len(expected):
+ return 0.5 # Partial match for different lengths
+
+ element_scores = [
+ self._compare_outputs(a, e)
+ for a, e in zip(actual, expected)
+ ]
+ return sum(element_scores) / len(element_scores)
+
+ else:
+ # Direct comparison for primitive types
+ return float(actual == expected)
+
+
+def create_test_suite(
+ function_schema: Dict[str, Any], num_cases: int = 10
+) -> List[Dict[str, Any]]:
+ """
+ Creates a test suite for a given function schema.
+
+ Args:
+ function_schema: The function schema to create tests for
+ num_cases: Number of test cases to generate
+
+ Returns:
+ List of test cases
+ """
+ test_cases = []
+ properties = function_schema["parameters"]["properties"]
+
+ for _ in range(num_cases):
+ test_case = {}
+ for prop_name, prop_schema in properties.items():
+ test_case[prop_name] = _generate_test_value(prop_schema)
+ test_cases.append(test_case)
+
+ return test_cases
+
+
+def _generate_test_value(property_schema: Dict[str, Any]) -> Any:
+ """Helper function to generate test values based on property schema"""
+ schema_type = property_schema.get("type", "string")
+
+ if schema_type == "string":
+ return "test_string"
+ elif schema_type == "number":
+ return 42.0
+ elif schema_type == "integer":
+ return 42
+ elif schema_type == "boolean":
+ return True
+ elif schema_type == "array":
+ items_schema = property_schema.get(
+ "items", {"type": "string"}
+ )
+ return [_generate_test_value(items_schema) for _ in range(2)]
+ elif schema_type == "object":
+ obj = {}
+ for prop_name, prop_schema in property_schema.get(
+ "properties", {}
+ ).items():
+ obj[prop_name] = _generate_test_value(prop_schema)
+ return obj
+ else:
+ return None
+
+
+# # Create base evaluator
+# base_evaluator = StatisticalModelEvaluator()
+
+# # Create function call evaluator
+# func_evaluator = FunctionCallEvaluator(base_evaluator)
+
+# # Example function schema
+# schema = {
+# "name": "calculate_total",
+# "description": "Calculates total with tax",
+# "parameters": {
+# "properties": {
+# "amount": {"type": "number"},
+# "tax_rate": {"type": "number"}
+# },
+# "required": ["amount", "tax_rate"]
+# }
+# }
+
+# # Create test cases
+# test_cases = create_test_suite(schema)
+
+# # Expected outputs for test cases
+# expected_outputs = [
+# {"total": 110.0},
+# {"total": 220.0},
+# # ... more expected outputs
+# ]
+
+# # Evaluate the function
+# result = func_evaluator.evaluate_function_call(
+# function_schema=schema,
+# test_cases=test_cases,
+# expected_outputs=expected_outputs
+# )
+
+# print(f"Schema valid: {result.schema_valid}")
+# print(f"Execution valid: {result.execution_valid}")
+# print(f"Matching score: {result.matching_score}")
diff --git a/auto_eval.py b/examples/auto_eval.py
similarity index 100%
rename from auto_eval.py
rename to examples/auto_eval.py
diff --git a/huggingface_eval_example.py b/examples/huggingface_eval_example.py
similarity index 100%
rename from huggingface_eval_example.py
rename to examples/huggingface_eval_example.py
diff --git a/huggingface_simple_example.py b/examples/huggingface_simple_example.py
similarity index 100%
rename from huggingface_simple_example.py
rename to examples/huggingface_simple_example.py
diff --git a/experimental/test.py b/experimental/test.py
index 9327b10..bfcaa52 100644
--- a/experimental/test.py
+++ b/experimental/test.py
@@ -195,9 +195,7 @@ def optimized_matrix_multiply_with_addition(
+ 1
)
- torch.tensor(
- [2**i for i in range(max_bits)], dtype=torch.int32
- )
+ torch.tensor([2**i for i in range(max_bits)], dtype=torch.int32)
# Process matrices in blocks for better cache utilization
for i in range(0, m, chunk_size):
diff --git a/function_call_eval.py b/function_call_eval.py
new file mode 100644
index 0000000..f0cb788
--- /dev/null
+++ b/function_call_eval.py
@@ -0,0 +1,243 @@
+import json
+from typing import Any, Dict, List
+
+from evalops.function_eval import FunctionCallEvaluator
+from evalops.main import StatisticalModelEvaluator
+
+
+# First, let's create a mock model class that implements function calling
+class MockLanguageModel:
+ """
+ Mock language model that implements function calling capabilities.
+ This simulates how a real LLM might handle function calls.
+ """
+
+ def run(
+ self, task: str, functions: List[Dict[str, Any]] = None
+ ) -> Dict[str, Any]:
+ """Simulates model inference with function calling"""
+ # This is a mock implementation - in reality, this would be an LLM
+ if "weather" in task.lower():
+ return {
+ "function_call": {
+ "name": "get_weather",
+ "arguments": {
+ "location": "New York",
+ "date": "2024-01-02",
+ },
+ }
+ }
+ elif "calculate" in task.lower():
+ return {
+ "function_call": {
+ "name": "calculate_total",
+ "arguments": {"amount": 100, "tax_rate": 0.1},
+ }
+ }
+ return {"content": "I cannot help with that task."}
+
+
+# Define some example functions that we want to test
+def get_weather(location: str, date: str) -> Dict[str, Any]:
+ """Mock weather function"""
+ return {
+ "temperature": 72,
+ "conditions": "sunny",
+ "location": location,
+ "date": date,
+ }
+
+
+def calculate_total(amount: float, tax_rate: float) -> Dict[str, Any]:
+ """Calculate total with tax"""
+ total = amount * (1 + tax_rate)
+ return {
+ "total": total,
+ "breakdown": {
+ "base_amount": amount,
+ "tax_amount": amount * tax_rate,
+ },
+ }
+
+
+# Define the function schemas
+function_schemas = [
+ {
+ "name": "get_weather",
+ "description": "Get weather information for a location and date",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "location": {
+ "type": "string",
+ "description": "City name",
+ },
+ "date": {
+ "type": "string",
+ "description": "Date in YYYY-MM-DD format",
+ },
+ },
+ "required": ["location", "date"],
+ },
+ },
+ {
+ "name": "calculate_total",
+ "description": "Calculate total amount including tax",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "amount": {
+ "type": "number",
+ "description": "Base amount",
+ },
+ "tax_rate": {
+ "type": "number",
+ "description": "Tax rate as decimal",
+ },
+ },
+ "required": ["amount", "tax_rate"],
+ },
+ },
+]
+
+
+def main():
+ # Initialize evaluators
+ base_evaluator = StatisticalModelEvaluator()
+ func_evaluator = FunctionCallEvaluator(base_evaluator)
+
+ # Initialize model
+ model = MockLanguageModel()
+
+ # Test cases for weather function
+ weather_test_cases = [
+ {
+ "task": "What's the weather like in New York today?",
+ "expected_schema": function_schemas[0],
+ "expected_output": {
+ "temperature": 72,
+ "conditions": "sunny",
+ "location": "New York",
+ "date": "2024-01-02",
+ },
+ },
+ {
+ "task": "Tell me the weather forecast for New York",
+ "expected_schema": function_schemas[0],
+ "expected_output": {
+ "temperature": 72,
+ "conditions": "sunny",
+ "location": "New York",
+ "date": "2024-01-02",
+ },
+ },
+ ]
+
+ # Test cases for calculation function
+ calculation_test_cases = [
+ {
+ "task": "Calculate the total for $100 with 10% tax",
+ "expected_schema": function_schemas[1],
+ "expected_output": {
+ "total": 110.0,
+ "breakdown": {
+ "base_amount": 100.0,
+ "tax_amount": 10.0,
+ },
+ },
+ }
+ ]
+
+ # Evaluate each function schema
+ for schema in function_schemas:
+ print(f"\nEvaluating schema for {schema['name']}:")
+ schema_result = func_evaluator.validate_function_schema(
+ schema
+ )
+ print(
+ f"Schema validation result: {json.dumps(schema_result, indent=2)}"
+ )
+
+ # Run test cases for weather function
+ print("\nTesting weather function:")
+ weather_results = []
+ for test_case in weather_test_cases:
+ # Get model response
+ model_response = model.run(
+ test_case["task"], functions=function_schemas
+ )
+
+ if "function_call" in model_response:
+ # Execute the function with model's arguments
+ func_name = model_response["function_call"]["name"]
+ func_args = model_response["function_call"]["arguments"]
+
+ if func_name == "get_weather":
+ actual_output = get_weather(**func_args)
+ else:
+ actual_output = {"error": "Wrong function called"}
+
+ # Evaluate the execution
+ result = func_evaluator.evaluate_function_call(
+ function_schema=test_case["expected_schema"],
+ test_cases=[func_args],
+ expected_outputs=[test_case["expected_output"]],
+ )
+ weather_results.append(result)
+
+ print(f"\nTest case: {test_case['task']}")
+ print(f"Model called: {func_name}")
+ print(f"Arguments: {json.dumps(func_args, indent=2)}")
+ print(f"Output: {json.dumps(actual_output, indent=2)}")
+ print(
+ f"Evaluation result: {json.dumps(result.model_dump(), indent=2)}"
+ )
+
+ # Run test cases for calculation function
+ print("\nTesting calculation function:")
+ calc_results = []
+ for test_case in calculation_test_cases:
+ model_response = model.run(
+ test_case["task"], functions=function_schemas
+ )
+
+ if "function_call" in model_response:
+ func_name = model_response["function_call"]["name"]
+ func_args = model_response["function_call"]["arguments"]
+
+ if func_name == "calculate_total":
+ actual_output = calculate_total(**func_args)
+ else:
+ actual_output = {"error": "Wrong function called"}
+
+ result = func_evaluator.evaluate_function_call(
+ function_schema=test_case["expected_schema"],
+ test_cases=[func_args],
+ expected_outputs=[test_case["expected_output"]],
+ )
+ calc_results.append(result)
+
+ print(f"\nTest case: {test_case['task']}")
+ print(f"Model called: {func_name}")
+ print(f"Arguments: {json.dumps(func_args, indent=2)}")
+ print(f"Output: {json.dumps(actual_output, indent=2)}")
+ print(
+ f"Evaluation result: {json.dumps(result.model_dump(), indent=2)}"
+ )
+
+ # Calculate aggregate statistics
+ all_results = weather_results + calc_results
+ avg_matching_score = sum(
+ r.matching_score for r in all_results
+ ) / len(all_results)
+ schema_validity = all(r.schema_valid for r in all_results)
+ execution_validity = all(r.execution_valid for r in all_results)
+
+ print("\nAggregate Results:")
+ print(f"Average matching score: {avg_matching_score:.2f}")
+ print(f"All schemas valid: {schema_validity}")
+ print(f"All executions valid: {execution_validity}")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/new.py b/new.py
new file mode 100644
index 0000000..a8f4104
--- /dev/null
+++ b/new.py
@@ -0,0 +1,395 @@
+import json
+import time
+from concurrent.futures import ThreadPoolExecutor
+from difflib import SequenceMatcher
+from functools import partial
+from pathlib import Path
+from typing import Any, Dict, List, Optional, Protocol
+
+import numpy as np
+from loguru import logger
+from pydantic import BaseModel
+from scipy import stats
+
+from evalops.function_eval import FunctionCallEvaluator
+from evalops.main import StatisticalModelEvaluator
+
+
+class ModelInterface(Protocol):
+ """Protocol defining the required interface for model classes."""
+
+ def run(self, task: str, img: str = None) -> str:
+ """Run the model on a given task."""
+ ...
+
+
+class EvalResult(BaseModel):
+ """Stores evaluation results for a single model run."""
+
+ mean_score: float
+ sem: float
+ ci_lower: float
+ ci_upper: float
+ raw_scores: List[float]
+ metadata: Dict[str, Any]
+ function_call_results: Optional[Dict[str, Any]] = None
+ sentiment_score: Optional[float] = None
+
+
+class FunctionCallResult(BaseModel):
+ """Stores the evaluation results for function calling tests."""
+
+ schema_valid: bool
+ execution_valid: bool
+ schema_errors: List[str]
+ execution_errors: List[str]
+ matching_score: float
+ metadata: Dict[str, Any]
+
+
+class IntegratedModelEvaluator:
+ """
+ Enhanced model evaluator that combines statistical evaluation,
+ function calling assessment, and sentiment analysis.
+ """
+
+ def __init__(
+ self,
+ cache_dir: Optional[str] = None,
+ log_level: str = "INFO",
+ random_seed: Optional[int] = None,
+ ):
+ # Initialize base statistical evaluator
+ self.statistical_evaluator = StatisticalModelEvaluator(
+ cache_dir=cache_dir,
+ log_level=log_level,
+ random_seed=random_seed,
+ )
+
+ # Initialize function call evaluator
+ self.function_evaluator = FunctionCallEvaluator(
+ self.statistical_evaluator
+ )
+
+ self.cache_dir = Path(cache_dir) if cache_dir else None
+ if self.cache_dir:
+ self.cache_dir.mkdir(parents=True, exist_ok=True)
+
+ if random_seed is not None:
+ np.random.seed(random_seed)
+
+ logger.add(
+ lambda msg: print(msg),
+ level=log_level,
+ format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
+ )
+
+ def _calculate_score(
+ self, prediction: str, correct_answer: str
+ ) -> float:
+ """Calculate similarity score between prediction and correct answer."""
+ prediction = prediction.strip().lower()
+ correct_answer = correct_answer.strip().lower()
+
+ if correct_answer in prediction:
+ return 1.0
+
+ similarity = SequenceMatcher(
+ None, prediction, correct_answer
+ ).ratio()
+ return similarity if similarity > 0.8 else 0.0
+
+ def _analyze_sentiment(self, text: str) -> float:
+ """
+ Analyze sentiment in text and return score between 0.1 and 1.0.
+ Basic implementation - could be enhanced with more sophisticated NLP.
+ """
+ # List of positive and negative sentiment words
+ positive_words = {
+ "good",
+ "great",
+ "excellent",
+ "amazing",
+ "wonderful",
+ "fantastic",
+ "helpful",
+ "perfect",
+ "thank",
+ "thanks",
+ "appreciated",
+ "love",
+ "nice",
+ }
+ negative_words = {
+ "bad",
+ "poor",
+ "terrible",
+ "horrible",
+ "useless",
+ "waste",
+ "unhelpful",
+ "wrong",
+ "fail",
+ "failed",
+ "confused",
+ "disappointing",
+ }
+
+ words = text.lower().split()
+ pos_count = sum(1 for word in words if word in positive_words)
+ neg_count = sum(1 for word in words if word in negative_words)
+
+ total_count = pos_count + neg_count
+ if total_count == 0:
+ return 0.5 # Neutral sentiment
+
+ sentiment = (
+ (pos_count / (pos_count + neg_count))
+ if total_count > 0
+ else 0.5
+ )
+ # Scale to 0.1-1.0 range
+ return max(0.1, min(1.0, 0.1 + sentiment * 0.9))
+
+ def validate_function_schema(
+ self, schema: Dict[str, Any]
+ ) -> Dict[str, Any]:
+ """Validates if a given function schema follows JSON Schema specification."""
+ return self.function_evaluator.validate_function_schema(
+ schema
+ )
+
+ def evaluate_function_call(
+ self,
+ function_schema: Dict[str, Any],
+ test_cases: List[Dict[str, Any]],
+ expected_outputs: List[Any],
+ ) -> FunctionCallResult:
+ """Evaluates function calling implementation against test cases."""
+ return self.function_evaluator.evaluate_function_call(
+ function_schema=function_schema,
+ test_cases=test_cases,
+ expected_outputs=expected_outputs,
+ )
+
+ def _compare_outputs(self, actual: Any, expected: Any) -> float:
+ """Compares actual output with expected output and returns similarity score."""
+ if isinstance(actual, dict) and isinstance(expected, dict):
+ actual_keys = set(actual.keys())
+ expected_keys = set(expected.keys())
+
+ key_similarity = len(actual_keys & expected_keys) / len(
+ expected_keys
+ )
+
+ value_scores = []
+ for key in actual_keys & expected_keys:
+ value_scores.append(
+ self._compare_outputs(actual[key], expected[key])
+ )
+
+ value_similarity = (
+ sum(value_scores) / len(value_scores)
+ if value_scores
+ else 0
+ )
+ return (key_similarity + value_similarity) / 2
+
+ elif isinstance(actual, (list, tuple)) and isinstance(
+ expected, (list, tuple)
+ ):
+ if len(actual) != len(expected):
+ return 0.5
+ element_scores = [
+ self._compare_outputs(a, e)
+ for a, e in zip(actual, expected)
+ ]
+ return sum(element_scores) / len(element_scores)
+ else:
+ return float(actual == expected)
+
+ def evaluate_model(
+ self,
+ model: ModelInterface,
+ questions: List[str],
+ correct_answers: List[str],
+ imgs: Optional[List[str]] = None,
+ cluster_ids: Optional[List[str]] = None,
+ num_samples: int = 1,
+ batch_size: int = 32,
+ cache_key: Optional[str] = None,
+ function_schema: Optional[Dict[str, Any]] = None,
+ function_test_cases: Optional[List[Dict[str, Any]]] = None,
+ function_expected_outputs: Optional[List[Any]] = None,
+ analyze_sentiment: bool = False,
+ ) -> EvalResult:
+ """
+ Enhanced evaluation that includes statistical analysis, function calling,
+ and optional sentiment analysis.
+ """
+ start_time = time.time()
+
+ # Check cache
+ if cache_key and self.cache_dir:
+ cache_path = self.cache_dir / f"{cache_key}.json"
+ if cache_path.exists():
+ with open(cache_path) as f:
+ return EvalResult(**json.load(f))
+
+ # Validate inputs
+ assert len(questions) == len(
+ correct_answers
+ ), "Questions and answers must have same length"
+ if cluster_ids:
+ assert len(cluster_ids) == len(
+ questions
+ ), "Cluster IDs must match question length"
+
+ # Run model predictions
+ all_scores = []
+ sentiment_scores = [] if analyze_sentiment else None
+
+ with ThreadPoolExecutor() as executor:
+ for i in range(0, len(questions), batch_size):
+ batch_questions = questions[i : i + batch_size]
+ batch_answers = correct_answers[i : i + batch_size]
+
+ tasks = [
+ partial(
+ self._evaluate_single_question,
+ model,
+ q,
+ a,
+ num_samples,
+ )
+ for q, a in zip(batch_questions, batch_answers)
+ ]
+
+ batch_scores = list(
+ executor.map(lambda f: f(), tasks)
+ )
+ all_scores.extend(batch_scores)
+
+ if analyze_sentiment:
+ batch_predictions = [
+ model.run(q) for q in batch_questions
+ ]
+ sentiment_scores.extend(
+ [
+ self._analyze_sentiment(p)
+ for p in batch_predictions
+ ]
+ )
+
+ # Calculate statistics
+ scores_array = np.array(all_scores)
+ mean_score = np.mean(scores_array)
+
+ if cluster_ids:
+ sem = self._calculate_clustered_sem(
+ scores_array, cluster_ids
+ )
+ else:
+ sem = stats.sem(scores_array)
+
+ ci_lower, ci_upper = stats.norm.interval(
+ 0.95, loc=mean_score, scale=sem
+ )
+
+ # Evaluate function calling if provided
+ function_results = None
+ if (
+ function_schema
+ and function_test_cases
+ and function_expected_outputs
+ ):
+ function_results = self.evaluate_function_call(
+ function_schema,
+ function_test_cases,
+ function_expected_outputs,
+ ).__dict__
+
+ # Create result
+ result = EvalResult(
+ mean_score=float(mean_score),
+ sem=float(sem),
+ ci_lower=float(ci_lower),
+ ci_upper=float(ci_upper),
+ raw_scores=all_scores,
+ metadata={
+ "num_questions": len(questions),
+ "num_samples": num_samples,
+ "has_clusters": cluster_ids is not None,
+ "evaluation_time": time.time() - start_time,
+ },
+ function_call_results=function_results,
+ sentiment_score=(
+ np.mean(sentiment_scores)
+ if sentiment_scores
+ else None
+ ),
+ )
+
+ # Cache results
+ if cache_key and self.cache_dir:
+ cache_path = self.cache_dir / f"{cache_key}.json"
+ with open(cache_path, "w") as f:
+ json.dump(result.__dict__, f)
+
+ return result
+
+ def _calculate_clustered_sem(
+ self, scores: np.ndarray, cluster_ids: List[str]
+ ) -> float:
+ """Calculate clustered standard error of the mean."""
+ import pandas as pd
+
+ df = pd.DataFrame({"score": scores, "cluster": cluster_ids})
+ cluster_means = df.groupby("cluster")["score"].mean()
+ n_clusters = len(cluster_means)
+ cluster_variance = cluster_means.var()
+ return np.sqrt(cluster_variance / n_clusters)
+
+
+def create_test_suite(
+ function_schema: Dict[str, Any], num_cases: int = 10
+) -> List[Dict[str, Any]]:
+ """Creates a test suite for a given function schema."""
+ test_cases = []
+ properties = function_schema["parameters"]["properties"]
+
+ for _ in range(num_cases):
+ test_case = {}
+ for prop_name, prop_schema in properties.items():
+ test_case[prop_name] = _generate_test_value(prop_schema)
+ test_cases.append(test_case)
+
+ return test_cases
+
+
+def _generate_test_value(property_schema: Dict[str, Any]) -> Any:
+ """Helper function to generate test values based on property schema."""
+ schema_type = property_schema.get("type", "string")
+
+ if schema_type == "string":
+ return "test_string"
+ elif schema_type == "number":
+ return 42.0
+ elif schema_type == "integer":
+ return 42
+ elif schema_type == "boolean":
+ return True
+ elif schema_type == "array":
+ items_schema = property_schema.get(
+ "items", {"type": "string"}
+ )
+ return [_generate_test_value(items_schema) for _ in range(2)]
+ elif schema_type == "object":
+ obj = {}
+ for prop_name, prop_schema in property_schema.get(
+ "properties", {}
+ ).items():
+ obj[prop_name] = _generate_test_value(prop_schema)
+ return obj
+ else:
+ return None
diff --git a/queen_swarm.py b/queen_swarm.py
deleted file mode 100644
index 6a8c3bc..0000000
--- a/queen_swarm.py
+++ /dev/null
@@ -1,224 +0,0 @@
-import asyncio
-from dataclasses import dataclass
-from decimal import Decimal
-from typing import Any, Dict
-
-import ccxt
-import pandas as pd
-from loguru import logger
-from swarms import Agent
-
-# Configure logging
-logger.add("swarm.log", rotation="500 MB", level="INFO")
-
-
-@dataclass
-class SharedMemory:
- """Shared memory system for all agents"""
-
- market_data: Dict = None
- trading_signals: Dict = None
- risk_metrics: Dict = None
- positions: Dict = None
-
- def update(self, key: str, value: Any):
- """Update any attribute in shared memory"""
- if hasattr(self, key):
- setattr(self, key, value)
- logger.info(f"Updated shared memory: {key}")
-
-
-class MarketDataAgent(Agent):
- def __init__(self, shared_memory: SharedMemory):
- super().__init__(
- agent_name="Market-Data-Agent",
- system_prompt="You are a market data specialist. Monitor and analyze crypto market data.",
- )
- self.exchange = ccxt.kraken({"enableRateLimit": True})
- self.shared_memory = shared_memory
-
- async def run(self, symbol: str) -> None:
- try:
- data = await self.exchange.fetch_ohlcv(
- symbol, "1h", limit=100
- )
- df = pd.DataFrame(
- data,
- columns=[
- "timestamp",
- "open",
- "high",
- "low",
- "close",
- "volume",
- ],
- )
- self.shared_memory.update("market_data", {symbol: df})
-
- except Exception as e:
- logger.error(f"Market data error: {e}")
- return None
-
-
-class SignalAgent(Agent):
- def __init__(self, shared_memory: SharedMemory):
- super().__init__(
- agent_name="Signal-Agent",
- system_prompt="You are a trading signal specialist. Generate trading signals based on market data.",
- )
- self.shared_memory = shared_memory
-
- async def run(self, symbol: str) -> None:
- try:
- if not self.shared_memory.market_data:
- return
-
- df = self.shared_memory.market_data[symbol]
-
- # Get AI analysis
- analysis = await self.run(
- f"Analyze price action for {symbol}: Current price {df['close'].iloc[-1]}, "
- f"Volume: {df['volume'].iloc[-1]}"
- )
-
- signal = {
- "symbol": symbol,
- "action": analysis.get("recommendation", "HOLD"),
- "confidence": analysis.get("confidence", 0),
- "timestamp": pd.Timestamp.now(),
- }
-
- self.shared_memory.update(
- "trading_signals", {symbol: signal}
- )
-
- except Exception as e:
- logger.error(f"Signal generation error: {e}")
-
-
-class RiskAgent(Agent):
- def __init__(self, shared_memory: SharedMemory):
- super().__init__(
- agent_name="Risk-Agent",
- system_prompt="You are a risk management specialist. Evaluate trading risks and set position sizes.",
- )
- self.shared_memory = shared_memory
-
- async def run(self, symbol: str) -> None:
- try:
- if not (
- self.shared_memory.market_data
- and self.shared_memory.trading_signals
- ):
- return
-
- signal = self.shared_memory.trading_signals[symbol]
-
- risk_metrics = {
- "symbol": symbol,
- "position_size": Decimal(
- "0.01"
- ), # Default conservative size
- "risk_score": 0.5,
- "timestamp": pd.Timestamp.now(),
- }
-
- # Get AI risk assessment
- assessment = await self.run(
- f"Evaluate risk for {symbol} trade with signal confidence {signal['confidence']}"
- )
-
- risk_metrics.update(assessment)
- self.shared_memory.update(
- "risk_metrics", {symbol: risk_metrics}
- )
-
- except Exception as e:
- logger.error(f"Risk assessment error: {e}")
-
-
-class QueenAgent(Agent):
- def __init__(self, shared_memory: SharedMemory):
- super().__init__(
- agent_name="Queen-Agent",
- system_prompt="You are the queen bee coordinator. Make final trading decisions based on all available information.",
- )
- self.shared_memory = shared_memory
- self.market_agent = MarketDataAgent(shared_memory)
- self.signal_agent = SignalAgent(shared_memory)
- self.risk_agent = RiskAgent(shared_memory)
-
- async def run(self, symbol: str) -> Dict:
- """Coordinate the swarm and make final decisions"""
- try:
- # Run all agents sequentially
- await self.market_agent.run(symbol)
- await self.signal_agent.run(symbol)
- await self.risk_agent.run(symbol)
-
- # Make final decision
- if all(
- [
- self.shared_memory.market_data,
- self.shared_memory.trading_signals,
- self.shared_memory.risk_metrics,
- ]
- ):
- signal = self.shared_memory.trading_signals[symbol]
- risk = self.shared_memory.risk_metrics[symbol]
-
- # Get AI final decision
- decision = await self.run(
- f"Make final trading decision for {symbol}:\n"
- f"Signal: {signal['action']}\n"
- f"Confidence: {signal['confidence']}\n"
- f"Risk Score: {risk['risk_score']}"
- )
-
- # Update positions if trade is approved
- if decision.get("execute", False):
- self.shared_memory.update(
- "positions",
- {
- symbol: {
- "action": signal["action"],
- "size": risk["position_size"],
- "timestamp": pd.Timestamp.now(),
- }
- },
- )
-
- return decision
-
- except Exception as e:
- logger.error(f"Queen agent error: {e}")
- return {"execute": False, "reason": str(e)}
-
-
-async def main():
- # Initialize shared memory
- shared_memory = SharedMemory()
-
- # Initialize queen agent
- queen = QueenAgent(shared_memory)
-
- symbols = ["BTC/USD", "ETH/USD"]
-
- try:
- while True:
- for symbol in symbols:
- decision = await queen.run(symbol)
- logger.info(f"Decision for {symbol}: {decision}")
- await asyncio.sleep(1) # Rate limiting
-
- await asyncio.sleep(60) # Main loop interval
-
- except KeyboardInterrupt:
- logger.info("Shutting down swarm...")
- except Exception as e:
- logger.error(f"Critical error: {e}")
- raise
-
-
-if __name__ == "__main__":
- asyncio.run(main())