Spaces:
Running
Running
| # shared/utils.py | |
| """Shared utilities for MCP servers""" | |
| import asyncio | |
| import time | |
| import logging | |
| from typing import Optional, Callable, Any | |
| from mcp.types import TextContent | |
| import httpx | |
| logger = logging.getLogger(__name__) | |
| class RateLimiter: | |
| """Rate limiter for API calls""" | |
| def __init__(self, delay: float): | |
| """ | |
| Initialize rate limiter | |
| Args: | |
| delay: Minimum delay between requests in seconds | |
| """ | |
| self.delay = delay | |
| self.last_request_time: Optional[float] = None | |
| async def wait(self) -> None: | |
| """Wait if necessary to respect rate limit""" | |
| if self.last_request_time is not None: | |
| elapsed = time.time() - self.last_request_time | |
| if elapsed < self.delay: | |
| await asyncio.sleep(self.delay - elapsed) | |
| self.last_request_time = time.time() | |
| async def safe_api_call( | |
| func: Callable, | |
| *args: Any, | |
| timeout: float = 30.0, | |
| error_prefix: str = "API", | |
| **kwargs: Any | |
| ) -> list[TextContent]: | |
| """ | |
| Safely execute an API call with comprehensive error handling | |
| Args: | |
| func: Async function to call | |
| *args: Positional arguments for func | |
| timeout: Timeout in seconds | |
| error_prefix: Prefix for error messages | |
| **kwargs: Keyword arguments for func | |
| Returns: | |
| list[TextContent]: Result or error message | |
| """ | |
| try: | |
| return await asyncio.wait_for(func(*args, **kwargs), timeout=timeout) | |
| except asyncio.TimeoutError: | |
| logger.error(f"{error_prefix} request timed out after {timeout}s") | |
| return [TextContent( | |
| type="text", | |
| text=f"Error: {error_prefix} request timed out after {timeout} seconds. Please try again." | |
| )] | |
| except httpx.TimeoutException: | |
| logger.error(f"{error_prefix} request timed out") | |
| return [TextContent( | |
| type="text", | |
| text=f"Error: {error_prefix} request timed out. Please try again." | |
| )] | |
| except httpx.HTTPStatusError as e: | |
| logger.error(f"{error_prefix} error: HTTP {e.response.status_code}") | |
| return [TextContent( | |
| type="text", | |
| text=f"Error: {error_prefix} returned status {e.response.status_code}" | |
| )] | |
| except httpx.RequestError as e: | |
| logger.error(f"{error_prefix} request error: {e}") | |
| return [TextContent( | |
| type="text", | |
| text=f"Error: Failed to connect to {error_prefix}. Please check your connection." | |
| )] | |
| except Exception as e: | |
| logger.error(f"Unexpected error in {error_prefix}: {e}", exc_info=True) | |
| return [TextContent( | |
| type="text", | |
| text=f"Error: {str(e)}" | |
| )] | |
| def truncate_text(text: str, max_chars: int = 8000, suffix: str = "...") -> str: | |
| """ | |
| Truncate text to maximum length with suffix | |
| Args: | |
| text: Text to truncate | |
| max_chars: Maximum character count | |
| suffix: Suffix to add when truncated | |
| Returns: | |
| Truncated text | |
| """ | |
| if len(text) <= max_chars: | |
| return text | |
| return text[:max_chars] + f"\n\n[Content truncated at {max_chars} characters]{suffix}" | |
| def format_authors(authors: str, max_authors: int = 3) -> str: | |
| """ | |
| Format author list with et al. if needed | |
| Args: | |
| authors: Semicolon-separated author list | |
| max_authors: Maximum authors to show | |
| Returns: | |
| Formatted author string | |
| """ | |
| if not authors or authors == "Unknown": | |
| return "Unknown authors" | |
| author_list = [a.strip() for a in authors.split(";")] | |
| if len(author_list) <= max_authors: | |
| return ", ".join(author_list) | |
| return ", ".join(author_list[:max_authors]) + " et al." | |
| def clean_whitespace(text: str) -> str: | |
| """ | |
| Clean up excessive whitespace in text | |
| Args: | |
| text: Text to clean | |
| Returns: | |
| Cleaned text | |
| """ | |
| lines = (line.strip() for line in text.splitlines()) | |
| chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
| return '\n'.join(chunk for chunk in chunks if chunk) | |
| class ErrorFormatter: | |
| """Consistent error message formatting""" | |
| def not_found(resource_type: str, identifier: str) -> str: | |
| """Format not found error""" | |
| return f"No {resource_type} found with identifier: {identifier}" | |
| def no_results(query: str, time_period: str = "") -> str: | |
| """Format no results error""" | |
| time_str = f" {time_period}" if time_period else "" | |
| return f"No results found for query: {query}{time_str}" | |
| def validation_error(field: str, issue: str) -> str: | |
| """Format validation error""" | |
| return f"Validation error: {field} - {issue}" | |
| def api_error(service: str, status_code: int) -> str: | |
| """Format API error""" | |
| return f"Error: {service} API returned status {status_code}" | |
| def create_citation( | |
| identifier: str, | |
| identifier_type: str, | |
| url: Optional[str] = None | |
| ) -> str: | |
| """ | |
| Create a formatted citation string | |
| Args: | |
| identifier: Citation identifier (PMID, DOI, NCT ID) | |
| identifier_type: Type of identifier | |
| url: Optional URL | |
| Returns: | |
| Formatted citation | |
| """ | |
| citation = f"{identifier_type}: {identifier}" | |
| if url: | |
| citation += f" | URL: {url}" | |
| return citation | |