import subprocess import signal import os import time from typing import Optional, Dict from dataclasses import dataclass from collections import OrderedDict import requests from fastapi import FastAPI, HTTPException from fastapi.openapi.utils import get_openapi from pydantic import BaseModel, Field from duckduckgo_search import DDGS from bs4 import BeautifulSoup app = FastAPI( title="AGI Multi-Model API", description=""" **Dynamic Multi-Model LLM API with Web Search Capabilities** This API provides: * 🔄 Dynamic model switching between multiple LLM models * 💬 OpenAI-compatible chat completions * 🌐 Web-augmented chat with real-time search * 📊 Model management and status monitoring ## Available Models - **deepseek-chat** (default): General purpose conversational model - **mistral-7b**: Financial analysis and summarization - **openhermes-7b**: Advanced instruction following - **deepseek-coder**: Specialized coding assistance - **llama-7b**: Lightweight and fast responses ## Quick Start 1. Check available models: `GET /models` 2. Switch model (optional): `POST /switch-model` 3. Chat: `POST /v1/chat/completions` 4. Chat with web search: `POST /v1/web-chat/completions` """, version="0.0.1.2025.12.04", contact={ "name": "API Support", "email": "support@example.com", }, license_info={ "name": "MIT", }, openapi_tags=[ { "name": "status", "description": "System status and health checks", }, { "name": "models", "description": "Model management and switching operations", }, { "name": "chat", "description": "Chat completion endpoints (OpenAI-compatible)", }, { "name": "documentation", "description": "API documentation and OpenAPI specification", }, ] ) # Predefined list of available models (TheBloke only - verified, fits 18GB Space) AVAILABLE_MODELS = { # === General Purpose (Default) === "deepseek-chat": "TheBloke/deepseek-llm-7B-chat-GGUF:deepseek-llm-7b-chat.Q4_K_M.gguf", # === Financial & Summarization Models === "mistral-7b": "TheBloke/Mistral-7B-Instruct-v0.2-GGUF:mistral-7b-instruct-v0.2.Q4_K_M.gguf", "openhermes-7b": "TheBloke/OpenHermes-2.5-Mistral-7B-GGUF:openhermes-2.5-mistral-7b.Q4_K_M.gguf", # === Coding Models === "deepseek-coder": "TheBloke/deepseek-coder-6.7B-instruct-GGUF:deepseek-coder-6.7b-instruct.Q4_K_M.gguf", # === Lightweight/Fast === "llama-7b": "TheBloke/Llama-2-7B-Chat-GGUF:llama-2-7b-chat.Q4_K_M.gguf", } # Configuration MAX_CACHED_MODELS = 2 # Maximum number of models to keep in memory BASE_PORT = 8080 # Starting port for llama-server instances @dataclass class CachedModel: """Represents a cached model with its process and connection info.""" name: str model_id: str process: subprocess.Popen port: int url: str last_used: float class ModelCache: """ In-memory LRU cache for loaded models. Manages multiple llama-server processes, each on a different port. Automatically evicts least recently used models when cache is full. """ def __init__(self, max_size: int = MAX_CACHED_MODELS): self.max_size = max_size self.cache: OrderedDict[str, CachedModel] = OrderedDict() self.port_counter = BASE_PORT self.used_ports = set() def _get_next_port(self) -> int: """Get next available port for a model.""" while self.port_counter in self.used_ports: self.port_counter += 1 port = self.port_counter self.used_ports.add(port) self.port_counter += 1 return port def _release_port(self, port: int): """Release a port back to the pool.""" self.used_ports.discard(port) def _evict_lru(self): """Evict the least recently used model.""" if not self.cache: return # Get the first (oldest) item model_name, cached_model = self.cache.popitem(last=False) print(f"Evicting model from cache: {model_name}") # Stop the process try: if os.name != 'nt': os.killpg(os.getpgid(cached_model.process.pid), signal.SIGTERM) else: cached_model.process.terminate() cached_model.process.wait(timeout=10) except Exception as e: print(f"Error stopping model {model_name}: {e}") try: if os.name != 'nt': os.killpg(os.getpgid(cached_model.process.pid), signal.SIGKILL) else: cached_model.process.kill() except: pass # Release the port self._release_port(cached_model.port) time.sleep(1) def get(self, model_name: str) -> Optional[CachedModel]: """Get a model from cache, updating its last used time.""" if model_name in self.cache: cached_model = self.cache[model_name] cached_model.last_used = time.time() # Move to end (most recently used) self.cache.move_to_end(model_name) print(f"Cache hit for model: {model_name}") return cached_model print(f"Cache miss for model: {model_name}") return None def put(self, model_name: str, model_id: str, process: subprocess.Popen, port: int): """Add a model to the cache.""" # Evict if cache is full while len(self.cache) >= self.max_size: self._evict_lru() url = f"http://localhost:{port}" cached_model = CachedModel( name=model_name, model_id=model_id, process=process, port=port, url=url, last_used=time.time() ) self.cache[model_name] = cached_model print(f"Cached model: {model_name} on port {port}") def clear(self): """Clear all cached models.""" print("Clearing model cache...") for model_name, cached_model in list(self.cache.items()): try: if os.name != 'nt': os.killpg(os.getpgid(cached_model.process.pid), signal.SIGTERM) else: cached_model.process.terminate() cached_model.process.wait(timeout=10) except: try: if os.name != 'nt': os.killpg(os.getpgid(cached_model.process.pid), signal.SIGKILL) else: cached_model.process.kill() except: pass self._release_port(cached_model.port) self.cache.clear() def get_cache_info(self) -> Dict: """Get information about cached models.""" return { "max_size": self.max_size, "current_size": len(self.cache), "cached_models": [ { "name": name, "port": model.port, "url": model.url, "last_used": model.last_used } for name, model in self.cache.items() ] } # Global state current_model = "deepseek-chat" # Default model model_cache = ModelCache(max_size=MAX_CACHED_MODELS) class ModelSwitchRequest(BaseModel): """Request to switch the active LLM model.""" model_name: str = Field( ..., description="Name of the model to switch to", examples=["deepseek-chat", "mistral-7b", "deepseek-coder"] ) model_config = { "json_schema_extra": { "examples": [ {"model_name": "deepseek-coder"}, {"model_name": "mistral-7b"} ] } } class ChatCompletionRequest(BaseModel): """OpenAI-compatible chat completion request.""" messages: list[dict] = Field( ..., description="Array of message objects with 'role' and 'content' fields", examples=[[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Hello!"} ]] ) max_tokens: int = Field( default=256, description="Maximum number of tokens to generate", ge=1, le=4096 ) temperature: float = Field( default=0.7, description="Sampling temperature (0.0 to 2.0). Higher values make output more random.", ge=0.0, le=2.0 ) model_config = { "json_schema_extra": { "examples": [ { "messages": [ {"role": "user", "content": "What is the capital of France?"} ], "max_tokens": 100, "temperature": 0.7 } ] } } class WebChatRequest(BaseModel): """Chat completion request with web search augmentation.""" messages: list[dict] = Field( ..., description="Array of message objects. The last user message is used for web search.", examples=[[ {"role": "user", "content": "What are the latest developments in AI?"} ]] ) max_tokens: int = Field( default=512, description="Maximum number of tokens to generate", ge=1, le=4096 ) temperature: float = Field( default=0.7, description="Sampling temperature (0.0 to 2.0)", ge=0.0, le=2.0 ) max_search_results: int = Field( default=5, description="Maximum number of web search results to include in context", ge=1, le=10 ) model_config = { "json_schema_extra": { "examples": [ { "messages": [ {"role": "user", "content": "What's the weather like today in San Francisco?"} ], "max_tokens": 512, "temperature": 0.7, "max_search_results": 5 } ] } } class StatusResponse(BaseModel): """API status response.""" status: str = Field(..., description="Current API status") current_model: str = Field(..., description="Currently active model") available_models: list[str] = Field(..., description="List of available models") class ModelsResponse(BaseModel): """Available models response.""" current_model: str = Field(..., description="Currently active model") available_models: list[str] = Field(..., description="List of all available models") class ModelSwitchResponse(BaseModel): """Model switch response.""" message: str = Field(..., description="Status message") model: str = Field(..., description="New active model name") def start_llama_server(model_id: str, port: int) -> subprocess.Popen: """Start llama-server with specified model on a specific port.""" cmd = [ "llama-server", "-hf", model_id, "--host", "0.0.0.0", "--port", str(port), "-c", "2048", # Context size "-t", "4", # CPU threads (adjust based on cores) "-ngl", "0", # GPU layers (0 for CPU-only) "--cont-batching", # Enable continuous batching for speed "-b", "512", # Batch size ] print(f"Starting llama-server with model: {model_id} on port {port}") print("This may take 2-3 minutes to download and load the model...") process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, preexec_fn=os.setsid if os.name != 'nt' else None, text=True, bufsize=1 ) # Wait for server to be ready (increased timeout for model download) max_retries = 300 # 5 minutes server_url = f"http://localhost:{port}" for i in range(max_retries): # Check if process died if process.poll() is not None: stdout, _ = process.communicate() print(f"llama-server exited with code {process.returncode}") print(f"Output: {stdout}") raise RuntimeError("llama-server process died") try: # Try root endpoint instead of /health response = requests.get(f"{server_url}/", timeout=2) if response.status_code in [200, 404]: # 404 is ok, means server is up print(f"llama-server ready after {i+1} seconds") return process except requests.exceptions.ConnectionError: # Server not ready yet pass except Exception: # Other errors, keep waiting pass time.sleep(1) raise RuntimeError("llama-server failed to start within 5 minutes") @app.on_event("startup") async def startup_event(): """Start with default model and cache it.""" global current_model model_id = AVAILABLE_MODELS[current_model] port = model_cache._get_next_port() process = start_llama_server(model_id, port) model_cache.put(current_model, model_id, process, port) print(f"Started with default model: {current_model}") @app.on_event("shutdown") async def shutdown_event(): """Clean shutdown - clear all cached models.""" model_cache.clear() @app.get( "/", response_model=StatusResponse, tags=["status"], summary="API Status", description="Get the current status of the API, including active model and available models." ) async def root(): """ Returns the current status of the AGI Multi-Model API. This endpoint provides information about: - Current API status - Currently active LLM model - List of all available models """ return { "status": "AGI Multi-Model API with dynamic model switching and web search", "current_model": current_model, "available_models": list(AVAILABLE_MODELS.keys()) } @app.get( "/models", response_model=ModelsResponse, tags=["models"], summary="List Available Models", description="Get a list of all available LLM models and the currently active model." ) async def list_models(): """ List all available LLM models. Returns: - current_model: The model currently in use - available_models: Array of all available model names Use this endpoint to see which models you can switch to. """ return { "current_model": current_model, "available_models": list(AVAILABLE_MODELS.keys()) } @app.post( "/switch-model", response_model=ModelSwitchResponse, tags=["models"], summary="Switch Active Model", description="Switch to a different LLM model. Uses caching for instant switching to recently used models.", responses={ 200: { "description": "Model switched successfully", "content": { "application/json": { "example": { "message": "Switched to model: deepseek-coder (from cache)", "model": "deepseek-coder" } } } }, 400: { "description": "Invalid model name", "content": { "application/json": { "example": { "detail": "Model 'invalid-model' not found. Available: ['deepseek-chat', 'mistral-7b', ...]" } } } } } ) async def switch_model(request: ModelSwitchRequest): """ Switch to a different LLM model with intelligent caching. **How it works:** 1. Checks if requested model is already active (no switch needed) 2. Checks cache for the model (instant switch if cached) 3. If not cached, loads the model (may take 2-3 minutes) **Caching:** - Up to 2 models kept in memory - LRU (Least Recently Used) eviction policy - Each model runs on a separate port - Instant switching between cached models """ global current_model if request.model_name not in AVAILABLE_MODELS: raise HTTPException( status_code=400, detail=f"Model '{request.model_name}' not found. Available: {list(AVAILABLE_MODELS.keys())}" ) if request.model_name == current_model: return {"message": f"Already using model: {current_model}", "model": current_model} # Try to get from cache cached_model = model_cache.get(request.model_name) if cached_model: # Model is cached, instant switch current_model = request.model_name return { "message": f"Switched to model: {current_model} (from cache)", "model": current_model } # Model not cached, need to load it model_id = AVAILABLE_MODELS[request.model_name] port = model_cache._get_next_port() try: process = start_llama_server(model_id, port) model_cache.put(request.model_name, model_id, process, port) current_model = request.model_name return { "message": f"Switched to model: {current_model} (newly loaded)", "model": current_model } except Exception as e: # Release port if failed model_cache._release_port(port) raise HTTPException(status_code=500, detail=f"Failed to load model: {str(e)}") @app.post( "/v1/chat/completions", tags=["chat"], summary="Chat Completions", description="OpenAI-compatible chat completions endpoint. Send messages and get AI-generated responses.", responses={ 200: { "description": "Successful response", "content": { "application/json": { "example": { "id": "chatcmpl-123", "object": "chat.completion", "created": 1677652288, "model": "deepseek-chat", "choices": [{ "index": 0, "message": { "role": "assistant", "content": "Hello! How can I help you today?" }, "finish_reason": "stop" }] } } } }, 500: { "description": "LLM server error" } } ) async def chat_completions(request: ChatCompletionRequest): """ OpenAI-compatible chat completions endpoint. This endpoint forwards your request to the currently active LLM model and returns the response in OpenAI-compatible format. **Message Format:** ```json { "messages": [ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": "Hello!"} ], "max_tokens": 256, "temperature": 0.7 } ``` **Supported Roles:** - `system`: Sets the behavior of the assistant - `user`: User messages - `assistant`: Assistant responses (for multi-turn conversations) """ try: # Get current model from cache cached_model = model_cache.get(current_model) if not cached_model: raise HTTPException(status_code=500, detail="Current model not loaded") # Forward to llama-server response = requests.post( f"{cached_model.url}/v1/chat/completions", json={ "messages": request.messages, "max_tokens": request.max_tokens, "temperature": request.temperature, }, timeout=300 ) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: raise HTTPException(status_code=500, detail=f"llama-server error: {str(e)}") def search_web(query: str, max_results: int = 5) -> list[dict]: """Search the web using DuckDuckGo and return results.""" try: with DDGS() as ddgs: results = list(ddgs.text(query, max_results=max_results)) return results except Exception as e: print(f"Search error: {e}") return [] def format_search_context(query: str, search_results: list[dict]) -> str: """Format search results into context for the LLM.""" if not search_results: return f"No web results found for: {query}" context = f"# Web Search Results for: {query}\n\n" for i, result in enumerate(search_results, 1): title = result.get("title", "No title") body = result.get("body", "No description") url = result.get("href", "") context += f"## Result {i}: {title}\n" context += f"{body}\n" if url: context += f"Source: {url}\n" context += "\n" return context @app.post( "/v1/web-chat/completions", tags=["chat"], summary="Web-Augmented Chat Completions", description="Chat completions enhanced with real-time web search. The last user message is used as a search query.", responses={ 200: { "description": "Successful response with web search metadata", "content": { "application/json": { "example": { "id": "chatcmpl-123", "object": "chat.completion", "created": 1677652288, "model": "deepseek-chat", "choices": [{ "index": 0, "message": { "role": "assistant", "content": "Based on recent search results, here's what I found..." }, "finish_reason": "stop" }], "web_search": { "query": "latest AI developments", "results_count": 5, "sources": ["https://example.com/1", "https://example.com/2"] } } } } }, 400: { "description": "No user message found" }, 500: { "description": "LLM server or search error" } } ) async def web_chat_completions(request: WebChatRequest): """ Chat completions with real-time web search augmentation. **How it works:** 1. Extracts the last user message as the search query 2. Performs a web search using DuckDuckGo 3. Injects search results into the LLM context 4. Returns the AI response with source citations **Use cases:** - Current events and news - Recent information beyond the model's training data - Fact-checking with web sources - Research with live data **Example:** ```json { "messages": [ {"role": "user", "content": "What's the latest news about SpaceX?"} ], "max_tokens": 512, "max_search_results": 5 } ``` The response includes a `web_search` field with metadata about sources used. """ try: # Get the last user message as search query user_messages = [msg for msg in request.messages if msg.get("role") == "user"] if not user_messages: raise HTTPException(status_code=400, detail="No user message found") search_query = user_messages[-1].get("content", "") # Perform web search print(f"Searching web for: {search_query}") search_results = search_web(search_query, request.max_search_results) # Format search results as context web_context = format_search_context(search_query, search_results) # Create augmented messages with web context augmented_messages = request.messages.copy() # Insert web context as a system message before the last user message system_prompt = { "role": "system", "content": f"""You are a helpful assistant with access to current web information. {web_context} Use the above search results to provide accurate, up-to-date information in your response. Always cite sources when using information from the search results.""" } # Insert system message before the last user message augmented_messages.insert(-1, system_prompt) # Get current model from cache cached_model = model_cache.get(current_model) if not cached_model: raise HTTPException(status_code=500, detail="Current model not loaded") # Forward to llama-server with augmented context response = requests.post( f"{cached_model.url}/v1/chat/completions", json={ "messages": augmented_messages, "max_tokens": request.max_tokens, "temperature": request.temperature, }, timeout=300 ) response.raise_for_status() result = response.json() # Add metadata about search results result["web_search"] = { "query": search_query, "results_count": len(search_results), "sources": [r.get("href", "") for r in search_results if r.get("href")] } return result except requests.exceptions.RequestException as e: raise HTTPException(status_code=500, detail=f"llama-server error: {str(e)}") except Exception as e: raise HTTPException(status_code=500, detail=f"Error: {str(e)}") @app.get( "/cache/info", tags=["models"], summary="Get Cache Information", description="Returns information about the model cache, including cached models and cache statistics." ) async def get_cache_info(): """ Get information about the in-memory model cache. Returns: - max_size: Maximum number of models that can be cached - current_size: Current number of cached models - cached_models: List of currently cached models with their metadata **Example Response:** ```json { "max_size": 2, "current_size": 2, "cached_models": [ { "name": "deepseek-chat", "port": 8080, "url": "http://localhost:8080", "last_used": 1234567890.123 }, { "name": "mistral-7b", "port": 8081, "url": "http://localhost:8081", "last_used": 1234567895.456 } ] } ``` """ return model_cache.get_cache_info() @app.get( "/openapi.json", tags=["documentation"], summary="Get OpenAPI Specification", description="Returns the complete OpenAPI 3.0 specification for this API in JSON format.", include_in_schema=False ) async def get_openapi_spec(): """ Export the OpenAPI specification for this API. This endpoint returns the complete OpenAPI 3.0 specification that can be used with: - API documentation tools (Swagger UI, ReDoc) - Code generators (openapi-generator, swagger-codegen) - API testing tools (Postman, Insomnia) - SDK generation Save this to a file and use it with tools like: ```bash # Generate Python client openapi-generator generate -i openapi.json -g python -o ./client # Generate TypeScript client openapi-generator generate -i openapi.json -g typescript-fetch -o ./client ``` """ return app.openapi()