|
|
|
|
|
"""
|
|
|
Master Resource Orchestrator
|
|
|
Orchestrates ALL 86+ resources hierarchically - NO IDLE RESOURCES
|
|
|
مدیریت سلسلهمراتبی همه 86+ منبع - هیچ منبعی بیکار نمیماند
|
|
|
"""
|
|
|
|
|
|
import httpx
|
|
|
import logging
|
|
|
import asyncio
|
|
|
from typing import Dict, Any, List, Optional, Tuple
|
|
|
from datetime import datetime
|
|
|
from enum import Enum
|
|
|
|
|
|
from backend.services.hierarchical_fallback_config import (
|
|
|
hierarchical_config,
|
|
|
Priority,
|
|
|
ResourceConfig
|
|
|
)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class ResourceStatus(Enum):
|
|
|
"""Status of resource attempt"""
|
|
|
SUCCESS = "success"
|
|
|
FAILED = "failed"
|
|
|
SKIPPED = "skipped"
|
|
|
TIMEOUT = "timeout"
|
|
|
|
|
|
|
|
|
class MasterResourceOrchestrator:
|
|
|
"""
|
|
|
Master orchestrator for ALL resources
|
|
|
تمام 86+ منبع را به صورت سلسلهمراتبی مدیریت میکند
|
|
|
"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.config = hierarchical_config
|
|
|
self.timeout = 10.0
|
|
|
|
|
|
|
|
|
self.usage_stats = {
|
|
|
"total_requests": 0,
|
|
|
"successful_requests": 0,
|
|
|
"failed_requests": 0,
|
|
|
"resource_usage": {},
|
|
|
"priority_distribution": {
|
|
|
Priority.CRITICAL: 0,
|
|
|
Priority.HIGH: 0,
|
|
|
Priority.MEDIUM: 0,
|
|
|
Priority.LOW: 0,
|
|
|
Priority.EMERGENCY: 0
|
|
|
}
|
|
|
}
|
|
|
|
|
|
async def fetch_with_hierarchy(
|
|
|
self,
|
|
|
resource_list: List[ResourceConfig],
|
|
|
fetch_function: callable,
|
|
|
max_concurrent: int = 3
|
|
|
) -> Tuple[Any, Dict[str, Any]]:
|
|
|
"""
|
|
|
Fetch data using hierarchical fallback
|
|
|
دریافت داده با فالبک سلسلهمراتبی
|
|
|
|
|
|
Args:
|
|
|
resource_list: List of resources in priority order
|
|
|
fetch_function: Async function to fetch data from a resource
|
|
|
max_concurrent: Max concurrent attempts within same priority
|
|
|
|
|
|
Returns:
|
|
|
(data, metadata) - Data and information about which resource succeeded
|
|
|
"""
|
|
|
self.usage_stats["total_requests"] += 1
|
|
|
|
|
|
|
|
|
priority_groups = self._group_by_priority(resource_list)
|
|
|
|
|
|
|
|
|
for priority in [Priority.CRITICAL, Priority.HIGH, Priority.MEDIUM, Priority.LOW, Priority.EMERGENCY]:
|
|
|
resources_in_priority = priority_groups.get(priority, [])
|
|
|
|
|
|
if not resources_in_priority:
|
|
|
continue
|
|
|
|
|
|
logger.info(f"🔄 Trying {len(resources_in_priority)} resources at {priority.name} priority")
|
|
|
|
|
|
|
|
|
|
|
|
if max_concurrent > 1 and len(resources_in_priority) > 1:
|
|
|
result = await self._try_concurrent(
|
|
|
resources_in_priority[:max_concurrent],
|
|
|
fetch_function,
|
|
|
priority
|
|
|
)
|
|
|
else:
|
|
|
result = await self._try_sequential(
|
|
|
resources_in_priority,
|
|
|
fetch_function,
|
|
|
priority
|
|
|
)
|
|
|
|
|
|
if result:
|
|
|
data, metadata = result
|
|
|
self.usage_stats["successful_requests"] += 1
|
|
|
self.usage_stats["priority_distribution"][priority] += 1
|
|
|
logger.info(f"✅ SUCCESS at {priority.name} priority: {metadata['resource_name']}")
|
|
|
return data, metadata
|
|
|
|
|
|
|
|
|
self.usage_stats["failed_requests"] += 1
|
|
|
logger.error(f"❌ ALL {len(resource_list)} resources failed")
|
|
|
|
|
|
raise Exception(f"All {len(resource_list)} resources failed across all priority levels")
|
|
|
|
|
|
def _group_by_priority(
|
|
|
self,
|
|
|
resources: List[ResourceConfig]
|
|
|
) -> Dict[Priority, List[ResourceConfig]]:
|
|
|
"""Group resources by priority level"""
|
|
|
groups = {
|
|
|
Priority.CRITICAL: [],
|
|
|
Priority.HIGH: [],
|
|
|
Priority.MEDIUM: [],
|
|
|
Priority.LOW: [],
|
|
|
Priority.EMERGENCY: []
|
|
|
}
|
|
|
|
|
|
for resource in resources:
|
|
|
groups[resource.priority].append(resource)
|
|
|
|
|
|
return groups
|
|
|
|
|
|
async def _try_sequential(
|
|
|
self,
|
|
|
resources: List[ResourceConfig],
|
|
|
fetch_function: callable,
|
|
|
priority: Priority
|
|
|
) -> Optional[Tuple[Any, Dict[str, Any]]]:
|
|
|
"""Try resources sequentially"""
|
|
|
for idx, resource in enumerate(resources, 1):
|
|
|
try:
|
|
|
logger.info(f" 📡 [{idx}/{len(resources)}] Trying {resource.name}...")
|
|
|
|
|
|
|
|
|
if resource.name not in self.usage_stats["resource_usage"]:
|
|
|
self.usage_stats["resource_usage"][resource.name] = {
|
|
|
"attempts": 0,
|
|
|
"successes": 0,
|
|
|
"failures": 0
|
|
|
}
|
|
|
|
|
|
self.usage_stats["resource_usage"][resource.name]["attempts"] += 1
|
|
|
|
|
|
|
|
|
start_time = datetime.utcnow()
|
|
|
data = await fetch_function(resource)
|
|
|
end_time = datetime.utcnow()
|
|
|
|
|
|
if data:
|
|
|
self.usage_stats["resource_usage"][resource.name]["successes"] += 1
|
|
|
|
|
|
metadata = {
|
|
|
"resource_name": resource.name,
|
|
|
"priority": priority.name,
|
|
|
"base_url": resource.base_url,
|
|
|
"response_time_ms": int((end_time - start_time).total_seconds() * 1000),
|
|
|
"timestamp": int(end_time.timestamp() * 1000)
|
|
|
}
|
|
|
|
|
|
logger.info(f" ✅ {resource.name} succeeded in {metadata['response_time_ms']}ms")
|
|
|
return data, metadata
|
|
|
|
|
|
logger.warning(f" ⚠️ {resource.name} returned no data")
|
|
|
self.usage_stats["resource_usage"][resource.name]["failures"] += 1
|
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
logger.warning(f" ⏱️ {resource.name} timeout")
|
|
|
self.usage_stats["resource_usage"][resource.name]["failures"] += 1
|
|
|
continue
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.warning(f" ❌ {resource.name} failed: {e}")
|
|
|
self.usage_stats["resource_usage"][resource.name]["failures"] += 1
|
|
|
continue
|
|
|
|
|
|
return None
|
|
|
|
|
|
async def _try_concurrent(
|
|
|
self,
|
|
|
resources: List[ResourceConfig],
|
|
|
fetch_function: callable,
|
|
|
priority: Priority
|
|
|
) -> Optional[Tuple[Any, Dict[str, Any]]]:
|
|
|
"""Try multiple resources concurrently (race condition - first success wins)"""
|
|
|
logger.info(f" 🏁 Racing {len(resources)} resources in parallel...")
|
|
|
|
|
|
tasks = []
|
|
|
for resource in resources:
|
|
|
task = self._try_single_resource(resource, fetch_function, priority)
|
|
|
tasks.append(task)
|
|
|
|
|
|
|
|
|
for completed_task in asyncio.as_completed(tasks):
|
|
|
try:
|
|
|
result = await completed_task
|
|
|
if result:
|
|
|
|
|
|
for task in tasks:
|
|
|
if not task.done():
|
|
|
task.cancel()
|
|
|
return result
|
|
|
except Exception:
|
|
|
continue
|
|
|
|
|
|
return None
|
|
|
|
|
|
async def _try_single_resource(
|
|
|
self,
|
|
|
resource: ResourceConfig,
|
|
|
fetch_function: callable,
|
|
|
priority: Priority
|
|
|
) -> Optional[Tuple[Any, Dict[str, Any]]]:
|
|
|
"""Try a single resource (used in concurrent mode)"""
|
|
|
try:
|
|
|
|
|
|
if resource.name not in self.usage_stats["resource_usage"]:
|
|
|
self.usage_stats["resource_usage"][resource.name] = {
|
|
|
"attempts": 0,
|
|
|
"successes": 0,
|
|
|
"failures": 0
|
|
|
}
|
|
|
|
|
|
self.usage_stats["resource_usage"][resource.name]["attempts"] += 1
|
|
|
|
|
|
start_time = datetime.utcnow()
|
|
|
data = await fetch_function(resource)
|
|
|
end_time = datetime.utcnow()
|
|
|
|
|
|
if data:
|
|
|
self.usage_stats["resource_usage"][resource.name]["successes"] += 1
|
|
|
|
|
|
metadata = {
|
|
|
"resource_name": resource.name,
|
|
|
"priority": priority.name,
|
|
|
"base_url": resource.base_url,
|
|
|
"response_time_ms": int((end_time - start_time).total_seconds() * 1000),
|
|
|
"timestamp": int(end_time.timestamp() * 1000)
|
|
|
}
|
|
|
|
|
|
logger.info(f" 🏆 {resource.name} won the race! ({metadata['response_time_ms']}ms)")
|
|
|
return data, metadata
|
|
|
|
|
|
self.usage_stats["resource_usage"][resource.name]["failures"] += 1
|
|
|
return None
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.warning(f" ❌ {resource.name} failed: {e}")
|
|
|
self.usage_stats["resource_usage"][resource.name]["failures"] += 1
|
|
|
return None
|
|
|
|
|
|
def get_usage_statistics(self) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Get comprehensive usage statistics
|
|
|
آمار کامل استفاده از منابع
|
|
|
"""
|
|
|
total_resources = len(self.usage_stats["resource_usage"])
|
|
|
used_resources = sum(
|
|
|
1 for stats in self.usage_stats["resource_usage"].values()
|
|
|
if stats["attempts"] > 0
|
|
|
)
|
|
|
successful_resources = sum(
|
|
|
1 for stats in self.usage_stats["resource_usage"].values()
|
|
|
if stats["successes"] > 0
|
|
|
)
|
|
|
|
|
|
|
|
|
priority_success_rates = {}
|
|
|
total_priority_requests = sum(self.usage_stats["priority_distribution"].values())
|
|
|
|
|
|
if total_priority_requests > 0:
|
|
|
for priority, count in self.usage_stats["priority_distribution"].items():
|
|
|
priority_success_rates[priority.name] = {
|
|
|
"count": count,
|
|
|
"percentage": round((count / total_priority_requests) * 100, 2)
|
|
|
}
|
|
|
|
|
|
|
|
|
most_used = sorted(
|
|
|
self.usage_stats["resource_usage"].items(),
|
|
|
key=lambda x: x[1]["attempts"],
|
|
|
reverse=True
|
|
|
)[:10]
|
|
|
|
|
|
|
|
|
most_successful = sorted(
|
|
|
self.usage_stats["resource_usage"].items(),
|
|
|
key=lambda x: x[1]["successes"],
|
|
|
reverse=True
|
|
|
)[:10]
|
|
|
|
|
|
return {
|
|
|
"overview": {
|
|
|
"total_requests": self.usage_stats["total_requests"],
|
|
|
"successful_requests": self.usage_stats["successful_requests"],
|
|
|
"failed_requests": self.usage_stats["failed_requests"],
|
|
|
"success_rate": round(
|
|
|
(self.usage_stats["successful_requests"] / self.usage_stats["total_requests"] * 100)
|
|
|
if self.usage_stats["total_requests"] > 0 else 0,
|
|
|
2
|
|
|
)
|
|
|
},
|
|
|
"resource_utilization": {
|
|
|
"total_resources_in_system": total_resources,
|
|
|
"resources_used": used_resources,
|
|
|
"resources_successful": successful_resources,
|
|
|
"utilization_rate": round((used_resources / total_resources * 100) if total_resources > 0 else 0, 2)
|
|
|
},
|
|
|
"priority_distribution": priority_success_rates,
|
|
|
"top_10_most_used": [
|
|
|
{
|
|
|
"resource": name,
|
|
|
"attempts": stats["attempts"],
|
|
|
"successes": stats["successes"],
|
|
|
"failures": stats["failures"],
|
|
|
"success_rate": round((stats["successes"] / stats["attempts"] * 100) if stats["attempts"] > 0 else 0, 2)
|
|
|
}
|
|
|
for name, stats in most_used
|
|
|
],
|
|
|
"top_10_most_successful": [
|
|
|
{
|
|
|
"resource": name,
|
|
|
"successes": stats["successes"],
|
|
|
"attempts": stats["attempts"],
|
|
|
"success_rate": round((stats["successes"] / stats["attempts"] * 100) if stats["attempts"] > 0 else 0, 2)
|
|
|
}
|
|
|
for name, stats in most_successful
|
|
|
]
|
|
|
}
|
|
|
|
|
|
def get_resource_health_report(self) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Get health report for all resources
|
|
|
گزارش سلامت همه منابع
|
|
|
"""
|
|
|
healthy_resources = []
|
|
|
degraded_resources = []
|
|
|
failed_resources = []
|
|
|
unused_resources = []
|
|
|
|
|
|
for resource_name, stats in self.usage_stats["resource_usage"].items():
|
|
|
if stats["attempts"] == 0:
|
|
|
unused_resources.append(resource_name)
|
|
|
elif stats["successes"] == 0:
|
|
|
failed_resources.append({
|
|
|
"name": resource_name,
|
|
|
"attempts": stats["attempts"],
|
|
|
"failures": stats["failures"]
|
|
|
})
|
|
|
else:
|
|
|
success_rate = (stats["successes"] / stats["attempts"]) * 100
|
|
|
|
|
|
if success_rate >= 80:
|
|
|
healthy_resources.append({
|
|
|
"name": resource_name,
|
|
|
"success_rate": round(success_rate, 2),
|
|
|
"attempts": stats["attempts"]
|
|
|
})
|
|
|
else:
|
|
|
degraded_resources.append({
|
|
|
"name": resource_name,
|
|
|
"success_rate": round(success_rate, 2),
|
|
|
"attempts": stats["attempts"],
|
|
|
"failures": stats["failures"]
|
|
|
})
|
|
|
|
|
|
return {
|
|
|
"healthy_resources": {
|
|
|
"count": len(healthy_resources),
|
|
|
"resources": healthy_resources
|
|
|
},
|
|
|
"degraded_resources": {
|
|
|
"count": len(degraded_resources),
|
|
|
"resources": degraded_resources
|
|
|
},
|
|
|
"failed_resources": {
|
|
|
"count": len(failed_resources),
|
|
|
"resources": failed_resources
|
|
|
},
|
|
|
"unused_resources": {
|
|
|
"count": len(unused_resources),
|
|
|
"resources": unused_resources
|
|
|
},
|
|
|
"overall_health": "Healthy" if len(healthy_resources) > len(failed_resources) else "Degraded"
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
master_orchestrator = MasterResourceOrchestrator()
|
|
|
|
|
|
__all__ = ["MasterResourceOrchestrator", "master_orchestrator", "ResourceStatus"]
|
|
|
|
|
|
|