AGI / app.py
Dmitry Beresnev
Add automatic API documentation and in-memory model caching
2295174
import subprocess
import signal
import os
import time
from typing import Optional, Dict
from dataclasses import dataclass
from collections import OrderedDict
import requests
from fastapi import FastAPI, HTTPException
from fastapi.openapi.utils import get_openapi
from pydantic import BaseModel, Field
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
app = FastAPI(
title="AGI Multi-Model API",
description="""
**Dynamic Multi-Model LLM API with Web Search Capabilities**
This API provides:
* πŸ”„ Dynamic model switching between multiple LLM models
* πŸ’¬ OpenAI-compatible chat completions
* 🌐 Web-augmented chat with real-time search
* πŸ“Š Model management and status monitoring
## Available Models
- **deepseek-chat** (default): General purpose conversational model
- **mistral-7b**: Financial analysis and summarization
- **openhermes-7b**: Advanced instruction following
- **deepseek-coder**: Specialized coding assistance
- **llama-7b**: Lightweight and fast responses
## Quick Start
1. Check available models: `GET /models`
2. Switch model (optional): `POST /switch-model`
3. Chat: `POST /v1/chat/completions`
4. Chat with web search: `POST /v1/web-chat/completions`
""",
version="0.0.1.2025.12.04",
contact={
"name": "API Support",
"email": "support@example.com",
},
license_info={
"name": "MIT",
},
openapi_tags=[
{
"name": "status",
"description": "System status and health checks",
},
{
"name": "models",
"description": "Model management and switching operations",
},
{
"name": "chat",
"description": "Chat completion endpoints (OpenAI-compatible)",
},
{
"name": "documentation",
"description": "API documentation and OpenAPI specification",
},
]
)
# Predefined list of available models (TheBloke only - verified, fits 18GB Space)
AVAILABLE_MODELS = {
# === General Purpose (Default) ===
"deepseek-chat": "TheBloke/deepseek-llm-7B-chat-GGUF:deepseek-llm-7b-chat.Q4_K_M.gguf",
# === Financial & Summarization Models ===
"mistral-7b": "TheBloke/Mistral-7B-Instruct-v0.2-GGUF:mistral-7b-instruct-v0.2.Q4_K_M.gguf",
"openhermes-7b": "TheBloke/OpenHermes-2.5-Mistral-7B-GGUF:openhermes-2.5-mistral-7b.Q4_K_M.gguf",
# === Coding Models ===
"deepseek-coder": "TheBloke/deepseek-coder-6.7B-instruct-GGUF:deepseek-coder-6.7b-instruct.Q4_K_M.gguf",
# === Lightweight/Fast ===
"llama-7b": "TheBloke/Llama-2-7B-Chat-GGUF:llama-2-7b-chat.Q4_K_M.gguf",
}
# Configuration
MAX_CACHED_MODELS = 2 # Maximum number of models to keep in memory
BASE_PORT = 8080 # Starting port for llama-server instances
@dataclass
class CachedModel:
"""Represents a cached model with its process and connection info."""
name: str
model_id: str
process: subprocess.Popen
port: int
url: str
last_used: float
class ModelCache:
"""
In-memory LRU cache for loaded models.
Manages multiple llama-server processes, each on a different port.
Automatically evicts least recently used models when cache is full.
"""
def __init__(self, max_size: int = MAX_CACHED_MODELS):
self.max_size = max_size
self.cache: OrderedDict[str, CachedModel] = OrderedDict()
self.port_counter = BASE_PORT
self.used_ports = set()
def _get_next_port(self) -> int:
"""Get next available port for a model."""
while self.port_counter in self.used_ports:
self.port_counter += 1
port = self.port_counter
self.used_ports.add(port)
self.port_counter += 1
return port
def _release_port(self, port: int):
"""Release a port back to the pool."""
self.used_ports.discard(port)
def _evict_lru(self):
"""Evict the least recently used model."""
if not self.cache:
return
# Get the first (oldest) item
model_name, cached_model = self.cache.popitem(last=False)
print(f"Evicting model from cache: {model_name}")
# Stop the process
try:
if os.name != 'nt':
os.killpg(os.getpgid(cached_model.process.pid), signal.SIGTERM)
else:
cached_model.process.terminate()
cached_model.process.wait(timeout=10)
except Exception as e:
print(f"Error stopping model {model_name}: {e}")
try:
if os.name != 'nt':
os.killpg(os.getpgid(cached_model.process.pid), signal.SIGKILL)
else:
cached_model.process.kill()
except:
pass
# Release the port
self._release_port(cached_model.port)
time.sleep(1)
def get(self, model_name: str) -> Optional[CachedModel]:
"""Get a model from cache, updating its last used time."""
if model_name in self.cache:
cached_model = self.cache[model_name]
cached_model.last_used = time.time()
# Move to end (most recently used)
self.cache.move_to_end(model_name)
print(f"Cache hit for model: {model_name}")
return cached_model
print(f"Cache miss for model: {model_name}")
return None
def put(self, model_name: str, model_id: str, process: subprocess.Popen, port: int):
"""Add a model to the cache."""
# Evict if cache is full
while len(self.cache) >= self.max_size:
self._evict_lru()
url = f"http://localhost:{port}"
cached_model = CachedModel(
name=model_name,
model_id=model_id,
process=process,
port=port,
url=url,
last_used=time.time()
)
self.cache[model_name] = cached_model
print(f"Cached model: {model_name} on port {port}")
def clear(self):
"""Clear all cached models."""
print("Clearing model cache...")
for model_name, cached_model in list(self.cache.items()):
try:
if os.name != 'nt':
os.killpg(os.getpgid(cached_model.process.pid), signal.SIGTERM)
else:
cached_model.process.terminate()
cached_model.process.wait(timeout=10)
except:
try:
if os.name != 'nt':
os.killpg(os.getpgid(cached_model.process.pid), signal.SIGKILL)
else:
cached_model.process.kill()
except:
pass
self._release_port(cached_model.port)
self.cache.clear()
def get_cache_info(self) -> Dict:
"""Get information about cached models."""
return {
"max_size": self.max_size,
"current_size": len(self.cache),
"cached_models": [
{
"name": name,
"port": model.port,
"url": model.url,
"last_used": model.last_used
}
for name, model in self.cache.items()
]
}
# Global state
current_model = "deepseek-chat" # Default model
model_cache = ModelCache(max_size=MAX_CACHED_MODELS)
class ModelSwitchRequest(BaseModel):
"""Request to switch the active LLM model."""
model_name: str = Field(
...,
description="Name of the model to switch to",
examples=["deepseek-chat", "mistral-7b", "deepseek-coder"]
)
model_config = {
"json_schema_extra": {
"examples": [
{"model_name": "deepseek-coder"},
{"model_name": "mistral-7b"}
]
}
}
class ChatCompletionRequest(BaseModel):
"""OpenAI-compatible chat completion request."""
messages: list[dict] = Field(
...,
description="Array of message objects with 'role' and 'content' fields",
examples=[[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"}
]]
)
max_tokens: int = Field(
default=256,
description="Maximum number of tokens to generate",
ge=1,
le=4096
)
temperature: float = Field(
default=0.7,
description="Sampling temperature (0.0 to 2.0). Higher values make output more random.",
ge=0.0,
le=2.0
)
model_config = {
"json_schema_extra": {
"examples": [
{
"messages": [
{"role": "user", "content": "What is the capital of France?"}
],
"max_tokens": 100,
"temperature": 0.7
}
]
}
}
class WebChatRequest(BaseModel):
"""Chat completion request with web search augmentation."""
messages: list[dict] = Field(
...,
description="Array of message objects. The last user message is used for web search.",
examples=[[
{"role": "user", "content": "What are the latest developments in AI?"}
]]
)
max_tokens: int = Field(
default=512,
description="Maximum number of tokens to generate",
ge=1,
le=4096
)
temperature: float = Field(
default=0.7,
description="Sampling temperature (0.0 to 2.0)",
ge=0.0,
le=2.0
)
max_search_results: int = Field(
default=5,
description="Maximum number of web search results to include in context",
ge=1,
le=10
)
model_config = {
"json_schema_extra": {
"examples": [
{
"messages": [
{"role": "user", "content": "What's the weather like today in San Francisco?"}
],
"max_tokens": 512,
"temperature": 0.7,
"max_search_results": 5
}
]
}
}
class StatusResponse(BaseModel):
"""API status response."""
status: str = Field(..., description="Current API status")
current_model: str = Field(..., description="Currently active model")
available_models: list[str] = Field(..., description="List of available models")
class ModelsResponse(BaseModel):
"""Available models response."""
current_model: str = Field(..., description="Currently active model")
available_models: list[str] = Field(..., description="List of all available models")
class ModelSwitchResponse(BaseModel):
"""Model switch response."""
message: str = Field(..., description="Status message")
model: str = Field(..., description="New active model name")
def start_llama_server(model_id: str, port: int) -> subprocess.Popen:
"""Start llama-server with specified model on a specific port."""
cmd = [
"llama-server",
"-hf", model_id,
"--host", "0.0.0.0",
"--port", str(port),
"-c", "2048", # Context size
"-t", "4", # CPU threads (adjust based on cores)
"-ngl", "0", # GPU layers (0 for CPU-only)
"--cont-batching", # Enable continuous batching for speed
"-b", "512", # Batch size
]
print(f"Starting llama-server with model: {model_id} on port {port}")
print("This may take 2-3 minutes to download and load the model...")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
preexec_fn=os.setsid if os.name != 'nt' else None,
text=True,
bufsize=1
)
# Wait for server to be ready (increased timeout for model download)
max_retries = 300 # 5 minutes
server_url = f"http://localhost:{port}"
for i in range(max_retries):
# Check if process died
if process.poll() is not None:
stdout, _ = process.communicate()
print(f"llama-server exited with code {process.returncode}")
print(f"Output: {stdout}")
raise RuntimeError("llama-server process died")
try:
# Try root endpoint instead of /health
response = requests.get(f"{server_url}/", timeout=2)
if response.status_code in [200, 404]: # 404 is ok, means server is up
print(f"llama-server ready after {i+1} seconds")
return process
except requests.exceptions.ConnectionError:
# Server not ready yet
pass
except Exception:
# Other errors, keep waiting
pass
time.sleep(1)
raise RuntimeError("llama-server failed to start within 5 minutes")
@app.on_event("startup")
async def startup_event():
"""Start with default model and cache it."""
global current_model
model_id = AVAILABLE_MODELS[current_model]
port = model_cache._get_next_port()
process = start_llama_server(model_id, port)
model_cache.put(current_model, model_id, process, port)
print(f"Started with default model: {current_model}")
@app.on_event("shutdown")
async def shutdown_event():
"""Clean shutdown - clear all cached models."""
model_cache.clear()
@app.get(
"/",
response_model=StatusResponse,
tags=["status"],
summary="API Status",
description="Get the current status of the API, including active model and available models."
)
async def root():
"""
Returns the current status of the AGI Multi-Model API.
This endpoint provides information about:
- Current API status
- Currently active LLM model
- List of all available models
"""
return {
"status": "AGI Multi-Model API with dynamic model switching and web search",
"current_model": current_model,
"available_models": list(AVAILABLE_MODELS.keys())
}
@app.get(
"/models",
response_model=ModelsResponse,
tags=["models"],
summary="List Available Models",
description="Get a list of all available LLM models and the currently active model."
)
async def list_models():
"""
List all available LLM models.
Returns:
- current_model: The model currently in use
- available_models: Array of all available model names
Use this endpoint to see which models you can switch to.
"""
return {
"current_model": current_model,
"available_models": list(AVAILABLE_MODELS.keys())
}
@app.post(
"/switch-model",
response_model=ModelSwitchResponse,
tags=["models"],
summary="Switch Active Model",
description="Switch to a different LLM model. Uses caching for instant switching to recently used models.",
responses={
200: {
"description": "Model switched successfully",
"content": {
"application/json": {
"example": {
"message": "Switched to model: deepseek-coder (from cache)",
"model": "deepseek-coder"
}
}
}
},
400: {
"description": "Invalid model name",
"content": {
"application/json": {
"example": {
"detail": "Model 'invalid-model' not found. Available: ['deepseek-chat', 'mistral-7b', ...]"
}
}
}
}
}
)
async def switch_model(request: ModelSwitchRequest):
"""
Switch to a different LLM model with intelligent caching.
**How it works:**
1. Checks if requested model is already active (no switch needed)
2. Checks cache for the model (instant switch if cached)
3. If not cached, loads the model (may take 2-3 minutes)
**Caching:**
- Up to 2 models kept in memory
- LRU (Least Recently Used) eviction policy
- Each model runs on a separate port
- Instant switching between cached models
"""
global current_model
if request.model_name not in AVAILABLE_MODELS:
raise HTTPException(
status_code=400,
detail=f"Model '{request.model_name}' not found. Available: {list(AVAILABLE_MODELS.keys())}"
)
if request.model_name == current_model:
return {"message": f"Already using model: {current_model}", "model": current_model}
# Try to get from cache
cached_model = model_cache.get(request.model_name)
if cached_model:
# Model is cached, instant switch
current_model = request.model_name
return {
"message": f"Switched to model: {current_model} (from cache)",
"model": current_model
}
# Model not cached, need to load it
model_id = AVAILABLE_MODELS[request.model_name]
port = model_cache._get_next_port()
try:
process = start_llama_server(model_id, port)
model_cache.put(request.model_name, model_id, process, port)
current_model = request.model_name
return {
"message": f"Switched to model: {current_model} (newly loaded)",
"model": current_model
}
except Exception as e:
# Release port if failed
model_cache._release_port(port)
raise HTTPException(status_code=500, detail=f"Failed to load model: {str(e)}")
@app.post(
"/v1/chat/completions",
tags=["chat"],
summary="Chat Completions",
description="OpenAI-compatible chat completions endpoint. Send messages and get AI-generated responses.",
responses={
200: {
"description": "Successful response",
"content": {
"application/json": {
"example": {
"id": "chatcmpl-123",
"object": "chat.completion",
"created": 1677652288,
"model": "deepseek-chat",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": "Hello! How can I help you today?"
},
"finish_reason": "stop"
}]
}
}
}
},
500: {
"description": "LLM server error"
}
}
)
async def chat_completions(request: ChatCompletionRequest):
"""
OpenAI-compatible chat completions endpoint.
This endpoint forwards your request to the currently active LLM model
and returns the response in OpenAI-compatible format.
**Message Format:**
```json
{
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"}
],
"max_tokens": 256,
"temperature": 0.7
}
```
**Supported Roles:**
- `system`: Sets the behavior of the assistant
- `user`: User messages
- `assistant`: Assistant responses (for multi-turn conversations)
"""
try:
# Get current model from cache
cached_model = model_cache.get(current_model)
if not cached_model:
raise HTTPException(status_code=500, detail="Current model not loaded")
# Forward to llama-server
response = requests.post(
f"{cached_model.url}/v1/chat/completions",
json={
"messages": request.messages,
"max_tokens": request.max_tokens,
"temperature": request.temperature,
},
timeout=300
)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise HTTPException(status_code=500, detail=f"llama-server error: {str(e)}")
def search_web(query: str, max_results: int = 5) -> list[dict]:
"""Search the web using DuckDuckGo and return results."""
try:
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=max_results))
return results
except Exception as e:
print(f"Search error: {e}")
return []
def format_search_context(query: str, search_results: list[dict]) -> str:
"""Format search results into context for the LLM."""
if not search_results:
return f"No web results found for: {query}"
context = f"# Web Search Results for: {query}\n\n"
for i, result in enumerate(search_results, 1):
title = result.get("title", "No title")
body = result.get("body", "No description")
url = result.get("href", "")
context += f"## Result {i}: {title}\n"
context += f"{body}\n"
if url:
context += f"Source: {url}\n"
context += "\n"
return context
@app.post(
"/v1/web-chat/completions",
tags=["chat"],
summary="Web-Augmented Chat Completions",
description="Chat completions enhanced with real-time web search. The last user message is used as a search query.",
responses={
200: {
"description": "Successful response with web search metadata",
"content": {
"application/json": {
"example": {
"id": "chatcmpl-123",
"object": "chat.completion",
"created": 1677652288,
"model": "deepseek-chat",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": "Based on recent search results, here's what I found..."
},
"finish_reason": "stop"
}],
"web_search": {
"query": "latest AI developments",
"results_count": 5,
"sources": ["https://example.com/1", "https://example.com/2"]
}
}
}
}
},
400: {
"description": "No user message found"
},
500: {
"description": "LLM server or search error"
}
}
)
async def web_chat_completions(request: WebChatRequest):
"""
Chat completions with real-time web search augmentation.
**How it works:**
1. Extracts the last user message as the search query
2. Performs a web search using DuckDuckGo
3. Injects search results into the LLM context
4. Returns the AI response with source citations
**Use cases:**
- Current events and news
- Recent information beyond the model's training data
- Fact-checking with web sources
- Research with live data
**Example:**
```json
{
"messages": [
{"role": "user", "content": "What's the latest news about SpaceX?"}
],
"max_tokens": 512,
"max_search_results": 5
}
```
The response includes a `web_search` field with metadata about sources used.
"""
try:
# Get the last user message as search query
user_messages = [msg for msg in request.messages if msg.get("role") == "user"]
if not user_messages:
raise HTTPException(status_code=400, detail="No user message found")
search_query = user_messages[-1].get("content", "")
# Perform web search
print(f"Searching web for: {search_query}")
search_results = search_web(search_query, request.max_search_results)
# Format search results as context
web_context = format_search_context(search_query, search_results)
# Create augmented messages with web context
augmented_messages = request.messages.copy()
# Insert web context as a system message before the last user message
system_prompt = {
"role": "system",
"content": f"""You are a helpful assistant with access to current web information.
{web_context}
Use the above search results to provide accurate, up-to-date information in your response.
Always cite sources when using information from the search results."""
}
# Insert system message before the last user message
augmented_messages.insert(-1, system_prompt)
# Get current model from cache
cached_model = model_cache.get(current_model)
if not cached_model:
raise HTTPException(status_code=500, detail="Current model not loaded")
# Forward to llama-server with augmented context
response = requests.post(
f"{cached_model.url}/v1/chat/completions",
json={
"messages": augmented_messages,
"max_tokens": request.max_tokens,
"temperature": request.temperature,
},
timeout=300
)
response.raise_for_status()
result = response.json()
# Add metadata about search results
result["web_search"] = {
"query": search_query,
"results_count": len(search_results),
"sources": [r.get("href", "") for r in search_results if r.get("href")]
}
return result
except requests.exceptions.RequestException as e:
raise HTTPException(status_code=500, detail=f"llama-server error: {str(e)}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error: {str(e)}")
@app.get(
"/cache/info",
tags=["models"],
summary="Get Cache Information",
description="Returns information about the model cache, including cached models and cache statistics."
)
async def get_cache_info():
"""
Get information about the in-memory model cache.
Returns:
- max_size: Maximum number of models that can be cached
- current_size: Current number of cached models
- cached_models: List of currently cached models with their metadata
**Example Response:**
```json
{
"max_size": 2,
"current_size": 2,
"cached_models": [
{
"name": "deepseek-chat",
"port": 8080,
"url": "http://localhost:8080",
"last_used": 1234567890.123
},
{
"name": "mistral-7b",
"port": 8081,
"url": "http://localhost:8081",
"last_used": 1234567895.456
}
]
}
```
"""
return model_cache.get_cache_info()
@app.get(
"/openapi.json",
tags=["documentation"],
summary="Get OpenAPI Specification",
description="Returns the complete OpenAPI 3.0 specification for this API in JSON format.",
include_in_schema=False
)
async def get_openapi_spec():
"""
Export the OpenAPI specification for this API.
This endpoint returns the complete OpenAPI 3.0 specification that can be used with:
- API documentation tools (Swagger UI, ReDoc)
- Code generators (openapi-generator, swagger-codegen)
- API testing tools (Postman, Insomnia)
- SDK generation
Save this to a file and use it with tools like:
```bash
# Generate Python client
openapi-generator generate -i openapi.json -g python -o ./client
# Generate TypeScript client
openapi-generator generate -i openapi.json -g typescript-fetch -o ./client
```
"""
return app.openapi()