File size: 7,867 Bytes
3e435ad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# fetch_server.py
from mcp.server.fastmcp import FastMCP
import httpx
from bs4 import BeautifulSoup
from urllib.parse import urlparse
import logging
import sys
from pathlib import Path

# Add parent directory to path for shared imports
sys.path.insert(0, str(Path(__file__).parent.parent))

from shared import (
    config,
    clean_whitespace,
    truncate_text
)
from shared.http_client import get_http_client

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

mcp = FastMCP("fetch-server")

def validate_url(url: str) -> tuple[bool, str]:
    """Validate URL for security concerns. Returns (is_valid, error_message)"""
    try:
        parsed = urlparse(url)

        # Check scheme using shared config
        if parsed.scheme not in config.security.allowed_schemes:
            return False, f"Invalid URL scheme. Only {', '.join(config.security.allowed_schemes)} are allowed."

        # Check for blocked hosts (SSRF protection)
        hostname = parsed.hostname
        if not hostname:
            return False, "Invalid URL: no hostname found."

        # Use shared security config for SSRF checks
        if config.security.is_private_ip(hostname):
            return False, "Access to localhost/private IPs is not allowed."

        return True, ""

    except Exception as e:
        return False, f"Invalid URL: {str(e)}"

def parse_clinical_trial_page(soup: BeautifulSoup, url: str) -> str:
    """Parse ClinicalTrials.gov trial detail page for structured data."""
    # Check if this is a ClinicalTrials.gov page
    if "clinicaltrials.gov" not in url.lower():
        return None

    # Extract NCT ID from URL
    import re
    nct_match = re.search(r'NCT\d{8}', url)
    nct_id = nct_match.group() if nct_match else "Unknown"

    # Try to extract key trial information
    trial_info = []
    trial_info.append(f"**NCT ID:** {nct_id}")
    trial_info.append(f"**URL:** {url}")

    # Look for title
    title = soup.find('h1')
    if title:
        trial_info.append(f"**Title:** {title.get_text(strip=True)}")

    # Look for status (various patterns)
    status_patterns = [
        soup.find('span', string=re.compile(r'Recruiting|Active|Completed|Enrolling', re.I)),
        soup.find('div', string=re.compile(r'Recruitment Status', re.I))
    ]
    for pattern in status_patterns:
        if pattern:
            status_text = pattern.get_text(strip=True) if hasattr(pattern, 'get_text') else str(pattern)
            trial_info.append(f"**Status:** {status_text}")
            break

    # Look for study description
    desc_section = soup.find('div', {'class': re.compile('description', re.I)})
    if desc_section:
        desc_text = desc_section.get_text(strip=True)[:500]
        trial_info.append(f"**Description:** {desc_text}...")

    # Look for conditions
    conditions = soup.find_all(string=re.compile(r'Condition', re.I))
    if conditions:
        for cond in conditions[:1]:  # Just first mention
            parent = cond.parent
            if parent:
                trial_info.append(f"**Condition:** {parent.get_text(strip=True)[:200]}")
                break

    # Look for interventions
    interventions = soup.find_all(string=re.compile(r'Intervention', re.I))
    if interventions:
        for inter in interventions[:1]:  # Just first mention
            parent = inter.parent
            if parent:
                trial_info.append(f"**Intervention:** {parent.get_text(strip=True)[:200]}")
                break

    # Look for sponsor
    sponsor = soup.find(string=re.compile(r'Sponsor', re.I))
    if sponsor and sponsor.parent:
        trial_info.append(f"**Sponsor:** {sponsor.parent.get_text(strip=True)[:100]}")

    # Locations/Sites
    locations = soup.find_all(string=re.compile(r'Location|Site', re.I))
    if locations:
        location_texts = []
        for loc in locations[:3]:  # First 3 locations
            if loc.parent:
                location_texts.append(loc.parent.get_text(strip=True)[:50])
        if location_texts:
            trial_info.append(f"**Locations:** {', '.join(location_texts)}")

    if len(trial_info) > 2:  # If we found meaningful data
        return "\n\n".join(trial_info) + "\n\n**Note:** This is extracted from the trial webpage. Some details may be incomplete due to page structure variations."

    return None

@mcp.tool()
async def fetch_url(url: str, extract_text_only: bool = True) -> str:
    """Fetch content from a URL (paper abstract page, news article, etc.).

    Args:
        url: URL to fetch
        extract_text_only: Extract only main text content (default: True)
    """
    try:
        logger.info(f"Fetching URL: {url}")

        # Validate URL
        is_valid, error_msg = validate_url(url)
        if not is_valid:
            logger.warning(f"URL validation failed: {error_msg}")
            return f"Error: {error_msg}"

        # Use shared HTTP client for connection pooling
        client = get_http_client(timeout=config.api.timeout)
        response = await client.get(url, headers={
            "User-Agent": config.api.user_agent
        })
        response.raise_for_status()

        # Check content size using shared config
        content_length = response.headers.get('content-length')
        if content_length and int(content_length) > config.content_limits.max_content_size:
            logger.warning(f"Content too large: {content_length} bytes")
            return f"Error: Content size ({content_length} bytes) exceeds maximum allowed size of {config.content_limits.max_content_size} bytes"

        # Check actual content size
        if len(response.content) > config.content_limits.max_content_size:
            logger.warning(f"Content too large: {len(response.content)} bytes")
            return f"Error: Content size exceeds maximum allowed size of {config.content_limits.max_content_size} bytes"

        if extract_text_only:
            soup = BeautifulSoup(response.text, 'html.parser')

            # Check if this is a clinical trial page and try enhanced parsing
            trial_data = parse_clinical_trial_page(soup, url)
            if trial_data:
                logger.info(f"Successfully parsed clinical trial page: {url}")
                return trial_data

            # Otherwise, do standard text extraction
            # Remove script and style elements
            for script in soup(["script", "style", "meta", "link"]):
                script.decompose()

            # Get text
            text = soup.get_text()

            # Clean up whitespace using shared utility
            text = clean_whitespace(text)

            # Limit to reasonable size for LLM context using shared utility
            text = truncate_text(text, max_chars=config.content_limits.max_text_chars)

            logger.info(f"Successfully fetched and extracted text from {url}")
            return text
        else:
            # Return raw HTML, but still limit size using shared utility
            html = truncate_text(response.text, max_chars=config.content_limits.max_text_chars)

            logger.info(f"Successfully fetched raw HTML from {url}")
            return html

    except httpx.TimeoutException:
        logger.error(f"Request to {url} timed out")
        return f"Error: Request timed out after {config.api.timeout} seconds"
    except httpx.HTTPStatusError as e:
        logger.error(f"HTTP error fetching {url}: {e}")
        return f"Error: HTTP {e.response.status_code} - {e.response.reason_phrase}"
    except httpx.RequestError as e:
        logger.error(f"Request error fetching {url}: {e}")
        return f"Error: Failed to fetch URL - {str(e)}"
    except Exception as e:
        logger.error(f"Unexpected error fetching {url}: {e}")
        return f"Error: {str(e)}"

if __name__ == "__main__":
    mcp.run(transport="stdio")