sentinel / scripts /benchmark_llm_costs.py
jeuko's picture
Sync from GitHub (main)
8018595 verified
"""LLM Cost Benchmarking Script
Measures token usage and calculates costs for cancer risk assessments
across different LLM backends.
"""
import argparse
import csv
import functools
import os
from collections import defaultdict
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any
import requests
import yaml
from dotenv import load_dotenv
from langchain_community.callbacks.manager import get_openai_callback
from loguru import logger
from reportlab.lib import colors
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.lib.units import inch
from reportlab.platypus import Paragraph, SimpleDocTemplate, Spacer, Table, TableStyle
from sentinel.config import AppConfig, ModelConfig, ResourcePaths
from sentinel.factory import SentinelFactory
from sentinel.utils import load_user_file
load_dotenv()
@dataclass
class ModelPricing:
"""Pricing per 1 million tokens in USD.
Attributes:
input_per_million: Cost per 1M input tokens (USD)
output_per_million: Cost per 1M output tokens (USD)
"""
input_per_million: float
output_per_million: float
@dataclass
class BenchmarkModelConfig:
"""Model configuration for benchmarking.
Attributes:
provider: Provider key (google, openai, local)
model_name: Model identifier used by the provider
pricing: Pricing information per 1M tokens
"""
provider: str
model_name: str
pricing: ModelPricing
# Sources:
# - https://ai.google.dev/pricing
# - https://openai.com/api/pricing/
BENCHMARK_MODELS = [
BenchmarkModelConfig(
provider="google",
model_name="gemini-2.5-pro",
pricing=ModelPricing(input_per_million=1.25, output_per_million=10.00),
),
BenchmarkModelConfig(
provider="google",
model_name="gemini-2.5-flash-lite",
pricing=ModelPricing(input_per_million=0.1, output_per_million=0.4),
),
]
@dataclass
class TokenUsage:
"""Token usage statistics for a single assessment.
Attributes:
input_tokens: Tokens in the prompt/input
output_tokens: Tokens in the model's response
"""
input_tokens: int
output_tokens: int
@property
def total_tokens(self) -> int:
"""Total tokens used.
Returns:
Sum of input and output tokens
"""
return self.input_tokens + self.output_tokens
@dataclass
class BenchmarkResult:
"""Results from a single model/profile benchmark run.
Attributes:
model_name: Name of the model
provider: Provider key (openai, google, local)
profile_name: Name of the profile
token_usage: Token usage statistics
cost: Cost in USD
"""
model_name: str
provider: str
profile_name: str
token_usage: TokenUsage
cost: float
def calculate_cost(token_usage: TokenUsage, pricing: ModelPricing) -> float:
"""Calculate cost based on token usage and model pricing.
Args:
token_usage: Token usage statistics
pricing: Model pricing per 1M tokens
Returns:
Cost in USD
"""
input_cost = (token_usage.input_tokens / 1_000_000) * pricing.input_per_million
output_cost = (token_usage.output_tokens / 1_000_000) * pricing.output_per_million
return input_cost + output_cost
def validate_directory_input(func: Callable[..., Any]) -> Callable[..., Any]:
"""Decorator to validate directory argument.
Args:
func: Function to decorate
Returns:
Decorated function that validates directory input
"""
@functools.wraps(func)
def wrapper(directory: Path, *args: Any, **kwargs: Any) -> Any:
"""Wrapper function to validate directory input.
Args:
directory: Path to directory to validate
*args: Additional positional arguments
**kwargs: Additional keyword arguments
Returns:
Result of the wrapped function
Raises:
FileNotFoundError: If the directory does not exist
NotADirectoryError: If the path is not a directory
ValueError: If the directory is empty
"""
if not directory.exists():
raise FileNotFoundError(f"Directory not found: {directory}")
if not directory.is_dir():
raise NotADirectoryError(f"Not a directory: {directory}")
if not any(directory.iterdir()):
raise ValueError(f"Directory is empty: {directory}")
return func(directory, *args, **kwargs)
return wrapper
def get_available_models() -> list[BenchmarkModelConfig]:
"""Get list of available models for benchmarking.
Returns:
List of configured benchmark models
"""
return BENCHMARK_MODELS
@validate_directory_input
def load_benchmark_profiles(benchmark_dir: Path) -> list[dict[str, Any]]:
"""Load benchmark profiles.
Args:
benchmark_dir: Directory containing benchmark YAML files
Returns:
List of dicts with 'name' and 'path' keys
"""
profiles = []
for yaml_file in sorted(benchmark_dir.glob("*.yaml")):
profiles.append({"name": yaml_file.stem, "path": yaml_file})
return profiles
def create_knowledge_base_paths(workspace_root: Path) -> ResourcePaths:
"""Build resource path configuration from workspace root.
Args:
workspace_root: Path to workspace root directory
Returns:
ResourcePaths configuration object
"""
return ResourcePaths(
persona=workspace_root / "prompts/persona/default.md",
instruction_assessment=workspace_root / "prompts/instruction/assessment.md",
instruction_conversation=workspace_root / "prompts/instruction/conversation.md",
output_format_assessment=workspace_root
/ "configs/output_format/assessment.yaml",
output_format_conversation=workspace_root
/ "configs/output_format/conversation.yaml",
cancer_modules_dir=workspace_root / "configs/knowledge_base/cancer_modules",
dx_protocols_dir=workspace_root / "configs/knowledge_base/dx_protocols",
)
def validate_backend(provider: str, model_name: str) -> None:
"""Validate that backend is accessible.
Args:
provider: Provider key (e.g. "openai", "google", "local")
model_name: Model identifier
Raises:
ValueError: If the backend is not accessible
"""
if provider == "openai":
if not os.getenv("OPENAI_API_KEY"):
raise ValueError("OPENAI_API_KEY not set")
elif provider == "google":
if not os.getenv("GOOGLE_API_KEY"):
raise ValueError("GOOGLE_API_KEY not set")
elif provider == "local":
ollama_base_url = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434")
response = requests.get(f"{ollama_base_url}/api/tags", timeout=2)
if response.status_code != 200:
raise ValueError("Ollama server not responding")
models = response.json().get("models", [])
model_names = [m.get("name") for m in models]
if model_name not in model_names:
raise ValueError(f"Model not found. Run: ollama pull {model_name}")
def run_assessment(
model_config: BenchmarkModelConfig, profile_path: Path
) -> BenchmarkResult:
"""Run a single assessment and capture token usage.
Args:
model_config: Model configuration with pricing
profile_path: Path to profile YAML file
Returns:
BenchmarkResult with cost and token usage
"""
validate_backend(model_config.provider, model_config.model_name)
workspace_root = Path(__file__).parent.parent
with open(workspace_root / "configs/config.yaml") as f:
default_config = yaml.safe_load(f)
app_config = AppConfig(
model=ModelConfig(
provider=model_config.provider,
model_name=model_config.model_name,
),
knowledge_base_paths=create_knowledge_base_paths(workspace_root),
selected_cancer_modules=default_config["knowledge_base"]["cancer_modules"],
selected_dx_protocols=default_config["knowledge_base"]["dx_protocols"],
)
factory = SentinelFactory(app_config)
conversation = factory.create_conversation_manager()
user = load_user_file(str(profile_path))
with get_openai_callback() as cb:
conversation.initial_assessment(user)
input_tokens = cb.prompt_tokens
output_tokens = cb.completion_tokens
token_usage = TokenUsage(input_tokens, output_tokens)
cost = calculate_cost(token_usage, model_config.pricing)
return BenchmarkResult(
model_name=model_config.model_name,
provider=model_config.provider,
profile_name=profile_path.stem,
token_usage=token_usage,
cost=cost,
)
def print_results(results: list[BenchmarkResult]) -> None:
"""Print formatted results to console.
Args:
results: List of benchmark results
"""
by_model = defaultdict(list)
for result in results:
by_model[result.model_name].append(result)
lines = []
lines.append("\n╔══════════════════════════════════════════════════════════════╗")
lines.append("║ LLM Cost Benchmark Results ║")
lines.append("╚══════════════════════════════════════════════════════════════╝\n")
for model_name, model_results in sorted(by_model.items()):
provider = model_results[0].provider
lines.append(f"Model: {model_name} ({provider})")
num_results = len(model_results)
avg_cost = sum(result.cost for result in model_results) / num_results
avg_input = (
sum(result.token_usage.input_tokens for result in model_results)
/ num_results
)
avg_output = (
sum(result.token_usage.output_tokens for result in model_results)
/ num_results
)
for result_index, result in enumerate(model_results):
is_last = result_index == num_results - 1
prefix = "└─" if is_last else "├─"
indent = " " if is_last else "│ "
lines.append(f"{prefix} Profile: {result.profile_name}")
lines.append(f"{indent}├─ Input: {result.token_usage.input_tokens:,}")
lines.append(f"{indent}├─ Output: {result.token_usage.output_tokens:,}")
lines.append(f"{indent}└─ Cost: ${result.cost:.4f}")
lines.append(f"└─ Average: ${avg_cost:.4f}")
lines.append(
f" └─ Tokens: {avg_input:,.0f} input, {avg_output:,.0f} output\n"
)
lines.append("═══════════════════════════════════════════════════════════════")
lines.append("Summary - Model Ranking (Cheapest to Most Expensive)")
lines.append("───────────────────────────────────────────────────────────────")
model_averages = sorted(
(
(
model_name,
sum(result.cost for result in model_results) / len(model_results),
)
for model_name, model_results in by_model.items()
),
key=lambda model_avg_tuple: model_avg_tuple[1],
)
for rank, (model_name, avg_cost) in enumerate(model_averages, 1):
prefix = (
"🥇"
if rank == 1
else "🥈"
if rank == 2
else "🥉"
if rank == 3
else f"{rank}."
)
lines.append(f"{prefix:<4} {model_name:<25} ${avg_cost:.4f}")
lines.append(f"\nTotal: {len(results)} assessments across {len(by_model)} models")
lines.append("═══════════════════════════════════════════════════════════════\n")
logger.info("\n".join(lines))
def export_to_csv(results: list[BenchmarkResult], output_path: Path) -> None:
"""Export results to CSV file.
Args:
results: List of benchmark results
output_path: Path to output CSV file
"""
with open(output_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(
[
"model_name",
"provider",
"profile_name",
"input_tokens",
"output_tokens",
"total_tokens",
"cost_usd",
]
)
for result in results:
writer.writerow(
[
result.model_name,
result.provider,
result.profile_name,
result.token_usage.input_tokens,
result.token_usage.output_tokens,
result.token_usage.total_tokens,
f"{result.cost:.6f}",
]
)
logger.success(f"Results exported to: {output_path}")
def export_to_pdf(
results: list[BenchmarkResult],
output_path: Path,
) -> None:
"""Export results to PDF file with formatted table.
Args:
results: List of benchmark results
output_path: Path to output PDF file
"""
doc = SimpleDocTemplate(
str(output_path),
pagesize=letter,
leftMargin=0.75 * inch,
rightMargin=0.75 * inch,
topMargin=0.75 * inch,
bottomMargin=0.75 * inch,
)
elements = []
styles = getSampleStyleSheet()
title = Paragraph(
"<b>LLM Cost Benchmark Report</b>",
styles["Title"],
)
elements.append(title)
elements.append(Spacer(1, 0.2 * inch))
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
timestamp_text = Paragraph(
f"Generated: {timestamp}",
styles["Normal"],
)
elements.append(timestamp_text)
elements.append(Spacer(1, 0.3 * inch))
by_model = defaultdict(list)
for result in results:
by_model[result.model_name].append(result)
pricing_lookup = {model.model_name: model.pricing for model in BENCHMARK_MODELS}
results_desc = Paragraph(
"Average cost of running a single cancer risk assessment given a completed patient questionnaire.",
styles["Normal"],
)
elements.append(results_desc)
elements.append(Spacer(1, 0.2 * inch))
table_data = [
[
"Model",
"Provider",
"Avg Cost\nper Report",
"Input Price\n(per 1M)",
"Output Price\n(per 1M)",
"Avg Input\nTokens",
"Avg Output\nTokens",
]
]
# Sort by average cost (cheapest first)
sorted_models = sorted(
by_model.items(),
key=lambda model_tuple: sum(result.cost for result in model_tuple[1])
/ len(model_tuple[1]),
)
for model_name, model_results in sorted_models:
provider = model_results[0].provider
num_results = len(model_results)
avg_cost = sum(result.cost for result in model_results) / num_results
avg_input = (
sum(result.token_usage.input_tokens for result in model_results)
/ num_results
)
avg_output = (
sum(result.token_usage.output_tokens for result in model_results)
/ num_results
)
pricing = pricing_lookup.get(model_name)
input_price = f"${pricing.input_per_million:.2f}" if pricing else "N/A"
output_price = f"${pricing.output_per_million:.2f}" if pricing else "N/A"
table_data.append(
[
model_name,
provider,
f"${avg_cost:.4f}",
input_price,
output_price,
f"{avg_input:,.0f}",
f"{avg_output:,.0f}",
]
)
table = Table(
table_data,
colWidths=[
1.6 * inch,
0.9 * inch,
1.0 * inch,
1.0 * inch,
1.0 * inch,
0.9 * inch,
0.9 * inch,
],
)
table_style = TableStyle(
[
# Header styling
("BACKGROUND", (0, 0), (-1, 0), colors.HexColor("#4A90E2")),
("TEXTCOLOR", (0, 0), (-1, 0), colors.whitesmoke),
("ALIGN", (0, 0), (-1, 0), "CENTER"),
("VALIGN", (0, 0), (-1, 0), "MIDDLE"),
("FONTNAME", (0, 0), (-1, 0), "Helvetica-Bold"),
("FONTSIZE", (0, 0), (-1, 0), 9),
("BOTTOMPADDING", (0, 0), (-1, 0), 12),
("TOPPADDING", (0, 0), (-1, 0), 12),
# Data rows styling
("BACKGROUND", (0, 1), (-1, -1), colors.beige),
("TEXTCOLOR", (0, 1), (-1, -1), colors.black),
("ALIGN", (0, 1), (1, -1), "LEFT"),
("ALIGN", (2, 1), (-1, -1), "CENTER"),
("VALIGN", (0, 1), (-1, -1), "MIDDLE"),
("FONTNAME", (0, 1), (-1, -1), "Helvetica"),
("FONTSIZE", (0, 1), (-1, -1), 9),
("TOPPADDING", (0, 1), (-1, -1), 8),
("BOTTOMPADDING", (0, 1), (-1, -1), 8),
# Alternating row colors
("ROWBACKGROUNDS", (0, 1), (-1, -1), [colors.beige, colors.lightgrey]),
# Grid
("GRID", (0, 0), (-1, -1), 1, colors.black),
]
)
table.setStyle(table_style)
elements.append(table)
elements.append(Spacer(1, 0.3 * inch))
doc.build(elements)
logger.success(f"PDF report generated: {output_path}")
def parse_args() -> argparse.Namespace:
"""Parse command-line arguments.
Returns:
Parsed command-line arguments
"""
workspace_root = Path(__file__).parent.parent
parser = argparse.ArgumentParser(description="Benchmark LLM costs")
parser.add_argument(
"--benchmark-dir",
type=Path,
default=workspace_root / "examples/benchmark",
help="Benchmark profile directory",
)
parser.add_argument(
"--models",
nargs="+",
help="Specific models to test (by name)",
)
parser.add_argument(
"--profiles",
nargs="+",
help="Specific profiles to test",
)
parser.add_argument(
"--output",
type=Path,
help="Export to CSV",
)
return parser.parse_args()
def main() -> None:
"""Main entry point.
Raises:
ValueError: If no matching models or profiles found
"""
args = parse_args()
logger.info("Loading benchmark configuration...")
all_models = get_available_models()
logger.info("Loading profiles...")
all_profiles = load_benchmark_profiles(args.benchmark_dir)
if args.models:
all_models = [model for model in all_models if model.model_name in args.models]
if not all_models:
raise ValueError(f"No matching models: {args.models}")
if args.profiles:
all_profiles = [
profile for profile in all_profiles if profile["name"] in args.profiles
]
if not all_profiles:
raise ValueError(f"No matching profiles: {args.profiles}")
logger.info(
f"\nRunning {len(all_models)} model(s) x {len(all_profiles)} profile(s)...\n"
)
results = []
for model_index, model in enumerate(all_models, 1):
for profile in all_profiles:
logger.info(
f"[{model_index}/{len(all_models)}] {model.model_name}: {profile['name']}"
)
result = run_assessment(model, profile["path"])
results.append(result)
print_results(results)
# Generate PDF report with timestamp
workspace_root = Path(__file__).parent.parent
outputs_dir = workspace_root / "outputs"
outputs_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
pdf_path = outputs_dir / f"llm_benchmark_{timestamp}.pdf"
export_to_pdf(results, pdf_path)
if args.output:
export_to_csv(results, args.output)
if __name__ == "__main__":
main()