|
|
""" |
|
|
Sensor System Module - Simulates and manages sensor data for fire, smoke, temperature, oxygen |
|
|
""" |
|
|
import random |
|
|
import numpy as np |
|
|
from typing import Dict, List |
|
|
from .floor_plan import FloorPlan |
|
|
|
|
|
|
|
|
class SensorReading: |
|
|
"""Represents sensor readings for a specific location""" |
|
|
def __init__(self, location_id: str): |
|
|
self.location_id = location_id |
|
|
|
|
|
|
|
|
self.fire_detected = False |
|
|
self.smoke_level = 0.0 |
|
|
self.temperature = 20.0 |
|
|
self.oxygen_level = 21.0 |
|
|
self.visibility = 100.0 |
|
|
self.structural_integrity = 100.0 |
|
|
|
|
|
|
|
|
self.fire_growth_rate = 0.0 |
|
|
self.flashover_risk = 0.0 |
|
|
self.backdraft_risk = 0.0 |
|
|
self.heat_radiation = 0.0 |
|
|
self.fire_type = "none" |
|
|
|
|
|
|
|
|
self.carbon_monoxide = 0.0 |
|
|
self.carbon_dioxide = 400.0 |
|
|
self.hydrogen_cyanide = 0.0 |
|
|
self.hydrogen_chloride = 0.0 |
|
|
|
|
|
|
|
|
self.wind_direction = 0.0 |
|
|
self.wind_speed = 0.0 |
|
|
self.air_pressure = 1013.25 |
|
|
self.humidity = 50.0 |
|
|
|
|
|
|
|
|
self.occupancy_density = 0.0 |
|
|
self.mobility_limitations = 0 |
|
|
self.panic_level = 0.0 |
|
|
self.evacuation_progress = 0.0 |
|
|
|
|
|
|
|
|
self.sprinkler_active = False |
|
|
self.emergency_lighting = True |
|
|
self.elevator_available = True |
|
|
self.stairwell_clear = True |
|
|
self.exit_accessible = True |
|
|
self.exit_capacity = 100 |
|
|
self.ventilation_active = True |
|
|
|
|
|
|
|
|
self.time_since_fire_start = 0 |
|
|
self.estimated_time_to_exit = 0 |
|
|
|
|
|
|
|
|
self.emergency_comm_working = True |
|
|
self.wifi_signal_strength = 100.0 |
|
|
|
|
|
|
|
|
self.weather_temperature = 20.0 |
|
|
self.weather_rain = False |
|
|
self.time_of_day = 12 |
|
|
self.day_of_week = 1 |
|
|
|
|
|
def calculate_danger_score(self) -> float: |
|
|
""" |
|
|
Calculate enhanced danger score with all real-world factors |
|
|
Returns: 0.0 (safe) to 100.0 (extremely dangerous) |
|
|
""" |
|
|
score = 0.0 |
|
|
|
|
|
|
|
|
|
|
|
if self.fire_detected: |
|
|
score += 40.0 |
|
|
|
|
|
|
|
|
score += self.smoke_level * 20.0 |
|
|
|
|
|
|
|
|
if self.temperature > 60: |
|
|
score += min((self.temperature - 60) / 2, 20.0) |
|
|
|
|
|
|
|
|
if self.oxygen_level < 19.5: |
|
|
score += (19.5 - self.oxygen_level) * 5.0 |
|
|
|
|
|
|
|
|
score += (100 - self.visibility) * 0.1 |
|
|
|
|
|
|
|
|
if self.structural_integrity < 80: |
|
|
score += (100 - self.structural_integrity) * 0.2 |
|
|
|
|
|
|
|
|
|
|
|
if self.carbon_monoxide > 50: |
|
|
score += min(self.carbon_monoxide / 5, 30.0) |
|
|
elif self.carbon_monoxide > 9: |
|
|
score += (self.carbon_monoxide - 9) * 0.5 |
|
|
|
|
|
|
|
|
if self.carbon_dioxide > 5000: |
|
|
score += min((self.carbon_dioxide - 5000) / 200, 20.0) |
|
|
|
|
|
|
|
|
if self.hydrogen_cyanide > 20: |
|
|
score += min(self.hydrogen_cyanide * 1.5, 25.0) |
|
|
|
|
|
|
|
|
if self.hydrogen_chloride > 5: |
|
|
score += min(self.hydrogen_chloride * 2, 15.0) |
|
|
|
|
|
|
|
|
if self.flashover_risk > 0.7: |
|
|
score += 25.0 |
|
|
elif self.flashover_risk > 0.5: |
|
|
score += 15.0 |
|
|
elif self.flashover_risk > 0.3: |
|
|
score += 8.0 |
|
|
|
|
|
|
|
|
if self.backdraft_risk > 0.6: |
|
|
score += 20.0 |
|
|
elif self.backdraft_risk > 0.4: |
|
|
score += 10.0 |
|
|
|
|
|
|
|
|
if self.occupancy_density > 0.8: |
|
|
score += (self.occupancy_density - 0.8) * 75.0 |
|
|
elif self.occupancy_density > 0.6: |
|
|
score += (self.occupancy_density - 0.6) * 30.0 |
|
|
|
|
|
|
|
|
if not self.exit_accessible: |
|
|
score += 20.0 |
|
|
|
|
|
if not self.stairwell_clear: |
|
|
score += 15.0 |
|
|
|
|
|
|
|
|
if self.time_since_fire_start > 300: |
|
|
score += min((self.time_since_fire_start - 300) / 20, 15.0) |
|
|
|
|
|
|
|
|
if not self.emergency_lighting: |
|
|
score += 5.0 |
|
|
|
|
|
if not self.emergency_comm_working: |
|
|
score += 3.0 |
|
|
|
|
|
|
|
|
if self.fire_growth_rate > 10: |
|
|
score += min(self.fire_growth_rate / 2, 12.0) |
|
|
|
|
|
|
|
|
if self.heat_radiation > 2.5: |
|
|
score += min((self.heat_radiation - 2.5) * 3, 10.0) |
|
|
|
|
|
return min(score, 100.0) |
|
|
|
|
|
def is_passable(self) -> bool: |
|
|
"""Determine if this location is passable""" |
|
|
danger = self.calculate_danger_score() |
|
|
|
|
|
return danger < 70.0 and self.structural_integrity > 50.0 |
|
|
|
|
|
def __repr__(self): |
|
|
return (f"SensorReading({self.location_id}: " |
|
|
f"Fire={self.fire_detected}, " |
|
|
f"Smoke={self.smoke_level:.2f}, " |
|
|
f"Temp={self.temperature:.1f}°C, " |
|
|
f"Danger={self.calculate_danger_score():.1f})") |
|
|
|
|
|
|
|
|
class SensorSystem: |
|
|
"""Manages all sensors in the building""" |
|
|
def __init__(self, floor_plan: FloorPlan): |
|
|
self.floor_plan = floor_plan |
|
|
self.sensors: Dict[str, SensorReading] = {} |
|
|
self._initialize_sensors() |
|
|
|
|
|
def _initialize_sensors(self): |
|
|
"""Initialize sensors for all rooms with realistic mock data""" |
|
|
import random |
|
|
|
|
|
for room_id in self.floor_plan.rooms: |
|
|
sensor = SensorReading(room_id) |
|
|
|
|
|
|
|
|
sensor.fire_detected = False |
|
|
sensor.smoke_level = random.uniform(0.0, 0.1) |
|
|
sensor.temperature = random.uniform(18, 25) |
|
|
sensor.oxygen_level = random.uniform(20.5, 21.0) |
|
|
sensor.visibility = random.uniform(95, 100) |
|
|
sensor.structural_integrity = 100.0 |
|
|
|
|
|
|
|
|
sensor.fire_growth_rate = 0.0 |
|
|
sensor.flashover_risk = 0.0 |
|
|
sensor.backdraft_risk = 0.0 |
|
|
sensor.heat_radiation = 0.0 |
|
|
sensor.fire_type = "none" |
|
|
|
|
|
|
|
|
sensor.carbon_monoxide = random.uniform(0, 5) |
|
|
sensor.carbon_dioxide = random.uniform(350, 450) |
|
|
sensor.hydrogen_cyanide = 0.0 |
|
|
sensor.hydrogen_chloride = 0.0 |
|
|
|
|
|
|
|
|
sensor.wind_direction = random.uniform(0, 360) |
|
|
sensor.wind_speed = random.uniform(0, 3) |
|
|
sensor.air_pressure = random.uniform(1010, 1020) |
|
|
sensor.humidity = random.uniform(40, 60) |
|
|
|
|
|
|
|
|
if "EXIT" in room_id: |
|
|
sensor.occupancy_density = random.uniform(0.1, 0.3) |
|
|
elif "C" in room_id: |
|
|
sensor.occupancy_density = random.uniform(0.2, 0.5) |
|
|
else: |
|
|
sensor.occupancy_density = random.uniform(0.3, 0.7) |
|
|
|
|
|
sensor.mobility_limitations = random.randint(0, 2) |
|
|
sensor.panic_level = random.uniform(0.0, 0.2) |
|
|
sensor.evacuation_progress = 0.0 |
|
|
|
|
|
|
|
|
sensor.sprinkler_active = True |
|
|
sensor.emergency_lighting = True |
|
|
sensor.elevator_available = True |
|
|
sensor.stairwell_clear = True |
|
|
sensor.exit_accessible = True |
|
|
sensor.exit_capacity = random.randint(80, 120) |
|
|
sensor.ventilation_active = True |
|
|
|
|
|
|
|
|
sensor.time_since_fire_start = 0 |
|
|
sensor.estimated_time_to_exit = random.randint(30, 180) |
|
|
|
|
|
|
|
|
sensor.emergency_comm_working = True |
|
|
sensor.wifi_signal_strength = random.uniform(70, 100) |
|
|
|
|
|
|
|
|
sensor.weather_temperature = random.uniform(15, 25) |
|
|
sensor.weather_rain = random.choice([True, False]) |
|
|
sensor.time_of_day = random.randint(8, 18) |
|
|
sensor.day_of_week = random.randint(0, 6) |
|
|
|
|
|
self.sensors[room_id] = sensor |
|
|
|
|
|
def update_sensor(self, location_id: str, **kwargs): |
|
|
"""Update sensor readings for a specific location""" |
|
|
if location_id in self.sensors: |
|
|
sensor = self.sensors[location_id] |
|
|
for key, value in kwargs.items(): |
|
|
if hasattr(sensor, key): |
|
|
setattr(sensor, key, value) |
|
|
|
|
|
def get_sensor_reading(self, location_id: str) -> SensorReading: |
|
|
"""Get current sensor reading for a location""" |
|
|
return self.sensors.get(location_id) |
|
|
|
|
|
def simulate_fire_scenario(self, fire_locations: List[str], |
|
|
affected_areas: Dict[str, Dict] = None): |
|
|
""" |
|
|
Simulate a fire scenario with specified fire locations and affected areas |
|
|
|
|
|
Args: |
|
|
fire_locations: List of room IDs where fire started |
|
|
affected_areas: Dict of room_id -> sensor values for affected areas |
|
|
""" |
|
|
|
|
|
self._initialize_sensors() |
|
|
|
|
|
|
|
|
for location in fire_locations: |
|
|
if location in self.sensors: |
|
|
self.update_sensor( |
|
|
location, |
|
|
fire_detected=True, |
|
|
smoke_level=0.9, |
|
|
temperature=200.0 + random.uniform(-20, 50), |
|
|
oxygen_level=15.0, |
|
|
visibility=10.0, |
|
|
structural_integrity=70.0 |
|
|
) |
|
|
|
|
|
|
|
|
if affected_areas: |
|
|
for location, values in affected_areas.items(): |
|
|
if location in self.sensors: |
|
|
self.update_sensor(location, **values) |
|
|
|
|
|
|
|
|
self._simulate_smoke_spread(fire_locations) |
|
|
|
|
|
def _simulate_smoke_spread(self, fire_locations: List[str]): |
|
|
"""Simulate smoke spreading to adjacent areas""" |
|
|
for fire_loc in fire_locations: |
|
|
neighbors = self.floor_plan.get_neighbors(fire_loc) |
|
|
for neighbor_id, _ in neighbors: |
|
|
if neighbor_id in self.sensors and not self.sensors[neighbor_id].fire_detected: |
|
|
|
|
|
current = self.sensors[neighbor_id] |
|
|
self.update_sensor( |
|
|
neighbor_id, |
|
|
smoke_level=min(current.smoke_level + random.uniform(0.3, 0.6), 1.0), |
|
|
temperature=current.temperature + random.uniform(20, 40), |
|
|
visibility=max(current.visibility - random.uniform(20, 40), 20.0), |
|
|
oxygen_level=max(current.oxygen_level - random.uniform(1, 3), 16.0) |
|
|
) |
|
|
|
|
|
def get_all_readings(self) -> Dict[str, SensorReading]: |
|
|
"""Get all sensor readings""" |
|
|
return self.sensors |
|
|
|
|
|
def print_status(self): |
|
|
"""Print status of all sensors""" |
|
|
print(f"\n{'='*80}") |
|
|
print(f"SENSOR SYSTEM STATUS") |
|
|
print(f"{'='*80}") |
|
|
for location_id, reading in sorted(self.sensors.items()): |
|
|
danger = reading.calculate_danger_score() |
|
|
status = "SAFE" if danger < 30 else "WARNING" if danger < 70 else "DANGER" |
|
|
print(f"{location_id:10} | {status:7} | {reading}") |
|
|
|
|
|
|
|
|
def create_sample_fire_scenario(floor_plan: FloorPlan) -> SensorSystem: |
|
|
""" |
|
|
Create a sample fire scenario with 3 routes of varying danger |
|
|
|
|
|
Scenario: |
|
|
- Route 1 (via C1, C4 to EXIT1): Has oxygen cylinder (explosion risk) but less fire |
|
|
- Route 2 (via C2, C5, C6 to EXIT2): Has moderate fire |
|
|
- Route 3 (via C7 to EXIT3): Has heavy fire blocking path |
|
|
""" |
|
|
sensor_system = SensorSystem(floor_plan) |
|
|
|
|
|
|
|
|
fire_locations = ["R2", "C5", "C7"] |
|
|
|
|
|
|
|
|
affected_areas = { |
|
|
|
|
|
"C1": { |
|
|
"fire_detected": False, |
|
|
"smoke_level": 0.4, |
|
|
"temperature": 45.0, |
|
|
"oxygen_level": 20.5, |
|
|
"visibility": 60.0, |
|
|
"structural_integrity": 95.0 |
|
|
}, |
|
|
"C4": { |
|
|
"fire_detected": False, |
|
|
"smoke_level": 0.5, |
|
|
"temperature": 50.0, |
|
|
"oxygen_level": 20.0, |
|
|
"visibility": 50.0, |
|
|
"structural_integrity": 90.0 |
|
|
}, |
|
|
|
|
|
|
|
|
"C2": { |
|
|
"fire_detected": False, |
|
|
"smoke_level": 0.6, |
|
|
"temperature": 65.0, |
|
|
"oxygen_level": 18.5, |
|
|
"visibility": 40.0, |
|
|
"structural_integrity": 85.0 |
|
|
}, |
|
|
"C6": { |
|
|
"fire_detected": True, |
|
|
"smoke_level": 0.8, |
|
|
"temperature": 150.0, |
|
|
"oxygen_level": 16.0, |
|
|
"visibility": 20.0, |
|
|
"structural_integrity": 75.0 |
|
|
}, |
|
|
|
|
|
|
|
|
"R3": { |
|
|
"fire_detected": False, |
|
|
"smoke_level": 0.7, |
|
|
"temperature": 80.0, |
|
|
"oxygen_level": 17.5, |
|
|
"visibility": 30.0, |
|
|
"structural_integrity": 80.0 |
|
|
}, |
|
|
|
|
|
|
|
|
"R1": { |
|
|
"fire_detected": False, |
|
|
"smoke_level": 0.2, |
|
|
"temperature": 35.0, |
|
|
"oxygen_level": 20.5, |
|
|
"visibility": 80.0, |
|
|
"structural_integrity": 100.0 |
|
|
}, |
|
|
|
|
|
|
|
|
"EXIT1": { |
|
|
"fire_detected": False, |
|
|
"smoke_level": 0.1, |
|
|
"temperature": 25.0, |
|
|
"oxygen_level": 21.0, |
|
|
"visibility": 100.0, |
|
|
"structural_integrity": 100.0 |
|
|
}, |
|
|
"EXIT2": { |
|
|
"fire_detected": False, |
|
|
"smoke_level": 0.3, |
|
|
"temperature": 40.0, |
|
|
"oxygen_level": 20.0, |
|
|
"visibility": 70.0, |
|
|
"structural_integrity": 100.0 |
|
|
}, |
|
|
"EXIT3": { |
|
|
"fire_detected": False, |
|
|
"smoke_level": 0.4, |
|
|
"temperature": 45.0, |
|
|
"oxygen_level": 19.5, |
|
|
"visibility": 60.0, |
|
|
"structural_integrity": 100.0 |
|
|
} |
|
|
} |
|
|
|
|
|
sensor_system.simulate_fire_scenario(fire_locations, affected_areas) |
|
|
|
|
|
return sensor_system |
|
|
|
|
|
|