|
|
|
|
|
"""Centralized access to Hugging Face models with ensemble sentiment.""" |
|
|
|
|
|
from __future__ import annotations |
|
|
import logging |
|
|
import threading |
|
|
from dataclasses import dataclass |
|
|
from typing import Any, Dict, List, Mapping, Optional, Sequence |
|
|
from config import HUGGINGFACE_MODELS, get_settings |
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import sys |
|
|
|
|
|
|
|
|
os.environ.setdefault('TRANSFORMERS_NO_ADVISORY_WARNINGS', '1') |
|
|
os.environ.setdefault('TRANSFORMERS_VERBOSITY', 'error') |
|
|
os.environ.setdefault('TF_CPP_MIN_LOG_LEVEL', '3') |
|
|
os.environ.setdefault('TRANSFORMERS_FRAMEWORK', 'pt') |
|
|
|
|
|
|
|
|
|
|
|
class TfKerasMock: |
|
|
"""Mock tf_keras to prevent import errors when transformers checks for TensorFlow""" |
|
|
pass |
|
|
|
|
|
|
|
|
sys.modules['tf_keras'] = TfKerasMock() |
|
|
sys.modules['tf_keras.src'] = TfKerasMock() |
|
|
sys.modules['tf_keras.src.utils'] = TfKerasMock() |
|
|
|
|
|
try: |
|
|
from transformers import pipeline |
|
|
TRANSFORMERS_AVAILABLE = True |
|
|
except ImportError: |
|
|
TRANSFORMERS_AVAILABLE = False |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
settings = get_settings() |
|
|
|
|
|
HF_MODE = os.getenv("HF_MODE", "off").lower() |
|
|
HF_TOKEN_ENV = os.getenv("HF_TOKEN") |
|
|
|
|
|
if HF_MODE not in ("off", "public", "auth"): |
|
|
HF_MODE = "off" |
|
|
logger.warning(f"Invalid HF_MODE, defaulting to 'off'") |
|
|
|
|
|
if HF_MODE == "auth" and not HF_TOKEN_ENV: |
|
|
HF_MODE = "off" |
|
|
logger.warning("HF_MODE='auth' but HF_TOKEN not set, defaulting to 'off'") |
|
|
|
|
|
ACTIVE_MODELS = [ |
|
|
"ElKulako/cryptobert", |
|
|
"kk08/CryptoBERT", |
|
|
"ProsusAI/finbert" |
|
|
] |
|
|
|
|
|
LEGACY_MODELS = [ |
|
|
"burakutf/finetuned-finbert-crypto", |
|
|
"mathugo/crypto_news_bert", |
|
|
"svalabs/twitter-xlm-roberta-bitcoin-sentiment", |
|
|
"mayurjadhav/crypto-sentiment-model", |
|
|
"cardiffnlp/twitter-roberta-base-sentiment", |
|
|
"mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis", |
|
|
"agarkovv/CryptoTrader-LM" |
|
|
] |
|
|
|
|
|
CRYPTO_SENTIMENT_MODELS = ACTIVE_MODELS[:2] + LEGACY_MODELS[:2] |
|
|
SOCIAL_SENTIMENT_MODELS = LEGACY_MODELS[2:4] |
|
|
FINANCIAL_SENTIMENT_MODELS = [ACTIVE_MODELS[2]] + [LEGACY_MODELS[4]] |
|
|
NEWS_SENTIMENT_MODELS = [LEGACY_MODELS[5]] |
|
|
DECISION_MODELS = [LEGACY_MODELS[6]] |
|
|
|
|
|
@dataclass(frozen=True) |
|
|
class PipelineSpec: |
|
|
key: str |
|
|
task: str |
|
|
model_id: str |
|
|
requires_auth: bool = False |
|
|
category: str = "sentiment" |
|
|
|
|
|
MODEL_SPECS: Dict[str, PipelineSpec] = {} |
|
|
|
|
|
|
|
|
for lk in ["sentiment_twitter", "sentiment_financial", "summarization", "crypto_sentiment"]: |
|
|
if lk in HUGGINGFACE_MODELS: |
|
|
MODEL_SPECS[lk] = PipelineSpec( |
|
|
key=lk, |
|
|
task="sentiment-analysis" if "sentiment" in lk else "summarization", |
|
|
model_id=HUGGINGFACE_MODELS[lk], |
|
|
category="legacy" |
|
|
) |
|
|
|
|
|
for i, mid in enumerate(ACTIVE_MODELS): |
|
|
MODEL_SPECS[f"active_{i}"] = PipelineSpec( |
|
|
key=f"active_{i}", task="sentiment-analysis", model_id=mid, |
|
|
category="crypto_sentiment" if i < 2 else "financial_sentiment", |
|
|
requires_auth=("ElKulako" in mid) |
|
|
) |
|
|
|
|
|
for i, mid in enumerate(CRYPTO_SENTIMENT_MODELS): |
|
|
MODEL_SPECS[f"crypto_sent_{i}"] = PipelineSpec( |
|
|
key=f"crypto_sent_{i}", task="sentiment-analysis", model_id=mid, |
|
|
category="crypto_sentiment", requires_auth=("ElKulako" in mid) |
|
|
) |
|
|
|
|
|
for i, mid in enumerate(SOCIAL_SENTIMENT_MODELS): |
|
|
MODEL_SPECS[f"social_sent_{i}"] = PipelineSpec( |
|
|
key=f"social_sent_{i}", task="sentiment-analysis", model_id=mid, category="social_sentiment" |
|
|
) |
|
|
|
|
|
for i, mid in enumerate(FINANCIAL_SENTIMENT_MODELS): |
|
|
MODEL_SPECS[f"financial_sent_{i}"] = PipelineSpec( |
|
|
key=f"financial_sent_{i}", task="sentiment-analysis", model_id=mid, category="financial_sentiment" |
|
|
) |
|
|
|
|
|
for i, mid in enumerate(NEWS_SENTIMENT_MODELS): |
|
|
MODEL_SPECS[f"news_sent_{i}"] = PipelineSpec( |
|
|
key=f"news_sent_{i}", task="sentiment-analysis", model_id=mid, category="news_sentiment" |
|
|
) |
|
|
|
|
|
class ModelNotAvailable(RuntimeError): pass |
|
|
|
|
|
class ModelRegistry: |
|
|
def __init__(self): |
|
|
self._pipelines = {} |
|
|
self._lock = threading.Lock() |
|
|
self._initialized = False |
|
|
|
|
|
def get_pipeline(self, key: str): |
|
|
if not TRANSFORMERS_AVAILABLE: |
|
|
raise ModelNotAvailable("transformers not installed") |
|
|
if key not in MODEL_SPECS: |
|
|
raise ModelNotAvailable(f"Unknown key: {key}") |
|
|
|
|
|
spec = MODEL_SPECS[key] |
|
|
if key in self._pipelines: |
|
|
return self._pipelines[key] |
|
|
|
|
|
with self._lock: |
|
|
if key in self._pipelines: |
|
|
return self._pipelines[key] |
|
|
|
|
|
if HF_MODE == "off": |
|
|
raise ModelNotAvailable("HF_MODE=off") |
|
|
|
|
|
token_value = None |
|
|
if HF_MODE == "auth": |
|
|
token_value = HF_TOKEN_ENV or settings.hf_token |
|
|
elif HF_MODE == "public": |
|
|
token_value = None |
|
|
|
|
|
if spec.requires_auth and not token_value: |
|
|
raise ModelNotAvailable("Model requires auth but no token available") |
|
|
|
|
|
logger.info(f"Loading model: {spec.model_id} (mode: {HF_MODE})") |
|
|
try: |
|
|
pipeline_kwargs = { |
|
|
'task': spec.task, |
|
|
'model': spec.model_id, |
|
|
'tokenizer': spec.model_id, |
|
|
'framework': 'pt', |
|
|
'device': -1, |
|
|
} |
|
|
pipeline_kwargs['token'] = token_value |
|
|
|
|
|
self._pipelines[key] = pipeline(**pipeline_kwargs) |
|
|
except Exception as e: |
|
|
error_msg = str(e) |
|
|
error_lower = error_msg.lower() |
|
|
|
|
|
try: |
|
|
from huggingface_hub.errors import RepositoryNotFoundError, HfHubHTTPError |
|
|
hf_errors = (RepositoryNotFoundError, HfHubHTTPError) |
|
|
except ImportError: |
|
|
hf_errors = () |
|
|
|
|
|
is_auth_error = any(kw in error_lower for kw in ['401', 'unauthorized', 'repository not found', 'expired', 'token']) |
|
|
is_hf_error = isinstance(e, hf_errors) or is_auth_error |
|
|
|
|
|
if is_hf_error: |
|
|
logger.warning(f"HF error for {spec.model_id}: {type(e).__name__}") |
|
|
raise ModelNotAvailable(f"HF error: {spec.model_id}") from e |
|
|
|
|
|
if any(kw in error_lower for kw in ['keras', 'tensorflow', 'tf_keras', 'framework']): |
|
|
try: |
|
|
pipeline_kwargs['torch_dtype'] = 'float32' |
|
|
self._pipelines[key] = pipeline(**pipeline_kwargs) |
|
|
return self._pipelines[key] |
|
|
except Exception: |
|
|
raise ModelNotAvailable(f"Framework error: {spec.model_id}") from e |
|
|
|
|
|
raise ModelNotAvailable(f"Load failed: {spec.model_id}") from e |
|
|
|
|
|
return self._pipelines[key] |
|
|
|
|
|
def get_loaded_models(self): |
|
|
"""Get list of all loaded model keys""" |
|
|
return list(self._pipelines.keys()) |
|
|
|
|
|
def get_available_sentiment_models(self): |
|
|
"""Get list of all available sentiment model keys""" |
|
|
return [key for key in MODEL_SPECS.keys() if "sent" in key or "sentiment" in key] |
|
|
|
|
|
def initialize_models(self): |
|
|
if self._initialized: |
|
|
return {"status": "already_initialized", "mode": HF_MODE, "models_loaded": len(self._pipelines)} |
|
|
|
|
|
if HF_MODE == "off": |
|
|
self._initialized = True |
|
|
return {"status": "disabled", "mode": "off", "models_loaded": 0, "loaded": [], "failed": []} |
|
|
|
|
|
if not TRANSFORMERS_AVAILABLE: |
|
|
return {"status": "transformers_not_available", "mode": HF_MODE, "models_loaded": 0} |
|
|
|
|
|
loaded, failed = [], [] |
|
|
active_keys = [f"active_{i}" for i in range(len(ACTIVE_MODELS))] |
|
|
|
|
|
for key in active_keys: |
|
|
try: |
|
|
self.get_pipeline(key) |
|
|
loaded.append(key) |
|
|
except ModelNotAvailable as e: |
|
|
failed.append((key, str(e)[:100])) |
|
|
except Exception as e: |
|
|
error_msg = str(e)[:100] |
|
|
failed.append((key, error_msg)) |
|
|
|
|
|
self._initialized = True |
|
|
status = "initialized" if loaded else "partial" |
|
|
return {"status": status, "mode": HF_MODE, "models_loaded": len(loaded), "loaded": loaded, "failed": failed} |
|
|
|
|
|
_registry = ModelRegistry() |
|
|
|
|
|
AI_MODELS_SUMMARY = {"status": "not_initialized", "mode": "off", "models_loaded": 0, "loaded": [], "failed": []} |
|
|
|
|
|
def initialize_models(): |
|
|
global AI_MODELS_SUMMARY |
|
|
result = _registry.initialize_models() |
|
|
AI_MODELS_SUMMARY = result |
|
|
return result |
|
|
|
|
|
def ensemble_crypto_sentiment(text: str) -> Dict[str, Any]: |
|
|
if not TRANSFORMERS_AVAILABLE or HF_MODE == "off": |
|
|
return {"label": "neutral", "confidence": 0.0, "scores": {}, "model_count": 0, "error": "HF disabled" if HF_MODE == "off" else "transformers N/A"} |
|
|
|
|
|
results, labels_count, total_conf = {}, {"bullish": 0, "bearish": 0, "neutral": 0}, 0.0 |
|
|
|
|
|
loaded_keys = _registry.get_loaded_models() |
|
|
available_keys = [key for key in loaded_keys if "sent" in key or "sentiment" in key or key.startswith("active_")] |
|
|
|
|
|
if not available_keys: |
|
|
return {"label": "neutral", "confidence": 0.0, "scores": {}, "model_count": 0, "error": "No models loaded"} |
|
|
|
|
|
for key in available_keys: |
|
|
try: |
|
|
pipe = _registry.get_pipeline(key) |
|
|
res = pipe(text[:512]) |
|
|
if isinstance(res, list) and res: res = res[0] |
|
|
|
|
|
label = res.get("label", "NEUTRAL").upper() |
|
|
score = res.get("score", 0.5) |
|
|
|
|
|
mapped = "bullish" if "POSITIVE" in label or "BULLISH" in label else ("bearish" if "NEGATIVE" in label or "BEARISH" in label else "neutral") |
|
|
|
|
|
spec = MODEL_SPECS.get(key) |
|
|
if spec: |
|
|
results[spec.model_id] = {"label": mapped, "score": score} |
|
|
else: |
|
|
results[key] = {"label": mapped, "score": score} |
|
|
labels_count[mapped] += 1 |
|
|
total_conf += score |
|
|
except ModelNotAvailable: |
|
|
continue |
|
|
except Exception as e: |
|
|
logger.warning(f"Ensemble failed for {key}: {e}") |
|
|
|
|
|
if not results: |
|
|
return {"label": "neutral", "confidence": 0.0, "scores": {}, "model_count": 0, "error": "All models failed"} |
|
|
|
|
|
final = max(labels_count, key=labels_count.get) |
|
|
avg_conf = total_conf / len(results) |
|
|
|
|
|
return {"label": final, "confidence": avg_conf, "scores": results, "model_count": len(results)} |
|
|
|
|
|
def analyze_crypto_sentiment(text: str): return ensemble_crypto_sentiment(text) |
|
|
|
|
|
def analyze_financial_sentiment(text: str): |
|
|
if not TRANSFORMERS_AVAILABLE: |
|
|
return {"label": "neutral", "score": 0.5, "error": "transformers N/A"} |
|
|
try: |
|
|
pipe = _registry.get_pipeline("financial_sent_0") |
|
|
res = pipe(text[:512]) |
|
|
if isinstance(res, list) and res: res = res[0] |
|
|
return {"label": res.get("label", "neutral").lower(), "score": res.get("score", 0.5)} |
|
|
except Exception as e: |
|
|
logger.error(f"Financial sentiment failed: {e}") |
|
|
return {"label": "neutral", "score": 0.5, "error": str(e)} |
|
|
|
|
|
def analyze_social_sentiment(text: str): |
|
|
if not TRANSFORMERS_AVAILABLE: |
|
|
return {"label": "neutral", "score": 0.5, "error": "transformers N/A"} |
|
|
try: |
|
|
pipe = _registry.get_pipeline("social_sent_0") |
|
|
res = pipe(text[:512]) |
|
|
if isinstance(res, list) and res: res = res[0] |
|
|
return {"label": res.get("label", "neutral").lower(), "score": res.get("score", 0.5)} |
|
|
except Exception as e: |
|
|
logger.error(f"Social sentiment failed: {e}") |
|
|
return {"label": "neutral", "score": 0.5, "error": str(e)} |
|
|
|
|
|
def analyze_market_text(text: str): return ensemble_crypto_sentiment(text) |
|
|
|
|
|
def analyze_chart_points(data: Sequence[Mapping[str, Any]], indicators: Optional[List[str]] = None): |
|
|
if not data: return {"trend": "neutral", "strength": 0, "analysis": "No data"} |
|
|
|
|
|
prices = [float(p.get("price", 0)) for p in data if p.get("price")] |
|
|
if not prices: return {"trend": "neutral", "strength": 0, "analysis": "No price data"} |
|
|
|
|
|
first, last = prices[0], prices[-1] |
|
|
change = ((last - first) / first * 100) if first > 0 else 0 |
|
|
|
|
|
if change > 5: trend, strength = "bullish", min(abs(change) / 10, 1.0) |
|
|
elif change < -5: trend, strength = "bearish", min(abs(change) / 10, 1.0) |
|
|
else: trend, strength = "neutral", abs(change) / 5 |
|
|
|
|
|
return {"trend": trend, "strength": strength, "change_pct": change, "support": min(prices), "resistance": max(prices), "analysis": f"Price moved {change:.2f}% showing {trend} trend"} |
|
|
|
|
|
def analyze_news_item(item: Dict[str, Any]): |
|
|
text = item.get("title", "") + " " + item.get("description", "") |
|
|
sent = ensemble_crypto_sentiment(text) |
|
|
return {**item, "sentiment": sent["label"], "sentiment_confidence": sent["confidence"], "sentiment_details": sent} |
|
|
|
|
|
def get_model_info(): |
|
|
return { |
|
|
"transformers_available": TRANSFORMERS_AVAILABLE, |
|
|
"hf_mode": HF_MODE, |
|
|
"hf_token_configured": bool(HF_TOKEN_ENV or settings.hf_token) if HF_MODE == "auth" else False, |
|
|
"models_initialized": _registry._initialized, |
|
|
"models_loaded": len(_registry._pipelines), |
|
|
"active_models": ACTIVE_MODELS, |
|
|
"total_models": len(MODEL_SPECS) |
|
|
} |
|
|
|
|
|
def registry_status(): |
|
|
return { |
|
|
"initialized": _registry._initialized, |
|
|
"pipelines_loaded": len(_registry._pipelines), |
|
|
"available_models": list(MODEL_SPECS.keys()), |
|
|
"transformers_available": TRANSFORMERS_AVAILABLE |
|
|
} |
|
|
|