Spaces:
Sleeping
Sleeping
| import subprocess | |
| import signal | |
| import os | |
| import time | |
| from typing import Optional, Dict | |
| from dataclasses import dataclass | |
| from collections import OrderedDict | |
| import requests | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.openapi.utils import get_openapi | |
| from pydantic import BaseModel, Field | |
| from duckduckgo_search import DDGS | |
| from bs4 import BeautifulSoup | |
| app = FastAPI( | |
| title="AGI Multi-Model API", | |
| description=""" | |
| **Dynamic Multi-Model LLM API with Web Search Capabilities** | |
| This API provides: | |
| * π Dynamic model switching between multiple LLM models | |
| * π¬ OpenAI-compatible chat completions | |
| * π Web-augmented chat with real-time search | |
| * π Model management and status monitoring | |
| ## Available Models | |
| - **deepseek-chat** (default): General purpose conversational model | |
| - **mistral-7b**: Financial analysis and summarization | |
| - **openhermes-7b**: Advanced instruction following | |
| - **deepseek-coder**: Specialized coding assistance | |
| - **llama-7b**: Lightweight and fast responses | |
| ## Quick Start | |
| 1. Check available models: `GET /models` | |
| 2. Switch model (optional): `POST /switch-model` | |
| 3. Chat: `POST /v1/chat/completions` | |
| 4. Chat with web search: `POST /v1/web-chat/completions` | |
| """, | |
| version="0.0.1.2025.12.04", | |
| contact={ | |
| "name": "API Support", | |
| "email": "support@example.com", | |
| }, | |
| license_info={ | |
| "name": "MIT", | |
| }, | |
| openapi_tags=[ | |
| { | |
| "name": "status", | |
| "description": "System status and health checks", | |
| }, | |
| { | |
| "name": "models", | |
| "description": "Model management and switching operations", | |
| }, | |
| { | |
| "name": "chat", | |
| "description": "Chat completion endpoints (OpenAI-compatible)", | |
| }, | |
| { | |
| "name": "documentation", | |
| "description": "API documentation and OpenAPI specification", | |
| }, | |
| ] | |
| ) | |
| # Predefined list of available models (TheBloke only - verified, fits 18GB Space) | |
| AVAILABLE_MODELS = { | |
| # === General Purpose (Default) === | |
| "deepseek-chat": "TheBloke/deepseek-llm-7B-chat-GGUF:deepseek-llm-7b-chat.Q4_K_M.gguf", | |
| # === Financial & Summarization Models === | |
| "mistral-7b": "TheBloke/Mistral-7B-Instruct-v0.2-GGUF:mistral-7b-instruct-v0.2.Q4_K_M.gguf", | |
| "openhermes-7b": "TheBloke/OpenHermes-2.5-Mistral-7B-GGUF:openhermes-2.5-mistral-7b.Q4_K_M.gguf", | |
| # === Coding Models === | |
| "deepseek-coder": "TheBloke/deepseek-coder-6.7B-instruct-GGUF:deepseek-coder-6.7b-instruct.Q4_K_M.gguf", | |
| # === Lightweight/Fast === | |
| "llama-7b": "TheBloke/Llama-2-7B-Chat-GGUF:llama-2-7b-chat.Q4_K_M.gguf", | |
| } | |
| # Configuration | |
| MAX_CACHED_MODELS = 2 # Maximum number of models to keep in memory | |
| BASE_PORT = 8080 # Starting port for llama-server instances | |
| class CachedModel: | |
| """Represents a cached model with its process and connection info.""" | |
| name: str | |
| model_id: str | |
| process: subprocess.Popen | |
| port: int | |
| url: str | |
| last_used: float | |
| class ModelCache: | |
| """ | |
| In-memory LRU cache for loaded models. | |
| Manages multiple llama-server processes, each on a different port. | |
| Automatically evicts least recently used models when cache is full. | |
| """ | |
| def __init__(self, max_size: int = MAX_CACHED_MODELS): | |
| self.max_size = max_size | |
| self.cache: OrderedDict[str, CachedModel] = OrderedDict() | |
| self.port_counter = BASE_PORT | |
| self.used_ports = set() | |
| def _get_next_port(self) -> int: | |
| """Get next available port for a model.""" | |
| while self.port_counter in self.used_ports: | |
| self.port_counter += 1 | |
| port = self.port_counter | |
| self.used_ports.add(port) | |
| self.port_counter += 1 | |
| return port | |
| def _release_port(self, port: int): | |
| """Release a port back to the pool.""" | |
| self.used_ports.discard(port) | |
| def _evict_lru(self): | |
| """Evict the least recently used model.""" | |
| if not self.cache: | |
| return | |
| # Get the first (oldest) item | |
| model_name, cached_model = self.cache.popitem(last=False) | |
| print(f"Evicting model from cache: {model_name}") | |
| # Stop the process | |
| try: | |
| if os.name != 'nt': | |
| os.killpg(os.getpgid(cached_model.process.pid), signal.SIGTERM) | |
| else: | |
| cached_model.process.terminate() | |
| cached_model.process.wait(timeout=10) | |
| except Exception as e: | |
| print(f"Error stopping model {model_name}: {e}") | |
| try: | |
| if os.name != 'nt': | |
| os.killpg(os.getpgid(cached_model.process.pid), signal.SIGKILL) | |
| else: | |
| cached_model.process.kill() | |
| except: | |
| pass | |
| # Release the port | |
| self._release_port(cached_model.port) | |
| time.sleep(1) | |
| def get(self, model_name: str) -> Optional[CachedModel]: | |
| """Get a model from cache, updating its last used time.""" | |
| if model_name in self.cache: | |
| cached_model = self.cache[model_name] | |
| cached_model.last_used = time.time() | |
| # Move to end (most recently used) | |
| self.cache.move_to_end(model_name) | |
| print(f"Cache hit for model: {model_name}") | |
| return cached_model | |
| print(f"Cache miss for model: {model_name}") | |
| return None | |
| def put(self, model_name: str, model_id: str, process: subprocess.Popen, port: int): | |
| """Add a model to the cache.""" | |
| # Evict if cache is full | |
| while len(self.cache) >= self.max_size: | |
| self._evict_lru() | |
| url = f"http://localhost:{port}" | |
| cached_model = CachedModel( | |
| name=model_name, | |
| model_id=model_id, | |
| process=process, | |
| port=port, | |
| url=url, | |
| last_used=time.time() | |
| ) | |
| self.cache[model_name] = cached_model | |
| print(f"Cached model: {model_name} on port {port}") | |
| def clear(self): | |
| """Clear all cached models.""" | |
| print("Clearing model cache...") | |
| for model_name, cached_model in list(self.cache.items()): | |
| try: | |
| if os.name != 'nt': | |
| os.killpg(os.getpgid(cached_model.process.pid), signal.SIGTERM) | |
| else: | |
| cached_model.process.terminate() | |
| cached_model.process.wait(timeout=10) | |
| except: | |
| try: | |
| if os.name != 'nt': | |
| os.killpg(os.getpgid(cached_model.process.pid), signal.SIGKILL) | |
| else: | |
| cached_model.process.kill() | |
| except: | |
| pass | |
| self._release_port(cached_model.port) | |
| self.cache.clear() | |
| def get_cache_info(self) -> Dict: | |
| """Get information about cached models.""" | |
| return { | |
| "max_size": self.max_size, | |
| "current_size": len(self.cache), | |
| "cached_models": [ | |
| { | |
| "name": name, | |
| "port": model.port, | |
| "url": model.url, | |
| "last_used": model.last_used | |
| } | |
| for name, model in self.cache.items() | |
| ] | |
| } | |
| # Global state | |
| current_model = "deepseek-chat" # Default model | |
| model_cache = ModelCache(max_size=MAX_CACHED_MODELS) | |
| class ModelSwitchRequest(BaseModel): | |
| """Request to switch the active LLM model.""" | |
| model_name: str = Field( | |
| ..., | |
| description="Name of the model to switch to", | |
| examples=["deepseek-chat", "mistral-7b", "deepseek-coder"] | |
| ) | |
| model_config = { | |
| "json_schema_extra": { | |
| "examples": [ | |
| {"model_name": "deepseek-coder"}, | |
| {"model_name": "mistral-7b"} | |
| ] | |
| } | |
| } | |
| class ChatCompletionRequest(BaseModel): | |
| """OpenAI-compatible chat completion request.""" | |
| messages: list[dict] = Field( | |
| ..., | |
| description="Array of message objects with 'role' and 'content' fields", | |
| examples=[[ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| {"role": "user", "content": "Hello!"} | |
| ]] | |
| ) | |
| max_tokens: int = Field( | |
| default=256, | |
| description="Maximum number of tokens to generate", | |
| ge=1, | |
| le=4096 | |
| ) | |
| temperature: float = Field( | |
| default=0.7, | |
| description="Sampling temperature (0.0 to 2.0). Higher values make output more random.", | |
| ge=0.0, | |
| le=2.0 | |
| ) | |
| model_config = { | |
| "json_schema_extra": { | |
| "examples": [ | |
| { | |
| "messages": [ | |
| {"role": "user", "content": "What is the capital of France?"} | |
| ], | |
| "max_tokens": 100, | |
| "temperature": 0.7 | |
| } | |
| ] | |
| } | |
| } | |
| class WebChatRequest(BaseModel): | |
| """Chat completion request with web search augmentation.""" | |
| messages: list[dict] = Field( | |
| ..., | |
| description="Array of message objects. The last user message is used for web search.", | |
| examples=[[ | |
| {"role": "user", "content": "What are the latest developments in AI?"} | |
| ]] | |
| ) | |
| max_tokens: int = Field( | |
| default=512, | |
| description="Maximum number of tokens to generate", | |
| ge=1, | |
| le=4096 | |
| ) | |
| temperature: float = Field( | |
| default=0.7, | |
| description="Sampling temperature (0.0 to 2.0)", | |
| ge=0.0, | |
| le=2.0 | |
| ) | |
| max_search_results: int = Field( | |
| default=5, | |
| description="Maximum number of web search results to include in context", | |
| ge=1, | |
| le=10 | |
| ) | |
| model_config = { | |
| "json_schema_extra": { | |
| "examples": [ | |
| { | |
| "messages": [ | |
| {"role": "user", "content": "What's the weather like today in San Francisco?"} | |
| ], | |
| "max_tokens": 512, | |
| "temperature": 0.7, | |
| "max_search_results": 5 | |
| } | |
| ] | |
| } | |
| } | |
| class StatusResponse(BaseModel): | |
| """API status response.""" | |
| status: str = Field(..., description="Current API status") | |
| current_model: str = Field(..., description="Currently active model") | |
| available_models: list[str] = Field(..., description="List of available models") | |
| class ModelsResponse(BaseModel): | |
| """Available models response.""" | |
| current_model: str = Field(..., description="Currently active model") | |
| available_models: list[str] = Field(..., description="List of all available models") | |
| class ModelSwitchResponse(BaseModel): | |
| """Model switch response.""" | |
| message: str = Field(..., description="Status message") | |
| model: str = Field(..., description="New active model name") | |
| def start_llama_server(model_id: str, port: int) -> subprocess.Popen: | |
| """Start llama-server with specified model on a specific port.""" | |
| cmd = [ | |
| "llama-server", | |
| "-hf", model_id, | |
| "--host", "0.0.0.0", | |
| "--port", str(port), | |
| "-c", "2048", # Context size | |
| "-t", "4", # CPU threads (adjust based on cores) | |
| "-ngl", "0", # GPU layers (0 for CPU-only) | |
| "--cont-batching", # Enable continuous batching for speed | |
| "-b", "512", # Batch size | |
| ] | |
| print(f"Starting llama-server with model: {model_id} on port {port}") | |
| print("This may take 2-3 minutes to download and load the model...") | |
| process = subprocess.Popen( | |
| cmd, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| preexec_fn=os.setsid if os.name != 'nt' else None, | |
| text=True, | |
| bufsize=1 | |
| ) | |
| # Wait for server to be ready (increased timeout for model download) | |
| max_retries = 300 # 5 minutes | |
| server_url = f"http://localhost:{port}" | |
| for i in range(max_retries): | |
| # Check if process died | |
| if process.poll() is not None: | |
| stdout, _ = process.communicate() | |
| print(f"llama-server exited with code {process.returncode}") | |
| print(f"Output: {stdout}") | |
| raise RuntimeError("llama-server process died") | |
| try: | |
| # Try root endpoint instead of /health | |
| response = requests.get(f"{server_url}/", timeout=2) | |
| if response.status_code in [200, 404]: # 404 is ok, means server is up | |
| print(f"llama-server ready after {i+1} seconds") | |
| return process | |
| except requests.exceptions.ConnectionError: | |
| # Server not ready yet | |
| pass | |
| except Exception: | |
| # Other errors, keep waiting | |
| pass | |
| time.sleep(1) | |
| raise RuntimeError("llama-server failed to start within 5 minutes") | |
| async def startup_event(): | |
| """Start with default model and cache it.""" | |
| global current_model | |
| model_id = AVAILABLE_MODELS[current_model] | |
| port = model_cache._get_next_port() | |
| process = start_llama_server(model_id, port) | |
| model_cache.put(current_model, model_id, process, port) | |
| print(f"Started with default model: {current_model}") | |
| async def shutdown_event(): | |
| """Clean shutdown - clear all cached models.""" | |
| model_cache.clear() | |
| async def root(): | |
| """ | |
| Returns the current status of the AGI Multi-Model API. | |
| This endpoint provides information about: | |
| - Current API status | |
| - Currently active LLM model | |
| - List of all available models | |
| """ | |
| return { | |
| "status": "AGI Multi-Model API with dynamic model switching and web search", | |
| "current_model": current_model, | |
| "available_models": list(AVAILABLE_MODELS.keys()) | |
| } | |
| async def list_models(): | |
| """ | |
| List all available LLM models. | |
| Returns: | |
| - current_model: The model currently in use | |
| - available_models: Array of all available model names | |
| Use this endpoint to see which models you can switch to. | |
| """ | |
| return { | |
| "current_model": current_model, | |
| "available_models": list(AVAILABLE_MODELS.keys()) | |
| } | |
| async def switch_model(request: ModelSwitchRequest): | |
| """ | |
| Switch to a different LLM model with intelligent caching. | |
| **How it works:** | |
| 1. Checks if requested model is already active (no switch needed) | |
| 2. Checks cache for the model (instant switch if cached) | |
| 3. If not cached, loads the model (may take 2-3 minutes) | |
| **Caching:** | |
| - Up to 2 models kept in memory | |
| - LRU (Least Recently Used) eviction policy | |
| - Each model runs on a separate port | |
| - Instant switching between cached models | |
| """ | |
| global current_model | |
| if request.model_name not in AVAILABLE_MODELS: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Model '{request.model_name}' not found. Available: {list(AVAILABLE_MODELS.keys())}" | |
| ) | |
| if request.model_name == current_model: | |
| return {"message": f"Already using model: {current_model}", "model": current_model} | |
| # Try to get from cache | |
| cached_model = model_cache.get(request.model_name) | |
| if cached_model: | |
| # Model is cached, instant switch | |
| current_model = request.model_name | |
| return { | |
| "message": f"Switched to model: {current_model} (from cache)", | |
| "model": current_model | |
| } | |
| # Model not cached, need to load it | |
| model_id = AVAILABLE_MODELS[request.model_name] | |
| port = model_cache._get_next_port() | |
| try: | |
| process = start_llama_server(model_id, port) | |
| model_cache.put(request.model_name, model_id, process, port) | |
| current_model = request.model_name | |
| return { | |
| "message": f"Switched to model: {current_model} (newly loaded)", | |
| "model": current_model | |
| } | |
| except Exception as e: | |
| # Release port if failed | |
| model_cache._release_port(port) | |
| raise HTTPException(status_code=500, detail=f"Failed to load model: {str(e)}") | |
| async def chat_completions(request: ChatCompletionRequest): | |
| """ | |
| OpenAI-compatible chat completions endpoint. | |
| This endpoint forwards your request to the currently active LLM model | |
| and returns the response in OpenAI-compatible format. | |
| **Message Format:** | |
| ```json | |
| { | |
| "messages": [ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| {"role": "user", "content": "Hello!"} | |
| ], | |
| "max_tokens": 256, | |
| "temperature": 0.7 | |
| } | |
| ``` | |
| **Supported Roles:** | |
| - `system`: Sets the behavior of the assistant | |
| - `user`: User messages | |
| - `assistant`: Assistant responses (for multi-turn conversations) | |
| """ | |
| try: | |
| # Get current model from cache | |
| cached_model = model_cache.get(current_model) | |
| if not cached_model: | |
| raise HTTPException(status_code=500, detail="Current model not loaded") | |
| # Forward to llama-server | |
| response = requests.post( | |
| f"{cached_model.url}/v1/chat/completions", | |
| json={ | |
| "messages": request.messages, | |
| "max_tokens": request.max_tokens, | |
| "temperature": request.temperature, | |
| }, | |
| timeout=300 | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| raise HTTPException(status_code=500, detail=f"llama-server error: {str(e)}") | |
| def search_web(query: str, max_results: int = 5) -> list[dict]: | |
| """Search the web using DuckDuckGo and return results.""" | |
| try: | |
| with DDGS() as ddgs: | |
| results = list(ddgs.text(query, max_results=max_results)) | |
| return results | |
| except Exception as e: | |
| print(f"Search error: {e}") | |
| return [] | |
| def format_search_context(query: str, search_results: list[dict]) -> str: | |
| """Format search results into context for the LLM.""" | |
| if not search_results: | |
| return f"No web results found for: {query}" | |
| context = f"# Web Search Results for: {query}\n\n" | |
| for i, result in enumerate(search_results, 1): | |
| title = result.get("title", "No title") | |
| body = result.get("body", "No description") | |
| url = result.get("href", "") | |
| context += f"## Result {i}: {title}\n" | |
| context += f"{body}\n" | |
| if url: | |
| context += f"Source: {url}\n" | |
| context += "\n" | |
| return context | |
| async def web_chat_completions(request: WebChatRequest): | |
| """ | |
| Chat completions with real-time web search augmentation. | |
| **How it works:** | |
| 1. Extracts the last user message as the search query | |
| 2. Performs a web search using DuckDuckGo | |
| 3. Injects search results into the LLM context | |
| 4. Returns the AI response with source citations | |
| **Use cases:** | |
| - Current events and news | |
| - Recent information beyond the model's training data | |
| - Fact-checking with web sources | |
| - Research with live data | |
| **Example:** | |
| ```json | |
| { | |
| "messages": [ | |
| {"role": "user", "content": "What's the latest news about SpaceX?"} | |
| ], | |
| "max_tokens": 512, | |
| "max_search_results": 5 | |
| } | |
| ``` | |
| The response includes a `web_search` field with metadata about sources used. | |
| """ | |
| try: | |
| # Get the last user message as search query | |
| user_messages = [msg for msg in request.messages if msg.get("role") == "user"] | |
| if not user_messages: | |
| raise HTTPException(status_code=400, detail="No user message found") | |
| search_query = user_messages[-1].get("content", "") | |
| # Perform web search | |
| print(f"Searching web for: {search_query}") | |
| search_results = search_web(search_query, request.max_search_results) | |
| # Format search results as context | |
| web_context = format_search_context(search_query, search_results) | |
| # Create augmented messages with web context | |
| augmented_messages = request.messages.copy() | |
| # Insert web context as a system message before the last user message | |
| system_prompt = { | |
| "role": "system", | |
| "content": f"""You are a helpful assistant with access to current web information. | |
| {web_context} | |
| Use the above search results to provide accurate, up-to-date information in your response. | |
| Always cite sources when using information from the search results.""" | |
| } | |
| # Insert system message before the last user message | |
| augmented_messages.insert(-1, system_prompt) | |
| # Get current model from cache | |
| cached_model = model_cache.get(current_model) | |
| if not cached_model: | |
| raise HTTPException(status_code=500, detail="Current model not loaded") | |
| # Forward to llama-server with augmented context | |
| response = requests.post( | |
| f"{cached_model.url}/v1/chat/completions", | |
| json={ | |
| "messages": augmented_messages, | |
| "max_tokens": request.max_tokens, | |
| "temperature": request.temperature, | |
| }, | |
| timeout=300 | |
| ) | |
| response.raise_for_status() | |
| result = response.json() | |
| # Add metadata about search results | |
| result["web_search"] = { | |
| "query": search_query, | |
| "results_count": len(search_results), | |
| "sources": [r.get("href", "") for r in search_results if r.get("href")] | |
| } | |
| return result | |
| except requests.exceptions.RequestException as e: | |
| raise HTTPException(status_code=500, detail=f"llama-server error: {str(e)}") | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error: {str(e)}") | |
| async def get_cache_info(): | |
| """ | |
| Get information about the in-memory model cache. | |
| Returns: | |
| - max_size: Maximum number of models that can be cached | |
| - current_size: Current number of cached models | |
| - cached_models: List of currently cached models with their metadata | |
| **Example Response:** | |
| ```json | |
| { | |
| "max_size": 2, | |
| "current_size": 2, | |
| "cached_models": [ | |
| { | |
| "name": "deepseek-chat", | |
| "port": 8080, | |
| "url": "http://localhost:8080", | |
| "last_used": 1234567890.123 | |
| }, | |
| { | |
| "name": "mistral-7b", | |
| "port": 8081, | |
| "url": "http://localhost:8081", | |
| "last_used": 1234567895.456 | |
| } | |
| ] | |
| } | |
| ``` | |
| """ | |
| return model_cache.get_cache_info() | |
| async def get_openapi_spec(): | |
| """ | |
| Export the OpenAPI specification for this API. | |
| This endpoint returns the complete OpenAPI 3.0 specification that can be used with: | |
| - API documentation tools (Swagger UI, ReDoc) | |
| - Code generators (openapi-generator, swagger-codegen) | |
| - API testing tools (Postman, Insomnia) | |
| - SDK generation | |
| Save this to a file and use it with tools like: | |
| ```bash | |
| # Generate Python client | |
| openapi-generator generate -i openapi.json -g python -o ./client | |
| # Generate TypeScript client | |
| openapi-generator generate -i openapi.json -g typescript-fetch -o ./client | |
| ``` | |
| """ | |
| return app.openapi() |