Spaces:
Paused
Paused
| # app.py β Enhanced version with streaming datasets + memory + web search | |
| import os | |
| import json | |
| import threading | |
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| from datasets import load_dataset | |
| from duckduckgo_search import DDGS | |
| # ---------------- CONFIG ---------------- | |
| MODEL_ID = "openai/gpt-oss-120b" | |
| DATA_DIR = "/data" if os.path.isdir("/data") else "./data" | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| SHORT_TERM_LIMIT = 10 | |
| SUMMARY_MAX_TOKENS = 150 | |
| MEMORY_LOCK = threading.Lock() | |
| # ---------------- STREAMING DATASET LOADING (ZERO STORAGE!) ---------------- | |
| # FineWeb 100BT - Full access via streaming! | |
| fineweb_stream = load_dataset( | |
| "HuggingFaceFW/fineweb", | |
| split="train", # Access to full dataset including 100BT | |
| streaming=True # No local storage used! | |
| ) | |
| # Other datasets in streaming mode | |
| ultrachat_stream = load_dataset( | |
| "HuggingFaceH4/ultrachat_200k", | |
| split="train", | |
| streaming=True | |
| ) | |
| hh_rlhf_stream = load_dataset( | |
| "Anthropic/hh-rlhf", | |
| split="train", | |
| streaming=True | |
| ) | |
| print("β All datasets loaded in streaming mode - 0GB storage used!") | |
| # ---------------- DATASET SEARCH FUNCTIONS ---------------- | |
| def search_fineweb_knowledge(query, max_samples=5, max_search=2000): | |
| """Search through streaming FineWeb 100BT for relevant content""" | |
| try: | |
| relevant_texts = [] | |
| processed = 0 | |
| query_words = query.lower().split() | |
| # Stream through FineWeb looking for relevant content | |
| for sample in fineweb_stream: | |
| if processed >= max_search or len(relevant_texts) >= max_samples: | |
| break | |
| text = sample.get('text', '').lower() | |
| # Check if query words appear in text | |
| if any(word in text for word in query_words): | |
| content = sample['text'][:400] + "..." if len(sample['text']) > 400 else sample['text'] | |
| relevant_texts.append(content) | |
| processed += 1 | |
| if relevant_texts: | |
| return "π FineWeb 100BT Knowledge:\n\n" + "\n---\n".join(relevant_texts) | |
| return "No relevant FineWeb content found." | |
| except Exception as e: | |
| return f"FineWeb search error: {str(e)}" | |
| def search_conversation_patterns(query, max_samples=3): | |
| """Search UltraChat for conversation patterns""" | |
| try: | |
| relevant_convos = [] | |
| processed = 0 | |
| for sample in ultrachat_stream: | |
| if processed >= 500 or len(relevant_convos) >= max_samples: | |
| break | |
| # Check messages for relevance | |
| messages = sample.get('messages', []) | |
| for msg in messages: | |
| if query.lower() in msg.get('content', '').lower(): | |
| relevant_convos.append({ | |
| 'role': msg.get('role', 'unknown'), | |
| 'content': msg.get('content', '')[:300] + "..." | |
| }) | |
| break | |
| processed += 1 | |
| if relevant_convos: | |
| result = "π¬ Conversation Patterns:\n\n" | |
| for convo in relevant_convos: | |
| result += f"**{convo['role']}**: {convo['content']}\n\n" | |
| return result | |
| return "" | |
| except Exception as e: | |
| return f"Conversation search error: {str(e)}" | |
| # ---------------- HELPERS: MEMORY ---------------- | |
| def get_user_id(hf_token: gr.OAuthToken | None): | |
| if hf_token and getattr(hf_token, "token", None): | |
| return "user_" + hf_token.token[:12] | |
| return "anon" | |
| def memory_file_path(user_id: str): | |
| return os.path.join(DATA_DIR, f"memory_{user_id}.json") | |
| def load_memory(user_id: str): | |
| p = memory_file_path(user_id) | |
| if os.path.exists(p): | |
| try: | |
| with open(p, "r", encoding="utf-8") as f: | |
| mem = json.load(f) | |
| if isinstance(mem, dict) and "short_term" in mem and "long_term" in mem: | |
| return mem | |
| except Exception as e: | |
| print("load_memory error:", e) | |
| return {"short_term": [], "long_term": ""} | |
| def save_memory(user_id: str, memory: dict): | |
| p = memory_file_path(user_id) | |
| try: | |
| with MEMORY_LOCK: | |
| with open(p, "w", encoding="utf-8") as f: | |
| json.dump(memory, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| print("save_memory error:", e) | |
| # ---------------- NORMALIZE HISTORY ---------------- | |
| def normalize_history(history): | |
| out = [] | |
| if not history: return out | |
| for turn in history: | |
| if isinstance(turn, dict) and "role" in turn and "content" in turn: | |
| out.append({"role": turn["role"], "content": str(turn["content"])}) | |
| elif isinstance(turn, (list, tuple)) and len(turn) == 2: | |
| user_msg, assistant_msg = turn | |
| out.append({"role": "user", "content": str(user_msg)}) | |
| out.append({"role": "assistant", "content": str(assistant_msg)}) | |
| elif isinstance(turn, str): | |
| out.append({"role": "user", "content": turn}) | |
| return out | |
| # ---------------- SYNC COMPLETION ---------------- | |
| def _get_chat_response_sync(client: InferenceClient, messages, max_tokens=SUMMARY_MAX_TOKENS, temperature=0.3, top_p=0.9): | |
| try: | |
| resp = client.chat_completion(messages, max_tokens=max_tokens, temperature=temperature, top_p=top_p, stream=False) | |
| except Exception as e: | |
| print("sync chat_completion error:", e) | |
| return "" | |
| try: | |
| choices = resp.get("choices") if isinstance(resp, dict) else getattr(resp, "choices", None) | |
| if choices: | |
| c0 = choices[0] | |
| msg = c0.get("message") if isinstance(c0, dict) else getattr(c0, "message", None) | |
| if isinstance(msg, dict): | |
| return msg.get("content", "") | |
| return getattr(msg, "content", "") or str(msg or "") | |
| except Exception: | |
| pass | |
| return "" | |
| # ---------------- WEB SEARCH ---------------- | |
| def web_search(query, num_results=3): | |
| try: | |
| with DDGS() as ddgs: | |
| results = list(ddgs.text(query, max_results=num_results)) | |
| search_context = "π Web Search Results:\n\n" | |
| for i, r in enumerate(results, 1): | |
| title = r.get("title", "")[:200] | |
| body = r.get("body", "")[:200].replace("\n", " ") | |
| href = r.get("href", "") | |
| search_context += f"{i}. {title}\n{body}...\nSource: {href}\n\n" | |
| return search_context | |
| except Exception as e: | |
| return f"β Search error: {str(e)}" | |
| # ---------------- SUMMARIZATION ---------------- | |
| def summarize_old_messages(client: InferenceClient, old_messages): | |
| text = "\n".join([f"{m['role']}: {m['content']}" for m in old_messages]) | |
| system = {"role": "system", "content": "You are a summarizer. Summarize <=150 words."} | |
| user = {"role": "user", "content": text} | |
| return _get_chat_response_sync(client, [system, user]) | |
| # ---------------- MEMORY TOOLS ---------------- | |
| def show_memory(hf_token: gr.OAuthToken | None = None): | |
| user = get_user_id(hf_token) | |
| p = memory_file_path(user) | |
| if not os.path.exists(p): | |
| return "βΉοΈ No memory file found for user: " + user | |
| with open(p, "r", encoding="utf-8") as f: | |
| return f.read() | |
| def clear_memory(hf_token: gr.OAuthToken | None = None): | |
| user = get_user_id(hf_token) | |
| p = memory_file_path(user) | |
| if os.path.exists(p): | |
| os.remove(p) | |
| return f"β Memory cleared for {user}" | |
| return "βΉοΈ No memory to clear." | |
| # ---------------- MAIN CHAT WITH ENHANCED CAPABILITIES ---------------- | |
| def respond(message, history: list, system_message, max_tokens, temperature, top_p, | |
| enable_web_search, enable_fineweb_search, enable_conversation_search, | |
| enable_persistent_memory, hf_token: gr.OAuthToken = None): | |
| client = InferenceClient(token=(hf_token.token if hf_token else None), model=MODEL_ID) | |
| user_id = get_user_id(hf_token) | |
| memory = load_memory(user_id) if enable_persistent_memory else {"short_term": [], "long_term": ""} | |
| session_history = normalize_history(history) | |
| combined = memory.get("short_term", []) + session_history | |
| # Memory management | |
| if len(combined) > SHORT_TERM_LIMIT: | |
| to_summarize = combined[:len(combined) - SHORT_TERM_LIMIT] | |
| summary = summarize_old_messages(client, to_summarize) | |
| if summary: | |
| memory["long_term"] = (memory.get("long_term", "") + "\n" + summary).strip() | |
| combined = combined[-SHORT_TERM_LIMIT:] | |
| combined.append({"role": "user", "content": message}) | |
| memory["short_term"] = combined | |
| if enable_persistent_memory: | |
| save_memory(user_id, memory) | |
| # Build context | |
| messages = [{"role": "system", "content": system_message}] | |
| if memory.get("long_term"): | |
| messages.append({"role": "system", "content": "Long-term memory:\n" + memory["long_term"]}) | |
| # Enhanced search capabilities | |
| context_parts = [] | |
| # Web search | |
| if enable_web_search and any(k in message.lower() for k in ["search", "google", "tin tα»©c", "news", "what is", "latest", "current"]): | |
| web_results = web_search(message) | |
| context_parts.append(web_results) | |
| # FineWeb 100BT search | |
| if enable_fineweb_search: | |
| fineweb_results = search_fineweb_knowledge(message) | |
| if "No relevant FineWeb" not in fineweb_results: | |
| context_parts.append(fineweb_results) | |
| # Conversation pattern search | |
| if enable_conversation_search: | |
| convo_results = search_conversation_patterns(message) | |
| if convo_results: | |
| context_parts.append(convo_results) | |
| # Add enhanced context | |
| if context_parts: | |
| enhanced_context = "\n\n".join(context_parts) | |
| messages.append({"role": "system", "content": f"Additional Context:\n{enhanced_context}"}) | |
| messages.extend(memory["short_term"]) | |
| # Generate response | |
| response = "" | |
| try: | |
| for chunk in client.chat_completion(messages, max_tokens=int(max_tokens), | |
| stream=True, temperature=float(temperature), top_p=float(top_p)): | |
| choices = chunk.get("choices") if isinstance(chunk, dict) else getattr(chunk, "choices", None) | |
| if not choices: continue | |
| c0 = choices[0] | |
| delta = c0.get("delta") if isinstance(c0, dict) else getattr(c0, "delta", None) | |
| token = None | |
| if delta and (delta.get("content") if isinstance(delta, dict) else getattr(delta, "content", None)): | |
| token = delta.get("content") if isinstance(delta, dict) else getattr(delta, "content", None) | |
| else: | |
| msg = c0.get("message") if isinstance(c0, dict) else getattr(c0, "message", None) | |
| if isinstance(msg, dict): | |
| token = msg.get("content", "") | |
| else: | |
| token = getattr(msg, "content", None) or str(msg or "") | |
| if token: | |
| response += token | |
| yield response | |
| except Exception as e: | |
| yield f"β οΈ Inference error: {e}" | |
| return | |
| # Update memory | |
| memory["short_term"].append({"role": "assistant", "content": response}) | |
| memory["short_term"] = memory["short_term"][-SHORT_TERM_LIMIT:] | |
| if enable_persistent_memory: | |
| save_memory(user_id, memory) | |
| # ---------------- ENHANCED GRADIO UI ---------------- | |
| chatbot = gr.ChatInterface( | |
| respond, | |
| type="messages", | |
| additional_inputs=[ | |
| gr.Textbox(value="You are an advanced AI assistant with access to web search, FineWeb 100BT knowledge, conversation patterns, and persistent memory. Provide comprehensive, accurate responses.", label="System message"), | |
| gr.Slider(1, 2048, value=512, step=1, label="Max new tokens"), | |
| gr.Slider(0.1, 4.0, value=0.7, step=0.1, label="Temperature"), | |
| gr.Slider(0.1, 1.0, value=0.95, step=0.05, label="Top-p"), | |
| gr.Checkbox(value=True, label="π Enable Web Search"), | |
| gr.Checkbox(value=True, label="π Enable FineWeb 100BT Search"), | |
| gr.Checkbox(value=True, label="π¬ Enable Conversation Pattern Search"), | |
| gr.Checkbox(value=True, label="π§ Enable Persistent Memory"), | |
| ], | |
| ) | |
| with gr.Blocks(title="Enhanced AI Chatbot - FineWeb 100BT") as demo: | |
| gr.Markdown(""" | |
| # π Enhanced AI Chatbot with FineWeb 100BT Streaming | |
| **Now with access to 100+ billion tokens via streaming - Zero storage used!** | |
| ## π₯ Features: | |
| - **π FineWeb 100BT**: Full access to 100+ billion token web dataset | |
| - **π Web Search**: Real-time internet information | |
| - **π¬ Conversation Patterns**: Learn from 200k+ high-quality conversations | |
| - **π§ Persistent Memory**: Remembers across sessions | |
| - **β‘ Zero Storage**: All datasets stream on-demand | |
| - **π° Cost**: $0.00 (still free!) | |
| """) | |
| with gr.Sidebar(): | |
| gr.LoginButton() | |
| gr.Markdown(""" | |
| ### π Dataset Access: | |
| - **FineWeb**: 100BT tokens (streaming) | |
| - **UltraChat**: 515k conversations (streaming) | |
| - **HH-RLHF**: 169k samples (streaming) | |
| - **Storage Used**: 0GB π | |
| ### π§ Memory Tools: | |
| """) | |
| with gr.Row(): | |
| show_btn = gr.Button("π Show Memory", size="sm") | |
| clear_btn = gr.Button("ποΈ Clear Memory", size="sm") | |
| memory_output = gr.Textbox(label="Memory Status", lines=10, max_lines=15) | |
| show_btn.click(show_memory, inputs=None, outputs=memory_output) | |
| clear_btn.click(clear_memory, inputs=None, outputs=memory_output) | |
| chatbot.render() | |
| if __name__ == "__main__": | |
| demo.launch() |