"""API client for HuggingFace Inference API and OpenAI.""" import httpx import asyncio from typing import List, Dict, Any, Optional from .config_free import ( OPENAI_API_KEY, HUGGINGFACE_API_KEY, DEFAULT_TIMEOUT, MAX_RETRIES, RETRY_DELAY ) async def query_openai_model( model: str, messages: List[Dict[str, str]], timeout: float = DEFAULT_TIMEOUT, max_retries: int = MAX_RETRIES ) -> Optional[Dict[str, Any]]: """ Query an OpenAI model. Args: model: OpenAI model name (e.g., "gpt-4o-mini") messages: List of message dicts with 'role' and 'content' timeout: Request timeout in seconds max_retries: Maximum retry attempts Returns: Response dict with 'content', or None if failed """ headers = { "Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json", } payload = { "model": model, "messages": messages, "temperature": 0.7, } for attempt in range(max_retries + 1): try: async with httpx.AsyncClient(timeout=timeout) as client: response = await client.post( "https://api.openai.com/v1/chat/completions", headers=headers, json=payload ) response.raise_for_status() data = response.json() content = data["choices"][0]["message"]["content"] return {"content": content} except httpx.TimeoutException as e: print(f"⏱️ Timeout querying OpenAI {model} (attempt {attempt + 1}/{max_retries + 1})") if attempt < max_retries: await asyncio.sleep(RETRY_DELAY * (attempt + 1)) continue return None except httpx.HTTPStatusError as e: print(f"🚫 HTTP error querying OpenAI {model}: {e.response.status_code}") if 400 <= e.response.status_code < 500: return None if attempt < max_retries: await asyncio.sleep(RETRY_DELAY * (attempt + 1)) continue return None except Exception as e: print(f"❌ Error querying OpenAI {model}: {e}") if attempt < max_retries: await asyncio.sleep(RETRY_DELAY) continue return None return None async def query_huggingface_model( model: str, messages: List[Dict[str, str]], timeout: float = DEFAULT_TIMEOUT, max_retries: int = MAX_RETRIES ) -> Optional[Dict[str, Any]]: """ Query a HuggingFace model via Router (FREE). Args: model: HuggingFace model ID (e.g., "meta-llama/Llama-3.3-70B-Instruct") messages: List of message dicts with 'role' and 'content' timeout: Request timeout in seconds max_retries: Maximum retry attempts Returns: Response dict with 'content', or None if failed """ headers = { "Authorization": f"Bearer {HUGGINGFACE_API_KEY}", "Content-Type": "application/json", } # Use OpenAI-compatible format for HuggingFace Router payload = { "model": model, "messages": messages, "max_tokens": 2048, "temperature": 0.7, "top_p": 0.9, } # Updated to use router.huggingface.co (new endpoint) api_url = "https://router.huggingface.co/v1/chat/completions" for attempt in range(max_retries + 1): try: async with httpx.AsyncClient(timeout=timeout) as client: response = await client.post(api_url, headers=headers, json=payload) response.raise_for_status() data = response.json() # Parse OpenAI-compatible response format if "choices" in data and len(data["choices"]) > 0: content = data["choices"][0]["message"]["content"] return {"content": content} else: print(f"❌ Unexpected response format from HF {model}: {data}") return None except httpx.TimeoutException as e: print(f"⏱️ Timeout querying HF {model} (attempt {attempt + 1}/{max_retries + 1})") if attempt < max_retries: await asyncio.sleep(RETRY_DELAY * (attempt + 1)) continue return None except httpx.HTTPStatusError as e: error_msg = e.response.text print(f"🚫 HTTP {e.response.status_code} querying HF {model}: {error_msg[:200]}") # Model is loading - retry with longer delay if "loading" in error_msg.lower() or "warming up" in error_msg.lower(): print(f"⏳ Model is loading, waiting 20s...") await asyncio.sleep(20) if attempt < max_retries: continue # Don't retry on client errors (except loading) if 400 <= e.response.status_code < 500: return None if attempt < max_retries: await asyncio.sleep(RETRY_DELAY * (attempt + 1)) continue return None except Exception as e: print(f"❌ Error querying HF {model}: {e}") if attempt < max_retries: await asyncio.sleep(RETRY_DELAY) continue return None return None async def query_model( model_config: Dict[str, str], messages: List[Dict[str, str]], timeout: float = DEFAULT_TIMEOUT ) -> Optional[Dict[str, Any]]: """ Query a model based on its configuration (provider-agnostic). Args: model_config: Dict with 'provider' and 'model' keys messages: List of message dicts timeout: Request timeout Returns: Response dict or None """ provider = model_config["provider"] model = model_config["model"] if provider == "openai": return await query_openai_model(model, messages, timeout) elif provider == "huggingface": return await query_huggingface_model(model, messages, timeout) else: print(f"❌ Unknown provider: {provider}") return None async def query_model_stream( model_config: Dict[str, str], messages: List[Dict[str, str]], timeout: float = DEFAULT_TIMEOUT ): """ Query a model and stream the response. Args: model_config: Dict with 'provider' and 'model' keys messages: List of message dicts timeout: Request timeout Yields: Content chunks """ provider = model_config["provider"] model = model_config["model"] if provider == "openai": async for chunk in stream_openai_model(model, messages, timeout): yield chunk elif provider == "huggingface": # HF Inference API doesn't support streaming well, fallback to full response response = await query_huggingface_model(model, messages, timeout) if response: yield response["content"] else: yield "[Error: Failed to get response]" else: yield f"[Error: Unknown provider {provider}]" async def stream_openai_model( model: str, messages: List[Dict[str, str]], timeout: float = DEFAULT_TIMEOUT ): """Stream OpenAI model response.""" headers = { "Authorization": f"Bearer {OPENAI_API_KEY}", "Content-Type": "application/json", } payload = { "model": model, "messages": messages, "temperature": 0.7, "stream": True, } import json try: async with httpx.AsyncClient(timeout=timeout) as client: async with client.stream( "POST", "https://api.openai.com/v1/chat/completions", headers=headers, json=payload ) as response: response.raise_for_status() async for line in response.aiter_lines(): if line.startswith("data: "): data_str = line[6:] if data_str.strip() == "[DONE]": break try: data = json.loads(data_str) delta = data["choices"][0]["delta"] content = delta.get("content") if content: yield content except (json.JSONDecodeError, KeyError): pass except Exception as e: print(f"❌ Error streaming OpenAI {model}: {e}") yield f"\n[Error: {str(e)}]" async def query_models_parallel( model_configs: List[Dict[str, str]], messages: List[Dict[str, str]], timeout: float = DEFAULT_TIMEOUT ) -> Dict[str, Optional[Dict[str, Any]]]: """ Query multiple models in parallel. Args: model_configs: List of model config dicts messages: Messages to send to each model timeout: Request timeout Returns: Dict mapping model ID to response """ print(f"🚀 Querying {len(model_configs)} models in parallel...") tasks = [query_model(config, messages, timeout) for config in model_configs] responses = await asyncio.gather(*tasks, return_exceptions=True) result = {} for config, response in zip(model_configs, responses): model_id = config["id"] if isinstance(response, Exception): print(f"❌ Model {model_id} raised exception: {response}") result[model_id] = None else: result[model_id] = response status = "✅" if response else "❌" print(f"{status} Model {model_id} completed") successful = sum(1 for r in result.values() if r is not None) print(f"📊 {successful}/{len(model_configs)} models responded successfully") return result