#!/usr/bin/env python3 """ HuggingFace Dataset Loader - Direct Loading Loads cryptocurrency datasets directly from Hugging Face """ import logging import os from typing import Dict, Any, Optional, List from datetime import datetime import pandas as pd from pathlib import Path logger = logging.getLogger(__name__) # Try to import datasets try: from datasets import load_dataset, Dataset, DatasetDict DATASETS_AVAILABLE = True except ImportError: DATASETS_AVAILABLE = False logger.error("❌ Datasets library not available. Install with: pip install datasets") class CryptoDatasetLoader: """ Direct Cryptocurrency Dataset Loader Loads crypto datasets from Hugging Face without using pipelines """ def __init__(self, cache_dir: Optional[str] = None): """ Initialize Dataset Loader Args: cache_dir: Directory to cache datasets (default: ~/.cache/huggingface/datasets) """ if not DATASETS_AVAILABLE: raise ImportError("Datasets library is required. Install with: pip install datasets") self.cache_dir = cache_dir or os.path.expanduser("~/.cache/huggingface/datasets") self.datasets = {} logger.info(f"🚀 Crypto Dataset Loader initialized") logger.info(f" Cache directory: {self.cache_dir}") # Dataset configurations self.dataset_configs = { "cryptocoin": { "dataset_id": "linxy/CryptoCoin", "description": "CryptoCoin dataset by Linxy", "loaded": False }, "bitcoin_btc_usdt": { "dataset_id": "WinkingFace/CryptoLM-Bitcoin-BTC-USDT", "description": "Bitcoin BTC-USDT market data", "loaded": False }, "ethereum_eth_usdt": { "dataset_id": "WinkingFace/CryptoLM-Ethereum-ETH-USDT", "description": "Ethereum ETH-USDT market data", "loaded": False }, "solana_sol_usdt": { "dataset_id": "WinkingFace/CryptoLM-Solana-SOL-USDT", "description": "Solana SOL-USDT market data", "loaded": False }, "ripple_xrp_usdt": { "dataset_id": "WinkingFace/CryptoLM-Ripple-XRP-USDT", "description": "Ripple XRP-USDT market data", "loaded": False } } async def load_dataset( self, dataset_key: str, split: Optional[str] = None, streaming: bool = False ) -> Dict[str, Any]: """ Load a specific dataset directly Args: dataset_key: Key of the dataset to load split: Dataset split to load (train, test, validation, etc.) streaming: Whether to stream the dataset Returns: Status dict with dataset info """ if dataset_key not in self.dataset_configs: raise ValueError(f"Unknown dataset: {dataset_key}") config = self.dataset_configs[dataset_key] # Check if already loaded if dataset_key in self.datasets: logger.info(f"✅ Dataset {dataset_key} already loaded") config["loaded"] = True return { "success": True, "dataset_key": dataset_key, "dataset_id": config["dataset_id"], "status": "already_loaded", "num_rows": len(self.datasets[dataset_key]) if hasattr(self.datasets[dataset_key], "__len__") else "unknown" } try: logger.info(f"📥 Loading dataset: {config['dataset_id']}") # Load dataset directly dataset = load_dataset( config["dataset_id"], split=split, cache_dir=self.cache_dir, streaming=streaming ) # Store dataset self.datasets[dataset_key] = dataset config["loaded"] = True # Get dataset info if isinstance(dataset, Dataset): num_rows = len(dataset) columns = dataset.column_names elif isinstance(dataset, DatasetDict): num_rows = {split: len(dataset[split]) for split in dataset.keys()} columns = list(dataset[list(dataset.keys())[0]].column_names) else: num_rows = "unknown" columns = [] logger.info(f"✅ Dataset loaded successfully: {config['dataset_id']}") return { "success": True, "dataset_key": dataset_key, "dataset_id": config["dataset_id"], "status": "loaded", "num_rows": num_rows, "columns": columns, "streaming": streaming } except Exception as e: logger.error(f"❌ Failed to load dataset {dataset_key}: {e}") raise Exception(f"Failed to load dataset {dataset_key}: {str(e)}") async def load_all_datasets(self, streaming: bool = False) -> Dict[str, Any]: """ Load all configured datasets Args: streaming: Whether to stream the datasets Returns: Status dict with all datasets """ results = [] success_count = 0 for dataset_key in self.dataset_configs.keys(): try: result = await self.load_dataset(dataset_key, streaming=streaming) results.append(result) if result["success"]: success_count += 1 except Exception as e: logger.error(f"❌ Failed to load {dataset_key}: {e}") results.append({ "success": False, "dataset_key": dataset_key, "error": str(e) }) return { "success": True, "total_datasets": len(self.dataset_configs), "loaded_datasets": success_count, "failed_datasets": len(self.dataset_configs) - success_count, "results": results, "timestamp": datetime.utcnow().isoformat() } async def get_dataset_sample( self, dataset_key: str, num_samples: int = 10, split: Optional[str] = None ) -> Dict[str, Any]: """ Get sample rows from a dataset Args: dataset_key: Key of the dataset num_samples: Number of samples to return split: Dataset split to sample from Returns: Sample data """ # Ensure dataset is loaded if dataset_key not in self.datasets: await self.load_dataset(dataset_key, split=split) try: dataset = self.datasets[dataset_key] # Handle different dataset types if isinstance(dataset, DatasetDict): # Get first split if not specified split_to_use = split or list(dataset.keys())[0] dataset = dataset[split_to_use] # Get samples samples = dataset.select(range(min(num_samples, len(dataset)))) # Convert to list of dicts samples_list = [dict(sample) for sample in samples] logger.info(f"✅ Retrieved {len(samples_list)} samples from {dataset_key}") return { "success": True, "dataset_key": dataset_key, "dataset_id": self.dataset_configs[dataset_key]["dataset_id"], "num_samples": len(samples_list), "samples": samples_list, "columns": list(samples_list[0].keys()) if samples_list else [], "timestamp": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"❌ Failed to get samples from {dataset_key}: {e}") raise Exception(f"Failed to get samples: {str(e)}") async def query_dataset( self, dataset_key: str, filters: Optional[Dict[str, Any]] = None, limit: int = 100 ) -> Dict[str, Any]: """ Query dataset with filters Args: dataset_key: Key of the dataset filters: Dictionary of column filters limit: Maximum number of results Returns: Filtered data """ # Ensure dataset is loaded if dataset_key not in self.datasets: await self.load_dataset(dataset_key) try: dataset = self.datasets[dataset_key] # Handle DatasetDict if isinstance(dataset, DatasetDict): dataset = dataset[list(dataset.keys())[0]] # Apply filters if provided if filters: for column, value in filters.items(): dataset = dataset.filter(lambda x: x[column] == value) # Limit results result_dataset = dataset.select(range(min(limit, len(dataset)))) # Convert to list of dicts results = [dict(row) for row in result_dataset] logger.info(f"✅ Query returned {len(results)} results from {dataset_key}") return { "success": True, "dataset_key": dataset_key, "filters_applied": filters or {}, "count": len(results), "results": results, "timestamp": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"❌ Failed to query dataset {dataset_key}: {e}") raise Exception(f"Failed to query dataset: {str(e)}") async def get_dataset_stats(self, dataset_key: str) -> Dict[str, Any]: """ Get statistics about a dataset Args: dataset_key: Key of the dataset Returns: Dataset statistics """ # Ensure dataset is loaded if dataset_key not in self.datasets: await self.load_dataset(dataset_key) try: dataset = self.datasets[dataset_key] # Handle DatasetDict if isinstance(dataset, DatasetDict): splits_info = {} for split_name, split_dataset in dataset.items(): splits_info[split_name] = { "num_rows": len(split_dataset), "columns": split_dataset.column_names, "features": str(split_dataset.features) } return { "success": True, "dataset_key": dataset_key, "dataset_id": self.dataset_configs[dataset_key]["dataset_id"], "type": "DatasetDict", "splits": splits_info, "timestamp": datetime.utcnow().isoformat() } else: return { "success": True, "dataset_key": dataset_key, "dataset_id": self.dataset_configs[dataset_key]["dataset_id"], "type": "Dataset", "num_rows": len(dataset), "columns": dataset.column_names, "features": str(dataset.features), "timestamp": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"❌ Failed to get stats for {dataset_key}: {e}") raise Exception(f"Failed to get dataset stats: {str(e)}") def get_loaded_datasets(self) -> Dict[str, Any]: """ Get list of loaded datasets Returns: Dict with loaded datasets info """ datasets_info = [] for dataset_key, config in self.dataset_configs.items(): info = { "dataset_key": dataset_key, "dataset_id": config["dataset_id"], "description": config["description"], "loaded": dataset_key in self.datasets } # Add size info if loaded if dataset_key in self.datasets: dataset = self.datasets[dataset_key] if isinstance(dataset, DatasetDict): info["num_rows"] = {split: len(dataset[split]) for split in dataset.keys()} elif hasattr(dataset, "__len__"): info["num_rows"] = len(dataset) else: info["num_rows"] = "unknown" datasets_info.append(info) return { "success": True, "total_configured": len(self.dataset_configs), "total_loaded": len(self.datasets), "datasets": datasets_info, "timestamp": datetime.utcnow().isoformat() } def unload_dataset(self, dataset_key: str) -> Dict[str, Any]: """ Unload a specific dataset from memory Args: dataset_key: Key of the dataset to unload Returns: Status dict """ if dataset_key not in self.datasets: return { "success": False, "dataset_key": dataset_key, "message": "Dataset not loaded" } try: # Remove dataset del self.datasets[dataset_key] # Update config self.dataset_configs[dataset_key]["loaded"] = False logger.info(f"✅ Dataset unloaded: {dataset_key}") return { "success": True, "dataset_key": dataset_key, "message": "Dataset unloaded successfully" } except Exception as e: logger.error(f"❌ Failed to unload dataset {dataset_key}: {e}") return { "success": False, "dataset_key": dataset_key, "error": str(e) } # Global instance - only create if datasets is available crypto_dataset_loader = None if DATASETS_AVAILABLE: try: crypto_dataset_loader = CryptoDatasetLoader() except Exception as e: logger.warning(f"Failed to initialize CryptoDatasetLoader: {e}") crypto_dataset_loader = None else: logger.warning("CryptoDatasetLoader not available - datasets library not installed") # Export __all__ = ["CryptoDatasetLoader", "crypto_dataset_loader"]