Datasourceforcryptocurrency / backend /services /market_data_aggregator.py
Really-amin's picture
Upload 577 files
b190b45 verified
#!/usr/bin/env python3
"""
Market Data Aggregator - Uses ALL Free Resources
Maximizes usage of all available free market data APIs with intelligent fallback
"""
import httpx
import logging
import asyncio
from typing import Dict, Any, List, Optional
from datetime import datetime
from fastapi import HTTPException
logger = logging.getLogger(__name__)
class MarketDataAggregator:
"""
Aggregates market data from ALL free sources:
- CoinGecko (primary)
- CoinPaprika
- CoinCap
- Binance Public
- CoinLore
- Messari
- DefiLlama
- DIA Data
- CoinStats
- FreeCryptoAPI
"""
def __init__(self):
self.timeout = 10.0
self.providers = {
"coingecko": {
"base_url": "https://api.coingecko.com/api/v3",
"priority": 1,
"free": True
},
"coinpaprika": {
"base_url": "https://api.coinpaprika.com/v1",
"priority": 2,
"free": True
},
"coincap": {
"base_url": "https://api.coincap.io/v2",
"priority": 3,
"free": True
},
"binance": {
"base_url": "https://api.binance.com/api/v3",
"priority": 4,
"free": True
},
"coinlore": {
"base_url": "https://api.coinlore.net/api",
"priority": 5,
"free": True
},
"messari": {
"base_url": "https://data.messari.io/api/v1",
"priority": 6,
"free": True
},
"defillama": {
"base_url": "https://coins.llama.fi",
"priority": 7,
"free": True
},
"diadata": {
"base_url": "https://api.diadata.org/v1",
"priority": 8,
"free": True
},
"coinstats": {
"base_url": "https://api.coinstats.app/public/v1",
"priority": 9,
"free": True
}
}
# Symbol mappings for different providers
self.symbol_to_coingecko_id = {
"BTC": "bitcoin", "ETH": "ethereum", "BNB": "binancecoin",
"XRP": "ripple", "ADA": "cardano", "DOGE": "dogecoin",
"SOL": "solana", "TRX": "tron", "DOT": "polkadot",
"MATIC": "matic-network", "LTC": "litecoin", "SHIB": "shiba-inu",
"AVAX": "avalanche-2", "UNI": "uniswap", "LINK": "chainlink",
"ATOM": "cosmos", "XLM": "stellar", "ETC": "ethereum-classic",
"XMR": "monero", "BCH": "bitcoin-cash", "NEAR": "near",
"APT": "aptos", "ARB": "arbitrum", "OP": "optimism"
}
async def get_price(self, symbol: str) -> Dict[str, Any]:
"""
Get price using ALL available free providers with fallback
"""
symbol = symbol.upper().replace("USDT", "").replace("USD", "")
# Try all providers in priority order
providers_to_try = sorted(
self.providers.items(),
key=lambda x: x[1]["priority"]
)
for provider_name, provider_info in providers_to_try:
try:
if provider_name == "coingecko":
price_data = await self._get_price_coingecko(symbol)
elif provider_name == "coinpaprika":
price_data = await self._get_price_coinpaprika(symbol)
elif provider_name == "coincap":
price_data = await self._get_price_coincap(symbol)
elif provider_name == "binance":
price_data = await self._get_price_binance(symbol)
elif provider_name == "coinlore":
price_data = await self._get_price_coinlore(symbol)
elif provider_name == "messari":
price_data = await self._get_price_messari(symbol)
elif provider_name == "coinstats":
price_data = await self._get_price_coinstats(symbol)
else:
continue
if price_data and price_data.get("price", 0) > 0:
logger.info(f"✅ {provider_name.upper()}: Successfully fetched price for {symbol}")
return price_data
except Exception as e:
logger.warning(f"⚠️ {provider_name.upper()} failed for {symbol}: {e}")
continue
raise HTTPException(
status_code=503,
detail=f"All market data providers failed for {symbol}"
)
async def get_multiple_prices(self, symbols: List[str], limit: int = 100) -> List[Dict[str, Any]]:
"""
Get prices for multiple symbols using batch APIs where possible
"""
# Try CoinGecko batch first
try:
return await self._get_batch_coingecko(symbols or None, limit)
except Exception as e:
logger.warning(f"⚠️ CoinGecko batch failed: {e}")
# Try CoinCap batch
try:
return await self._get_batch_coincap(symbols, limit)
except Exception as e:
logger.warning(f"⚠️ CoinCap batch failed: {e}")
# Try CoinPaprika batch
try:
return await self._get_batch_coinpaprika(limit)
except Exception as e:
logger.warning(f"⚠️ CoinPaprika batch failed: {e}")
# Fallback: Get individual prices
if symbols:
results = []
for symbol in symbols[:limit]:
try:
price_data = await self.get_price(symbol)
results.append(price_data)
except:
continue
if results:
return results
raise HTTPException(
status_code=503,
detail="All market data providers failed"
)
# CoinGecko implementation
async def _get_price_coingecko(self, symbol: str) -> Dict[str, Any]:
"""Get price from CoinGecko"""
coin_id = self.symbol_to_coingecko_id.get(symbol, symbol.lower())
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.providers['coingecko']['base_url']}/simple/price",
params={
"ids": coin_id,
"vs_currencies": "usd",
"include_24hr_change": "true",
"include_24hr_vol": "true",
"include_market_cap": "true"
}
)
response.raise_for_status()
data = response.json()
if coin_id in data:
coin_data = data[coin_id]
return {
"symbol": symbol,
"price": coin_data.get("usd", 0),
"change24h": coin_data.get("usd_24h_change", 0),
"volume24h": coin_data.get("usd_24h_vol", 0),
"marketCap": coin_data.get("usd_market_cap", 0),
"source": "coingecko",
"timestamp": int(datetime.utcnow().timestamp() * 1000)
}
raise Exception("Coin not found in CoinGecko")
async def _get_batch_coingecko(self, symbols: Optional[List[str]], limit: int) -> List[Dict[str, Any]]:
"""Get batch prices from CoinGecko"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
if symbols:
coin_ids = [self.symbol_to_coingecko_id.get(s.upper(), s.lower()) for s in symbols]
response = await client.get(
f"{self.providers['coingecko']['base_url']}/simple/price",
params={
"ids": ",".join(coin_ids),
"vs_currencies": "usd",
"include_24hr_change": "true",
"include_24hr_vol": "true",
"include_market_cap": "true"
}
)
else:
response = await client.get(
f"{self.providers['coingecko']['base_url']}/coins/markets",
params={
"vs_currency": "usd",
"order": "market_cap_desc",
"per_page": min(limit, 250),
"page": 1,
"sparkline": "false"
}
)
response.raise_for_status()
data = response.json()
results = []
if isinstance(data, list):
for coin in data:
results.append({
"symbol": coin.get("symbol", "").upper(),
"name": coin.get("name", ""),
"price": coin.get("current_price", 0),
"change24h": coin.get("price_change_24h", 0),
"volume24h": coin.get("total_volume", 0),
"marketCap": coin.get("market_cap", 0),
"source": "coingecko",
"timestamp": int(datetime.utcnow().timestamp() * 1000)
})
else:
for coin_id, coin_data in data.items():
symbol = next((k for k, v in self.symbol_to_coingecko_id.items() if v == coin_id), coin_id.upper())
results.append({
"symbol": symbol,
"price": coin_data.get("usd", 0),
"change24h": coin_data.get("usd_24h_change", 0),
"volume24h": coin_data.get("usd_24h_vol", 0),
"marketCap": coin_data.get("usd_market_cap", 0),
"source": "coingecko",
"timestamp": int(datetime.utcnow().timestamp() * 1000)
})
logger.info(f"✅ CoinGecko: Fetched {len(results)} prices")
return results
# CoinPaprika implementation
async def _get_price_coinpaprika(self, symbol: str) -> Dict[str, Any]:
"""Get price from CoinPaprika"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
# Search for coin
search_response = await client.get(
f"{self.providers['coinpaprika']['base_url']}/search",
params={"q": symbol, "c": "currencies", "limit": 1}
)
search_response.raise_for_status()
search_data = search_response.json()
if search_data.get("currencies"):
coin_id = search_data["currencies"][0]["id"]
# Get ticker data
ticker_response = await client.get(
f"{self.providers['coinpaprika']['base_url']}/tickers/{coin_id}"
)
ticker_response.raise_for_status()
ticker_data = ticker_response.json()
quotes = ticker_data.get("quotes", {}).get("USD", {})
return {
"symbol": symbol,
"name": ticker_data.get("name", ""),
"price": quotes.get("price", 0),
"change24h": quotes.get("percent_change_24h", 0),
"volume24h": quotes.get("volume_24h", 0),
"marketCap": quotes.get("market_cap", 0),
"source": "coinpaprika",
"timestamp": int(datetime.utcnow().timestamp() * 1000)
}
raise Exception("Coin not found in CoinPaprika")
async def _get_batch_coinpaprika(self, limit: int) -> List[Dict[str, Any]]:
"""Get batch prices from CoinPaprika"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.providers['coinpaprika']['base_url']}/tickers",
params={"limit": limit}
)
response.raise_for_status()
data = response.json()
results = []
for coin in data:
quotes = coin.get("quotes", {}).get("USD", {})
results.append({
"symbol": coin.get("symbol", "").upper(),
"name": coin.get("name", ""),
"price": quotes.get("price", 0),
"change24h": quotes.get("percent_change_24h", 0),
"volume24h": quotes.get("volume_24h", 0),
"marketCap": quotes.get("market_cap", 0),
"source": "coinpaprika",
"timestamp": int(datetime.utcnow().timestamp() * 1000)
})
logger.info(f"✅ CoinPaprika: Fetched {len(results)} prices")
return results
# CoinCap implementation
async def _get_price_coincap(self, symbol: str) -> Dict[str, Any]:
"""Get price from CoinCap"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
# Search for asset
search_response = await client.get(
f"{self.providers['coincap']['base_url']}/assets",
params={"search": symbol, "limit": 1}
)
search_response.raise_for_status()
search_data = search_response.json()
if search_data.get("data"):
asset_id = search_data["data"][0]["id"]
# Get asset details
asset_response = await client.get(
f"{self.providers['coincap']['base_url']}/assets/{asset_id}"
)
asset_response.raise_for_status()
asset_data = asset_response.json()
asset = asset_data.get("data", {})
return {
"symbol": symbol,
"name": asset.get("name", ""),
"price": float(asset.get("priceUsd", 0)),
"change24h": float(asset.get("changePercent24Hr", 0)),
"volume24h": float(asset.get("volumeUsd24Hr", 0)),
"marketCap": float(asset.get("marketCapUsd", 0)),
"source": "coincap",
"timestamp": int(datetime.utcnow().timestamp() * 1000)
}
raise Exception("Asset not found in CoinCap")
async def _get_batch_coincap(self, symbols: Optional[List[str]], limit: int) -> List[Dict[str, Any]]:
"""Get batch prices from CoinCap"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.providers['coincap']['base_url']}/assets",
params={"limit": limit}
)
response.raise_for_status()
data = response.json()
results = []
for asset in data.get("data", []):
results.append({
"symbol": asset.get("symbol", "").upper(),
"name": asset.get("name", ""),
"price": float(asset.get("priceUsd", 0)),
"change24h": float(asset.get("changePercent24Hr", 0)),
"volume24h": float(asset.get("volumeUsd24Hr", 0)),
"marketCap": float(asset.get("marketCapUsd", 0)),
"source": "coincap",
"timestamp": int(datetime.utcnow().timestamp() * 1000)
})
logger.info(f"✅ CoinCap: Fetched {len(results)} prices")
return results
# Binance implementation
async def _get_price_binance(self, symbol: str) -> Dict[str, Any]:
"""Get price from Binance"""
binance_symbol = f"{symbol}USDT"
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.providers['binance']['base_url']}/ticker/24hr",
params={"symbol": binance_symbol}
)
response.raise_for_status()
data = response.json()
return {
"symbol": symbol,
"price": float(data.get("lastPrice", 0)),
"change24h": float(data.get("priceChangePercent", 0)),
"volume24h": float(data.get("volume", 0)),
"high24h": float(data.get("highPrice", 0)),
"low24h": float(data.get("lowPrice", 0)),
"source": "binance",
"timestamp": int(datetime.utcnow().timestamp() * 1000)
}
# CoinLore implementation
async def _get_price_coinlore(self, symbol: str) -> Dict[str, Any]:
"""Get price from CoinLore"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.providers['coinlore']['base_url']}/tickers/"
)
response.raise_for_status()
data = response.json()
for coin in data.get("data", []):
if coin.get("symbol", "").upper() == symbol:
return {
"symbol": symbol,
"name": coin.get("name", ""),
"price": float(coin.get("price_usd", 0)),
"change24h": float(coin.get("percent_change_24h", 0)),
"marketCap": float(coin.get("market_cap_usd", 0)),
"source": "coinlore",
"timestamp": int(datetime.utcnow().timestamp() * 1000)
}
raise Exception("Coin not found in CoinLore")
# Messari implementation
async def _get_price_messari(self, symbol: str) -> Dict[str, Any]:
"""Get price from Messari"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.providers['messari']['base_url']}/assets/{symbol.lower()}/metrics"
)
response.raise_for_status()
data = response.json()
metrics = data.get("data", {}).get("market_data", {})
return {
"symbol": symbol,
"name": data.get("data", {}).get("name", ""),
"price": float(metrics.get("price_usd", 0)),
"change24h": float(metrics.get("percent_change_usd_last_24_hours", 0)),
"volume24h": float(metrics.get("real_volume_last_24_hours", 0)),
"marketCap": float(metrics.get("marketcap", {}).get("current_marketcap_usd", 0)),
"source": "messari",
"timestamp": int(datetime.utcnow().timestamp() * 1000)
}
# CoinStats implementation
async def _get_price_coinstats(self, symbol: str) -> Dict[str, Any]:
"""Get price from CoinStats"""
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.providers['coinstats']['base_url']}/coins",
params={"currency": "USD"}
)
response.raise_for_status()
data = response.json()
for coin in data.get("coins", []):
if coin.get("symbol", "").upper() == symbol:
return {
"symbol": symbol,
"name": coin.get("name", ""),
"price": float(coin.get("price", 0)),
"change24h": float(coin.get("priceChange1d", 0)),
"volume24h": float(coin.get("volume", 0)),
"marketCap": float(coin.get("marketCap", 0)),
"source": "coinstats",
"timestamp": int(datetime.utcnow().timestamp() * 1000)
}
raise Exception("Coin not found in CoinStats")
# Global instance
market_data_aggregator = MarketDataAggregator()
__all__ = ["MarketDataAggregator", "market_data_aggregator"]