|
|
"""
|
|
|
API Fallback Manager
|
|
|
Automatically switches to alternative API providers when primary fails
|
|
|
"""
|
|
|
|
|
|
import asyncio
|
|
|
import logging
|
|
|
from typing import Dict, List, Any, Optional, Callable
|
|
|
from datetime import datetime, timedelta
|
|
|
from enum import Enum
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
class ProviderStatus(Enum):
|
|
|
"""Provider status"""
|
|
|
ACTIVE = "active"
|
|
|
DEGRADED = "degraded"
|
|
|
FAILED = "failed"
|
|
|
COOLDOWN = "cooldown"
|
|
|
|
|
|
|
|
|
class APIProvider:
|
|
|
"""Represents an API provider with health tracking"""
|
|
|
|
|
|
def __init__(
|
|
|
self,
|
|
|
name: str,
|
|
|
priority: int,
|
|
|
fetch_function: Callable,
|
|
|
cooldown_seconds: int = 300,
|
|
|
max_failures: int = 3
|
|
|
):
|
|
|
self.name = name
|
|
|
self.priority = priority
|
|
|
self.fetch_function = fetch_function
|
|
|
self.cooldown_seconds = cooldown_seconds
|
|
|
self.max_failures = max_failures
|
|
|
|
|
|
self.failures = 0
|
|
|
self.total_requests = 0
|
|
|
self.successful_requests = 0
|
|
|
self.status = ProviderStatus.ACTIVE
|
|
|
self.last_failure_time = None
|
|
|
self.last_success_time = None
|
|
|
|
|
|
def record_success(self):
|
|
|
"""Record successful request"""
|
|
|
self.successful_requests += 1
|
|
|
self.total_requests += 1
|
|
|
self.failures = 0
|
|
|
self.status = ProviderStatus.ACTIVE
|
|
|
self.last_success_time = datetime.now()
|
|
|
logger.info(f"✅ {self.name}: Success (total: {self.successful_requests}/{self.total_requests})")
|
|
|
|
|
|
def record_failure(self, error: Exception):
|
|
|
"""Record failed request"""
|
|
|
self.failures += 1
|
|
|
self.total_requests += 1
|
|
|
self.last_failure_time = datetime.now()
|
|
|
|
|
|
if self.failures >= self.max_failures:
|
|
|
self.status = ProviderStatus.COOLDOWN
|
|
|
logger.warning(
|
|
|
f"❌ {self.name}: Entering cooldown after {self.failures} failures. "
|
|
|
f"Last error: {str(error)}"
|
|
|
)
|
|
|
else:
|
|
|
self.status = ProviderStatus.DEGRADED
|
|
|
logger.warning(f"⚠️ {self.name}: Failure {self.failures}/{self.max_failures} - {str(error)}")
|
|
|
|
|
|
def is_available(self) -> bool:
|
|
|
"""Check if provider is available"""
|
|
|
if self.status == ProviderStatus.COOLDOWN:
|
|
|
|
|
|
if self.last_failure_time:
|
|
|
cooldown_end = self.last_failure_time + timedelta(seconds=self.cooldown_seconds)
|
|
|
if datetime.now() >= cooldown_end:
|
|
|
self.status = ProviderStatus.ACTIVE
|
|
|
self.failures = 0
|
|
|
logger.info(f"🔄 {self.name}: Cooldown ended, provider reactivated")
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
return self.status in [ProviderStatus.ACTIVE, ProviderStatus.DEGRADED]
|
|
|
|
|
|
def get_health_score(self) -> float:
|
|
|
"""Get health score (0-100)"""
|
|
|
if self.total_requests == 0:
|
|
|
return 100.0
|
|
|
return (self.successful_requests / self.total_requests) * 100
|
|
|
|
|
|
|
|
|
class APIFallbackManager:
|
|
|
"""
|
|
|
Manages API fallback across multiple providers
|
|
|
|
|
|
Usage:
|
|
|
manager = APIFallbackManager("OHLCV")
|
|
|
manager.add_provider("Binance", 1, fetch_binance_ohlcv)
|
|
|
manager.add_provider("CoinGecko", 2, fetch_coingecko_ohlcv)
|
|
|
|
|
|
result = await manager.fetch_with_fallback(symbol="BTC", timeframe="1h")
|
|
|
"""
|
|
|
|
|
|
def __init__(self, service_name: str):
|
|
|
self.service_name = service_name
|
|
|
self.providers: List[APIProvider] = []
|
|
|
logger.info(f"📡 Initialized fallback manager for {service_name}")
|
|
|
|
|
|
def add_provider(
|
|
|
self,
|
|
|
name: str,
|
|
|
priority: int,
|
|
|
fetch_function: Callable,
|
|
|
cooldown_seconds: int = 300,
|
|
|
max_failures: int = 3
|
|
|
):
|
|
|
"""Add a provider to the fallback chain"""
|
|
|
provider = APIProvider(name, priority, fetch_function, cooldown_seconds, max_failures)
|
|
|
self.providers.append(provider)
|
|
|
|
|
|
self.providers.sort(key=lambda p: p.priority)
|
|
|
logger.info(f"✅ Added provider '{name}' (priority: {priority}) to {self.service_name}")
|
|
|
|
|
|
async def fetch_with_fallback(self, **kwargs) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Fetch data with automatic fallback
|
|
|
|
|
|
Args:
|
|
|
**kwargs: Parameters to pass to fetch functions
|
|
|
|
|
|
Returns:
|
|
|
Dict with:
|
|
|
- success: bool
|
|
|
- data: Any (if successful)
|
|
|
- provider: str (which provider succeeded)
|
|
|
- attempts: List of attempts
|
|
|
- error: str (if all failed)
|
|
|
"""
|
|
|
attempts = []
|
|
|
last_error = None
|
|
|
|
|
|
for provider in self.providers:
|
|
|
if not provider.is_available():
|
|
|
attempts.append({
|
|
|
"provider": provider.name,
|
|
|
"status": "skipped",
|
|
|
"reason": f"Provider in {provider.status.value} state"
|
|
|
})
|
|
|
continue
|
|
|
|
|
|
try:
|
|
|
logger.info(f"🔄 {self.service_name}: Trying {provider.name}...")
|
|
|
start_time = datetime.now()
|
|
|
|
|
|
|
|
|
data = await provider.fetch_function(**kwargs)
|
|
|
|
|
|
duration = (datetime.now() - start_time).total_seconds()
|
|
|
provider.record_success()
|
|
|
|
|
|
attempts.append({
|
|
|
"provider": provider.name,
|
|
|
"status": "success",
|
|
|
"duration": duration
|
|
|
})
|
|
|
|
|
|
logger.info(
|
|
|
f"✅ {self.service_name}: {provider.name} succeeded in {duration:.2f}s"
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"data": data,
|
|
|
"provider": provider.name,
|
|
|
"attempts": attempts,
|
|
|
"health_score": provider.get_health_score()
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
last_error = e
|
|
|
provider.record_failure(e)
|
|
|
|
|
|
attempts.append({
|
|
|
"provider": provider.name,
|
|
|
"status": "failed",
|
|
|
"error": str(e),
|
|
|
"error_type": type(e).__name__
|
|
|
})
|
|
|
|
|
|
logger.warning(
|
|
|
f"❌ {self.service_name}: {provider.name} failed - {str(e)}"
|
|
|
)
|
|
|
|
|
|
|
|
|
logger.error(
|
|
|
f"🚨 {self.service_name}: ALL PROVIDERS FAILED! "
|
|
|
f"Tried {len(attempts)} provider(s)"
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"success": False,
|
|
|
"data": None,
|
|
|
"provider": None,
|
|
|
"attempts": attempts,
|
|
|
"error": f"All providers failed. Last error: {str(last_error)}"
|
|
|
}
|
|
|
|
|
|
def get_status(self) -> Dict[str, Any]:
|
|
|
"""Get status of all providers"""
|
|
|
return {
|
|
|
"service": self.service_name,
|
|
|
"providers": [
|
|
|
{
|
|
|
"name": p.name,
|
|
|
"priority": p.priority,
|
|
|
"status": p.status.value,
|
|
|
"health_score": p.get_health_score(),
|
|
|
"total_requests": p.total_requests,
|
|
|
"successful_requests": p.successful_requests,
|
|
|
"failures": p.failures,
|
|
|
"available": p.is_available()
|
|
|
}
|
|
|
for p in self.providers
|
|
|
]
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def example_ohlcv_binance(symbol: str, timeframe: str, limit: int = 100):
|
|
|
"""Example: Fetch from Binance"""
|
|
|
from backend.services.binance_client import BinanceClient
|
|
|
client = BinanceClient()
|
|
|
return await client.get_ohlcv(symbol, timeframe=timeframe, limit=limit)
|
|
|
|
|
|
|
|
|
async def example_ohlcv_coingecko(symbol: str, timeframe: str, limit: int = 100):
|
|
|
"""Example: Fetch from CoinGecko (would need implementation)"""
|
|
|
|
|
|
raise NotImplementedError("CoinGecko OHLCV not implemented yet")
|
|
|
|
|
|
|
|
|
async def example_news_newsapi(q: str, **kwargs):
|
|
|
"""Example: Fetch news from NewsAPI"""
|
|
|
import httpx
|
|
|
api_key = "968a5e25552b4cb5ba3280361d8444ab"
|
|
|
url = f"https://newsapi.org/v2/everything?q={q}&sortBy=publishedAt&apiKey={api_key}"
|
|
|
async with httpx.AsyncClient() as client:
|
|
|
response = await client.get(url, timeout=10.0)
|
|
|
response.raise_for_status()
|
|
|
return response.json()
|
|
|
|
|
|
|
|
|
async def example_news_cryptocompare(q: str, **kwargs):
|
|
|
"""Example: Fetch news from CryptoCompare"""
|
|
|
import httpx
|
|
|
url = f"https://min-api.cryptocompare.com/data/v2/news/?categories={q}"
|
|
|
async with httpx.AsyncClient() as client:
|
|
|
response = await client.get(url, timeout=10.0)
|
|
|
response.raise_for_status()
|
|
|
return response.json()
|
|
|
|
|
|
|
|
|
|
|
|
_managers: Dict[str, APIFallbackManager] = {}
|
|
|
|
|
|
|
|
|
def get_fallback_manager(service_name: str) -> APIFallbackManager:
|
|
|
"""Get or create a fallback manager for a service"""
|
|
|
if service_name not in _managers:
|
|
|
_managers[service_name] = APIFallbackManager(service_name)
|
|
|
return _managers[service_name]
|
|
|
|
|
|
|
|
|
def get_all_managers_status() -> Dict[str, Any]:
|
|
|
"""Get status of all fallback managers"""
|
|
|
return {
|
|
|
name: manager.get_status()
|
|
|
for name, manager in _managers.items()
|
|
|
}
|
|
|
|
|
|
|