Krishna Chaitanya Cheedella
Fix: Update HuggingFace API to use router.huggingface.co (new endpoint)
5eb2461
"""API client for HuggingFace Inference API and OpenAI."""
import httpx
import asyncio
from typing import List, Dict, Any, Optional
from .config_free import (
OPENAI_API_KEY,
HUGGINGFACE_API_KEY,
DEFAULT_TIMEOUT,
MAX_RETRIES,
RETRY_DELAY
)
async def query_openai_model(
model: str,
messages: List[Dict[str, str]],
timeout: float = DEFAULT_TIMEOUT,
max_retries: int = MAX_RETRIES
) -> Optional[Dict[str, Any]]:
"""
Query an OpenAI model.
Args:
model: OpenAI model name (e.g., "gpt-4o-mini")
messages: List of message dicts with 'role' and 'content'
timeout: Request timeout in seconds
max_retries: Maximum retry attempts
Returns:
Response dict with 'content', or None if failed
"""
headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": messages,
"temperature": 0.7,
}
for attempt in range(max_retries + 1):
try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=payload
)
response.raise_for_status()
data = response.json()
content = data["choices"][0]["message"]["content"]
return {"content": content}
except httpx.TimeoutException as e:
print(f"⏱️ Timeout querying OpenAI {model} (attempt {attempt + 1}/{max_retries + 1})")
if attempt < max_retries:
await asyncio.sleep(RETRY_DELAY * (attempt + 1))
continue
return None
except httpx.HTTPStatusError as e:
print(f"🚫 HTTP error querying OpenAI {model}: {e.response.status_code}")
if 400 <= e.response.status_code < 500:
return None
if attempt < max_retries:
await asyncio.sleep(RETRY_DELAY * (attempt + 1))
continue
return None
except Exception as e:
print(f"❌ Error querying OpenAI {model}: {e}")
if attempt < max_retries:
await asyncio.sleep(RETRY_DELAY)
continue
return None
return None
async def query_huggingface_model(
model: str,
messages: List[Dict[str, str]],
timeout: float = DEFAULT_TIMEOUT,
max_retries: int = MAX_RETRIES
) -> Optional[Dict[str, Any]]:
"""
Query a HuggingFace model via Router (FREE).
Args:
model: HuggingFace model ID (e.g., "meta-llama/Llama-3.3-70B-Instruct")
messages: List of message dicts with 'role' and 'content'
timeout: Request timeout in seconds
max_retries: Maximum retry attempts
Returns:
Response dict with 'content', or None if failed
"""
headers = {
"Authorization": f"Bearer {HUGGINGFACE_API_KEY}",
"Content-Type": "application/json",
}
# Use OpenAI-compatible format for HuggingFace Router
payload = {
"model": model,
"messages": messages,
"max_tokens": 2048,
"temperature": 0.7,
"top_p": 0.9,
}
# Updated to use router.huggingface.co (new endpoint)
api_url = "https://router.huggingface.co/v1/chat/completions"
for attempt in range(max_retries + 1):
try:
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(api_url, headers=headers, json=payload)
response.raise_for_status()
data = response.json()
# Parse OpenAI-compatible response format
if "choices" in data and len(data["choices"]) > 0:
content = data["choices"][0]["message"]["content"]
return {"content": content}
else:
print(f"❌ Unexpected response format from HF {model}: {data}")
return None
except httpx.TimeoutException as e:
print(f"⏱️ Timeout querying HF {model} (attempt {attempt + 1}/{max_retries + 1})")
if attempt < max_retries:
await asyncio.sleep(RETRY_DELAY * (attempt + 1))
continue
return None
except httpx.HTTPStatusError as e:
error_msg = e.response.text
print(f"🚫 HTTP {e.response.status_code} querying HF {model}: {error_msg[:200]}")
# Model is loading - retry with longer delay
if "loading" in error_msg.lower() or "warming up" in error_msg.lower():
print(f"⏳ Model is loading, waiting 20s...")
await asyncio.sleep(20)
if attempt < max_retries:
continue
# Don't retry on client errors (except loading)
if 400 <= e.response.status_code < 500:
return None
if attempt < max_retries:
await asyncio.sleep(RETRY_DELAY * (attempt + 1))
continue
return None
except Exception as e:
print(f"❌ Error querying HF {model}: {e}")
if attempt < max_retries:
await asyncio.sleep(RETRY_DELAY)
continue
return None
return None
async def query_model(
model_config: Dict[str, str],
messages: List[Dict[str, str]],
timeout: float = DEFAULT_TIMEOUT
) -> Optional[Dict[str, Any]]:
"""
Query a model based on its configuration (provider-agnostic).
Args:
model_config: Dict with 'provider' and 'model' keys
messages: List of message dicts
timeout: Request timeout
Returns:
Response dict or None
"""
provider = model_config["provider"]
model = model_config["model"]
if provider == "openai":
return await query_openai_model(model, messages, timeout)
elif provider == "huggingface":
return await query_huggingface_model(model, messages, timeout)
else:
print(f"❌ Unknown provider: {provider}")
return None
async def query_model_stream(
model_config: Dict[str, str],
messages: List[Dict[str, str]],
timeout: float = DEFAULT_TIMEOUT
):
"""
Query a model and stream the response.
Args:
model_config: Dict with 'provider' and 'model' keys
messages: List of message dicts
timeout: Request timeout
Yields:
Content chunks
"""
provider = model_config["provider"]
model = model_config["model"]
if provider == "openai":
async for chunk in stream_openai_model(model, messages, timeout):
yield chunk
elif provider == "huggingface":
# HF Inference API doesn't support streaming well, fallback to full response
response = await query_huggingface_model(model, messages, timeout)
if response:
yield response["content"]
else:
yield "[Error: Failed to get response]"
else:
yield f"[Error: Unknown provider {provider}]"
async def stream_openai_model(
model: str,
messages: List[Dict[str, str]],
timeout: float = DEFAULT_TIMEOUT
):
"""Stream OpenAI model response."""
headers = {
"Authorization": f"Bearer {OPENAI_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"model": model,
"messages": messages,
"temperature": 0.7,
"stream": True,
}
import json
try:
async with httpx.AsyncClient(timeout=timeout) as client:
async with client.stream(
"POST",
"https://api.openai.com/v1/chat/completions",
headers=headers,
json=payload
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.startswith("data: "):
data_str = line[6:]
if data_str.strip() == "[DONE]":
break
try:
data = json.loads(data_str)
delta = data["choices"][0]["delta"]
content = delta.get("content")
if content:
yield content
except (json.JSONDecodeError, KeyError):
pass
except Exception as e:
print(f"❌ Error streaming OpenAI {model}: {e}")
yield f"\n[Error: {str(e)}]"
async def query_models_parallel(
model_configs: List[Dict[str, str]],
messages: List[Dict[str, str]],
timeout: float = DEFAULT_TIMEOUT
) -> Dict[str, Optional[Dict[str, Any]]]:
"""
Query multiple models in parallel.
Args:
model_configs: List of model config dicts
messages: Messages to send to each model
timeout: Request timeout
Returns:
Dict mapping model ID to response
"""
print(f"πŸš€ Querying {len(model_configs)} models in parallel...")
tasks = [query_model(config, messages, timeout) for config in model_configs]
responses = await asyncio.gather(*tasks, return_exceptions=True)
result = {}
for config, response in zip(model_configs, responses):
model_id = config["id"]
if isinstance(response, Exception):
print(f"❌ Model {model_id} raised exception: {response}")
result[model_id] = None
else:
result[model_id] = response
status = "βœ…" if response else "❌"
print(f"{status} Model {model_id} completed")
successful = sum(1 for r in result.values() if r is not None)
print(f"πŸ“Š {successful}/{len(model_configs)} models responded successfully")
return result