|
|
import json |
|
|
import logging |
|
|
from copy import deepcopy |
|
|
from datetime import datetime |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, List, Optional |
|
|
|
|
|
|
|
|
class LocalResourceService: |
|
|
"""Centralized loader for the unified fallback registry.""" |
|
|
|
|
|
def __init__(self, resource_path: Path): |
|
|
self.resource_path = Path(resource_path) |
|
|
self._raw_data: Optional[Dict[str, Any]] = None |
|
|
self._assets: Dict[str, Dict[str, Any]] = {} |
|
|
self._market_overview: Dict[str, Any] = {} |
|
|
self._logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _ensure_loaded(self) -> None: |
|
|
if self._raw_data is not None: |
|
|
return |
|
|
|
|
|
try: |
|
|
with self.resource_path.open("r", encoding="utf-8") as handle: |
|
|
data = json.load(handle) |
|
|
except FileNotFoundError: |
|
|
self._logger.warning("Fallback registry %s not found", self.resource_path) |
|
|
data = {} |
|
|
except json.JSONDecodeError as exc: |
|
|
self._logger.error("Invalid fallback registry JSON: %s", exc) |
|
|
data = {} |
|
|
|
|
|
fallback_data = data.get("fallback_data") or {} |
|
|
assets = fallback_data.get("assets") or {} |
|
|
normalized_assets: Dict[str, Dict[str, Any]] = {} |
|
|
|
|
|
for key, details in assets.items(): |
|
|
symbol = str(details.get("symbol") or key).upper() |
|
|
asset_copy = deepcopy(details) |
|
|
asset_copy["symbol"] = symbol |
|
|
normalized_assets[symbol] = asset_copy |
|
|
|
|
|
self._raw_data = data |
|
|
self._assets = normalized_assets |
|
|
self._market_overview = deepcopy(fallback_data.get("market_overview") or {}) |
|
|
|
|
|
def refresh(self) -> None: |
|
|
"""Force reload from disk (used in tests).""" |
|
|
self._raw_data = None |
|
|
self._assets = {} |
|
|
self._market_overview = {} |
|
|
self._ensure_loaded() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_registry(self) -> Dict[str, Any]: |
|
|
self._ensure_loaded() |
|
|
return deepcopy(self._raw_data or {}) |
|
|
|
|
|
def get_supported_symbols(self) -> List[str]: |
|
|
self._ensure_loaded() |
|
|
return sorted(self._assets.keys()) |
|
|
|
|
|
def has_fallback_data(self) -> bool: |
|
|
self._ensure_loaded() |
|
|
return bool(self._assets) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _asset_to_market_record(self, asset: Dict[str, Any]) -> Dict[str, Any]: |
|
|
price = asset.get("price", {}) |
|
|
return { |
|
|
"id": asset.get("slug") or asset.get("symbol", "").lower(), |
|
|
"symbol": asset.get("symbol"), |
|
|
"name": asset.get("name"), |
|
|
"current_price": price.get("current_price"), |
|
|
"market_cap": price.get("market_cap"), |
|
|
"market_cap_rank": asset.get("market_cap_rank"), |
|
|
"total_volume": price.get("total_volume"), |
|
|
"price_change_24h": price.get("price_change_24h"), |
|
|
"price_change_percentage_24h": price.get("price_change_percentage_24h"), |
|
|
"high_24h": price.get("high_24h"), |
|
|
"low_24h": price.get("low_24h"), |
|
|
"last_updated": price.get("last_updated"), |
|
|
} |
|
|
|
|
|
def get_top_prices(self, limit: int = 10) -> List[Dict[str, Any]]: |
|
|
self._ensure_loaded() |
|
|
if not self._assets: |
|
|
return [] |
|
|
|
|
|
sorted_assets = sorted( |
|
|
self._assets.values(), |
|
|
key=lambda x: (x.get("market_cap_rank") or 9999, -(x.get("price", {}).get("market_cap") or 0)), |
|
|
) |
|
|
selected = sorted_assets[: max(1, limit)] |
|
|
return [self._asset_to_market_record(asset) for asset in selected] |
|
|
|
|
|
def get_prices_for_symbols(self, symbols: List[str]) -> List[Dict[str, Any]]: |
|
|
self._ensure_loaded() |
|
|
if not symbols or not self._assets: |
|
|
return [] |
|
|
|
|
|
results: List[Dict[str, Any]] = [] |
|
|
for raw_symbol in symbols: |
|
|
symbol = str(raw_symbol or "").upper() |
|
|
asset = self._assets.get(symbol) |
|
|
if asset: |
|
|
results.append(self._asset_to_market_record(asset)) |
|
|
return results |
|
|
|
|
|
def get_ticker_snapshot(self, symbol: str) -> Optional[Dict[str, Any]]: |
|
|
self._ensure_loaded() |
|
|
asset = self._assets.get(str(symbol or "").upper()) |
|
|
if not asset: |
|
|
return None |
|
|
|
|
|
price = asset.get("price", {}) |
|
|
return { |
|
|
"symbol": asset.get("symbol"), |
|
|
"price": price.get("current_price"), |
|
|
"price_change_24h": price.get("price_change_24h"), |
|
|
"price_change_percent_24h": price.get("price_change_percentage_24h"), |
|
|
"high_24h": price.get("high_24h"), |
|
|
"low_24h": price.get("low_24h"), |
|
|
"volume_24h": price.get("total_volume"), |
|
|
"quote_volume_24h": price.get("total_volume"), |
|
|
} |
|
|
|
|
|
def get_market_overview(self) -> Dict[str, Any]: |
|
|
self._ensure_loaded() |
|
|
if not self._assets: |
|
|
return {} |
|
|
|
|
|
overview = deepcopy(self._market_overview) |
|
|
if not overview: |
|
|
total_market_cap = sum( |
|
|
(asset.get("price", {}) or {}).get("market_cap") or 0 for asset in self._assets.values() |
|
|
) |
|
|
total_volume = sum( |
|
|
(asset.get("price", {}) or {}).get("total_volume") or 0 for asset in self._assets.values() |
|
|
) |
|
|
btc = self._assets.get("BTC", {}) |
|
|
btc_cap = (btc.get("price", {}) or {}).get("market_cap") or 0 |
|
|
overview = { |
|
|
"total_market_cap": total_market_cap, |
|
|
"total_volume_24h": total_volume, |
|
|
"btc_dominance": (btc_cap / total_market_cap * 100) if total_market_cap else 0, |
|
|
"active_cryptocurrencies": len(self._assets), |
|
|
"markets": 500, |
|
|
"market_cap_change_percentage_24h": 0, |
|
|
} |
|
|
|
|
|
|
|
|
gainers = sorted( |
|
|
self._assets.values(), |
|
|
key=lambda asset: (asset.get("price", {}) or {}).get("price_change_percentage_24h") or 0, |
|
|
reverse=True, |
|
|
)[:5] |
|
|
losers = sorted( |
|
|
self._assets.values(), |
|
|
key=lambda asset: (asset.get("price", {}) or {}).get("price_change_percentage_24h") or 0, |
|
|
)[:5] |
|
|
volumes = sorted( |
|
|
self._assets.values(), |
|
|
key=lambda asset: (asset.get("price", {}) or {}).get("total_volume") or 0, |
|
|
reverse=True, |
|
|
)[:5] |
|
|
|
|
|
overview["top_gainers"] = [self._asset_to_market_record(asset) for asset in gainers] |
|
|
overview["top_losers"] = [self._asset_to_market_record(asset) for asset in losers] |
|
|
overview["top_by_volume"] = [self._asset_to_market_record(asset) for asset in volumes] |
|
|
overview["timestamp"] = overview.get("timestamp") or datetime.utcnow().isoformat() |
|
|
|
|
|
return overview |
|
|
|
|
|
def get_ohlcv(self, symbol: str, interval: str = "1h", limit: int = 100) -> List[Dict[str, Any]]: |
|
|
self._ensure_loaded() |
|
|
asset = self._assets.get(str(symbol or "").upper()) |
|
|
if not asset: |
|
|
return [] |
|
|
|
|
|
ohlcv = (asset.get("ohlcv") or {}).get(interval) or [] |
|
|
if not ohlcv and interval != "1h": |
|
|
|
|
|
ohlcv = (asset.get("ohlcv") or {}).get("1h") or [] |
|
|
|
|
|
if limit and ohlcv: |
|
|
return deepcopy(ohlcv[-limit:]) |
|
|
return deepcopy(ohlcv) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def describe(self) -> Dict[str, Any]: |
|
|
"""Simple snapshot used in diagnostics/tests.""" |
|
|
self._ensure_loaded() |
|
|
return { |
|
|
"resource_path": str(self.resource_path), |
|
|
"assets": len(self._assets), |
|
|
"supported_symbols": self.get_supported_symbols(), |
|
|
} |
|
|
|