File size: 6,645 Bytes
faa740f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
"""
Realtime LLM-based route decision script for mock sensor data streams.
Designed to run outside the main project. It reads sensor payloads (JSON per line),
formats a prompt for a Llama-family model (e.g., via Ollama), and logs the AI's
route recommendation plus warnings/actions for each update.
"""
import argparse
import json
import sys
import textwrap
import time
from pathlib import Path
from typing import Dict, Any, List, Optional
import requests
PROMPT_TEMPLATE = """You are an emergency evacuation strategist using live sensor and route telemetry.
Evaluate the candidate routes and choose the safest *passable* option.
Decision rules:
1. Reject any route marked passable=false or containing simultaneous fire and oxygen-cylinder hazards.
2. Avoid rooms with biohazard alerts unless respirators are available; penalize heavily if no PPE.
3. Prefer routes with lower average danger and lower max danger; break ties by shorter path_length.
4. Flag any oxygen cylinders near heat (>40°C) or fire for operator intervention.
5. Provide backup route only if it is passable and within 15 danger points of the best route.
Data:
{data}
Respond in JSON with this schema:
{{
"recommended_route": "<route_id>",
"confidence": "high|medium|low",
"warnings": ["..."],
"actions": ["..."],
"backup_route": "<route_id or null>"
}}
If no safe route exists, set recommended_route=null and explain why in warnings.
"""
def load_event_stream(path: Path):
"""Yield parsed JSON objects from a file (one JSON object per line)."""
with path.open("r", encoding="utf-8") as f:
for line_num, line in enumerate(f, start=1):
line = line.strip()
if not line:
continue
try:
yield json.loads(line)
except json.JSONDecodeError as exc:
raise ValueError(f"Invalid JSON on line {line_num}: {exc}") from exc
def format_payload(event: Dict[str, Any]) -> str:
"""Pretty-print the incoming event for the LLM prompt."""
return json.dumps(event, indent=2, ensure_ascii=False)
def call_llm(model: str, prompt: str, server_url: str) -> str:
"""
Call a Llama-family model via Ollama (or any compatible endpoint).
server_url defaults to http://localhost:11434/api/generate.
"""
payload = {
"model": model,
"prompt": prompt,
"stream": False,
}
response = requests.post(server_url, json=payload, timeout=120)
response.raise_for_status()
data = response.json()
# Ollama returns {"response": "...", "done": true, ...}
if "response" in data:
return data["response"]
# Fallback for other APIs
return data.get("text") or json.dumps(data)
def parse_llm_output(text_output: str) -> Dict[str, Any]:
"""Attempt to parse the LLM JSON; fall back to error record."""
text_output = text_output.strip()
try:
return json.loads(text_output)
except json.JSONDecodeError:
return {
"recommended_route": None,
"confidence": "low",
"warnings": ["LLM returned non-JSON output", text_output],
"actions": [],
"backup_route": None,
}
def process_event(event: Dict[str, Any], args) -> Dict[str, Any]:
"""Send single event to LLM and return structured decision."""
payload_str = format_payload(event)
prompt = PROMPT_TEMPLATE.format(data=payload_str)
raw_output = call_llm(args.model, prompt, args.server_url)
decision = parse_llm_output(raw_output)
return {
"timestamp": event.get("timestamp_sec"),
"decision": decision,
"raw_prompt": prompt if args.debug else None,
"raw_output": raw_output if args.debug else None,
}
def log_decision(result: Dict[str, Any]):
"""Print a concise summary to stdout."""
decision = result["decision"]
ts = result["timestamp"]
header = f"[t={ts}s]" if ts is not None else "[t=?]"
print(f"{header} recommended_route={decision.get('recommended_route')} "
f"confidence={decision.get('confidence')}")
if decision.get("warnings"):
print(" warnings:")
for warning in decision["warnings"]:
wrapped = textwrap.fill(warning, width=78, subsequent_indent=" ")
print(f" - {wrapped}")
if decision.get("actions"):
print(" actions:")
for action in decision["actions"]:
wrapped = textwrap.fill(action, width=78, subsequent_indent=" ")
print(f" - {wrapped}")
if decision.get("backup_route"):
print(f" backup_route: {decision['backup_route']}")
print()
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Stream sensor events to a Llama model for route decisions.")
parser.add_argument("--input", required=True,
help="Path to JSONL file containing mock sensor events.")
parser.add_argument("--model", default="llama3",
help="Model name served by Ollama (default: llama3).")
parser.add_argument("--server-url", default="http://localhost:11434/api/generate",
help="Generation endpoint URL.")
parser.add_argument("--delay", type=float, default=0.0,
help="Seconds to wait between events (simulate realtime).")
parser.add_argument("--debug", action="store_true",
help="Print raw prompt/output for troubleshooting.")
return parser
def main(argv: Optional[List[str]] = None):
parser = build_arg_parser()
args = parser.parse_args(argv)
input_path = Path(args.input)
if not input_path.exists():
parser.error(f"Input file not found: {input_path}")
try:
for event in load_event_stream(input_path):
result = process_event(event, args)
log_decision(result)
if args.debug:
print("----- RAW PROMPT -----")
print(result["raw_prompt"])
print("----- RAW OUTPUT -----")
print(result["raw_output"])
print("----------------------\n")
if args.delay > 0:
time.sleep(args.delay)
except KeyboardInterrupt:
print("\nInterrupted by user.", file=sys.stderr)
except Exception as exc:
print(f"ERROR: {exc}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
|