Really-amin's picture
Upload 577 files
b190b45 verified
#!/usr/bin/env python3
"""
Data Hub Complete - مدیریت جامع همه منابع داده
=============================================
✅ استفاده از تمام کلیدهای API جدید
✅ پشتیبانی از همه انواع داده‌ها
✅ سیستم Fallback خودکار
✅ Cache Management
✅ Rate Limiting
"""
import httpx
import asyncio
import logging
from typing import Dict, Any, List, Optional, Union
from datetime import datetime, timedelta
import hashlib
import json
import os
from collections import defaultdict
import time
logger = logging.getLogger(__name__)
class DataHubConfiguration:
"""پیکربندی کامل Data Hub با تمام کلیدهای جدید"""
# ===== کلیدهای API های جدید =====
# Blockchain Explorers
TRONSCAN_API_KEY = "7ae72726-bffe-4e74-9c33-97b761eeea21"
TRONSCAN_BASE_URL = "https://apilist.tronscan.org/api"
BSCSCAN_API_KEY = "K62RKHGXTDCG53RU4MCG6XABIMJKTN19IT"
BSCSCAN_BASE_URL = "https://api.bscscan.com/api"
ETHERSCAN_API_KEY = "T6IR8VJHX2NE6ZJW2S3FDVN1TYG4PYYI45"
ETHERSCAN_BASE_URL = "https://api.etherscan.io/api"
# Market Data
COINMARKETCAP_API_KEY = "a35ffaec-c66c-4f16-81e3-41a717e4822f"
COINMARKETCAP_BASE_URL = "https://pro-api.coinmarketcap.com/v1"
# News
NEWSAPI_API_KEY = "968a5e25552b4cb5ba3280361d8444ab"
NEWSAPI_BASE_URL = "https://newsapi.org/v2"
# HuggingFace
HF_API_TOKEN = os.getenv("HF_API_TOKEN", "").strip()
HF_SPACE_BASE_URL = "https://really-amin-datasourceforcryptocurrency.hf.space"
# Additional Sources
ALTERNATIVE_ME_BASE_URL = "https://api.alternative.me"
COINGECKO_BASE_URL = "https://api.coingecko.com/api/v3"
BINANCE_BASE_URL = "https://api.binance.com/api/v3"
REDDIT_BASE_URL = "https://www.reddit.com/r"
# Cache TTL Settings (seconds)
CACHE_TTL = {
"market_prices": 30,
"ohlcv": 60,
"news": 300,
"sentiment": 60,
"blockchain": 60,
"whale_activity": 30,
"social_media": 120,
"trending": 180,
"fear_greed": 3600,
}
class RateLimiter:
"""Rate limiter for API calls"""
def __init__(self):
self.limits = {
"coinmarketcap": {"calls": 333, "period": 60}, # 333/min
"newsapi": {"calls": 500, "period": 3600}, # 500/hour
"etherscan": {"calls": 5, "period": 1}, # 5/sec
"bscscan": {"calls": 5, "period": 1}, # 5/sec
"tronscan": {"calls": 10, "period": 1}, # 10/sec
"coingecko": {"calls": 50, "period": 60}, # 50/min
"binance": {"calls": 1200, "period": 60}, # 1200/min
}
self.call_times = defaultdict(list)
async def wait_if_needed(self, service: str):
"""Wait if rate limit is reached"""
if service not in self.limits:
return
limit = self.limits[service]
now = time.time()
# Clean old calls
self.call_times[service] = [
t for t in self.call_times[service]
if now - t < limit["period"]
]
# Check if limit reached
if len(self.call_times[service]) >= limit["calls"]:
wait_time = limit["period"] - (now - self.call_times[service][0])
if wait_time > 0:
logger.warning(f"⏳ Rate limit reached for {service}, waiting {wait_time:.1f}s")
await asyncio.sleep(wait_time)
# Record new call
self.call_times[service].append(now)
class DataHubComplete:
"""
Data Hub کامل برای مدیریت همه منابع داده
"""
def __init__(self):
self.config = DataHubConfiguration()
self.rate_limiter = RateLimiter()
self.cache = {}
self.timeout = httpx.Timeout(30.0, connect=10.0)
logger.info("🚀 Data Hub Complete initialized with all new API keys")
# =========================================================================
# Cache Management
# =========================================================================
def _get_cache_key(self, category: str, params: Dict = None) -> str:
"""Generate cache key"""
cache_str = f"{category}:{json.dumps(params or {}, sort_keys=True)}"
return hashlib.md5(cache_str.encode()).hexdigest()
def _get_cached(self, cache_key: str, cache_type: str) -> Optional[Dict]:
"""Get data from cache if not expired"""
if cache_key not in self.cache:
return None
cached_data, cached_time = self.cache[cache_key]
ttl = self.config.CACHE_TTL.get(cache_type, 0)
if ttl == 0:
return None
age = (datetime.now() - cached_time).total_seconds()
if age < ttl:
logger.info(f"📦 Cache HIT: {cache_type} (age: {age:.1f}s)")
return cached_data
del self.cache[cache_key]
return None
def _set_cache(self, cache_key: str, data: Dict, cache_type: str):
"""Store data in cache"""
ttl = self.config.CACHE_TTL.get(cache_type, 0)
if ttl > 0:
self.cache[cache_key] = (data, datetime.now())
# =========================================================================
# 1. Market Price Data - داده‌های قیمت بازار
# =========================================================================
async def get_market_prices(
self,
symbols: Optional[List[str]] = None,
limit: int = 100,
source: str = "auto"
) -> Dict[str, Any]:
"""
دریافت قیمت‌های بازار از منابع مختلف
Sources: CoinMarketCap, CoinGecko, Binance, HuggingFace
"""
cache_key = self._get_cache_key("market_prices", {"symbols": symbols, "limit": limit})
cached = self._get_cached(cache_key, "market_prices")
if cached:
return cached
errors = []
# Try CoinMarketCap first
if source in ["auto", "coinmarketcap"]:
try:
await self.rate_limiter.wait_if_needed("coinmarketcap")
async with httpx.AsyncClient(timeout=self.timeout) as client:
headers = {"X-CMC_PRO_API_KEY": self.config.COINMARKETCAP_API_KEY}
params = {"limit": limit, "convert": "USD"}
if symbols:
params["symbol"] = ",".join(symbols)
endpoint = "/cryptocurrency/quotes/latest"
else:
endpoint = "/cryptocurrency/listings/latest"
response = await client.get(
f"{self.config.COINMARKETCAP_BASE_URL}{endpoint}",
headers=headers,
params=params
)
response.raise_for_status()
data = response.json()
# Transform data
result_data = []
if "data" in data:
items = data["data"] if isinstance(data["data"], list) else data["data"].values()
for coin in items:
quote = coin.get("quote", {}).get("USD", {})
result_data.append({
"symbol": coin["symbol"],
"name": coin["name"],
"price": quote.get("price", 0),
"change_24h": quote.get("percent_change_24h", 0),
"volume_24h": quote.get("volume_24h", 0),
"market_cap": quote.get("market_cap", 0),
"rank": coin.get("cmc_rank", 0)
})
result = {
"success": True,
"source": "coinmarketcap",
"data": result_data,
"timestamp": datetime.utcnow().isoformat()
}
self._set_cache(cache_key, result, "market_prices")
logger.info(f"✅ Market prices from CoinMarketCap: {len(result_data)} items")
return result
except Exception as e:
errors.append(f"CoinMarketCap: {e}")
logger.warning(f"❌ CoinMarketCap failed: {e}")
# Try CoinGecko as fallback
if source in ["auto", "coingecko"]:
try:
await self.rate_limiter.wait_if_needed("coingecko")
async with httpx.AsyncClient(timeout=self.timeout) as client:
if symbols:
ids = ",".join([s.lower() for s in symbols])
params = {"ids": ids, "vs_currencies": "usd", "include_24hr_change": "true"}
endpoint = "/simple/price"
else:
params = {"vs_currency": "usd", "per_page": limit, "page": 1}
endpoint = "/coins/markets"
response = await client.get(
f"{self.config.COINGECKO_BASE_URL}{endpoint}",
params=params
)
response.raise_for_status()
data = response.json()
# Transform data
result_data = []
if isinstance(data, list):
for coin in data:
result_data.append({
"symbol": coin.get("symbol", "").upper(),
"name": coin.get("name", ""),
"price": coin.get("current_price", 0),
"change_24h": coin.get("price_change_percentage_24h", 0),
"volume_24h": coin.get("total_volume", 0),
"market_cap": coin.get("market_cap", 0),
"rank": coin.get("market_cap_rank", 0)
})
else:
for symbol, info in data.items():
result_data.append({
"symbol": symbol.upper(),
"price": info.get("usd", 0),
"change_24h": info.get("usd_24h_change", 0)
})
result = {
"success": True,
"source": "coingecko",
"data": result_data,
"timestamp": datetime.utcnow().isoformat()
}
self._set_cache(cache_key, result, "market_prices")
logger.info(f"✅ Market prices from CoinGecko: {len(result_data)} items")
return result
except Exception as e:
errors.append(f"CoinGecko: {e}")
logger.warning(f"❌ CoinGecko failed: {e}")
# Try Binance for specific pairs
if source in ["auto", "binance"] and symbols:
try:
await self.rate_limiter.wait_if_needed("binance")
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.config.BINANCE_BASE_URL}/ticker/24hr"
)
response.raise_for_status()
data = response.json()
# Filter and transform data
result_data = []
for ticker in data:
if ticker["symbol"].endswith("USDT"):
base = ticker["symbol"][:-4]
if not symbols or base in symbols:
result_data.append({
"symbol": base,
"price": float(ticker["lastPrice"]),
"change_24h": float(ticker["priceChangePercent"]),
"volume_24h": float(ticker["volume"]) * float(ticker["lastPrice"]),
"high_24h": float(ticker["highPrice"]),
"low_24h": float(ticker["lowPrice"])
})
result = {
"success": True,
"source": "binance",
"data": result_data[:limit],
"timestamp": datetime.utcnow().isoformat()
}
self._set_cache(cache_key, result, "market_prices")
logger.info(f"✅ Market prices from Binance: {len(result_data)} items")
return result
except Exception as e:
errors.append(f"Binance: {e}")
logger.warning(f"❌ Binance failed: {e}")
# Return error if all sources failed
return {
"success": False,
"error": "All market data sources failed",
"errors": errors,
"timestamp": datetime.utcnow().isoformat()
}
# =========================================================================
# 2. Historical OHLCV Data - داده‌های تاریخی
# =========================================================================
async def get_ohlcv_data(
self,
symbol: str,
interval: str = "1h",
limit: int = 100,
source: str = "auto"
) -> Dict[str, Any]:
"""
دریافت داده‌های OHLCV (کندل استیک)
Sources: Binance, CoinMarketCap, HuggingFace
"""
cache_key = self._get_cache_key("ohlcv", {"symbol": symbol, "interval": interval, "limit": limit})
cached = self._get_cached(cache_key, "ohlcv")
if cached:
return cached
errors = []
# Try Binance first (best for OHLCV)
if source in ["auto", "binance"]:
try:
await self.rate_limiter.wait_if_needed("binance")
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.config.BINANCE_BASE_URL}/klines",
params={
"symbol": f"{symbol}USDT",
"interval": interval,
"limit": limit
}
)
response.raise_for_status()
klines = response.json()
# Transform to standard format
ohlcv_data = []
for kline in klines:
ohlcv_data.append({
"timestamp": int(kline[0]),
"open": float(kline[1]),
"high": float(kline[2]),
"low": float(kline[3]),
"close": float(kline[4]),
"volume": float(kline[5])
})
result = {
"success": True,
"source": "binance",
"symbol": symbol,
"interval": interval,
"data": ohlcv_data,
"timestamp": datetime.utcnow().isoformat()
}
self._set_cache(cache_key, result, "ohlcv")
logger.info(f"✅ OHLCV from Binance: {len(ohlcv_data)} candles")
return result
except Exception as e:
errors.append(f"Binance: {e}")
logger.warning(f"❌ Binance OHLCV failed: {e}")
# Try HuggingFace as fallback
if source in ["auto", "huggingface"]:
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
_token = self.config.HF_API_TOKEN or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN")
headers = {}
if _token:
headers["Authorization"] = f"Bearer {_token}"
response = await client.get(
f"{self.config.HF_SPACE_BASE_URL}/api/market/history",
headers=headers,
params={
"symbol": f"{symbol}USDT",
"timeframe": interval,
"limit": limit
}
)
response.raise_for_status()
data = response.json()
result = {
"success": True,
"source": "huggingface",
"symbol": symbol,
"interval": interval,
"data": data.get("data", []),
"timestamp": datetime.utcnow().isoformat()
}
self._set_cache(cache_key, result, "ohlcv")
logger.info(f"✅ OHLCV from HuggingFace")
return result
except Exception as e:
errors.append(f"HuggingFace: {e}")
logger.warning(f"❌ HuggingFace OHLCV failed: {e}")
return {
"success": False,
"error": "Failed to fetch OHLCV data",
"errors": errors,
"timestamp": datetime.utcnow().isoformat()
}
# =========================================================================
# 3. Sentiment Data - داده‌های احساسات
# =========================================================================
async def get_fear_greed_index(self) -> Dict[str, Any]:
"""
دریافت شاخص ترس و طمع
Source: Alternative.me
"""
cache_key = self._get_cache_key("fear_greed", {})
cached = self._get_cached(cache_key, "fear_greed")
if cached:
return cached
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.config.ALTERNATIVE_ME_BASE_URL}/fng/",
params={"limit": 30, "format": "json"}
)
response.raise_for_status()
data = response.json()
result = {
"success": True,
"source": "alternative.me",
"data": data.get("data", []),
"current": data.get("data", [{}])[0] if data.get("data") else {},
"timestamp": datetime.utcnow().isoformat()
}
self._set_cache(cache_key, result, "fear_greed")
logger.info(f"✅ Fear & Greed Index fetched")
return result
except Exception as e:
logger.error(f"❌ Fear & Greed Index failed: {e}")
return {
"success": False,
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
async def analyze_sentiment(
self,
text: str,
source: str = "huggingface"
) -> Dict[str, Any]:
"""
تحلیل احساسات متن
Source: HuggingFace Models
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
_token = self.config.HF_API_TOKEN or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN")
headers = {}
if _token:
headers["Authorization"] = f"Bearer {_token}"
response = await client.post(
f"{self.config.HF_SPACE_BASE_URL}/api/sentiment/analyze",
headers=headers,
json={"text": text}
)
response.raise_for_status()
data = response.json()
logger.info(f"✅ Sentiment analysis completed")
return {
"success": True,
"source": "huggingface",
"data": data.get("data", {}),
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"❌ Sentiment analysis failed: {e}")
return {
"success": False,
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
# =========================================================================
# 4. News Data - داده‌های اخبار
# =========================================================================
async def get_crypto_news(
self,
query: str = "cryptocurrency",
limit: int = 20,
source: str = "auto"
) -> Dict[str, Any]:
"""
دریافت اخبار ارزهای دیجیتال
Sources: NewsAPI, Reddit, HuggingFace
"""
cache_key = self._get_cache_key("news", {"query": query, "limit": limit})
cached = self._get_cached(cache_key, "news")
if cached:
return cached
errors = []
articles = []
# Try NewsAPI
if source in ["auto", "newsapi"]:
try:
await self.rate_limiter.wait_if_needed("newsapi")
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.config.NEWSAPI_BASE_URL}/everything",
params={
"q": query,
"apiKey": self.config.NEWSAPI_API_KEY,
"language": "en",
"sortBy": "publishedAt",
"pageSize": limit
}
)
response.raise_for_status()
data = response.json()
for article in data.get("articles", []):
articles.append({
"title": article["title"],
"description": article.get("description"),
"url": article["url"],
"source": article["source"]["name"],
"published_at": article["publishedAt"],
"image_url": article.get("urlToImage")
})
logger.info(f"✅ NewsAPI: {len(articles)} articles")
except Exception as e:
errors.append(f"NewsAPI: {e}")
logger.warning(f"❌ NewsAPI failed: {e}")
# Try Reddit
if source in ["auto", "reddit"]:
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.config.REDDIT_BASE_URL}/CryptoCurrency/hot.json",
params={"limit": limit},
headers={"User-Agent": "CryptoDataHub/1.0"}
)
response.raise_for_status()
data = response.json()
for post in data["data"]["children"]:
post_data = post["data"]
articles.append({
"title": post_data["title"],
"description": post_data.get("selftext", "")[:200],
"url": f"https://reddit.com{post_data['permalink']}",
"source": "Reddit",
"published_at": datetime.fromtimestamp(post_data["created_utc"]).isoformat(),
"score": post_data["score"],
"comments": post_data["num_comments"]
})
logger.info(f"✅ Reddit: {len(articles)} posts")
except Exception as e:
errors.append(f"Reddit: {e}")
logger.warning(f"❌ Reddit failed: {e}")
if articles:
result = {
"success": True,
"articles": articles[:limit],
"total": len(articles),
"sources": ["newsapi", "reddit"],
"timestamp": datetime.utcnow().isoformat()
}
self._set_cache(cache_key, result, "news")
return result
return {
"success": False,
"error": "Failed to fetch news",
"errors": errors,
"timestamp": datetime.utcnow().isoformat()
}
# =========================================================================
# 5. Trending Data - داده‌های ترندینگ
# =========================================================================
async def get_trending_coins(self, source: str = "coingecko") -> Dict[str, Any]:
"""
دریافت ارزهای ترند
Source: CoinGecko
"""
cache_key = self._get_cache_key("trending", {})
cached = self._get_cached(cache_key, "trending")
if cached:
return cached
try:
await self.rate_limiter.wait_if_needed("coingecko")
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(f"{self.config.COINGECKO_BASE_URL}/search/trending")
response.raise_for_status()
data = response.json()
trending = []
for coin in data.get("coins", []):
item = coin.get("item", {})
trending.append({
"id": item.get("id"),
"symbol": item.get("symbol"),
"name": item.get("name"),
"rank": item.get("market_cap_rank"),
"price_btc": item.get("price_btc"),
"score": item.get("score", 0)
})
result = {
"success": True,
"source": "coingecko",
"trending": trending,
"timestamp": datetime.utcnow().isoformat()
}
self._set_cache(cache_key, result, "trending")
logger.info(f"✅ Trending coins: {len(trending)} items")
return result
except Exception as e:
logger.error(f"❌ Trending coins failed: {e}")
return {
"success": False,
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
# =========================================================================
# 6. Blockchain Data - داده‌های بلاکچین
# =========================================================================
async def get_blockchain_data(
self,
chain: str,
data_type: str = "transactions",
address: Optional[str] = None,
limit: int = 20
) -> Dict[str, Any]:
"""
دریافت داده‌های بلاکچین
Chains: ethereum, bsc, tron
Types: transactions, balance, gas
"""
cache_key = self._get_cache_key("blockchain", {
"chain": chain,
"type": data_type,
"address": address
})
cached = self._get_cached(cache_key, "blockchain")
if cached:
return cached
try:
if chain.lower() == "ethereum":
await self.rate_limiter.wait_if_needed("etherscan")
async with httpx.AsyncClient(timeout=self.timeout) as client:
params = {"apikey": self.config.ETHERSCAN_API_KEY}
if data_type == "gas":
params.update({"module": "gastracker", "action": "gasoracle"})
elif data_type == "balance" and address:
params.update({
"module": "account",
"action": "balance",
"address": address
})
elif data_type == "transactions" and address:
params.update({
"module": "account",
"action": "txlist",
"address": address,
"startblock": 0,
"endblock": 99999999,
"page": 1,
"offset": limit,
"sort": "desc"
})
response = await client.get(
self.config.ETHERSCAN_BASE_URL,
params=params
)
response.raise_for_status()
data = response.json()
result = {
"success": True,
"source": "etherscan",
"chain": "ethereum",
"type": data_type,
"data": data.get("result", {}),
"timestamp": datetime.utcnow().isoformat()
}
self._set_cache(cache_key, result, "blockchain")
logger.info(f"✅ Ethereum {data_type} data fetched")
return result
elif chain.lower() == "bsc":
await self.rate_limiter.wait_if_needed("bscscan")
async with httpx.AsyncClient(timeout=self.timeout) as client:
params = {"apikey": self.config.BSCSCAN_API_KEY}
if data_type == "balance" and address:
params.update({
"module": "account",
"action": "balance",
"address": address
})
elif data_type == "transactions" and address:
params.update({
"module": "account",
"action": "txlist",
"address": address,
"startblock": 0,
"endblock": 99999999,
"page": 1,
"offset": limit,
"sort": "desc"
})
response = await client.get(
self.config.BSCSCAN_BASE_URL,
params=params
)
response.raise_for_status()
data = response.json()
result = {
"success": True,
"source": "bscscan",
"chain": "bsc",
"type": data_type,
"data": data.get("result", {}),
"timestamp": datetime.utcnow().isoformat()
}
self._set_cache(cache_key, result, "blockchain")
logger.info(f"✅ BSC {data_type} data fetched")
return result
elif chain.lower() == "tron":
await self.rate_limiter.wait_if_needed("tronscan")
async with httpx.AsyncClient(timeout=self.timeout) as client:
headers = {"TRON-PRO-API-KEY": self.config.TRONSCAN_API_KEY}
if data_type == "transactions":
endpoint = "/transaction"
params = {"sort": "-timestamp", "limit": limit}
if address:
params["address"] = address
elif data_type == "balance" and address:
endpoint = f"/account/{address}"
params = {}
else:
endpoint = "/transaction"
params = {"sort": "-timestamp", "limit": limit}
response = await client.get(
f"{self.config.TRONSCAN_BASE_URL}{endpoint}",
headers=headers,
params=params
)
response.raise_for_status()
data = response.json()
result = {
"success": True,
"source": "tronscan",
"chain": "tron",
"type": data_type,
"data": data.get("data", data),
"timestamp": datetime.utcnow().isoformat()
}
self._set_cache(cache_key, result, "blockchain")
logger.info(f"✅ Tron {data_type} data fetched")
return result
else:
return {
"success": False,
"error": f"Unsupported chain: {chain}",
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"❌ Blockchain data failed: {e}")
return {
"success": False,
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
# =========================================================================
# 7. Whale Activity - فعالیت نهنگ‌ها
# =========================================================================
async def get_whale_activity(
self,
chain: str = "all",
min_value_usd: float = 1000000,
limit: int = 50
) -> Dict[str, Any]:
"""
دریافت فعالیت نهنگ‌ها
تراکنش‌های بزرگ در بلاکچین‌های مختلف
"""
# برای ساده‌سازی، از HuggingFace استفاده می‌کنیم
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
_token = self.config.HF_API_TOKEN or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN")
headers = {}
if _token:
headers["Authorization"] = f"Bearer {_token}"
response = await client.get(
f"{self.config.HF_SPACE_BASE_URL}/api/crypto/whales/transactions",
headers=headers,
params={
"limit": limit,
"chain": chain if chain != "all" else None,
"min_amount_usd": min_value_usd
}
)
response.raise_for_status()
data = response.json()
logger.info(f"✅ Whale activity fetched")
return {
"success": True,
"source": "huggingface",
"data": data,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"❌ Whale activity failed: {e}")
# Fallback: Get large transactions from blockchain explorers
return {
"success": False,
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
# =========================================================================
# 8. Social Media Data - داده‌های شبکه‌های اجتماعی
# =========================================================================
async def get_social_media_data(
self,
platform: str = "reddit",
query: str = "cryptocurrency",
limit: int = 20
) -> Dict[str, Any]:
"""
دریافت داده‌های شبکه‌های اجتماعی
Platforms: reddit, twitter (future)
"""
cache_key = self._get_cache_key("social_media", {
"platform": platform,
"query": query
})
cached = self._get_cached(cache_key, "social_media")
if cached:
return cached
if platform == "reddit":
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
# Search in multiple crypto subreddits
subreddits = ["CryptoCurrency", "Bitcoin", "ethereum", "defi"]
all_posts = []
for subreddit in subreddits:
try:
response = await client.get(
f"{self.config.REDDIT_BASE_URL}/{subreddit}/hot.json",
params={"limit": limit // len(subreddits)},
headers={"User-Agent": "CryptoDataHub/1.0"}
)
response.raise_for_status()
data = response.json()
for post in data["data"]["children"]:
post_data = post["data"]
all_posts.append({
"id": post_data["id"],
"title": post_data["title"],
"text": post_data.get("selftext", "")[:500],
"url": f"https://reddit.com{post_data['permalink']}",
"subreddit": subreddit,
"score": post_data["score"],
"comments": post_data["num_comments"],
"created_at": datetime.fromtimestamp(post_data["created_utc"]).isoformat(),
"author": post_data.get("author", "deleted")
})
except Exception as e:
logger.warning(f"Failed to fetch from r/{subreddit}: {e}")
# Sort by score
all_posts.sort(key=lambda x: x["score"], reverse=True)
result = {
"success": True,
"platform": "reddit",
"posts": all_posts[:limit],
"total": len(all_posts),
"timestamp": datetime.utcnow().isoformat()
}
self._set_cache(cache_key, result, "social_media")
logger.info(f"✅ Reddit data: {len(all_posts)} posts")
return result
except Exception as e:
logger.error(f"❌ Reddit data failed: {e}")
return {
"success": False,
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
return {
"success": False,
"error": f"Unsupported platform: {platform}",
"timestamp": datetime.utcnow().isoformat()
}
# =========================================================================
# 9. AI Model Predictions - پیش‌بینی‌های مدل‌های AI
# =========================================================================
async def get_ai_prediction(
self,
symbol: str,
model_type: str = "price",
timeframe: str = "24h"
) -> Dict[str, Any]:
"""
دریافت پیش‌بینی از مدل‌های AI
Types: price, trend, signal
"""
try:
async with httpx.AsyncClient(timeout=self.timeout) as client:
_token = self.config.HF_API_TOKEN or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN")
headers = {}
if _token:
headers["Authorization"] = f"Bearer {_token}"
# Get recent price data for context
price_data = await self.get_market_prices(symbols=[symbol], limit=1)
current_price = 0
if price_data.get("success") and price_data.get("data"):
current_price = price_data["data"][0].get("price", 0)
response = await client.post(
f"{self.config.HF_SPACE_BASE_URL}/api/models/predict",
headers=headers,
json={
"symbol": symbol,
"type": model_type,
"timeframe": timeframe,
"current_price": current_price
}
)
response.raise_for_status()
data = response.json()
logger.info(f"✅ AI prediction for {symbol}")
return {
"success": True,
"source": "huggingface",
"symbol": symbol,
"prediction": data,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"❌ AI prediction failed: {e}")
# Fallback: Simple trend analysis
return {
"success": False,
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
# =========================================================================
# 10. System Health - سلامت سیستم
# =========================================================================
async def check_all_sources_health(self) -> Dict[str, Any]:
"""
بررسی سلامت تمام منابع داده
"""
health_status = {}
# Check CoinMarketCap
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(
f"{self.config.COINMARKETCAP_BASE_URL}/key/info",
headers={"X-CMC_PRO_API_KEY": self.config.COINMARKETCAP_API_KEY}
)
health_status["coinmarketcap"] = "operational" if response.status_code == 200 else "degraded"
except:
health_status["coinmarketcap"] = "down"
# Check NewsAPI
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(
f"{self.config.NEWSAPI_BASE_URL}/top-headlines",
params={"apiKey": self.config.NEWSAPI_API_KEY, "pageSize": 1, "q": "test"}
)
health_status["newsapi"] = "operational" if response.status_code == 200 else "degraded"
except:
health_status["newsapi"] = "down"
# Check Etherscan
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(
self.config.ETHERSCAN_BASE_URL,
params={
"module": "stats",
"action": "ethsupply",
"apikey": self.config.ETHERSCAN_API_KEY
}
)
health_status["etherscan"] = "operational" if response.status_code == 200 else "degraded"
except:
health_status["etherscan"] = "down"
# Check HuggingFace
try:
async with httpx.AsyncClient(timeout=5.0) as client:
_token = self.config.HF_API_TOKEN or os.getenv("HF_TOKEN") or os.getenv("HUGGINGFACE_TOKEN")
headers = {}
if _token:
headers["Authorization"] = f"Bearer {_token}"
response = await client.get(
f"{self.config.HF_SPACE_BASE_URL}/api/health",
headers=headers
)
health_status["huggingface"] = "operational" if response.status_code == 200 else "degraded"
except:
health_status["huggingface"] = "down"
# Check free APIs (no auth needed)
health_status["coingecko"] = "operational" # Usually very stable
health_status["binance"] = "operational" # Usually very stable
health_status["alternative_me"] = "operational"
health_status["reddit"] = "operational"
return {
"success": True,
"status": health_status,
"operational_count": sum(1 for v in health_status.values() if v == "operational"),
"total_sources": len(health_status),
"timestamp": datetime.utcnow().isoformat()
}
# Global singleton instance
_data_hub_instance = None
def get_data_hub() -> DataHubComplete:
"""Get singleton instance of Data Hub Complete"""
global _data_hub_instance
if _data_hub_instance is None:
_data_hub_instance = DataHubComplete()
return _data_hub_instance