Spaces:
Running
on
Zero
Running
on
Zero
| import logging | |
| import traceback | |
| import numpy as np | |
| from typing import Dict, List, Any, Optional | |
| logger = logging.getLogger(__name__) | |
| class PatternAnalyzer: | |
| """ | |
| 負責各種模式分析,包含交通流動、行人穿越、車輛分佈等的辨識 | |
| 專門處理動態區域和移動相關的區域分析 | |
| """ | |
| def __init__(self): | |
| """初始化模式分析器""" | |
| try: | |
| logger.info("PatternAnalyzer initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize PatternAnalyzer: {str(e)}") | |
| logger.error(traceback.format_exc()) | |
| raise | |
| def analyze_crossing_patterns(self, pedestrians: List[Dict], traffic_lights: List[Dict]) -> Dict: | |
| """ | |
| Analyze pedestrian crossing patterns to identify crossing zones. | |
| 若同一 region 中同時有行人與紅綠燈,則將兩者都放入該區域的 objects。 | |
| Args: | |
| pedestrians: 行人物件列表(每個 obj 應包含 'class_id', 'region', 'confidence' 等) | |
| traffic_lights: 紅綠燈物件列表(每個 obj 應包含 'class_id', 'region', 'confidence' 等) | |
| Returns: | |
| crossing_zones: 字典,key 為 zone 名稱,value 包含 'region', 'objects', 'description' | |
| """ | |
| try: | |
| crossing_zones = {} | |
| # 如果沒有任何行人,就不辨識任何 crossing zone | |
| if not pedestrians: | |
| return crossing_zones | |
| # (1) 按照 region 分組行人 | |
| pedestrian_regions = {} | |
| for p in pedestrians: | |
| region = p["region"] | |
| pedestrian_regions.setdefault(region, []).append(p) | |
| # (2) 針對每個 region,看是否同時有紅綠燈 | |
| # 建立一個對照表 mapping: region -> { "pedestrians": [...], "traffic_lights": [...] } | |
| combined_regions = {} | |
| for region, peds in pedestrian_regions.items(): | |
| # 取得該 region 下所有紅綠燈 | |
| tls_in_region = [t for t in traffic_lights if t["region"] == region] | |
| combined_regions[region] = { | |
| "pedestrians": peds, | |
| "traffic_lights": tls_in_region | |
| } | |
| # (3) 按照行人數量排序,找出前兩個需要建立 crossing zone 的 region | |
| sorted_regions = sorted( | |
| combined_regions.items(), | |
| key=lambda x: len(x[1]["pedestrians"]), | |
| reverse=True | |
| ) | |
| # (4) 將前兩個 region 建立 Crossing Zone,objects 同時包含行人與紅綠燈 | |
| for idx, (region, group) in enumerate(sorted_regions[:2]): | |
| peds = group["pedestrians"] | |
| tls = group["traffic_lights"] | |
| has_nearby_signals = len(tls) > 0 | |
| # 生成 zone_name(基於 region 方向 + idx 決定主/次 crossing) | |
| direction = self._get_directional_description_local(region) | |
| if direction and direction != "central": | |
| zone_name = f"{direction} crossing area" | |
| else: | |
| zone_name = "main crossing area" if idx == 0 else "secondary crossing area" | |
| # 組合 description | |
| description = f"Pedestrian crossing area with {len(peds)} " | |
| description += "person" if len(peds) == 1 else "people" | |
| if direction: | |
| description += f" in {direction} direction" | |
| if has_nearby_signals: | |
| description += " near traffic signals" | |
| # 將行人 + 同區紅綠燈一併放入 objects | |
| obj_list = ["pedestrian"] * len(peds) | |
| if has_nearby_signals: | |
| obj_list += ["traffic light"] * len(tls) | |
| crossing_zones[zone_name] = { | |
| "region": region, | |
| "objects": obj_list, | |
| "description": description | |
| } | |
| return crossing_zones | |
| except Exception as e: | |
| logger.error(f"Error in analyze_crossing_patterns: {str(e)}") | |
| logger.error(traceback.format_exc()) | |
| return {} | |
| def analyze_traffic_zones(self, vehicles: List[Dict]) -> Dict: | |
| """ | |
| 分析車輛分布以識別具有方向感知的交通區域 | |
| Args: | |
| vehicles: 車輛物件列表 | |
| Returns: | |
| 識別出的交通區域字典 | |
| """ | |
| try: | |
| traffic_zones = {} | |
| if not vehicles: | |
| return traffic_zones | |
| # 按區域分組車輛 | |
| vehicle_regions = {} | |
| for v in vehicles: | |
| region = v["region"] | |
| if region not in vehicle_regions: | |
| vehicle_regions[region] = [] | |
| vehicle_regions[region].append(v) | |
| # 為有車輛的區域創建交通區域 | |
| main_traffic_region = max(vehicle_regions.items(), key=lambda x: len(x[1]), default=(None, [])) | |
| if main_traffic_region[0] is not None: | |
| region = main_traffic_region[0] | |
| vehicles_in_region = main_traffic_region[1] | |
| # 獲取車輛類型列表用於描述 | |
| vehicle_types = [v["class_name"] for v in vehicles_in_region] | |
| unique_types = list(set(vehicle_types)) | |
| # 獲取方向描述 | |
| direction = self._get_directional_description_local(region) | |
| # 創建描述性區域 | |
| traffic_zones["vehicle_zone"] = { | |
| "region": region, | |
| "objects": vehicle_types, | |
| "description": f"Vehicle traffic area with {', '.join(unique_types[:3])}" + | |
| (f" in {direction} area" if direction else "") | |
| } | |
| # 如果車輛分布在多個區域,創建次要區域 | |
| if len(vehicle_regions) > 1: | |
| # 獲取第二大車輛聚集區域 | |
| sorted_regions = sorted(vehicle_regions.items(), key=lambda x: len(x[1]), reverse=True) | |
| if len(sorted_regions) > 1: | |
| second_region, second_vehicles = sorted_regions[1] | |
| direction = self._get_directional_description_local(second_region) | |
| vehicle_types = [v["class_name"] for v in second_vehicles] | |
| unique_types = list(set(vehicle_types)) | |
| traffic_zones["secondary_vehicle_zone"] = { | |
| "region": second_region, | |
| "objects": vehicle_types, | |
| "description": f"Secondary traffic area with {', '.join(unique_types[:2])}" + | |
| (f" in {direction} direction" if direction else "") | |
| } | |
| return traffic_zones | |
| except Exception as e: | |
| logger.error(f"Error analyzing traffic zones: {str(e)}") | |
| logger.error(traceback.format_exc()) | |
| return {} | |
| def analyze_aerial_traffic_patterns(self, vehicle_objs: List[Dict]) -> Dict: | |
| """ | |
| 分析空中視角的車輛交通模式 | |
| Args: | |
| vehicle_objs: 車輛物件列表 | |
| Returns: | |
| 交通模式區域字典 | |
| """ | |
| try: | |
| zones = {} | |
| if not vehicle_objs: | |
| return zones | |
| # 將位置轉換為數組進行模式分析 | |
| positions = np.array([obj["normalized_center"] for obj in vehicle_objs]) | |
| if len(positions) >= 2: | |
| # 計算分布指標 | |
| x_coords = positions[:, 0] | |
| y_coords = positions[:, 1] | |
| x_mean = np.mean(x_coords) | |
| y_mean = np.mean(y_coords) | |
| x_std = np.std(x_coords) | |
| y_std = np.std(y_coords) | |
| # 判斷車輛是否組織成車道 | |
| if x_std < y_std * 0.5: | |
| # 車輛垂直對齊 - 代表南北交通 | |
| zones["vertical_traffic_flow"] = { | |
| "region": "central_vertical", | |
| "objects": [obj["class_name"] for obj in vehicle_objs[:5]], | |
| "description": "North-south traffic flow visible from aerial view" | |
| } | |
| elif y_std < x_std * 0.5: | |
| # 車輛水平對齊 - 代表東西交通 | |
| zones["horizontal_traffic_flow"] = { | |
| "region": "central_horizontal", | |
| "objects": [obj["class_name"] for obj in vehicle_objs[:5]], | |
| "description": "East-west traffic flow visible from aerial view" | |
| } | |
| else: | |
| # 車輛多方向 - 代表十字路口 | |
| zones["intersection_traffic"] = { | |
| "region": "central", | |
| "objects": [obj["class_name"] for obj in vehicle_objs[:5]], | |
| "description": "Multi-directional traffic at intersection visible from aerial view" | |
| } | |
| return zones | |
| except Exception as e: | |
| logger.error(f"Error analyzing aerial traffic patterns: {str(e)}") | |
| logger.error(traceback.format_exc()) | |
| return {} | |
| def identify_park_recreational_zones(self, detected_objects: List[Dict]) -> Dict: | |
| """ | |
| 識別公園的休閒活動區域 | |
| Args: | |
| detected_objects: 檢測到的物件列表 | |
| Returns: | |
| 休閒區域字典 | |
| """ | |
| try: | |
| zones = {} | |
| # 尋找休閒物件(運動球、風箏等) | |
| rec_items = [] | |
| rec_regions = {} | |
| for obj in detected_objects: | |
| if obj["class_id"] in [32, 33, 34, 35, 38]: # sports ball, kite, baseball bat, glove, tennis racket | |
| region = obj["region"] | |
| if region not in rec_regions: | |
| rec_regions[region] = [] | |
| rec_regions[region].append(obj) | |
| rec_items.append(obj["class_name"]) | |
| if rec_items: | |
| main_rec_region = max(rec_regions.items(), | |
| key=lambda x: len(x[1]), | |
| default=(None, [])) | |
| if main_rec_region[0] is not None: | |
| zones["recreational_zone"] = { | |
| "region": main_rec_region[0], | |
| "objects": list(set(rec_items)), | |
| "description": f"Recreational area with {', '.join(list(set(rec_items)))}" | |
| } | |
| return zones | |
| except Exception as e: | |
| logger.error(f"Error identifying park recreational zones: {str(e)}") | |
| logger.error(traceback.format_exc()) | |
| return {} | |
| def identify_parking_zones(self, detected_objects: List[Dict]) -> Dict: | |
| """ | |
| 停車場的停車區域 | |
| Args: | |
| detected_objects: 檢測到的物件列表 | |
| Returns: | |
| 停車區域字典 | |
| """ | |
| try: | |
| zones = {} | |
| # 尋找停放的汽車 | |
| car_objs = [obj for obj in detected_objects if obj["class_id"] == 2] # cars | |
| if len(car_objs) >= 3: | |
| # 檢查汽車是否按模式排列 | |
| car_positions = [obj["normalized_center"] for obj in car_objs] | |
| # 通過分析垂直位置檢查行模式 | |
| y_coords = [pos[1] for pos in car_positions] | |
| y_clusters = {} | |
| # 按相似y坐標分組汽車 | |
| for i, y in enumerate(y_coords): | |
| assigned = False | |
| for cluster_y in y_clusters.keys(): | |
| if abs(y - cluster_y) < 0.1: # 圖像高度的10%內 | |
| y_clusters[cluster_y].append(i) | |
| assigned = True | |
| break | |
| if not assigned: | |
| y_clusters[y] = [i] | |
| # 如果有行模式 | |
| if max(len(indices) for indices in y_clusters.values()) >= 2: | |
| zones["parking_row"] = { | |
| "region": "central", | |
| "objects": ["car"] * len(car_objs), | |
| "description": f"Organized parking area with vehicles arranged in rows" | |
| } | |
| else: | |
| zones["parking_area"] = { | |
| "region": "wide", | |
| "objects": ["car"] * len(car_objs), | |
| "description": f"Parking area with {len(car_objs)} vehicles" | |
| } | |
| return zones | |
| except Exception as e: | |
| logger.error(f"Error identifying parking zones: {str(e)}") | |
| logger.error(traceback.format_exc()) | |
| return {} | |
| def _get_directional_description_local(self, region: str) -> str: | |
| """ | |
| 本地方向描述方法 | |
| 將區域名稱轉換為方位描述(東西南北) | |
| Args: | |
| region: 區域名稱 | |
| Returns: | |
| 方位描述字串 | |
| """ | |
| try: | |
| region_lower = region.lower() | |
| if "top" in region_lower and "left" in region_lower: | |
| return "northwest" | |
| elif "top" in region_lower and "right" in region_lower: | |
| return "northeast" | |
| elif "bottom" in region_lower and "left" in region_lower: | |
| return "southwest" | |
| elif "bottom" in region_lower and "right" in region_lower: | |
| return "southeast" | |
| elif "top" in region_lower: | |
| return "north" | |
| elif "bottom" in region_lower: | |
| return "south" | |
| elif "left" in region_lower: | |
| return "west" | |
| elif "right" in region_lower: | |
| return "east" | |
| else: | |
| return "central" | |
| except Exception as e: | |
| logger.error(f"Error getting directional description for region '{region}': {str(e)}") | |
| return "central" | |