Really-amin's picture
Upload 325 files
b66240d verified
#!/usr/bin/env python3
"""Centralized access to Hugging Face models with ensemble sentiment."""
from __future__ import annotations
import logging
import threading
from dataclasses import dataclass
from typing import Any, Dict, List, Mapping, Optional, Sequence
from config import HUGGINGFACE_MODELS, get_settings
# Set environment variables to avoid TensorFlow/Keras issues
# We'll force PyTorch framework instead
import os
import sys
# Completely disable TensorFlow to force PyTorch
os.environ.setdefault('TRANSFORMERS_NO_ADVISORY_WARNINGS', '1')
os.environ.setdefault('TRANSFORMERS_VERBOSITY', 'error')
os.environ.setdefault('TF_CPP_MIN_LOG_LEVEL', '3')
os.environ.setdefault('TRANSFORMERS_FRAMEWORK', 'pt')
# Mock tf_keras to prevent transformers from trying to import it
# This prevents the broken tf-keras installation from causing errors
class TfKerasMock:
"""Mock tf_keras to prevent import errors when transformers checks for TensorFlow"""
pass
# Add mock to sys.modules before transformers imports
sys.modules['tf_keras'] = TfKerasMock()
sys.modules['tf_keras.src'] = TfKerasMock()
sys.modules['tf_keras.src.utils'] = TfKerasMock()
try:
from transformers import pipeline
TRANSFORMERS_AVAILABLE = True
except ImportError:
TRANSFORMERS_AVAILABLE = False
logger = logging.getLogger(__name__)
settings = get_settings()
HF_MODE = os.getenv("HF_MODE", "off").lower()
HF_TOKEN_ENV = os.getenv("HF_TOKEN")
if HF_MODE not in ("off", "public", "auth"):
HF_MODE = "off"
logger.warning(f"Invalid HF_MODE, defaulting to 'off'")
if HF_MODE == "auth" and not HF_TOKEN_ENV:
HF_MODE = "off"
logger.warning("HF_MODE='auth' but HF_TOKEN not set, defaulting to 'off'")
ACTIVE_MODELS = [
"ElKulako/cryptobert",
"kk08/CryptoBERT",
"ProsusAI/finbert"
]
LEGACY_MODELS = [
"burakutf/finetuned-finbert-crypto",
"mathugo/crypto_news_bert",
"svalabs/twitter-xlm-roberta-bitcoin-sentiment",
"mayurjadhav/crypto-sentiment-model",
"cardiffnlp/twitter-roberta-base-sentiment",
"mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis",
"agarkovv/CryptoTrader-LM"
]
CRYPTO_SENTIMENT_MODELS = ACTIVE_MODELS[:2] + LEGACY_MODELS[:2]
SOCIAL_SENTIMENT_MODELS = LEGACY_MODELS[2:4]
FINANCIAL_SENTIMENT_MODELS = [ACTIVE_MODELS[2]] + [LEGACY_MODELS[4]]
NEWS_SENTIMENT_MODELS = [LEGACY_MODELS[5]]
DECISION_MODELS = [LEGACY_MODELS[6]]
@dataclass(frozen=True)
class PipelineSpec:
key: str
task: str
model_id: str
requires_auth: bool = False
category: str = "sentiment"
MODEL_SPECS: Dict[str, PipelineSpec] = {}
# Legacy models
for lk in ["sentiment_twitter", "sentiment_financial", "summarization", "crypto_sentiment"]:
if lk in HUGGINGFACE_MODELS:
MODEL_SPECS[lk] = PipelineSpec(
key=lk,
task="sentiment-analysis" if "sentiment" in lk else "summarization",
model_id=HUGGINGFACE_MODELS[lk],
category="legacy"
)
for i, mid in enumerate(ACTIVE_MODELS):
MODEL_SPECS[f"active_{i}"] = PipelineSpec(
key=f"active_{i}", task="sentiment-analysis", model_id=mid,
category="crypto_sentiment" if i < 2 else "financial_sentiment",
requires_auth=("ElKulako" in mid)
)
for i, mid in enumerate(CRYPTO_SENTIMENT_MODELS):
MODEL_SPECS[f"crypto_sent_{i}"] = PipelineSpec(
key=f"crypto_sent_{i}", task="sentiment-analysis", model_id=mid,
category="crypto_sentiment", requires_auth=("ElKulako" in mid)
)
for i, mid in enumerate(SOCIAL_SENTIMENT_MODELS):
MODEL_SPECS[f"social_sent_{i}"] = PipelineSpec(
key=f"social_sent_{i}", task="sentiment-analysis", model_id=mid, category="social_sentiment"
)
for i, mid in enumerate(FINANCIAL_SENTIMENT_MODELS):
MODEL_SPECS[f"financial_sent_{i}"] = PipelineSpec(
key=f"financial_sent_{i}", task="sentiment-analysis", model_id=mid, category="financial_sentiment"
)
for i, mid in enumerate(NEWS_SENTIMENT_MODELS):
MODEL_SPECS[f"news_sent_{i}"] = PipelineSpec(
key=f"news_sent_{i}", task="sentiment-analysis", model_id=mid, category="news_sentiment"
)
class ModelNotAvailable(RuntimeError): pass
class ModelRegistry:
def __init__(self):
self._pipelines = {}
self._lock = threading.Lock()
self._initialized = False
def get_pipeline(self, key: str):
if not TRANSFORMERS_AVAILABLE:
raise ModelNotAvailable("transformers not installed")
if key not in MODEL_SPECS:
raise ModelNotAvailable(f"Unknown key: {key}")
spec = MODEL_SPECS[key]
if key in self._pipelines:
return self._pipelines[key]
with self._lock:
if key in self._pipelines:
return self._pipelines[key]
if HF_MODE == "off":
raise ModelNotAvailable("HF_MODE=off")
token_value = None
if HF_MODE == "auth":
token_value = HF_TOKEN_ENV or settings.hf_token
elif HF_MODE == "public":
token_value = None
if spec.requires_auth and not token_value:
raise ModelNotAvailable("Model requires auth but no token available")
logger.info(f"Loading model: {spec.model_id} (mode: {HF_MODE})")
try:
pipeline_kwargs = {
'task': spec.task,
'model': spec.model_id,
'tokenizer': spec.model_id,
'framework': 'pt',
'device': -1,
}
pipeline_kwargs['token'] = token_value
self._pipelines[key] = pipeline(**pipeline_kwargs)
except Exception as e:
error_msg = str(e)
error_lower = error_msg.lower()
try:
from huggingface_hub.errors import RepositoryNotFoundError, HfHubHTTPError
hf_errors = (RepositoryNotFoundError, HfHubHTTPError)
except ImportError:
hf_errors = ()
is_auth_error = any(kw in error_lower for kw in ['401', 'unauthorized', 'repository not found', 'expired', 'token'])
is_hf_error = isinstance(e, hf_errors) or is_auth_error
if is_hf_error:
logger.warning(f"HF error for {spec.model_id}: {type(e).__name__}")
raise ModelNotAvailable(f"HF error: {spec.model_id}") from e
if any(kw in error_lower for kw in ['keras', 'tensorflow', 'tf_keras', 'framework']):
try:
pipeline_kwargs['torch_dtype'] = 'float32'
self._pipelines[key] = pipeline(**pipeline_kwargs)
return self._pipelines[key]
except Exception:
raise ModelNotAvailable(f"Framework error: {spec.model_id}") from e
raise ModelNotAvailable(f"Load failed: {spec.model_id}") from e
return self._pipelines[key]
def get_loaded_models(self):
"""Get list of all loaded model keys"""
return list(self._pipelines.keys())
def get_available_sentiment_models(self):
"""Get list of all available sentiment model keys"""
return [key for key in MODEL_SPECS.keys() if "sent" in key or "sentiment" in key]
def initialize_models(self):
if self._initialized:
return {"status": "already_initialized", "mode": HF_MODE, "models_loaded": len(self._pipelines)}
if HF_MODE == "off":
self._initialized = True
return {"status": "disabled", "mode": "off", "models_loaded": 0, "loaded": [], "failed": []}
if not TRANSFORMERS_AVAILABLE:
return {"status": "transformers_not_available", "mode": HF_MODE, "models_loaded": 0}
loaded, failed = [], []
active_keys = [f"active_{i}" for i in range(len(ACTIVE_MODELS))]
for key in active_keys:
try:
self.get_pipeline(key)
loaded.append(key)
except ModelNotAvailable as e:
failed.append((key, str(e)[:100]))
except Exception as e:
error_msg = str(e)[:100]
failed.append((key, error_msg))
self._initialized = True
status = "initialized" if loaded else "partial"
return {"status": status, "mode": HF_MODE, "models_loaded": len(loaded), "loaded": loaded, "failed": failed}
_registry = ModelRegistry()
AI_MODELS_SUMMARY = {"status": "not_initialized", "mode": "off", "models_loaded": 0, "loaded": [], "failed": []}
def initialize_models():
global AI_MODELS_SUMMARY
result = _registry.initialize_models()
AI_MODELS_SUMMARY = result
return result
def ensemble_crypto_sentiment(text: str) -> Dict[str, Any]:
if not TRANSFORMERS_AVAILABLE or HF_MODE == "off":
return {"label": "neutral", "confidence": 0.0, "scores": {}, "model_count": 0, "error": "HF disabled" if HF_MODE == "off" else "transformers N/A"}
results, labels_count, total_conf = {}, {"bullish": 0, "bearish": 0, "neutral": 0}, 0.0
loaded_keys = _registry.get_loaded_models()
available_keys = [key for key in loaded_keys if "sent" in key or "sentiment" in key or key.startswith("active_")]
if not available_keys:
return {"label": "neutral", "confidence": 0.0, "scores": {}, "model_count": 0, "error": "No models loaded"}
for key in available_keys:
try:
pipe = _registry.get_pipeline(key)
res = pipe(text[:512])
if isinstance(res, list) and res: res = res[0]
label = res.get("label", "NEUTRAL").upper()
score = res.get("score", 0.5)
mapped = "bullish" if "POSITIVE" in label or "BULLISH" in label else ("bearish" if "NEGATIVE" in label or "BEARISH" in label else "neutral")
spec = MODEL_SPECS.get(key)
if spec:
results[spec.model_id] = {"label": mapped, "score": score}
else:
results[key] = {"label": mapped, "score": score}
labels_count[mapped] += 1
total_conf += score
except ModelNotAvailable:
continue
except Exception as e:
logger.warning(f"Ensemble failed for {key}: {e}")
if not results:
return {"label": "neutral", "confidence": 0.0, "scores": {}, "model_count": 0, "error": "All models failed"}
final = max(labels_count, key=labels_count.get)
avg_conf = total_conf / len(results)
return {"label": final, "confidence": avg_conf, "scores": results, "model_count": len(results)}
def analyze_crypto_sentiment(text: str): return ensemble_crypto_sentiment(text)
def analyze_financial_sentiment(text: str):
if not TRANSFORMERS_AVAILABLE:
return {"label": "neutral", "score": 0.5, "error": "transformers N/A"}
try:
pipe = _registry.get_pipeline("financial_sent_0")
res = pipe(text[:512])
if isinstance(res, list) and res: res = res[0]
return {"label": res.get("label", "neutral").lower(), "score": res.get("score", 0.5)}
except Exception as e:
logger.error(f"Financial sentiment failed: {e}")
return {"label": "neutral", "score": 0.5, "error": str(e)}
def analyze_social_sentiment(text: str):
if not TRANSFORMERS_AVAILABLE:
return {"label": "neutral", "score": 0.5, "error": "transformers N/A"}
try:
pipe = _registry.get_pipeline("social_sent_0")
res = pipe(text[:512])
if isinstance(res, list) and res: res = res[0]
return {"label": res.get("label", "neutral").lower(), "score": res.get("score", 0.5)}
except Exception as e:
logger.error(f"Social sentiment failed: {e}")
return {"label": "neutral", "score": 0.5, "error": str(e)}
def analyze_market_text(text: str): return ensemble_crypto_sentiment(text)
def analyze_chart_points(data: Sequence[Mapping[str, Any]], indicators: Optional[List[str]] = None):
if not data: return {"trend": "neutral", "strength": 0, "analysis": "No data"}
prices = [float(p.get("price", 0)) for p in data if p.get("price")]
if not prices: return {"trend": "neutral", "strength": 0, "analysis": "No price data"}
first, last = prices[0], prices[-1]
change = ((last - first) / first * 100) if first > 0 else 0
if change > 5: trend, strength = "bullish", min(abs(change) / 10, 1.0)
elif change < -5: trend, strength = "bearish", min(abs(change) / 10, 1.0)
else: trend, strength = "neutral", abs(change) / 5
return {"trend": trend, "strength": strength, "change_pct": change, "support": min(prices), "resistance": max(prices), "analysis": f"Price moved {change:.2f}% showing {trend} trend"}
def analyze_news_item(item: Dict[str, Any]):
text = item.get("title", "") + " " + item.get("description", "")
sent = ensemble_crypto_sentiment(text)
return {**item, "sentiment": sent["label"], "sentiment_confidence": sent["confidence"], "sentiment_details": sent}
def get_model_info():
return {
"transformers_available": TRANSFORMERS_AVAILABLE,
"hf_mode": HF_MODE,
"hf_token_configured": bool(HF_TOKEN_ENV or settings.hf_token) if HF_MODE == "auth" else False,
"models_initialized": _registry._initialized,
"models_loaded": len(_registry._pipelines),
"active_models": ACTIVE_MODELS,
"total_models": len(MODEL_SPECS)
}
def registry_status():
return {
"initialized": _registry._initialized,
"pipelines_loaded": len(_registry._pipelines),
"available_models": list(MODEL_SPECS.keys()),
"transformers_available": TRANSFORMERS_AVAILABLE
}