File size: 4,080 Bytes
5cd2b89
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from __future__ import annotations
import os
import time
import random
from typing import Dict, Any, List, Literal, Optional
import httpx

HF_API_MODELS = "https://huggingface.co/api/models"
HF_API_DATASETS = "https://huggingface.co/api/datasets"

REFRESH_INTERVAL_SEC = int(os.getenv("HF_REGISTRY_REFRESH_SEC", "21600"))  # 6h
HTTP_TIMEOUT = float(os.getenv("HF_HTTP_TIMEOUT", "8.0"))

_SEED_MODELS = [
    "ElKulako/cryptobert",
    "kk08/CryptoBERT",
]
_SEED_DATASETS = [
    "linxy/CryptoCoin",
    "WinkingFace/CryptoLM-Bitcoin-BTC-USDT",
    "WinkingFace/CryptoLM-Ethereum-ETH-USDT",
    "WinkingFace/CryptoLM-Solana-SOL-USDT",
    "WinkingFace/CryptoLM-Ripple-XRP-USDT",
]


class HFRegistry:
    def __init__(self) -> None:
        self.models: Dict[str, Dict[str, Any]] = {}
        self.datasets: Dict[str, Dict[str, Any]] = {}
        self.last_refresh: float = 0.0
        self.fail_reason: Optional[str] = None

    async def _hf_json(self, url: str, params: Dict[str, Any]) -> Any:
        async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
            r = await client.get(url, params=params)
            r.raise_for_status()
            return r.json()

    async def refresh(self) -> Dict[str, Any]:
        try:
            for name in _SEED_MODELS:
                self.models.setdefault(name, {"id": name, "source": "seed", "pipeline_tag": "sentiment-analysis"})
            for name in _SEED_DATASETS:
                self.datasets.setdefault(name, {"id": name, "source": "seed"})

            q_crypto = {"search": "crypto", "limit": 50}
            q_sent = {"pipeline_tag": "sentiment-analysis", "search": "crypto", "limit": 50}

            models = await self._hf_json(HF_API_MODELS, q_sent)
            for m in models or []:
                mid = m.get("modelId") or m.get("id") or m.get("name")
                if not mid: continue
                self.models[mid] = {
                    "id": mid,
                    "pipeline_tag": m.get("pipeline_tag"),
                    "likes": m.get("likes"),
                    "downloads": m.get("downloads"),
                    "tags": m.get("tags") or [],
                    "source": "hub",
                }

            datasets = await self._hf_json(HF_API_DATASETS, q_crypto)
            for d in datasets or []:
                did = d.get("id") or d.get("name")
                if not did: continue
                self.datasets[did] = {
                    "id": did,
                    "likes": d.get("likes"),
                    "downloads": d.get("downloads"),
                    "tags": d.get("tags") or [],
                    "source": "hub",
                }

            self.last_refresh = time.time()
            self.fail_reason = None
            return {"ok": True, "models": len(self.models), "datasets": len(self.datasets)}
        except Exception as e:
            self.fail_reason = str(e)
            return {"ok": False, "error": self.fail_reason, "models": len(self.models), "datasets": len(self.datasets)}

    def list(self, kind: Literal["models","datasets"]="models") -> List[Dict[str, Any]]:
        return list(self.models.values()) if kind == "models" else list(self.datasets.values())

    def health(self) -> Dict[str, Any]:
        age = time.time() - (self.last_refresh or 0)
        return {
            "ok": self.last_refresh > 0 and (self.fail_reason is None),
            "last_refresh_epoch": self.last_refresh,
            "age_sec": age,
            "fail_reason": self.fail_reason,
            "counts": {"models": len(self.models), "datasets": len(self.datasets)},
            "interval_sec": REFRESH_INTERVAL_SEC,
        }


REGISTRY = HFRegistry()


async def periodic_refresh(loop_sleep: int = REFRESH_INTERVAL_SEC) -> None:
    await REGISTRY.refresh()
    await _sleep(int(loop_sleep * random.uniform(0.5, 0.9)))
    while True:
        await REGISTRY.refresh()
        await _sleep(loop_sleep)


async def _sleep(sec: int) -> None:
    import asyncio
    try:
        await asyncio.sleep(sec)
    except Exception:
        pass