Spaces:
Runtime error
Runtime error
| """LLM Cost Benchmarking Script | |
| Measures token usage and calculates costs for cancer risk assessments | |
| across different LLM backends. | |
| """ | |
| import argparse | |
| import csv | |
| import functools | |
| import os | |
| from collections import defaultdict | |
| from collections.abc import Callable | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any | |
| import requests | |
| import yaml | |
| from dotenv import load_dotenv | |
| from langchain_community.callbacks.manager import get_openai_callback | |
| from loguru import logger | |
| from reportlab.lib import colors | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.lib.styles import getSampleStyleSheet | |
| from reportlab.lib.units import inch | |
| from reportlab.platypus import Paragraph, SimpleDocTemplate, Spacer, Table, TableStyle | |
| from sentinel.config import AppConfig, ModelConfig, ResourcePaths | |
| from sentinel.factory import SentinelFactory | |
| from sentinel.utils import load_user_file | |
| load_dotenv() | |
| class ModelPricing: | |
| """Pricing per 1 million tokens in USD. | |
| Attributes: | |
| input_per_million: Cost per 1M input tokens (USD) | |
| output_per_million: Cost per 1M output tokens (USD) | |
| """ | |
| input_per_million: float | |
| output_per_million: float | |
| class BenchmarkModelConfig: | |
| """Model configuration for benchmarking. | |
| Attributes: | |
| provider: Provider key (google, openai, local) | |
| model_name: Model identifier used by the provider | |
| pricing: Pricing information per 1M tokens | |
| """ | |
| provider: str | |
| model_name: str | |
| pricing: ModelPricing | |
| # Sources: | |
| # - https://ai.google.dev/pricing | |
| # - https://openai.com/api/pricing/ | |
| BENCHMARK_MODELS = [ | |
| BenchmarkModelConfig( | |
| provider="google", | |
| model_name="gemini-2.5-pro", | |
| pricing=ModelPricing(input_per_million=1.25, output_per_million=10.00), | |
| ), | |
| BenchmarkModelConfig( | |
| provider="google", | |
| model_name="gemini-2.5-flash-lite", | |
| pricing=ModelPricing(input_per_million=0.1, output_per_million=0.4), | |
| ), | |
| ] | |
| class TokenUsage: | |
| """Token usage statistics for a single assessment. | |
| Attributes: | |
| input_tokens: Tokens in the prompt/input | |
| output_tokens: Tokens in the model's response | |
| """ | |
| input_tokens: int | |
| output_tokens: int | |
| def total_tokens(self) -> int: | |
| """Total tokens used. | |
| Returns: | |
| Sum of input and output tokens | |
| """ | |
| return self.input_tokens + self.output_tokens | |
| class BenchmarkResult: | |
| """Results from a single model/profile benchmark run. | |
| Attributes: | |
| model_name: Name of the model | |
| provider: Provider key (openai, google, local) | |
| profile_name: Name of the profile | |
| token_usage: Token usage statistics | |
| cost: Cost in USD | |
| """ | |
| model_name: str | |
| provider: str | |
| profile_name: str | |
| token_usage: TokenUsage | |
| cost: float | |
| def calculate_cost(token_usage: TokenUsage, pricing: ModelPricing) -> float: | |
| """Calculate cost based on token usage and model pricing. | |
| Args: | |
| token_usage: Token usage statistics | |
| pricing: Model pricing per 1M tokens | |
| Returns: | |
| Cost in USD | |
| """ | |
| input_cost = (token_usage.input_tokens / 1_000_000) * pricing.input_per_million | |
| output_cost = (token_usage.output_tokens / 1_000_000) * pricing.output_per_million | |
| return input_cost + output_cost | |
| def validate_directory_input(func: Callable[..., Any]) -> Callable[..., Any]: | |
| """Decorator to validate directory argument. | |
| Args: | |
| func: Function to decorate | |
| Returns: | |
| Decorated function that validates directory input | |
| """ | |
| def wrapper(directory: Path, *args: Any, **kwargs: Any) -> Any: | |
| """Wrapper function to validate directory input. | |
| Args: | |
| directory: Path to directory to validate | |
| *args: Additional positional arguments | |
| **kwargs: Additional keyword arguments | |
| Returns: | |
| Result of the wrapped function | |
| Raises: | |
| FileNotFoundError: If the directory does not exist | |
| NotADirectoryError: If the path is not a directory | |
| ValueError: If the directory is empty | |
| """ | |
| if not directory.exists(): | |
| raise FileNotFoundError(f"Directory not found: {directory}") | |
| if not directory.is_dir(): | |
| raise NotADirectoryError(f"Not a directory: {directory}") | |
| if not any(directory.iterdir()): | |
| raise ValueError(f"Directory is empty: {directory}") | |
| return func(directory, *args, **kwargs) | |
| return wrapper | |
| def get_available_models() -> list[BenchmarkModelConfig]: | |
| """Get list of available models for benchmarking. | |
| Returns: | |
| List of configured benchmark models | |
| """ | |
| return BENCHMARK_MODELS | |
| def load_benchmark_profiles(benchmark_dir: Path) -> list[dict[str, Any]]: | |
| """Load benchmark profiles. | |
| Args: | |
| benchmark_dir: Directory containing benchmark YAML files | |
| Returns: | |
| List of dicts with 'name' and 'path' keys | |
| """ | |
| profiles = [] | |
| for yaml_file in sorted(benchmark_dir.glob("*.yaml")): | |
| profiles.append({"name": yaml_file.stem, "path": yaml_file}) | |
| return profiles | |
| def create_knowledge_base_paths(workspace_root: Path) -> ResourcePaths: | |
| """Build resource path configuration from workspace root. | |
| Args: | |
| workspace_root: Path to workspace root directory | |
| Returns: | |
| ResourcePaths configuration object | |
| """ | |
| return ResourcePaths( | |
| persona=workspace_root / "prompts/persona/default.md", | |
| instruction_assessment=workspace_root / "prompts/instruction/assessment.md", | |
| instruction_conversation=workspace_root / "prompts/instruction/conversation.md", | |
| output_format_assessment=workspace_root | |
| / "configs/output_format/assessment.yaml", | |
| output_format_conversation=workspace_root | |
| / "configs/output_format/conversation.yaml", | |
| cancer_modules_dir=workspace_root / "configs/knowledge_base/cancer_modules", | |
| dx_protocols_dir=workspace_root / "configs/knowledge_base/dx_protocols", | |
| ) | |
| def validate_backend(provider: str, model_name: str) -> None: | |
| """Validate that backend is accessible. | |
| Args: | |
| provider: Provider key (e.g. "openai", "google", "local") | |
| model_name: Model identifier | |
| Raises: | |
| ValueError: If the backend is not accessible | |
| """ | |
| if provider == "openai": | |
| if not os.getenv("OPENAI_API_KEY"): | |
| raise ValueError("OPENAI_API_KEY not set") | |
| elif provider == "google": | |
| if not os.getenv("GOOGLE_API_KEY"): | |
| raise ValueError("GOOGLE_API_KEY not set") | |
| elif provider == "local": | |
| ollama_base_url = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") | |
| response = requests.get(f"{ollama_base_url}/api/tags", timeout=2) | |
| if response.status_code != 200: | |
| raise ValueError("Ollama server not responding") | |
| models = response.json().get("models", []) | |
| model_names = [m.get("name") for m in models] | |
| if model_name not in model_names: | |
| raise ValueError(f"Model not found. Run: ollama pull {model_name}") | |
| def run_assessment( | |
| model_config: BenchmarkModelConfig, profile_path: Path | |
| ) -> BenchmarkResult: | |
| """Run a single assessment and capture token usage. | |
| Args: | |
| model_config: Model configuration with pricing | |
| profile_path: Path to profile YAML file | |
| Returns: | |
| BenchmarkResult with cost and token usage | |
| """ | |
| validate_backend(model_config.provider, model_config.model_name) | |
| workspace_root = Path(__file__).parent.parent | |
| with open(workspace_root / "configs/config.yaml") as f: | |
| default_config = yaml.safe_load(f) | |
| app_config = AppConfig( | |
| model=ModelConfig( | |
| provider=model_config.provider, | |
| model_name=model_config.model_name, | |
| ), | |
| knowledge_base_paths=create_knowledge_base_paths(workspace_root), | |
| selected_cancer_modules=default_config["knowledge_base"]["cancer_modules"], | |
| selected_dx_protocols=default_config["knowledge_base"]["dx_protocols"], | |
| ) | |
| factory = SentinelFactory(app_config) | |
| conversation = factory.create_conversation_manager() | |
| user = load_user_file(str(profile_path)) | |
| with get_openai_callback() as cb: | |
| conversation.initial_assessment(user) | |
| input_tokens = cb.prompt_tokens | |
| output_tokens = cb.completion_tokens | |
| token_usage = TokenUsage(input_tokens, output_tokens) | |
| cost = calculate_cost(token_usage, model_config.pricing) | |
| return BenchmarkResult( | |
| model_name=model_config.model_name, | |
| provider=model_config.provider, | |
| profile_name=profile_path.stem, | |
| token_usage=token_usage, | |
| cost=cost, | |
| ) | |
| def print_results(results: list[BenchmarkResult]) -> None: | |
| """Print formatted results to console. | |
| Args: | |
| results: List of benchmark results | |
| """ | |
| by_model = defaultdict(list) | |
| for result in results: | |
| by_model[result.model_name].append(result) | |
| lines = [] | |
| lines.append("\n╔══════════════════════════════════════════════════════════════╗") | |
| lines.append("║ LLM Cost Benchmark Results ║") | |
| lines.append("╚══════════════════════════════════════════════════════════════╝\n") | |
| for model_name, model_results in sorted(by_model.items()): | |
| provider = model_results[0].provider | |
| lines.append(f"Model: {model_name} ({provider})") | |
| num_results = len(model_results) | |
| avg_cost = sum(result.cost for result in model_results) / num_results | |
| avg_input = ( | |
| sum(result.token_usage.input_tokens for result in model_results) | |
| / num_results | |
| ) | |
| avg_output = ( | |
| sum(result.token_usage.output_tokens for result in model_results) | |
| / num_results | |
| ) | |
| for result_index, result in enumerate(model_results): | |
| is_last = result_index == num_results - 1 | |
| prefix = "└─" if is_last else "├─" | |
| indent = " " if is_last else "│ " | |
| lines.append(f"{prefix} Profile: {result.profile_name}") | |
| lines.append(f"{indent}├─ Input: {result.token_usage.input_tokens:,}") | |
| lines.append(f"{indent}├─ Output: {result.token_usage.output_tokens:,}") | |
| lines.append(f"{indent}└─ Cost: ${result.cost:.4f}") | |
| lines.append(f"└─ Average: ${avg_cost:.4f}") | |
| lines.append( | |
| f" └─ Tokens: {avg_input:,.0f} input, {avg_output:,.0f} output\n" | |
| ) | |
| lines.append("═══════════════════════════════════════════════════════════════") | |
| lines.append("Summary - Model Ranking (Cheapest to Most Expensive)") | |
| lines.append("───────────────────────────────────────────────────────────────") | |
| model_averages = sorted( | |
| ( | |
| ( | |
| model_name, | |
| sum(result.cost for result in model_results) / len(model_results), | |
| ) | |
| for model_name, model_results in by_model.items() | |
| ), | |
| key=lambda model_avg_tuple: model_avg_tuple[1], | |
| ) | |
| for rank, (model_name, avg_cost) in enumerate(model_averages, 1): | |
| prefix = ( | |
| "🥇" | |
| if rank == 1 | |
| else "🥈" | |
| if rank == 2 | |
| else "🥉" | |
| if rank == 3 | |
| else f"{rank}." | |
| ) | |
| lines.append(f"{prefix:<4} {model_name:<25} ${avg_cost:.4f}") | |
| lines.append(f"\nTotal: {len(results)} assessments across {len(by_model)} models") | |
| lines.append("═══════════════════════════════════════════════════════════════\n") | |
| logger.info("\n".join(lines)) | |
| def export_to_csv(results: list[BenchmarkResult], output_path: Path) -> None: | |
| """Export results to CSV file. | |
| Args: | |
| results: List of benchmark results | |
| output_path: Path to output CSV file | |
| """ | |
| with open(output_path, "w", newline="") as f: | |
| writer = csv.writer(f) | |
| writer.writerow( | |
| [ | |
| "model_name", | |
| "provider", | |
| "profile_name", | |
| "input_tokens", | |
| "output_tokens", | |
| "total_tokens", | |
| "cost_usd", | |
| ] | |
| ) | |
| for result in results: | |
| writer.writerow( | |
| [ | |
| result.model_name, | |
| result.provider, | |
| result.profile_name, | |
| result.token_usage.input_tokens, | |
| result.token_usage.output_tokens, | |
| result.token_usage.total_tokens, | |
| f"{result.cost:.6f}", | |
| ] | |
| ) | |
| logger.success(f"Results exported to: {output_path}") | |
| def export_to_pdf( | |
| results: list[BenchmarkResult], | |
| output_path: Path, | |
| ) -> None: | |
| """Export results to PDF file with formatted table. | |
| Args: | |
| results: List of benchmark results | |
| output_path: Path to output PDF file | |
| """ | |
| doc = SimpleDocTemplate( | |
| str(output_path), | |
| pagesize=letter, | |
| leftMargin=0.75 * inch, | |
| rightMargin=0.75 * inch, | |
| topMargin=0.75 * inch, | |
| bottomMargin=0.75 * inch, | |
| ) | |
| elements = [] | |
| styles = getSampleStyleSheet() | |
| title = Paragraph( | |
| "<b>LLM Cost Benchmark Report</b>", | |
| styles["Title"], | |
| ) | |
| elements.append(title) | |
| elements.append(Spacer(1, 0.2 * inch)) | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| timestamp_text = Paragraph( | |
| f"Generated: {timestamp}", | |
| styles["Normal"], | |
| ) | |
| elements.append(timestamp_text) | |
| elements.append(Spacer(1, 0.3 * inch)) | |
| by_model = defaultdict(list) | |
| for result in results: | |
| by_model[result.model_name].append(result) | |
| pricing_lookup = {model.model_name: model.pricing for model in BENCHMARK_MODELS} | |
| results_desc = Paragraph( | |
| "Average cost of running a single cancer risk assessment given a completed patient questionnaire.", | |
| styles["Normal"], | |
| ) | |
| elements.append(results_desc) | |
| elements.append(Spacer(1, 0.2 * inch)) | |
| table_data = [ | |
| [ | |
| "Model", | |
| "Provider", | |
| "Avg Cost\nper Report", | |
| "Input Price\n(per 1M)", | |
| "Output Price\n(per 1M)", | |
| "Avg Input\nTokens", | |
| "Avg Output\nTokens", | |
| ] | |
| ] | |
| # Sort by average cost (cheapest first) | |
| sorted_models = sorted( | |
| by_model.items(), | |
| key=lambda model_tuple: sum(result.cost for result in model_tuple[1]) | |
| / len(model_tuple[1]), | |
| ) | |
| for model_name, model_results in sorted_models: | |
| provider = model_results[0].provider | |
| num_results = len(model_results) | |
| avg_cost = sum(result.cost for result in model_results) / num_results | |
| avg_input = ( | |
| sum(result.token_usage.input_tokens for result in model_results) | |
| / num_results | |
| ) | |
| avg_output = ( | |
| sum(result.token_usage.output_tokens for result in model_results) | |
| / num_results | |
| ) | |
| pricing = pricing_lookup.get(model_name) | |
| input_price = f"${pricing.input_per_million:.2f}" if pricing else "N/A" | |
| output_price = f"${pricing.output_per_million:.2f}" if pricing else "N/A" | |
| table_data.append( | |
| [ | |
| model_name, | |
| provider, | |
| f"${avg_cost:.4f}", | |
| input_price, | |
| output_price, | |
| f"{avg_input:,.0f}", | |
| f"{avg_output:,.0f}", | |
| ] | |
| ) | |
| table = Table( | |
| table_data, | |
| colWidths=[ | |
| 1.6 * inch, | |
| 0.9 * inch, | |
| 1.0 * inch, | |
| 1.0 * inch, | |
| 1.0 * inch, | |
| 0.9 * inch, | |
| 0.9 * inch, | |
| ], | |
| ) | |
| table_style = TableStyle( | |
| [ | |
| # Header styling | |
| ("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#4A90E2")), | |
| ("TEXTCOLOR", (0, 0), (-1, 0), colors.whitesmoke), | |
| ("ALIGN", (0, 0), (-1, 0), "CENTER"), | |
| ("VALIGN", (0, 0), (-1, 0), "MIDDLE"), | |
| ("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"), | |
| ("FONTSIZE", (0, 0), (-1, 0), 9), | |
| ("BOTTOMPADDING", (0, 0), (-1, 0), 12), | |
| ("TOPPADDING", (0, 0), (-1, 0), 12), | |
| # Data rows styling | |
| ("BACKGROUND", (0, 1), (-1, -1), colors.beige), | |
| ("TEXTCOLOR", (0, 1), (-1, -1), colors.black), | |
| ("ALIGN", (0, 1), (1, -1), "LEFT"), | |
| ("ALIGN", (2, 1), (-1, -1), "CENTER"), | |
| ("VALIGN", (0, 1), (-1, -1), "MIDDLE"), | |
| ("FONTNAME", (0, 1), (-1, -1), "Helvetica"), | |
| ("FONTSIZE", (0, 1), (-1, -1), 9), | |
| ("TOPPADDING", (0, 1), (-1, -1), 8), | |
| ("BOTTOMPADDING", (0, 1), (-1, -1), 8), | |
| # Alternating row colors | |
| ("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.beige, colors.lightgrey]), | |
| # Grid | |
| ("GRID", (0, 0), (-1, -1), 1, colors.black), | |
| ] | |
| ) | |
| table.setStyle(table_style) | |
| elements.append(table) | |
| elements.append(Spacer(1, 0.3 * inch)) | |
| doc.build(elements) | |
| logger.success(f"PDF report generated: {output_path}") | |
| def parse_args() -> argparse.Namespace: | |
| """Parse command-line arguments. | |
| Returns: | |
| Parsed command-line arguments | |
| """ | |
| workspace_root = Path(__file__).parent.parent | |
| parser = argparse.ArgumentParser(description="Benchmark LLM costs") | |
| parser.add_argument( | |
| "--benchmark-dir", | |
| type=Path, | |
| default=workspace_root / "examples/benchmark", | |
| help="Benchmark profile directory", | |
| ) | |
| parser.add_argument( | |
| "--models", | |
| nargs="+", | |
| help="Specific models to test (by name)", | |
| ) | |
| parser.add_argument( | |
| "--profiles", | |
| nargs="+", | |
| help="Specific profiles to test", | |
| ) | |
| parser.add_argument( | |
| "--output", | |
| type=Path, | |
| help="Export to CSV", | |
| ) | |
| return parser.parse_args() | |
| def main() -> None: | |
| """Main entry point. | |
| Raises: | |
| ValueError: If no matching models or profiles found | |
| """ | |
| args = parse_args() | |
| logger.info("Loading benchmark configuration...") | |
| all_models = get_available_models() | |
| logger.info("Loading profiles...") | |
| all_profiles = load_benchmark_profiles(args.benchmark_dir) | |
| if args.models: | |
| all_models = [model for model in all_models if model.model_name in args.models] | |
| if not all_models: | |
| raise ValueError(f"No matching models: {args.models}") | |
| if args.profiles: | |
| all_profiles = [ | |
| profile for profile in all_profiles if profile["name"] in args.profiles | |
| ] | |
| if not all_profiles: | |
| raise ValueError(f"No matching profiles: {args.profiles}") | |
| logger.info( | |
| f"\nRunning {len(all_models)} model(s) x {len(all_profiles)} profile(s)...\n" | |
| ) | |
| results = [] | |
| for model_index, model in enumerate(all_models, 1): | |
| for profile in all_profiles: | |
| logger.info( | |
| f"[{model_index}/{len(all_models)}] {model.model_name}: {profile['name']}" | |
| ) | |
| result = run_assessment(model, profile["path"]) | |
| results.append(result) | |
| print_results(results) | |
| # Generate PDF report with timestamp | |
| workspace_root = Path(__file__).parent.parent | |
| outputs_dir = workspace_root / "outputs" | |
| outputs_dir.mkdir(exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
| pdf_path = outputs_dir / f"llm_benchmark_{timestamp}.pdf" | |
| export_to_pdf(results, pdf_path) | |
| if args.output: | |
| export_to_csv(results, args.output) | |
| if __name__ == "__main__": | |
| main() | |