ALSARA / servers /pubmed_server.py
axegameon's picture
Upload ALSARA app files (#1)
3e435ad verified
# pubmed_server.py
from mcp.server.fastmcp import FastMCP
import httpx
import logging
import sys
from pathlib import Path
# Add parent directory to path for shared imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from shared import (
config,
RateLimiter,
format_authors,
ErrorFormatter,
truncate_text
)
from shared.http_client import get_http_client
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Create FastMCP server
mcp = FastMCP("pubmed-server")
# Rate limiting using shared utility
rate_limiter = RateLimiter(config.rate_limits.pubmed_delay)
@mcp.tool()
async def search_pubmed(
query: str,
max_results: int = 10,
sort: str = "relevance"
) -> str:
"""Search PubMed for ALS research papers. Returns titles, abstracts, PMIDs, and publication dates.
Args:
query: Search query (e.g., 'ALS SOD1 therapy')
max_results: Maximum number of results (default: 10)
sort: Sort order - 'relevance' or 'date' (default: 'relevance')
"""
try:
logger.info(f"Searching PubMed for: {query}")
# PubMed E-utilities API (no auth required)
base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils"
# Rate limiting
await rate_limiter.wait()
# Step 1: Search for PMIDs
search_params = {
"db": "pubmed",
"term": query,
"retmax": max_results,
"retmode": "json",
"sort": sort
}
# Use shared HTTP client for connection pooling
client = get_http_client(timeout=config.api.timeout)
# Get PMIDs
search_resp = await client.get(f"{base_url}/esearch.fcgi", params=search_params)
search_resp.raise_for_status()
search_data = search_resp.json()
pmids = search_data.get("esearchresult", {}).get("idlist", [])
if not pmids:
logger.info(f"No results found for query: {query}")
return ErrorFormatter.no_results(query)
# Rate limiting
await rate_limiter.wait()
# Step 2: Fetch details for PMIDs
fetch_params = {
"db": "pubmed",
"id": ",".join(pmids),
"retmode": "xml"
}
fetch_resp = await client.get(f"{base_url}/efetch.fcgi", params=fetch_params)
fetch_resp.raise_for_status()
# Parse XML and extract key info
papers = parse_pubmed_xml(fetch_resp.text)
result = f"Found {len(papers)} papers for query: '{query}'\n\n"
for i, paper in enumerate(papers, 1):
result += f"{i}. **{paper['title']}**\n"
result += f" PMID: {paper['pmid']} | Published: {paper['date']}\n"
result += f" Authors: {paper['authors']}\n"
result += f" URL: https://pubmed.ncbi.nlm.nih.gov/{paper['pmid']}/\n"
result += f" Abstract: {truncate_text(paper['abstract'], max_chars=300, suffix='')}...\n\n"
logger.info(f"Successfully retrieved {len(papers)} papers")
return result
except httpx.TimeoutException:
logger.error("PubMed API request timed out")
return "Error: PubMed API request timed out. Please try again."
except httpx.HTTPStatusError as e:
logger.error(f"PubMed API error: {e}")
return f"Error: PubMed API returned status {e.response.status_code}"
except Exception as e:
logger.error(f"Unexpected error in search_pubmed: {e}")
return f"Error: {str(e)}"
@mcp.tool()
async def get_paper_details(pmid: str) -> str:
"""Get full details for a specific PubMed paper by PMID.
Args:
pmid: PubMed ID
"""
try:
logger.info(f"Fetching details for PMID: {pmid}")
base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils"
# Rate limiting
await rate_limiter.wait()
fetch_params = {
"db": "pubmed",
"id": pmid,
"retmode": "xml"
}
# Use shared HTTP client for connection pooling
client = get_http_client(timeout=config.api.timeout)
fetch_resp = await client.get(f"{base_url}/efetch.fcgi", params=fetch_params)
fetch_resp.raise_for_status()
papers = parse_pubmed_xml(fetch_resp.text)
if not papers:
return ErrorFormatter.not_found("paper", pmid)
paper = papers[0]
# Format detailed response
result = f"**{paper['title']}**\n\n"
result += f"**PMID:** {paper['pmid']}\n"
result += f"**Published:** {paper['date']}\n"
result += f"**Authors:** {paper['authors']}\n\n"
result += f"**Abstract:**\n{paper['abstract']}\n\n"
result += f"**Journal:** {paper.get('journal', 'N/A')}\n"
result += f"**DOI:** {paper.get('doi', 'N/A')}\n"
result += f"**PubMed URL:** https://pubmed.ncbi.nlm.nih.gov/{pmid}/\n"
logger.info(f"Successfully retrieved details for PMID: {pmid}")
return result
except httpx.TimeoutException:
logger.error("PubMed API request timed out")
return "Error: PubMed API request timed out. Please try again."
except httpx.HTTPStatusError as e:
logger.error(f"PubMed API error: {e}")
return f"Error: PubMed API returned status {e.response.status_code}"
except Exception as e:
logger.error(f"Unexpected error in get_paper_details: {e}")
return f"Error: {str(e)}"
def parse_pubmed_xml(xml_text: str) -> list[dict]:
"""Parse PubMed XML response into structured data with error handling"""
import xml.etree.ElementTree as ET
papers = []
try:
root = ET.fromstring(xml_text)
except ET.ParseError as e:
logger.error(f"XML parsing error: {e}")
return papers
for article in root.findall(".//PubmedArticle"):
try:
# Extract title
title_elem = article.find(".//ArticleTitle")
title = "".join(title_elem.itertext()) if title_elem is not None else "No title"
# Extract abstract (may have multiple AbstractText elements)
abstract_parts = []
for abstract_elem in article.findall(".//AbstractText"):
if abstract_elem is not None and abstract_elem.text:
label = abstract_elem.get("Label", "")
text = "".join(abstract_elem.itertext())
if label:
abstract_parts.append(f"{label}: {text}")
else:
abstract_parts.append(text)
abstract = " ".join(abstract_parts) if abstract_parts else "No abstract available"
# Extract PMID
pmid_elem = article.find(".//PMID")
pmid = pmid_elem.text if pmid_elem is not None else "Unknown"
# Extract date - correct path in MedlineCitation
pub_date = article.find(".//MedlineCitation/Article/Journal/JournalIssue/PubDate")
if pub_date is not None:
year_elem = pub_date.find("Year")
month_elem = pub_date.find("Month")
year = year_elem.text if year_elem is not None else "Unknown"
month = month_elem.text if month_elem is not None else ""
date_str = f"{month} {year}" if month else year
else:
# Try alternative date location
date_completed = article.find(".//DateCompleted")
if date_completed is not None:
year_elem = date_completed.find("Year")
year = year_elem.text if year_elem is not None else "Unknown"
date_str = year
else:
date_str = "Unknown"
# Extract authors
authors = []
for author in article.findall(".//Author"):
last = author.find("LastName")
first = author.find("ForeName")
collective = author.find("CollectiveName")
if collective is not None and collective.text:
authors.append(collective.text)
elif last is not None and first is not None:
authors.append(f"{first.text} {last.text}")
elif last is not None:
authors.append(last.text)
# Format authors using shared utility
authors_str = format_authors("; ".join(authors), max_authors=3) if authors else "Unknown authors"
# Extract journal name
journal_elem = article.find(".//Journal/Title")
journal = journal_elem.text if journal_elem is not None else "Unknown"
# Extract DOI
doi = None
for article_id in article.findall(".//ArticleId"):
if article_id.get("IdType") == "doi":
doi = article_id.text
break
papers.append({
"title": title,
"abstract": abstract,
"pmid": pmid,
"date": date_str,
"authors": authors_str,
"journal": journal,
"doi": doi or "N/A"
})
except Exception as e:
logger.warning(f"Error parsing article: {e}")
continue
return papers
if __name__ == "__main__":
# Run with stdio transport
mcp.run(transport="stdio")