Spaces:
Running
Running
| # fetch_server.py | |
| from mcp.server.fastmcp import FastMCP | |
| import httpx | |
| from bs4 import BeautifulSoup | |
| from urllib.parse import urlparse | |
| import logging | |
| import sys | |
| from pathlib import Path | |
| # Add parent directory to path for shared imports | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| from shared import ( | |
| config, | |
| clean_whitespace, | |
| truncate_text | |
| ) | |
| from shared.http_client import get_http_client | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| mcp = FastMCP("fetch-server") | |
| def validate_url(url: str) -> tuple[bool, str]: | |
| """Validate URL for security concerns. Returns (is_valid, error_message)""" | |
| try: | |
| parsed = urlparse(url) | |
| # Check scheme using shared config | |
| if parsed.scheme not in config.security.allowed_schemes: | |
| return False, f"Invalid URL scheme. Only {', '.join(config.security.allowed_schemes)} are allowed." | |
| # Check for blocked hosts (SSRF protection) | |
| hostname = parsed.hostname | |
| if not hostname: | |
| return False, "Invalid URL: no hostname found." | |
| # Use shared security config for SSRF checks | |
| if config.security.is_private_ip(hostname): | |
| return False, "Access to localhost/private IPs is not allowed." | |
| return True, "" | |
| except Exception as e: | |
| return False, f"Invalid URL: {str(e)}" | |
| def parse_clinical_trial_page(soup: BeautifulSoup, url: str) -> str: | |
| """Parse ClinicalTrials.gov trial detail page for structured data.""" | |
| # Check if this is a ClinicalTrials.gov page | |
| if "clinicaltrials.gov" not in url.lower(): | |
| return None | |
| # Extract NCT ID from URL | |
| import re | |
| nct_match = re.search(r'NCT\d{8}', url) | |
| nct_id = nct_match.group() if nct_match else "Unknown" | |
| # Try to extract key trial information | |
| trial_info = [] | |
| trial_info.append(f"**NCT ID:** {nct_id}") | |
| trial_info.append(f"**URL:** {url}") | |
| # Look for title | |
| title = soup.find('h1') | |
| if title: | |
| trial_info.append(f"**Title:** {title.get_text(strip=True)}") | |
| # Look for status (various patterns) | |
| status_patterns = [ | |
| soup.find('span', string=re.compile(r'Recruiting|Active|Completed|Enrolling', re.I)), | |
| soup.find('div', string=re.compile(r'Recruitment Status', re.I)) | |
| ] | |
| for pattern in status_patterns: | |
| if pattern: | |
| status_text = pattern.get_text(strip=True) if hasattr(pattern, 'get_text') else str(pattern) | |
| trial_info.append(f"**Status:** {status_text}") | |
| break | |
| # Look for study description | |
| desc_section = soup.find('div', {'class': re.compile('description', re.I)}) | |
| if desc_section: | |
| desc_text = desc_section.get_text(strip=True)[:500] | |
| trial_info.append(f"**Description:** {desc_text}...") | |
| # Look for conditions | |
| conditions = soup.find_all(string=re.compile(r'Condition', re.I)) | |
| if conditions: | |
| for cond in conditions[:1]: # Just first mention | |
| parent = cond.parent | |
| if parent: | |
| trial_info.append(f"**Condition:** {parent.get_text(strip=True)[:200]}") | |
| break | |
| # Look for interventions | |
| interventions = soup.find_all(string=re.compile(r'Intervention', re.I)) | |
| if interventions: | |
| for inter in interventions[:1]: # Just first mention | |
| parent = inter.parent | |
| if parent: | |
| trial_info.append(f"**Intervention:** {parent.get_text(strip=True)[:200]}") | |
| break | |
| # Look for sponsor | |
| sponsor = soup.find(string=re.compile(r'Sponsor', re.I)) | |
| if sponsor and sponsor.parent: | |
| trial_info.append(f"**Sponsor:** {sponsor.parent.get_text(strip=True)[:100]}") | |
| # Locations/Sites | |
| locations = soup.find_all(string=re.compile(r'Location|Site', re.I)) | |
| if locations: | |
| location_texts = [] | |
| for loc in locations[:3]: # First 3 locations | |
| if loc.parent: | |
| location_texts.append(loc.parent.get_text(strip=True)[:50]) | |
| if location_texts: | |
| trial_info.append(f"**Locations:** {', '.join(location_texts)}") | |
| if len(trial_info) > 2: # If we found meaningful data | |
| return "\n\n".join(trial_info) + "\n\n**Note:** This is extracted from the trial webpage. Some details may be incomplete due to page structure variations." | |
| return None | |
| async def fetch_url(url: str, extract_text_only: bool = True) -> str: | |
| """Fetch content from a URL (paper abstract page, news article, etc.). | |
| Args: | |
| url: URL to fetch | |
| extract_text_only: Extract only main text content (default: True) | |
| """ | |
| try: | |
| logger.info(f"Fetching URL: {url}") | |
| # Validate URL | |
| is_valid, error_msg = validate_url(url) | |
| if not is_valid: | |
| logger.warning(f"URL validation failed: {error_msg}") | |
| return f"Error: {error_msg}" | |
| # Use shared HTTP client for connection pooling | |
| client = get_http_client(timeout=config.api.timeout) | |
| response = await client.get(url, headers={ | |
| "User-Agent": config.api.user_agent | |
| }) | |
| response.raise_for_status() | |
| # Check content size using shared config | |
| content_length = response.headers.get('content-length') | |
| if content_length and int(content_length) > config.content_limits.max_content_size: | |
| logger.warning(f"Content too large: {content_length} bytes") | |
| return f"Error: Content size ({content_length} bytes) exceeds maximum allowed size of {config.content_limits.max_content_size} bytes" | |
| # Check actual content size | |
| if len(response.content) > config.content_limits.max_content_size: | |
| logger.warning(f"Content too large: {len(response.content)} bytes") | |
| return f"Error: Content size exceeds maximum allowed size of {config.content_limits.max_content_size} bytes" | |
| if extract_text_only: | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Check if this is a clinical trial page and try enhanced parsing | |
| trial_data = parse_clinical_trial_page(soup, url) | |
| if trial_data: | |
| logger.info(f"Successfully parsed clinical trial page: {url}") | |
| return trial_data | |
| # Otherwise, do standard text extraction | |
| # Remove script and style elements | |
| for script in soup(["script", "style", "meta", "link"]): | |
| script.decompose() | |
| # Get text | |
| text = soup.get_text() | |
| # Clean up whitespace using shared utility | |
| text = clean_whitespace(text) | |
| # Limit to reasonable size for LLM context using shared utility | |
| text = truncate_text(text, max_chars=config.content_limits.max_text_chars) | |
| logger.info(f"Successfully fetched and extracted text from {url}") | |
| return text | |
| else: | |
| # Return raw HTML, but still limit size using shared utility | |
| html = truncate_text(response.text, max_chars=config.content_limits.max_text_chars) | |
| logger.info(f"Successfully fetched raw HTML from {url}") | |
| return html | |
| except httpx.TimeoutException: | |
| logger.error(f"Request to {url} timed out") | |
| return f"Error: Request timed out after {config.api.timeout} seconds" | |
| except httpx.HTTPStatusError as e: | |
| logger.error(f"HTTP error fetching {url}: {e}") | |
| return f"Error: HTTP {e.response.status_code} - {e.response.reason_phrase}" | |
| except httpx.RequestError as e: | |
| logger.error(f"Request error fetching {url}: {e}") | |
| return f"Error: Failed to fetch URL - {str(e)}" | |
| except Exception as e: | |
| logger.error(f"Unexpected error fetching {url}: {e}") | |
| return f"Error: {str(e)}" | |
| if __name__ == "__main__": | |
| mcp.run(transport="stdio") | |