""" Sentiment Data Collectors Fetches cryptocurrency sentiment data from Alternative.me Fear & Greed Index """ import asyncio from datetime import datetime, timezone from typing import Dict, List, Optional, Any from utils.api_client import get_client from utils.logger import setup_logger, log_api_request, log_error from config import config logger = setup_logger("sentiment_collector") def calculate_staleness_minutes(data_timestamp: Optional[datetime]) -> Optional[float]: """ Calculate staleness in minutes from data timestamp to now Args: data_timestamp: Timestamp of the data Returns: Staleness in minutes or None if timestamp not available """ if not data_timestamp: return None now = datetime.now(timezone.utc) if data_timestamp.tzinfo is None: data_timestamp = data_timestamp.replace(tzinfo=timezone.utc) delta = now - data_timestamp return delta.total_seconds() / 60.0 async def get_fear_greed_index() -> Dict[str, Any]: """ Fetch current Fear & Greed Index from Alternative.me The Fear & Greed Index is a sentiment indicator for the cryptocurrency market. - 0-24: Extreme Fear - 25-49: Fear - 50-74: Greed - 75-100: Extreme Greed Returns: Dict with provider, category, data, timestamp, staleness, success, error """ provider = "AlternativeMe" category = "sentiment" endpoint = "/fng/" logger.info(f"Fetching Fear & Greed Index from {provider}") try: client = get_client() provider_config = config.get_provider(provider) if not provider_config: error_msg = f"Provider {provider} not configured" log_error(logger, provider, "config_error", error_msg, endpoint) return { "provider": provider, "category": category, "data": None, "timestamp": datetime.now(timezone.utc).isoformat(), "staleness_minutes": None, "success": False, "error": error_msg } # Build request URL url = f"{provider_config.endpoint_url}{endpoint}" params = { "limit": "1", # Get only the latest index "format": "json" } # Make request response = await client.get(url, params=params, timeout=provider_config.timeout_ms // 1000) # Log request log_api_request( logger, provider, endpoint, response.get("response_time_ms", 0), "success" if response["success"] else "error", response.get("status_code") ) if not response["success"]: error_msg = response.get("error_message", "Unknown error") log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) return { "provider": provider, "category": category, "data": None, "timestamp": datetime.now(timezone.utc).isoformat(), "staleness_minutes": None, "success": False, "error": error_msg, "error_type": response.get("error_type") } # Extract data data = response["data"] # Parse timestamp from response data_timestamp = None if isinstance(data, dict) and "data" in data: data_list = data["data"] if isinstance(data_list, list) and len(data_list) > 0: index_data = data_list[0] if isinstance(index_data, dict) and "timestamp" in index_data: try: # Alternative.me returns Unix timestamp data_timestamp = datetime.fromtimestamp( int(index_data["timestamp"]), tz=timezone.utc ) except: pass staleness = calculate_staleness_minutes(data_timestamp) # Extract index value and classification index_value = None index_classification = None if isinstance(data, dict) and "data" in data: data_list = data["data"] if isinstance(data_list, list) and len(data_list) > 0: index_data = data_list[0] if isinstance(index_data, dict): index_value = index_data.get("value") index_classification = index_data.get("value_classification") logger.info( f"{provider} - {endpoint} - Fear & Greed Index: {index_value} ({index_classification}), " f"staleness: {staleness:.2f}m" if staleness else "staleness: N/A" ) return { "provider": provider, "category": category, "data": data, "timestamp": datetime.now(timezone.utc).isoformat(), "data_timestamp": data_timestamp.isoformat() if data_timestamp else None, "staleness_minutes": staleness, "success": True, "error": None, "response_time_ms": response.get("response_time_ms", 0), "index_value": index_value, "index_classification": index_classification } except Exception as e: error_msg = f"Unexpected error: {str(e)}" log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) return { "provider": provider, "category": category, "data": None, "timestamp": datetime.now(timezone.utc).isoformat(), "staleness_minutes": None, "success": False, "error": error_msg, "error_type": "exception" } async def collect_sentiment_data() -> List[Dict[str, Any]]: """ Main function to collect sentiment data from all sources Currently collects from: - Alternative.me Fear & Greed Index Returns: List of results from all sentiment collectors """ logger.info("Starting sentiment data collection from all sources") # Run all collectors concurrently results = await asyncio.gather( get_fear_greed_index(), return_exceptions=True ) # Process results processed_results = [] for result in results: if isinstance(result, Exception): logger.error(f"Collector failed with exception: {str(result)}") processed_results.append({ "provider": "Unknown", "category": "sentiment", "data": None, "timestamp": datetime.now(timezone.utc).isoformat(), "staleness_minutes": None, "success": False, "error": str(result), "error_type": "exception" }) else: processed_results.append(result) # Log summary successful = sum(1 for r in processed_results if r.get("success", False)) logger.info(f"Sentiment data collection complete: {successful}/{len(processed_results)} successful") return processed_results # Alias for backward compatibility collect_sentiment = collect_sentiment_data class SentimentCollector: """ Sentiment Collector class for WebSocket streaming interface Wraps the standalone sentiment collection functions """ def __init__(self, config: Any = None): """ Initialize the sentiment collector Args: config: Configuration object (optional, for compatibility) """ self.config = config self.logger = logger async def collect(self) -> Dict[str, Any]: """ Collect sentiment data from all sources Returns: Dict with aggregated sentiment data """ results = await collect_sentiment_data() # Aggregate data for WebSocket streaming aggregated = { "overall_sentiment": None, "sentiment_score": None, "social_volume": None, "trending_topics": [], "by_source": {}, "social_trends": [], "timestamp": datetime.now(timezone.utc).isoformat() } for result in results: if result.get("success") and result.get("data"): provider = result.get("provider", "unknown") # Parse Fear & Greed Index if provider == "Alternative.me" and "data" in result["data"]: index_data = result["data"]["data"][0] if result["data"]["data"] else {} aggregated["sentiment_score"] = int(index_data.get("value", 0)) aggregated["overall_sentiment"] = index_data.get("value_classification", "neutral") aggregated["by_source"][provider] = { "value": aggregated["sentiment_score"], "classification": aggregated["overall_sentiment"] } return aggregated # Example usage if __name__ == "__main__": async def main(): results = await collect_sentiment_data() print("\n=== Sentiment Data Collection Results ===") for result in results: print(f"\nProvider: {result['provider']}") print(f"Success: {result['success']}") print(f"Staleness: {result.get('staleness_minutes', 'N/A')} minutes") if result['success']: print(f"Response Time: {result.get('response_time_ms', 0):.2f}ms") if result.get('index_value'): print(f"Fear & Greed Index: {result['index_value']} ({result['index_classification']})") else: print(f"Error: {result.get('error', 'Unknown')}") asyncio.run(main())