"""Data aggregator with multi-provider fallback""" from __future__ import annotations from typing import List, Optional from datetime import datetime import time import logging import sys import os sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) from providers import BinanceProvider, CoinGeckoProvider, KrakenProvider, CoinCapProvider from core.models import ( OHLCV, Price, SentimentData, FearGreedIndex, NewsSentiment, OverallSentiment, MarketOverview, ProviderHealth ) from core.config import settings from core.cache import cache, cache_key, get_or_set import httpx logger = logging.getLogger(__name__) class DataAggregator: """Aggregates data from multiple providers with fallback""" def __init__(self): # Initialize providers self.ohlcv_providers = [ BinanceProvider(), KrakenProvider(), ] self.price_providers = [ CoinGeckoProvider(api_key=settings.COINGECKO_API_KEY), CoinCapProvider(), BinanceProvider(), ] self.market_provider = CoinGeckoProvider(api_key=settings.COINGECKO_API_KEY) self.start_time = time.time() async def close(self): """Close all provider connections""" for provider in self.ohlcv_providers + self.price_providers: await provider.close() async def fetch_ohlcv( self, symbol: str, interval: str = "1h", limit: int = 100 ) -> tuple[List[OHLCV], str]: """Fetch OHLCV data with provider fallback""" # Try each provider in order for provider in self.ohlcv_providers: try: logger.info(f"Trying {provider.name} for OHLCV data: {symbol} {interval}") data = await provider.fetch_ohlcv(symbol, interval, limit) if data and len(data) > 0: logger.info(f"Successfully fetched {len(data)} candles from {provider.name}") return data, provider.name except Exception as e: logger.warning(f"Provider {provider.name} failed: {e}") continue raise Exception("All OHLCV providers failed") async def fetch_prices(self, symbols: List[str]) -> tuple[List[Price], str]: """Fetch prices with aggregation from multiple providers""" all_prices = {} sources_used = [] # Collect prices from all available providers for provider in self.price_providers: try: logger.info(f"Fetching prices from {provider.name}") prices = await provider.fetch_prices(symbols) for price in prices: if price.symbol not in all_prices: all_prices[price.symbol] = [] all_prices[price.symbol].append((provider.name, price)) sources_used.append(provider.name) except Exception as e: logger.warning(f"Provider {provider.name} failed for prices: {e}") continue if not all_prices: raise Exception("All price providers failed") # Aggregate prices (use median or first available) aggregated = [] for symbol, price_list in all_prices.items(): if price_list: # Use first available price # Could implement median calculation for better accuracy _, price = price_list[0] aggregated.append(price) source_str = "+".join(sources_used) if sources_used else "multi-provider" return aggregated, source_str async def fetch_fear_greed_index(self) -> FearGreedIndex: """Fetch Fear & Greed Index from Alternative.me""" try: async with httpx.AsyncClient(timeout=10) as client: response = await client.get("https://api.alternative.me/fng/") data = response.json() if "data" in data and len(data["data"]) > 0: fng_data = data["data"][0] return FearGreedIndex( value=int(fng_data["value"]), classification=fng_data["value_classification"], timestamp=datetime.now().isoformat() ) except Exception as e: logger.error(f"Failed to fetch Fear & Greed Index: {e}") # Return neutral value on failure return FearGreedIndex( value=50, classification="Neutral", timestamp=datetime.now().isoformat() ) async def fetch_sentiment(self) -> SentimentData: """Fetch sentiment data""" fear_greed = await self.fetch_fear_greed_index() # Create overall sentiment based on Fear & Greed if fear_greed.value >= 75: sentiment = "extreme_greed" score = fear_greed.value elif fear_greed.value >= 55: sentiment = "bullish" score = fear_greed.value elif fear_greed.value >= 45: sentiment = "neutral" score = fear_greed.value elif fear_greed.value >= 25: sentiment = "bearish" score = fear_greed.value else: sentiment = "extreme_fear" score = fear_greed.value return SentimentData( fearGreed=fear_greed, news=NewsSentiment(total=0), overall=OverallSentiment( sentiment=sentiment, score=score, confidence=0.8 ) ) async def fetch_market_overview(self) -> MarketOverview: """Fetch market overview data""" try: market_data = await self.market_provider.fetch_market_data() return MarketOverview( totalMarketCap=market_data.get("total_market_cap", {}).get("usd", 0), totalVolume24h=market_data.get("total_volume", {}).get("usd", 0), btcDominance=market_data.get("market_cap_percentage", {}).get("btc", 0), ethDominance=market_data.get("market_cap_percentage", {}).get("eth", 0), activeCoins=market_data.get("active_cryptocurrencies", 0) ) except Exception as e: logger.error(f"Failed to fetch market overview: {e}") # Return empty data on failure return MarketOverview( totalMarketCap=0, totalVolume24h=0, btcDominance=0, ethDominance=0, activeCoins=0 ) async def get_all_provider_health(self) -> List[ProviderHealth]: """Get health status of all providers""" all_providers = set(self.ohlcv_providers + self.price_providers + [self.market_provider]) health_list = [] for provider in all_providers: health = await provider.get_health() health_list.append(health) return health_list def get_uptime(self) -> int: """Get service uptime in seconds""" return int(time.time() - self.start_time) # Global aggregator instance aggregator: Optional[DataAggregator] = None def get_aggregator() -> DataAggregator: """Get global aggregator instance""" global aggregator if aggregator is None: aggregator = DataAggregator() return aggregator