Spaces:
Running
Running
File size: 7,867 Bytes
3e435ad |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# fetch_server.py
from mcp.server.fastmcp import FastMCP
import httpx
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import logging
import sys
from pathlib import Path
# Add parent directory to path for shared imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from shared import (
config,
clean_whitespace,
truncate_text
)
from shared.http_client import get_http_client
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
mcp = FastMCP("fetch-server")
def validate_url(url: str) -> tuple[bool, str]:
"""Validate URL for security concerns. Returns (is_valid, error_message)"""
try:
parsed = urlparse(url)
# Check scheme using shared config
if parsed.scheme not in config.security.allowed_schemes:
return False, f"Invalid URL scheme. Only {', '.join(config.security.allowed_schemes)} are allowed."
# Check for blocked hosts (SSRF protection)
hostname = parsed.hostname
if not hostname:
return False, "Invalid URL: no hostname found."
# Use shared security config for SSRF checks
if config.security.is_private_ip(hostname):
return False, "Access to localhost/private IPs is not allowed."
return True, ""
except Exception as e:
return False, f"Invalid URL: {str(e)}"
def parse_clinical_trial_page(soup: BeautifulSoup, url: str) -> str:
"""Parse ClinicalTrials.gov trial detail page for structured data."""
# Check if this is a ClinicalTrials.gov page
if "clinicaltrials.gov" not in url.lower():
return None
# Extract NCT ID from URL
import re
nct_match = re.search(r'NCT\d{8}', url)
nct_id = nct_match.group() if nct_match else "Unknown"
# Try to extract key trial information
trial_info = []
trial_info.append(f"**NCT ID:** {nct_id}")
trial_info.append(f"**URL:** {url}")
# Look for title
title = soup.find('h1')
if title:
trial_info.append(f"**Title:** {title.get_text(strip=True)}")
# Look for status (various patterns)
status_patterns = [
soup.find('span', string=re.compile(r'Recruiting|Active|Completed|Enrolling', re.I)),
soup.find('div', string=re.compile(r'Recruitment Status', re.I))
]
for pattern in status_patterns:
if pattern:
status_text = pattern.get_text(strip=True) if hasattr(pattern, 'get_text') else str(pattern)
trial_info.append(f"**Status:** {status_text}")
break
# Look for study description
desc_section = soup.find('div', {'class': re.compile('description', re.I)})
if desc_section:
desc_text = desc_section.get_text(strip=True)[:500]
trial_info.append(f"**Description:** {desc_text}...")
# Look for conditions
conditions = soup.find_all(string=re.compile(r'Condition', re.I))
if conditions:
for cond in conditions[:1]: # Just first mention
parent = cond.parent
if parent:
trial_info.append(f"**Condition:** {parent.get_text(strip=True)[:200]}")
break
# Look for interventions
interventions = soup.find_all(string=re.compile(r'Intervention', re.I))
if interventions:
for inter in interventions[:1]: # Just first mention
parent = inter.parent
if parent:
trial_info.append(f"**Intervention:** {parent.get_text(strip=True)[:200]}")
break
# Look for sponsor
sponsor = soup.find(string=re.compile(r'Sponsor', re.I))
if sponsor and sponsor.parent:
trial_info.append(f"**Sponsor:** {sponsor.parent.get_text(strip=True)[:100]}")
# Locations/Sites
locations = soup.find_all(string=re.compile(r'Location|Site', re.I))
if locations:
location_texts = []
for loc in locations[:3]: # First 3 locations
if loc.parent:
location_texts.append(loc.parent.get_text(strip=True)[:50])
if location_texts:
trial_info.append(f"**Locations:** {', '.join(location_texts)}")
if len(trial_info) > 2: # If we found meaningful data
return "\n\n".join(trial_info) + "\n\n**Note:** This is extracted from the trial webpage. Some details may be incomplete due to page structure variations."
return None
@mcp.tool()
async def fetch_url(url: str, extract_text_only: bool = True) -> str:
"""Fetch content from a URL (paper abstract page, news article, etc.).
Args:
url: URL to fetch
extract_text_only: Extract only main text content (default: True)
"""
try:
logger.info(f"Fetching URL: {url}")
# Validate URL
is_valid, error_msg = validate_url(url)
if not is_valid:
logger.warning(f"URL validation failed: {error_msg}")
return f"Error: {error_msg}"
# Use shared HTTP client for connection pooling
client = get_http_client(timeout=config.api.timeout)
response = await client.get(url, headers={
"User-Agent": config.api.user_agent
})
response.raise_for_status()
# Check content size using shared config
content_length = response.headers.get('content-length')
if content_length and int(content_length) > config.content_limits.max_content_size:
logger.warning(f"Content too large: {content_length} bytes")
return f"Error: Content size ({content_length} bytes) exceeds maximum allowed size of {config.content_limits.max_content_size} bytes"
# Check actual content size
if len(response.content) > config.content_limits.max_content_size:
logger.warning(f"Content too large: {len(response.content)} bytes")
return f"Error: Content size exceeds maximum allowed size of {config.content_limits.max_content_size} bytes"
if extract_text_only:
soup = BeautifulSoup(response.text, 'html.parser')
# Check if this is a clinical trial page and try enhanced parsing
trial_data = parse_clinical_trial_page(soup, url)
if trial_data:
logger.info(f"Successfully parsed clinical trial page: {url}")
return trial_data
# Otherwise, do standard text extraction
# Remove script and style elements
for script in soup(["script", "style", "meta", "link"]):
script.decompose()
# Get text
text = soup.get_text()
# Clean up whitespace using shared utility
text = clean_whitespace(text)
# Limit to reasonable size for LLM context using shared utility
text = truncate_text(text, max_chars=config.content_limits.max_text_chars)
logger.info(f"Successfully fetched and extracted text from {url}")
return text
else:
# Return raw HTML, but still limit size using shared utility
html = truncate_text(response.text, max_chars=config.content_limits.max_text_chars)
logger.info(f"Successfully fetched raw HTML from {url}")
return html
except httpx.TimeoutException:
logger.error(f"Request to {url} timed out")
return f"Error: Request timed out after {config.api.timeout} seconds"
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error fetching {url}: {e}")
return f"Error: HTTP {e.response.status_code} - {e.response.reason_phrase}"
except httpx.RequestError as e:
logger.error(f"Request error fetching {url}: {e}")
return f"Error: Failed to fetch URL - {str(e)}"
except Exception as e:
logger.error(f"Unexpected error fetching {url}: {e}")
return f"Error: {str(e)}"
if __name__ == "__main__":
mcp.run(transport="stdio")
|