Krishna Chaitanya Cheedella
Refactor to use FREE HuggingFace models + OpenAI instead of OpenRouter
aa61236
| """3-stage LLM Council orchestration using FREE models.""" | |
| from typing import List, Dict, Any, Tuple | |
| from .api_client import query_models_parallel, query_model, query_model_stream | |
| from .config_free import COUNCIL_MODELS, CHAIRMAN_MODEL | |
| async def stage1_collect_responses(user_query: str) -> List[Dict[str, Any]]: | |
| """ | |
| Stage 1: Collect individual responses from all council models. | |
| Args: | |
| user_query: The user's question | |
| Returns: | |
| List of dicts with 'model' and 'response' keys | |
| """ | |
| print("STAGE 1: Collecting individual responses from council members...") | |
| messages = [{"role": "user", "content": user_query}] | |
| # Query all models in parallel | |
| responses = await query_models_parallel(COUNCIL_MODELS, messages) | |
| # Format results | |
| stage1_results = [] | |
| for model_config in COUNCIL_MODELS: | |
| model_id = model_config["id"] | |
| response = responses.get(model_id) | |
| if response is not None: | |
| stage1_results.append({ | |
| "model": model_id, | |
| "response": response.get("content", "") | |
| }) | |
| print(f"STAGE 1 COMPLETE: Received {len(stage1_results)} responses.") | |
| return stage1_results | |
| async def stage2_collect_rankings( | |
| user_query: str, stage1_results: List[Dict[str, Any]] | |
| ) -> Tuple[List[Dict[str, Any]], Dict[str, str]]: | |
| """ | |
| Stage 2: Each model ranks the anonymized responses. | |
| Args: | |
| user_query: The original user query | |
| stage1_results: Results from Stage 1 | |
| Returns: | |
| Tuple of (rankings list, label_to_model mapping) | |
| """ | |
| print("STAGE 2: Council members are ranking each other's responses...") | |
| # Create anonymized labels for responses (Response A, Response B, etc.) | |
| labels = [chr(65 + i) for i in range(len(stage1_results))] # A, B, C, ... | |
| # Create mapping from label to model name | |
| label_to_model = { | |
| f"Response {label}": result["model"] | |
| for label, result in zip(labels, stage1_results) | |
| } | |
| # Build the ranking prompt | |
| responses_text = "\n\n".join([ | |
| f"Response {label}:\n{result['response']}" | |
| for label, result in zip(labels, stage1_results) | |
| ]) | |
| ranking_prompt = f"""You are evaluating different responses to the following question: | |
| Question: {user_query} | |
| Here are the responses from different models (anonymized): | |
| {responses_text} | |
| Your task: | |
| 1. First, evaluate each response individually. For each response, explain what it does well and what it does poorly. | |
| 2. Then, at the very end of your response, provide a final ranking. | |
| IMPORTANT: Your final ranking MUST be formatted EXACTLY as follows: | |
| - Start with the line "FINAL RANKING:" (all caps, with colon) | |
| - Then list the responses from best to worst as a numbered list | |
| - Each line should be: number, period, space, then ONLY the response label (e.g., "1. Response A") | |
| - Do not add any other text or explanations in the ranking section | |
| Example of the correct format for your ENTIRE response: | |
| Response A provides good detail on X but misses Y... | |
| Response B is accurate but lacks depth on Z... | |
| Response C offers the most comprehensive answer... | |
| FINAL RANKING: | |
| 1. Response C | |
| 2. Response A | |
| 3. Response B | |
| Now provide your evaluation and ranking:""" | |
| messages = [{"role": "user", "content": ranking_prompt}] | |
| # Get rankings from all council models in parallel | |
| responses = await query_models_parallel(COUNCIL_MODELS, messages) | |
| # Format results | |
| stage2_results = [] | |
| for model_config in COUNCIL_MODELS: | |
| model_id = model_config["id"] | |
| response = responses.get(model_id) | |
| if response is not None: | |
| full_text = response.get("content", "") | |
| parsed = parse_ranking_from_text(full_text) | |
| stage2_results.append({ | |
| "model": model_id, | |
| "ranking": full_text, | |
| "parsed_ranking": parsed | |
| }) | |
| print("STAGE 2 COMPLETE: Rankings collected.") | |
| return stage2_results, label_to_model | |
| async def stage3_synthesize_final( | |
| user_query: str, | |
| stage1_results: List[Dict[str, Any]], | |
| stage2_results: List[Dict[str, Any]] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Stage 3: Chairman synthesizes final response. | |
| Args: | |
| user_query: The original user query | |
| stage1_results: Individual model responses from Stage 1 | |
| stage2_results: Rankings from Stage 2 | |
| Returns: | |
| Dict with 'model' and 'response' keys | |
| """ | |
| print("STAGE 3: Chairman is synthesizing the final answer...") | |
| # Build comprehensive context for chairman | |
| stage1_text = "\n\n".join([ | |
| f"Model: {result['model']}\nResponse: {result['response']}" | |
| for result in stage1_results | |
| ]) | |
| stage2_text = "\n\n".join([ | |
| f"Model: {result['model']}\nRanking: {result['ranking']}" | |
| for result in stage2_results | |
| ]) | |
| chairman_prompt = f"""You are the Chairman of an LLM Council. Multiple AI models have provided responses to a user's question, and then ranked each other's responses. | |
| Original Question: {user_query} | |
| STAGE 1 - Individual Responses: | |
| {stage1_text} | |
| STAGE 2 - Peer Rankings: | |
| {stage2_text} | |
| Your task as Chairman is to synthesize all of this information into a single, comprehensive, accurate answer to the user's original question. Consider: | |
| - The individual responses and their insights | |
| - The peer rankings and what they reveal about response quality | |
| - Any patterns of agreement or disagreement | |
| Provide a clear, well-reasoned final answer that represents the council's collective wisdom:""" | |
| messages = [{"role": "user", "content": chairman_prompt}] | |
| # Query the chairman model | |
| response = await query_model(CHAIRMAN_MODEL, messages) | |
| if response is None: | |
| print("STAGE 3 ERROR: Unable to generate final synthesis.") | |
| return { | |
| "model": CHAIRMAN_MODEL["id"], | |
| "response": "Error: Unable to generate final synthesis." | |
| } | |
| print("STAGE 3 COMPLETE: Final answer synthesized.") | |
| return { | |
| "model": CHAIRMAN_MODEL["id"], | |
| "response": response.get("content", "") | |
| } | |
| async def stage3_synthesize_final_stream( | |
| user_query: str, | |
| stage1_results: List[Dict[str, Any]], | |
| stage2_results: List[Dict[str, Any]] | |
| ): | |
| """ | |
| Stage 3: Chairman synthesizes final response (Streaming). | |
| Yields chunks of text. | |
| """ | |
| print("STAGE 3: Chairman is synthesizing the final answer (Streaming)...") | |
| # Build comprehensive context for chairman | |
| stage1_text = "\n\n".join([ | |
| f"Model: {result['model']}\nResponse: {result['response']}" | |
| for result in stage1_results | |
| ]) | |
| stage2_text = "\n\n".join([ | |
| f"Model: {result['model']}\nRanking: {result['ranking']}" | |
| for result in stage2_results | |
| ]) | |
| chairman_prompt = f"""You are the Chairman of an LLM Council. Multiple AI models have provided responses to a user's question, and then ranked each other's responses. | |
| Original Question: {user_query} | |
| STAGE 1 - Individual Responses: | |
| {stage1_text} | |
| STAGE 2 - Peer Rankings: | |
| {stage2_text} | |
| Your task as Chairman is to synthesize all of this information into a single, comprehensive, accurate answer to the user's original question. Consider: | |
| - The individual responses and their insights | |
| - The peer rankings and what they reveal about response quality | |
| - Any patterns of agreement or disagreement | |
| Provide a clear, well-reasoned final answer that represents the council's collective wisdom:""" | |
| messages = [{"role": "user", "content": chairman_prompt}] | |
| # Stream the chairman model | |
| async for chunk in query_model_stream(CHAIRMAN_MODEL, messages): | |
| yield chunk | |
| print("STAGE 3 COMPLETE: Final answer stream finished.") | |
| def parse_ranking_from_text(ranking_text: str) -> List[str]: | |
| """ | |
| Parse the FINAL RANKING section from the model's response. | |
| Args: | |
| ranking_text: The full text response from the model | |
| Returns: | |
| List of response labels in ranked order | |
| """ | |
| import re | |
| # Look for "FINAL RANKING:" section | |
| if "FINAL RANKING:" in ranking_text: | |
| parts = ranking_text.split("FINAL RANKING:") | |
| if len(parts) >= 2: | |
| ranking_section = parts[1] | |
| # Extract numbered list format | |
| numbered_matches = re.findall(r"\d+\.\s*Response [A-Z]", ranking_section) | |
| if numbered_matches: | |
| return [re.search(r"Response [A-Z]", m).group() for m in numbered_matches] | |
| # Fallback: Extract all "Response X" patterns in order | |
| matches = re.findall(r"Response [A-Z]", ranking_section) | |
| return matches | |
| # Fallback: try to find any "Response X" patterns in order | |
| matches = re.findall(r"Response [A-Z]", ranking_text) | |
| return matches | |
| def calculate_aggregate_rankings( | |
| stage2_results: List[Dict[str, Any]], | |
| label_to_model: Dict[str, str] | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Calculate aggregate rankings across all models. | |
| Args: | |
| stage2_results: Rankings from each model | |
| label_to_model: Mapping from anonymous labels to model names | |
| Returns: | |
| List of dicts with model name and average rank, sorted best to worst | |
| """ | |
| from collections import defaultdict | |
| # Track positions for each model | |
| model_positions = defaultdict(list) | |
| for ranking in stage2_results: | |
| ranking_text = ranking["ranking"] | |
| parsed_ranking = parse_ranking_from_text(ranking_text) | |
| for position, label in enumerate(parsed_ranking, start=1): | |
| if label in label_to_model: | |
| model_name = label_to_model[label] | |
| model_positions[model_name].append(position) | |
| # Calculate average position for each model | |
| aggregate = [] | |
| for model, positions in model_positions.items(): | |
| if positions: | |
| avg_rank = sum(positions) / len(positions) | |
| aggregate.append({ | |
| "model": model, | |
| "average_rank": round(avg_rank, 2), | |
| "rankings_count": len(positions) | |
| }) | |
| # Sort by average rank (lower is better) | |
| aggregate.sort(key=lambda x: x["average_rank"]) | |
| return aggregate | |
| async def generate_conversation_title(user_query: str) -> str: | |
| """ | |
| Generate a short title for a conversation based on the first user message. | |
| Args: | |
| user_query: The first user message | |
| Returns: | |
| A short title (3-5 words) | |
| """ | |
| title_prompt = f"""Generate a very short title (3-5 words maximum) that summarizes the following question. | |
| The title should be concise and descriptive. Do not use quotes or punctuation in the title. | |
| Question: {user_query} | |
| Title:""" | |
| messages = [{"role": "user", "content": title_prompt}] | |
| # Use GPT-4o-mini for fast title generation | |
| response = await query_model(CHAIRMAN_MODEL, messages, timeout=30.0) | |
| if response is None: | |
| return "New Conversation" | |
| title = response.get("content", "New Conversation").strip() | |
| title = title.strip("\"'") | |
| # Truncate if too long | |
| if len(title) > 50: | |
| title = title[:47] + "..." | |
| return title | |
| async def run_full_council(user_query: str) -> Tuple[List, List, Dict, Dict]: | |
| """ | |
| Run the complete 3-stage council process. | |
| Args: | |
| user_query: The user's question | |
| Returns: | |
| Tuple of (stage1_results, stage2_results, stage3_result, metadata) | |
| """ | |
| # Stage 1: Collect individual responses | |
| stage1_results = await stage1_collect_responses(user_query) | |
| # If no models responded successfully, return error | |
| if not stage1_results: | |
| return [], [], { | |
| "model": "error", | |
| "response": "All models failed to respond. Please try again." | |
| }, {} | |
| # Stage 2: Collect rankings | |
| stage2_results, label_to_model = await stage2_collect_rankings( | |
| user_query, stage1_results | |
| ) | |
| # Calculate aggregate rankings | |
| aggregate_rankings = calculate_aggregate_rankings(stage2_results, label_to_model) | |
| # Stage 3: Synthesize final answer | |
| stage3_result = await stage3_synthesize_final( | |
| user_query, stage1_results, stage2_results | |
| ) | |
| # Prepare metadata | |
| metadata = { | |
| "label_to_model": label_to_model, | |
| "aggregate_rankings": aggregate_rankings | |
| } | |
| return stage1_results, stage2_results, stage3_result, metadata | |