|
|
""" |
|
|
Extended Market Data Collectors |
|
|
Fetches data from Coinpaprika, DefiLlama, Messari, CoinCap, and other market data sources |
|
|
""" |
|
|
|
|
|
import asyncio |
|
|
from datetime import datetime, timezone |
|
|
from typing import Dict, List, Optional, Any |
|
|
from utils.api_client import get_client |
|
|
from utils.logger import setup_logger, log_api_request, log_error |
|
|
|
|
|
logger = setup_logger("market_data_extended_collector") |
|
|
|
|
|
|
|
|
async def get_coinpaprika_tickers() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch ticker data from Coinpaprika (free, no key required) |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
provider = "Coinpaprika" |
|
|
category = "market_data" |
|
|
endpoint = "/tickers" |
|
|
|
|
|
logger.info(f"Fetching tickers from {provider}") |
|
|
|
|
|
try: |
|
|
client = get_client() |
|
|
|
|
|
|
|
|
url = "https://api.coinpaprika.com/v1/tickers" |
|
|
|
|
|
params = { |
|
|
"quotes": "USD", |
|
|
"limit": 100 |
|
|
} |
|
|
|
|
|
|
|
|
response = await client.get(url, params=params, timeout=15) |
|
|
|
|
|
|
|
|
log_api_request( |
|
|
logger, |
|
|
provider, |
|
|
endpoint, |
|
|
response.get("response_time_ms", 0), |
|
|
"success" if response["success"] else "error", |
|
|
response.get("status_code") |
|
|
) |
|
|
|
|
|
if not response["success"]: |
|
|
error_msg = response.get("error_message", "Unknown error") |
|
|
log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": response.get("error_type") |
|
|
} |
|
|
|
|
|
|
|
|
data = response["data"] |
|
|
|
|
|
|
|
|
market_data = None |
|
|
if isinstance(data, list): |
|
|
top_10 = data[:10] |
|
|
total_market_cap = sum(coin.get("quotes", {}).get("USD", {}).get("market_cap", 0) for coin in top_10) |
|
|
|
|
|
market_data = { |
|
|
"total_coins": len(data), |
|
|
"top_10_market_cap": round(total_market_cap, 2), |
|
|
"top_10_coins": [ |
|
|
{ |
|
|
"symbol": coin.get("symbol"), |
|
|
"name": coin.get("name"), |
|
|
"price": coin.get("quotes", {}).get("USD", {}).get("price"), |
|
|
"market_cap": coin.get("quotes", {}).get("USD", {}).get("market_cap"), |
|
|
"volume_24h": coin.get("quotes", {}).get("USD", {}).get("volume_24h"), |
|
|
"percent_change_24h": coin.get("quotes", {}).get("USD", {}).get("percent_change_24h") |
|
|
} |
|
|
for coin in top_10 |
|
|
] |
|
|
} |
|
|
|
|
|
logger.info(f"{provider} - {endpoint} - Retrieved {len(data) if isinstance(data, list) else 0} tickers") |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": market_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"response_time_ms": response.get("response_time_ms", 0) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Unexpected error: {str(e)}" |
|
|
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": "exception" |
|
|
} |
|
|
|
|
|
|
|
|
async def get_defillama_tvl() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch DeFi Total Value Locked from DefiLlama (free, no key required) |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
provider = "DefiLlama" |
|
|
category = "defi_data" |
|
|
endpoint = "/tvl" |
|
|
|
|
|
logger.info(f"Fetching TVL data from {provider}") |
|
|
|
|
|
try: |
|
|
client = get_client() |
|
|
|
|
|
|
|
|
url = "https://api.llama.fi/v2/protocols" |
|
|
|
|
|
|
|
|
response = await client.get(url, timeout=15) |
|
|
|
|
|
|
|
|
log_api_request( |
|
|
logger, |
|
|
provider, |
|
|
endpoint, |
|
|
response.get("response_time_ms", 0), |
|
|
"success" if response["success"] else "error", |
|
|
response.get("status_code") |
|
|
) |
|
|
|
|
|
if not response["success"]: |
|
|
error_msg = response.get("error_message", "Unknown error") |
|
|
log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": response.get("error_type") |
|
|
} |
|
|
|
|
|
|
|
|
data = response["data"] |
|
|
|
|
|
|
|
|
tvl_data = None |
|
|
if isinstance(data, list): |
|
|
|
|
|
sorted_protocols = sorted(data, key=lambda x: x.get("tvl", 0), reverse=True) |
|
|
top_20 = sorted_protocols[:20] |
|
|
|
|
|
total_tvl = sum(p.get("tvl", 0) for p in data) |
|
|
|
|
|
tvl_data = { |
|
|
"total_protocols": len(data), |
|
|
"total_tvl": round(total_tvl, 2), |
|
|
"top_20_protocols": [ |
|
|
{ |
|
|
"name": p.get("name"), |
|
|
"symbol": p.get("symbol"), |
|
|
"tvl": round(p.get("tvl", 0), 2), |
|
|
"change_1d": p.get("change_1d"), |
|
|
"change_7d": p.get("change_7d"), |
|
|
"chains": p.get("chains", [])[:3] |
|
|
} |
|
|
for p in top_20 |
|
|
] |
|
|
} |
|
|
|
|
|
logger.info( |
|
|
f"{provider} - {endpoint} - Total TVL: ${tvl_data.get('total_tvl', 0):,.0f}" |
|
|
if tvl_data else f"{provider} - {endpoint} - No data" |
|
|
) |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": tvl_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"response_time_ms": response.get("response_time_ms", 0) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Unexpected error: {str(e)}" |
|
|
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": "exception" |
|
|
} |
|
|
|
|
|
|
|
|
async def get_coincap_assets() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch asset data from CoinCap (free, no key required) |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
provider = "CoinCap" |
|
|
category = "market_data" |
|
|
endpoint = "/assets" |
|
|
|
|
|
logger.info(f"Fetching assets from {provider}") |
|
|
|
|
|
try: |
|
|
client = get_client() |
|
|
|
|
|
|
|
|
url = "https://api.coincap.io/v2/assets" |
|
|
|
|
|
params = {"limit": 50} |
|
|
|
|
|
|
|
|
response = await client.get(url, params=params, timeout=10) |
|
|
|
|
|
|
|
|
log_api_request( |
|
|
logger, |
|
|
provider, |
|
|
endpoint, |
|
|
response.get("response_time_ms", 0), |
|
|
"success" if response["success"] else "error", |
|
|
response.get("status_code") |
|
|
) |
|
|
|
|
|
if not response["success"]: |
|
|
error_msg = response.get("error_message", "Unknown error") |
|
|
log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": response.get("error_type") |
|
|
} |
|
|
|
|
|
|
|
|
raw_data = response["data"] |
|
|
|
|
|
|
|
|
asset_data = None |
|
|
if isinstance(raw_data, dict) and "data" in raw_data: |
|
|
assets = raw_data["data"] |
|
|
|
|
|
top_10 = assets[:10] if isinstance(assets, list) else [] |
|
|
|
|
|
asset_data = { |
|
|
"total_assets": len(assets) if isinstance(assets, list) else 0, |
|
|
"top_10_assets": [ |
|
|
{ |
|
|
"symbol": asset.get("symbol"), |
|
|
"name": asset.get("name"), |
|
|
"price_usd": float(asset.get("priceUsd", 0)), |
|
|
"market_cap_usd": float(asset.get("marketCapUsd", 0)), |
|
|
"volume_24h_usd": float(asset.get("volumeUsd24Hr", 0)), |
|
|
"change_percent_24h": float(asset.get("changePercent24Hr", 0)) |
|
|
} |
|
|
for asset in top_10 |
|
|
] |
|
|
} |
|
|
|
|
|
logger.info(f"{provider} - {endpoint} - Retrieved {asset_data.get('total_assets', 0)} assets") |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": asset_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"response_time_ms": response.get("response_time_ms", 0) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Unexpected error: {str(e)}" |
|
|
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": "exception" |
|
|
} |
|
|
|
|
|
|
|
|
async def get_messari_assets(api_key: Optional[str] = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch asset data from Messari |
|
|
|
|
|
Args: |
|
|
api_key: Messari API key (optional, has free tier) |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
provider = "Messari" |
|
|
category = "market_data" |
|
|
endpoint = "/assets" |
|
|
|
|
|
logger.info(f"Fetching assets from {provider}") |
|
|
|
|
|
try: |
|
|
client = get_client() |
|
|
|
|
|
|
|
|
url = "https://data.messari.io/api/v1/assets" |
|
|
|
|
|
params = {"limit": 20} |
|
|
|
|
|
headers = {} |
|
|
if api_key: |
|
|
headers["x-messari-api-key"] = api_key |
|
|
|
|
|
|
|
|
response = await client.get(url, params=params, headers=headers, timeout=15) |
|
|
|
|
|
|
|
|
log_api_request( |
|
|
logger, |
|
|
provider, |
|
|
endpoint, |
|
|
response.get("response_time_ms", 0), |
|
|
"success" if response["success"] else "error", |
|
|
response.get("status_code") |
|
|
) |
|
|
|
|
|
if not response["success"]: |
|
|
error_msg = response.get("error_message", "Unknown error") |
|
|
log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": response.get("error_type") |
|
|
} |
|
|
|
|
|
|
|
|
raw_data = response["data"] |
|
|
|
|
|
|
|
|
asset_data = None |
|
|
if isinstance(raw_data, dict) and "data" in raw_data: |
|
|
assets = raw_data["data"] |
|
|
|
|
|
asset_data = { |
|
|
"total_assets": len(assets) if isinstance(assets, list) else 0, |
|
|
"assets": [ |
|
|
{ |
|
|
"symbol": asset.get("symbol"), |
|
|
"name": asset.get("name"), |
|
|
"slug": asset.get("slug"), |
|
|
"metrics": { |
|
|
"market_cap": asset.get("metrics", {}).get("marketcap", {}).get("current_marketcap_usd"), |
|
|
"volume_24h": asset.get("metrics", {}).get("market_data", {}).get("volume_last_24_hours"), |
|
|
"price": asset.get("metrics", {}).get("market_data", {}).get("price_usd") |
|
|
} |
|
|
} |
|
|
for asset in assets[:10] |
|
|
] if isinstance(assets, list) else [] |
|
|
} |
|
|
|
|
|
logger.info(f"{provider} - {endpoint} - Retrieved {asset_data.get('total_assets', 0)} assets") |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": asset_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"response_time_ms": response.get("response_time_ms", 0) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Unexpected error: {str(e)}" |
|
|
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": "exception" |
|
|
} |
|
|
|
|
|
|
|
|
async def get_cryptocompare_toplist() -> Dict[str, Any]: |
|
|
""" |
|
|
Fetch top cryptocurrencies from CryptoCompare (free tier available) |
|
|
|
|
|
Returns: |
|
|
Dict with provider, category, data, timestamp, success, error |
|
|
""" |
|
|
provider = "CryptoCompare" |
|
|
category = "market_data" |
|
|
endpoint = "/top/totalvolfull" |
|
|
|
|
|
logger.info(f"Fetching top list from {provider}") |
|
|
|
|
|
try: |
|
|
client = get_client() |
|
|
|
|
|
|
|
|
url = "https://min-api.cryptocompare.com/data/top/totalvolfull" |
|
|
|
|
|
params = { |
|
|
"limit": 20, |
|
|
"tsym": "USD" |
|
|
} |
|
|
|
|
|
|
|
|
response = await client.get(url, params=params, timeout=10) |
|
|
|
|
|
|
|
|
log_api_request( |
|
|
logger, |
|
|
provider, |
|
|
endpoint, |
|
|
response.get("response_time_ms", 0), |
|
|
"success" if response["success"] else "error", |
|
|
response.get("status_code") |
|
|
) |
|
|
|
|
|
if not response["success"]: |
|
|
error_msg = response.get("error_message", "Unknown error") |
|
|
log_error(logger, provider, response.get("error_type", "unknown"), error_msg, endpoint) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": response.get("error_type") |
|
|
} |
|
|
|
|
|
|
|
|
raw_data = response["data"] |
|
|
|
|
|
|
|
|
toplist_data = None |
|
|
if isinstance(raw_data, dict) and "Data" in raw_data: |
|
|
coins = raw_data["Data"] |
|
|
|
|
|
toplist_data = { |
|
|
"total_coins": len(coins) if isinstance(coins, list) else 0, |
|
|
"top_coins": [ |
|
|
{ |
|
|
"symbol": coin.get("CoinInfo", {}).get("Name"), |
|
|
"name": coin.get("CoinInfo", {}).get("FullName"), |
|
|
"price": coin.get("RAW", {}).get("USD", {}).get("PRICE"), |
|
|
"market_cap": coin.get("RAW", {}).get("USD", {}).get("MKTCAP"), |
|
|
"volume_24h": coin.get("RAW", {}).get("USD", {}).get("VOLUME24HOUR"), |
|
|
"change_24h": coin.get("RAW", {}).get("USD", {}).get("CHANGEPCT24HOUR") |
|
|
} |
|
|
for coin in (coins[:10] if isinstance(coins, list) else []) |
|
|
] |
|
|
} |
|
|
|
|
|
logger.info(f"{provider} - {endpoint} - Retrieved {toplist_data.get('total_coins', 0)} coins") |
|
|
|
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": toplist_data, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": True, |
|
|
"error": None, |
|
|
"response_time_ms": response.get("response_time_ms", 0) |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = f"Unexpected error: {str(e)}" |
|
|
log_error(logger, provider, "exception", error_msg, endpoint, exc_info=True) |
|
|
return { |
|
|
"provider": provider, |
|
|
"category": category, |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": error_msg, |
|
|
"error_type": "exception" |
|
|
} |
|
|
|
|
|
|
|
|
async def collect_extended_market_data(messari_key: Optional[str] = None) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Main function to collect extended market data from all sources |
|
|
|
|
|
Args: |
|
|
messari_key: Optional Messari API key |
|
|
|
|
|
Returns: |
|
|
List of results from all extended market data collectors |
|
|
""" |
|
|
logger.info("Starting extended market data collection from all sources") |
|
|
|
|
|
|
|
|
results = await asyncio.gather( |
|
|
get_coinpaprika_tickers(), |
|
|
get_defillama_tvl(), |
|
|
get_coincap_assets(), |
|
|
get_messari_assets(messari_key), |
|
|
get_cryptocompare_toplist(), |
|
|
return_exceptions=True |
|
|
) |
|
|
|
|
|
|
|
|
processed_results = [] |
|
|
for result in results: |
|
|
if isinstance(result, Exception): |
|
|
logger.error(f"Collector failed with exception: {str(result)}") |
|
|
processed_results.append({ |
|
|
"provider": "Unknown", |
|
|
"category": "market_data", |
|
|
"data": None, |
|
|
"timestamp": datetime.now(timezone.utc).isoformat(), |
|
|
"success": False, |
|
|
"error": str(result), |
|
|
"error_type": "exception" |
|
|
}) |
|
|
else: |
|
|
processed_results.append(result) |
|
|
|
|
|
|
|
|
successful = sum(1 for r in processed_results if r.get("success", False)) |
|
|
logger.info(f"Extended market data collection complete: {successful}/{len(processed_results)} successful") |
|
|
|
|
|
return processed_results |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
async def main(): |
|
|
import os |
|
|
|
|
|
messari_key = os.getenv("MESSARI_API_KEY") |
|
|
|
|
|
results = await collect_extended_market_data(messari_key) |
|
|
|
|
|
print("\n=== Extended Market Data Collection Results ===") |
|
|
for result in results: |
|
|
print(f"\nProvider: {result['provider']}") |
|
|
print(f"Category: {result['category']}") |
|
|
print(f"Success: {result['success']}") |
|
|
|
|
|
if result['success']: |
|
|
print(f"Response Time: {result.get('response_time_ms', 0):.2f}ms") |
|
|
data = result.get('data', {}) |
|
|
if data: |
|
|
if 'total_tvl' in data: |
|
|
print(f"Total TVL: ${data['total_tvl']:,.0f}") |
|
|
elif 'total_assets' in data: |
|
|
print(f"Total Assets: {data['total_assets']}") |
|
|
elif 'total_coins' in data: |
|
|
print(f"Total Coins: {data['total_coins']}") |
|
|
else: |
|
|
print(f"Error: {result.get('error', 'Unknown')}") |
|
|
|
|
|
asyncio.run(main()) |
|
|
|