ALSARA / shared /utils.py
axegameon's picture
Upload ALSARA app files (#1)
3e435ad verified
# shared/utils.py
"""Shared utilities for MCP servers"""
import asyncio
import time
import logging
from typing import Optional, Callable, Any
from mcp.types import TextContent
import httpx
logger = logging.getLogger(__name__)
class RateLimiter:
"""Rate limiter for API calls"""
def __init__(self, delay: float):
"""
Initialize rate limiter
Args:
delay: Minimum delay between requests in seconds
"""
self.delay = delay
self.last_request_time: Optional[float] = None
async def wait(self) -> None:
"""Wait if necessary to respect rate limit"""
if self.last_request_time is not None:
elapsed = time.time() - self.last_request_time
if elapsed < self.delay:
await asyncio.sleep(self.delay - elapsed)
self.last_request_time = time.time()
async def safe_api_call(
func: Callable,
*args: Any,
timeout: float = 30.0,
error_prefix: str = "API",
**kwargs: Any
) -> list[TextContent]:
"""
Safely execute an API call with comprehensive error handling
Args:
func: Async function to call
*args: Positional arguments for func
timeout: Timeout in seconds
error_prefix: Prefix for error messages
**kwargs: Keyword arguments for func
Returns:
list[TextContent]: Result or error message
"""
try:
return await asyncio.wait_for(func(*args, **kwargs), timeout=timeout)
except asyncio.TimeoutError:
logger.error(f"{error_prefix} request timed out after {timeout}s")
return [TextContent(
type="text",
text=f"Error: {error_prefix} request timed out after {timeout} seconds. Please try again."
)]
except httpx.TimeoutException:
logger.error(f"{error_prefix} request timed out")
return [TextContent(
type="text",
text=f"Error: {error_prefix} request timed out. Please try again."
)]
except httpx.HTTPStatusError as e:
logger.error(f"{error_prefix} error: HTTP {e.response.status_code}")
return [TextContent(
type="text",
text=f"Error: {error_prefix} returned status {e.response.status_code}"
)]
except httpx.RequestError as e:
logger.error(f"{error_prefix} request error: {e}")
return [TextContent(
type="text",
text=f"Error: Failed to connect to {error_prefix}. Please check your connection."
)]
except Exception as e:
logger.error(f"Unexpected error in {error_prefix}: {e}", exc_info=True)
return [TextContent(
type="text",
text=f"Error: {str(e)}"
)]
def truncate_text(text: str, max_chars: int = 8000, suffix: str = "...") -> str:
"""
Truncate text to maximum length with suffix
Args:
text: Text to truncate
max_chars: Maximum character count
suffix: Suffix to add when truncated
Returns:
Truncated text
"""
if len(text) <= max_chars:
return text
return text[:max_chars] + f"\n\n[Content truncated at {max_chars} characters]{suffix}"
def format_authors(authors: str, max_authors: int = 3) -> str:
"""
Format author list with et al. if needed
Args:
authors: Semicolon-separated author list
max_authors: Maximum authors to show
Returns:
Formatted author string
"""
if not authors or authors == "Unknown":
return "Unknown authors"
author_list = [a.strip() for a in authors.split(";")]
if len(author_list) <= max_authors:
return ", ".join(author_list)
return ", ".join(author_list[:max_authors]) + " et al."
def clean_whitespace(text: str) -> str:
"""
Clean up excessive whitespace in text
Args:
text: Text to clean
Returns:
Cleaned text
"""
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
return '\n'.join(chunk for chunk in chunks if chunk)
class ErrorFormatter:
"""Consistent error message formatting"""
@staticmethod
def not_found(resource_type: str, identifier: str) -> str:
"""Format not found error"""
return f"No {resource_type} found with identifier: {identifier}"
@staticmethod
def no_results(query: str, time_period: str = "") -> str:
"""Format no results error"""
time_str = f" {time_period}" if time_period else ""
return f"No results found for query: {query}{time_str}"
@staticmethod
def validation_error(field: str, issue: str) -> str:
"""Format validation error"""
return f"Validation error: {field} - {issue}"
@staticmethod
def api_error(service: str, status_code: int) -> str:
"""Format API error"""
return f"Error: {service} API returned status {status_code}"
def create_citation(
identifier: str,
identifier_type: str,
url: Optional[str] = None
) -> str:
"""
Create a formatted citation string
Args:
identifier: Citation identifier (PMID, DOI, NCT ID)
identifier_type: Type of identifier
url: Optional URL
Returns:
Formatted citation
"""
citation = f"{identifier_type}: {identifier}"
if url:
citation += f" | URL: {url}"
return citation