Chatbot / app.py
Kiy-K's picture
Update app.py
43ad491 verified
raw
history blame
13.8 kB
# app.py β€” Enhanced version with streaming datasets + memory + web search
import os
import json
import threading
import gradio as gr
from huggingface_hub import InferenceClient
from datasets import load_dataset
from duckduckgo_search import DDGS
# ---------------- CONFIG ----------------
MODEL_ID = "openai/gpt-oss-120b"
DATA_DIR = "/data" if os.path.isdir("/data") else "./data"
os.makedirs(DATA_DIR, exist_ok=True)
SHORT_TERM_LIMIT = 10
SUMMARY_MAX_TOKENS = 150
MEMORY_LOCK = threading.Lock()
# ---------------- STREAMING DATASET LOADING (ZERO STORAGE!) ----------------
# FineWeb 100BT - Full access via streaming!
fineweb_stream = load_dataset(
"HuggingFaceFW/fineweb",
split="train", # Access to full dataset including 100BT
streaming=True # No local storage used!
)
# Other datasets in streaming mode
ultrachat_stream = load_dataset(
"HuggingFaceH4/ultrachat_200k",
split="train",
streaming=True
)
hh_rlhf_stream = load_dataset(
"Anthropic/hh-rlhf",
split="train",
streaming=True
)
print("βœ… All datasets loaded in streaming mode - 0GB storage used!")
# ---------------- DATASET SEARCH FUNCTIONS ----------------
def search_fineweb_knowledge(query, max_samples=5, max_search=2000):
"""Search through streaming FineWeb 100BT for relevant content"""
try:
relevant_texts = []
processed = 0
query_words = query.lower().split()
# Stream through FineWeb looking for relevant content
for sample in fineweb_stream:
if processed >= max_search or len(relevant_texts) >= max_samples:
break
text = sample.get('text', '').lower()
# Check if query words appear in text
if any(word in text for word in query_words):
content = sample['text'][:400] + "..." if len(sample['text']) > 400 else sample['text']
relevant_texts.append(content)
processed += 1
if relevant_texts:
return "πŸ“š FineWeb 100BT Knowledge:\n\n" + "\n---\n".join(relevant_texts)
return "No relevant FineWeb content found."
except Exception as e:
return f"FineWeb search error: {str(e)}"
def search_conversation_patterns(query, max_samples=3):
"""Search UltraChat for conversation patterns"""
try:
relevant_convos = []
processed = 0
for sample in ultrachat_stream:
if processed >= 500 or len(relevant_convos) >= max_samples:
break
# Check messages for relevance
messages = sample.get('messages', [])
for msg in messages:
if query.lower() in msg.get('content', '').lower():
relevant_convos.append({
'role': msg.get('role', 'unknown'),
'content': msg.get('content', '')[:300] + "..."
})
break
processed += 1
if relevant_convos:
result = "πŸ’¬ Conversation Patterns:\n\n"
for convo in relevant_convos:
result += f"**{convo['role']}**: {convo['content']}\n\n"
return result
return ""
except Exception as e:
return f"Conversation search error: {str(e)}"
# ---------------- HELPERS: MEMORY ----------------
def get_user_id(hf_token: gr.OAuthToken | None):
if hf_token and getattr(hf_token, "token", None):
return "user_" + hf_token.token[:12]
return "anon"
def memory_file_path(user_id: str):
return os.path.join(DATA_DIR, f"memory_{user_id}.json")
def load_memory(user_id: str):
p = memory_file_path(user_id)
if os.path.exists(p):
try:
with open(p, "r", encoding="utf-8") as f:
mem = json.load(f)
if isinstance(mem, dict) and "short_term" in mem and "long_term" in mem:
return mem
except Exception as e:
print("load_memory error:", e)
return {"short_term": [], "long_term": ""}
def save_memory(user_id: str, memory: dict):
p = memory_file_path(user_id)
try:
with MEMORY_LOCK:
with open(p, "w", encoding="utf-8") as f:
json.dump(memory, f, ensure_ascii=False, indent=2)
except Exception as e:
print("save_memory error:", e)
# ---------------- NORMALIZE HISTORY ----------------
def normalize_history(history):
out = []
if not history: return out
for turn in history:
if isinstance(turn, dict) and "role" in turn and "content" in turn:
out.append({"role": turn["role"], "content": str(turn["content"])})
elif isinstance(turn, (list, tuple)) and len(turn) == 2:
user_msg, assistant_msg = turn
out.append({"role": "user", "content": str(user_msg)})
out.append({"role": "assistant", "content": str(assistant_msg)})
elif isinstance(turn, str):
out.append({"role": "user", "content": turn})
return out
# ---------------- SYNC COMPLETION ----------------
def _get_chat_response_sync(client: InferenceClient, messages, max_tokens=SUMMARY_MAX_TOKENS, temperature=0.3, top_p=0.9):
try:
resp = client.chat_completion(messages, max_tokens=max_tokens, temperature=temperature, top_p=top_p, stream=False)
except Exception as e:
print("sync chat_completion error:", e)
return ""
try:
choices = resp.get("choices") if isinstance(resp, dict) else getattr(resp, "choices", None)
if choices:
c0 = choices[0]
msg = c0.get("message") if isinstance(c0, dict) else getattr(c0, "message", None)
if isinstance(msg, dict):
return msg.get("content", "")
return getattr(msg, "content", "") or str(msg or "")
except Exception:
pass
return ""
# ---------------- WEB SEARCH ----------------
def web_search(query, num_results=3):
try:
with DDGS() as ddgs:
results = list(ddgs.text(query, max_results=num_results))
search_context = "πŸ” Web Search Results:\n\n"
for i, r in enumerate(results, 1):
title = r.get("title", "")[:200]
body = r.get("body", "")[:200].replace("\n", " ")
href = r.get("href", "")
search_context += f"{i}. {title}\n{body}...\nSource: {href}\n\n"
return search_context
except Exception as e:
return f"❌ Search error: {str(e)}"
# ---------------- SUMMARIZATION ----------------
def summarize_old_messages(client: InferenceClient, old_messages):
text = "\n".join([f"{m['role']}: {m['content']}" for m in old_messages])
system = {"role": "system", "content": "You are a summarizer. Summarize <=150 words."}
user = {"role": "user", "content": text}
return _get_chat_response_sync(client, [system, user])
# ---------------- MEMORY TOOLS ----------------
def show_memory(hf_token: gr.OAuthToken | None = None):
user = get_user_id(hf_token)
p = memory_file_path(user)
if not os.path.exists(p):
return "ℹ️ No memory file found for user: " + user
with open(p, "r", encoding="utf-8") as f:
return f.read()
def clear_memory(hf_token: gr.OAuthToken | None = None):
user = get_user_id(hf_token)
p = memory_file_path(user)
if os.path.exists(p):
os.remove(p)
return f"βœ… Memory cleared for {user}"
return "ℹ️ No memory to clear."
# ---------------- MAIN CHAT WITH ENHANCED CAPABILITIES ----------------
def respond(message, history: list, system_message, max_tokens, temperature, top_p,
enable_web_search, enable_fineweb_search, enable_conversation_search,
enable_persistent_memory, hf_token: gr.OAuthToken = None):
client = InferenceClient(token=(hf_token.token if hf_token else None), model=MODEL_ID)
user_id = get_user_id(hf_token)
memory = load_memory(user_id) if enable_persistent_memory else {"short_term": [], "long_term": ""}
session_history = normalize_history(history)
combined = memory.get("short_term", []) + session_history
# Memory management
if len(combined) > SHORT_TERM_LIMIT:
to_summarize = combined[:len(combined) - SHORT_TERM_LIMIT]
summary = summarize_old_messages(client, to_summarize)
if summary:
memory["long_term"] = (memory.get("long_term", "") + "\n" + summary).strip()
combined = combined[-SHORT_TERM_LIMIT:]
combined.append({"role": "user", "content": message})
memory["short_term"] = combined
if enable_persistent_memory:
save_memory(user_id, memory)
# Build context
messages = [{"role": "system", "content": system_message}]
if memory.get("long_term"):
messages.append({"role": "system", "content": "Long-term memory:\n" + memory["long_term"]})
# Enhanced search capabilities
context_parts = []
# Web search
if enable_web_search and any(k in message.lower() for k in ["search", "google", "tin tα»©c", "news", "what is", "latest", "current"]):
web_results = web_search(message)
context_parts.append(web_results)
# FineWeb 100BT search
if enable_fineweb_search:
fineweb_results = search_fineweb_knowledge(message)
if "No relevant FineWeb" not in fineweb_results:
context_parts.append(fineweb_results)
# Conversation pattern search
if enable_conversation_search:
convo_results = search_conversation_patterns(message)
if convo_results:
context_parts.append(convo_results)
# Add enhanced context
if context_parts:
enhanced_context = "\n\n".join(context_parts)
messages.append({"role": "system", "content": f"Additional Context:\n{enhanced_context}"})
messages.extend(memory["short_term"])
# Generate response
response = ""
try:
for chunk in client.chat_completion(messages, max_tokens=int(max_tokens),
stream=True, temperature=float(temperature), top_p=float(top_p)):
choices = chunk.get("choices") if isinstance(chunk, dict) else getattr(chunk, "choices", None)
if not choices: continue
c0 = choices[0]
delta = c0.get("delta") if isinstance(c0, dict) else getattr(c0, "delta", None)
token = None
if delta and (delta.get("content") if isinstance(delta, dict) else getattr(delta, "content", None)):
token = delta.get("content") if isinstance(delta, dict) else getattr(delta, "content", None)
else:
msg = c0.get("message") if isinstance(c0, dict) else getattr(c0, "message", None)
if isinstance(msg, dict):
token = msg.get("content", "")
else:
token = getattr(msg, "content", None) or str(msg or "")
if token:
response += token
yield response
except Exception as e:
yield f"⚠️ Inference error: {e}"
return
# Update memory
memory["short_term"].append({"role": "assistant", "content": response})
memory["short_term"] = memory["short_term"][-SHORT_TERM_LIMIT:]
if enable_persistent_memory:
save_memory(user_id, memory)
# ---------------- ENHANCED GRADIO UI ----------------
chatbot = gr.ChatInterface(
respond,
type="messages",
additional_inputs=[
gr.Textbox(value="You are an advanced AI assistant with access to web search, FineWeb 100BT knowledge, conversation patterns, and persistent memory. Provide comprehensive, accurate responses.", label="System message"),
gr.Slider(1, 2048, value=512, step=1, label="Max new tokens"),
gr.Slider(0.1, 4.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(0.1, 1.0, value=0.95, step=0.05, label="Top-p"),
gr.Checkbox(value=True, label="🌐 Enable Web Search"),
gr.Checkbox(value=True, label="πŸ“š Enable FineWeb 100BT Search"),
gr.Checkbox(value=True, label="πŸ’¬ Enable Conversation Pattern Search"),
gr.Checkbox(value=True, label="🧠 Enable Persistent Memory"),
],
)
with gr.Blocks(title="Enhanced AI Chatbot - FineWeb 100BT") as demo:
gr.Markdown("""
# πŸš€ Enhanced AI Chatbot with FineWeb 100BT Streaming
**Now with access to 100+ billion tokens via streaming - Zero storage used!**
## πŸ”₯ Features:
- **πŸ“š FineWeb 100BT**: Full access to 100+ billion token web dataset
- **🌐 Web Search**: Real-time internet information
- **πŸ’¬ Conversation Patterns**: Learn from 200k+ high-quality conversations
- **🧠 Persistent Memory**: Remembers across sessions
- **⚑ Zero Storage**: All datasets stream on-demand
- **πŸ’° Cost**: $0.00 (still free!)
""")
with gr.Sidebar():
gr.LoginButton()
gr.Markdown("""
### πŸ“Š Dataset Access:
- **FineWeb**: 100BT tokens (streaming)
- **UltraChat**: 515k conversations (streaming)
- **HH-RLHF**: 169k samples (streaming)
- **Storage Used**: 0GB πŸŽ‰
### πŸ”§ Memory Tools:
""")
with gr.Row():
show_btn = gr.Button("πŸ‘€ Show Memory", size="sm")
clear_btn = gr.Button("πŸ—‘οΈ Clear Memory", size="sm")
memory_output = gr.Textbox(label="Memory Status", lines=10, max_lines=15)
show_btn.click(show_memory, inputs=None, outputs=memory_output)
clear_btn.click(clear_memory, inputs=None, outputs=memory_output)
chatbot.render()
if __name__ == "__main__":
demo.launch()