File size: 6,645 Bytes
faa740f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
"""

Realtime LLM-based route decision script for mock sensor data streams.



Designed to run outside the main project. It reads sensor payloads (JSON per line),

formats a prompt for a Llama-family model (e.g., via Ollama), and logs the AI's

route recommendation plus warnings/actions for each update.

"""
import argparse
import json
import sys
import textwrap
import time
from pathlib import Path
from typing import Dict, Any, List, Optional

import requests


PROMPT_TEMPLATE = """You are an emergency evacuation strategist using live sensor and route telemetry.

Evaluate the candidate routes and choose the safest *passable* option.



Decision rules:

1. Reject any route marked passable=false or containing simultaneous fire and oxygen-cylinder hazards.

2. Avoid rooms with biohazard alerts unless respirators are available; penalize heavily if no PPE.

3. Prefer routes with lower average danger and lower max danger; break ties by shorter path_length.

4. Flag any oxygen cylinders near heat (>40°C) or fire for operator intervention.

5. Provide backup route only if it is passable and within 15 danger points of the best route.



Data:

{data}



Respond in JSON with this schema:

{{

  "recommended_route": "<route_id>",

  "confidence": "high|medium|low",

  "warnings": ["..."],

  "actions": ["..."],

  "backup_route": "<route_id or null>"

}}



If no safe route exists, set recommended_route=null and explain why in warnings.

"""


def load_event_stream(path: Path):
    """Yield parsed JSON objects from a file (one JSON object per line)."""
    with path.open("r", encoding="utf-8") as f:
        for line_num, line in enumerate(f, start=1):
            line = line.strip()
            if not line:
                continue
            try:
                yield json.loads(line)
            except json.JSONDecodeError as exc:
                raise ValueError(f"Invalid JSON on line {line_num}: {exc}") from exc


def format_payload(event: Dict[str, Any]) -> str:
    """Pretty-print the incoming event for the LLM prompt."""
    return json.dumps(event, indent=2, ensure_ascii=False)


def call_llm(model: str, prompt: str, server_url: str) -> str:
    """

    Call a Llama-family model via Ollama (or any compatible endpoint).



    server_url defaults to http://localhost:11434/api/generate.

    """
    payload = {
        "model": model,
        "prompt": prompt,
        "stream": False,
    }
    response = requests.post(server_url, json=payload, timeout=120)
    response.raise_for_status()
    data = response.json()

    # Ollama returns {"response": "...", "done": true, ...}
    if "response" in data:
        return data["response"]
    # Fallback for other APIs
    return data.get("text") or json.dumps(data)


def parse_llm_output(text_output: str) -> Dict[str, Any]:
    """Attempt to parse the LLM JSON; fall back to error record."""
    text_output = text_output.strip()
    try:
        return json.loads(text_output)
    except json.JSONDecodeError:
        return {
            "recommended_route": None,
            "confidence": "low",
            "warnings": ["LLM returned non-JSON output", text_output],
            "actions": [],
            "backup_route": None,
        }


def process_event(event: Dict[str, Any], args) -> Dict[str, Any]:
    """Send single event to LLM and return structured decision."""
    payload_str = format_payload(event)
    prompt = PROMPT_TEMPLATE.format(data=payload_str)
    raw_output = call_llm(args.model, prompt, args.server_url)
    decision = parse_llm_output(raw_output)
    return {
        "timestamp": event.get("timestamp_sec"),
        "decision": decision,
        "raw_prompt": prompt if args.debug else None,
        "raw_output": raw_output if args.debug else None,
    }


def log_decision(result: Dict[str, Any]):
    """Print a concise summary to stdout."""
    decision = result["decision"]
    ts = result["timestamp"]
    header = f"[t={ts}s]" if ts is not None else "[t=?]"
    print(f"{header} recommended_route={decision.get('recommended_route')} "
          f"confidence={decision.get('confidence')}")
    if decision.get("warnings"):
        print("  warnings:")
        for warning in decision["warnings"]:
            wrapped = textwrap.fill(warning, width=78, subsequent_indent="    ")
            print(f"    - {wrapped}")
    if decision.get("actions"):
        print("  actions:")
        for action in decision["actions"]:
            wrapped = textwrap.fill(action, width=78, subsequent_indent="    ")
            print(f"    - {wrapped}")
    if decision.get("backup_route"):
        print(f"  backup_route: {decision['backup_route']}")
    print()


def build_arg_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        description="Stream sensor events to a Llama model for route decisions.")
    parser.add_argument("--input", required=True,
                        help="Path to JSONL file containing mock sensor events.")
    parser.add_argument("--model", default="llama3",
                        help="Model name served by Ollama (default: llama3).")
    parser.add_argument("--server-url", default="http://localhost:11434/api/generate",
                        help="Generation endpoint URL.")
    parser.add_argument("--delay", type=float, default=0.0,
                        help="Seconds to wait between events (simulate realtime).")
    parser.add_argument("--debug", action="store_true",
                        help="Print raw prompt/output for troubleshooting.")
    return parser


def main(argv: Optional[List[str]] = None):
    parser = build_arg_parser()
    args = parser.parse_args(argv)

    input_path = Path(args.input)
    if not input_path.exists():
        parser.error(f"Input file not found: {input_path}")

    try:
        for event in load_event_stream(input_path):
            result = process_event(event, args)
            log_decision(result)

            if args.debug:
                print("----- RAW PROMPT -----")
                print(result["raw_prompt"])
                print("----- RAW OUTPUT -----")
                print(result["raw_output"])
                print("----------------------\n")

            if args.delay > 0:
                time.sleep(args.delay)
    except KeyboardInterrupt:
        print("\nInterrupted by user.", file=sys.stderr)
    except Exception as exc:
        print(f"ERROR: {exc}", file=sys.stderr)
        sys.exit(1)


if __name__ == "__main__":
    main()