File size: 7,975 Bytes
7fcdb70 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
import json
from typing import Any
import aiohttp
import modal
vllm_image = (
modal.Image.from_registry("nvidia/cuda:12.8.0-devel-ubuntu22.04", add_python="3.12")
.entrypoint([])
.uv_pip_install(
"vllm==0.11.2",
"huggingface-hub==0.36.0",
"flashinfer-python==0.5.2",
)
.env({"HF_XET_HIGH_PERFORMANCE": "1"}) # faster model transfers
)
# Lightweight image for the trace storage endpoint (doesn't need CUDA/vllm)
trace_storage_image = modal.Image.debian_slim(python_version="3.12").uv_pip_install(
"fastapi", "uvicorn"
)
MODEL_NAME = "microsoft/Fara-7B"
MODEL_REVISION = None # Use latest if no specific revision
hf_cache_vol = modal.Volume.from_name("huggingface-cache", create_if_missing=True)
vllm_cache_vol = modal.Volume.from_name("vllm-cache", create_if_missing=True)
traces_vol = modal.Volume.from_name("fara-traces", create_if_missing=True)
FAST_BOOT = True # Set to False for better performance if not cold-starting frequently
app = modal.App("fara-vllm")
MINUTES = 60 # seconds
VLLM_PORT = 5000 # Changed to 5000 as per user specification
N_GPU = 1
@app.function(
image=vllm_image,
gpu="L40S",
scaledown_window=2 * MINUTES,
timeout=10 * MINUTES,
volumes={
"/root/.cache/huggingface": hf_cache_vol,
"/root/.cache/vllm": vllm_cache_vol,
},
)
@modal.concurrent(max_inputs=32)
@modal.web_server(
port=VLLM_PORT, startup_timeout=10 * MINUTES, requires_proxy_auth=True
)
def serve():
import subprocess
cmd = [
"vllm",
"serve",
"--uvicorn-log-level=info",
MODEL_NAME,
"--served-model-name",
MODEL_NAME,
"--host",
"0.0.0.0",
"--port",
str(VLLM_PORT),
"--dtype",
"auto", # As per user specification
"--max-model-len",
"32768", # Limit context length to fit in GPU memory (default 128000 too large)
]
if MODEL_REVISION:
cmd += ["--revision", MODEL_REVISION]
# enforce-eager disables both Torch compilation and CUDA graph capture
cmd += ["--enforce-eager" if FAST_BOOT else "--no-enforce-eager"]
# assume multiple GPUs are for splitting up large matrix multiplications
cmd += ["--tensor-parallel-size", str(N_GPU)]
print(cmd)
subprocess.Popen(" ".join(cmd), shell=True)
@app.function(
image=trace_storage_image,
volumes={"/traces": traces_vol},
timeout=2 * MINUTES,
)
@modal.fastapi_endpoint(method="POST", requires_proxy_auth=True)
def store_trace(trace_data: dict) -> dict:
"""
Store a task trace JSON in the Modal volume.
If a trace with the same ID and instruction already exists, it will be overwritten.
Expected trace_data structure:
{
"trace": { id, timestamp, instruction, modelId, isRunning },
"completion": { status, message, finalAnswer },
"metadata": { traceId, inputTokensUsed, outputTokensUsed, ... user_evaluation },
"steps": [...],
"exportedAt": "ISO timestamp"
}
"""
import glob
import os
from datetime import datetime
try:
# Extract trace ID and instruction for duplicate detection
trace_id = trace_data.get("trace", {}).get("id", "unknown")
instruction = trace_data.get("trace", {}).get("instruction", "")
# Create organized directory structure: /traces/YYYY-MM/
date_folder = datetime.now().strftime("%Y-%m")
trace_dir = f"/traces/{date_folder}"
os.makedirs(trace_dir, exist_ok=True)
# Check for existing trace with same ID (in all monthly folders)
existing_file = None
for monthly_dir in glob.glob("/traces/*/"):
for filepath in glob.glob(f"{monthly_dir}*_{trace_id}.json"):
# Found an existing file with this trace ID
# Verify it's the same trace by checking instruction
try:
with open(filepath, "r") as f:
existing_data = json.load(f)
existing_instruction = existing_data.get("trace", {}).get(
"instruction", ""
)
if existing_instruction == instruction:
existing_file = filepath
break
except (json.JSONDecodeError, IOError):
# If we can't read the file, skip it
continue
if existing_file:
break
if existing_file:
# Overwrite the existing file
filepath = existing_file
print(f"Overwriting existing trace: {filepath}")
else:
# Generate new filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{timestamp}_{trace_id}.json"
filepath = f"{trace_dir}/{filename}"
# Write trace to file
with open(filepath, "w") as f:
json.dump(trace_data, f, indent=2, default=str)
# Commit volume changes
traces_vol.commit()
return {
"success": True,
"message": "Trace stored successfully"
if not existing_file
else "Trace updated successfully",
"filepath": filepath,
"trace_id": trace_id,
"was_overwritten": existing_file is not None,
}
except Exception as e:
return {
"success": False,
"error": str(e),
}
@app.local_entrypoint()
async def test(test_timeout=10 * MINUTES, content=None, twice=True):
url = serve.get_web_url()
system_prompt = {
"role": "system",
"content": "You are an AI assistant specialized in computer use tasks.",
}
if content is None:
content = "Hello, what can you do to help with computer tasks?"
messages = [ # OpenAI chat format
system_prompt,
{"role": "user", "content": content},
]
async with aiohttp.ClientSession(base_url=url) as session:
print(f"Running health check for server at {url}")
async with session.get("/health", timeout=test_timeout - 1 * MINUTES) as resp:
up = resp.status == 200
assert up, f"Failed health check for server at {url}"
print(f"Successful health check for server at {url}")
print(f"Sending messages to {url}:", *messages, sep="\n\t")
await _send_request(session, MODEL_NAME, messages)
if twice:
messages[0]["content"] = "You are a helpful assistant."
print(f"Sending messages to {url}:", *messages, sep="\n\t")
await _send_request(session, MODEL_NAME, messages)
async def _send_request(
session: aiohttp.ClientSession, model: str, messages: list
) -> None:
# \`stream=True\` tells an OpenAI-compatible backend to stream chunks
payload: dict[str, Any] = {"messages": messages, "model": model, "stream": True}
headers = {"Content-Type": "application/json", "Accept": "text/event-stream"}
async with session.post(
"/v1/chat/completions", json=payload, headers=headers, timeout=1 * MINUTES
) as resp:
async for raw in resp.content:
resp.raise_for_status()
# extract new content and stream it
line = raw.decode().strip()
if not line or line == "data: [DONE]":
continue
if line.startswith("data: "): # SSE prefix
line = line[len("data: ") :]
chunk = json.loads(line)
assert (
chunk["object"] == "chat.completion.chunk"
) # or something went horribly wrong
print(chunk["choices"][0]["delta"]["content"], end="")
print()
|