|
|
|
|
|
import json |
|
|
import os |
|
|
import random |
|
|
|
|
|
from gradio import ChatMessage |
|
|
from langchain_core.messages import AIMessage, AIMessageChunk, HumanMessage, ToolMessage |
|
|
from langgraph.graph import StateGraph, START, END |
|
|
from langgraph.graph.message import add_messages |
|
|
from langgraph.prebuilt import ToolNode |
|
|
from support.log_manager import logger |
|
|
from support.my_tools import captain_agent_tools, boss_agent_tools |
|
|
from support.tools.boss_agent_tools import create_fake_tool_call |
|
|
from support.tools.captain_agent_tools import create_agent_fake_tool_call |
|
|
from support.prompts import captain_agent_system_prompt, player_agent_system_prompt, boss_agent_system_prompt |
|
|
from typing import Annotated, List, Literal, Optional |
|
|
from typing_extensions import TypedDict |
|
|
|
|
|
|
|
|
class State(TypedDict): |
|
|
messages: Annotated[list, add_messages] |
|
|
chat_history: List[str] |
|
|
round_messages: Annotated[list, add_messages] |
|
|
original_board: dict |
|
|
board: dict |
|
|
players: List |
|
|
current_team: Literal['red', 'blue'] |
|
|
current_role: str |
|
|
turn: int |
|
|
last_user_message: str |
|
|
guesses: List[str] |
|
|
clue: Optional[str] |
|
|
clue_number: Optional[int] |
|
|
next_team: Literal['red', 'blue'] |
|
|
history_guessed_words: List[str] |
|
|
human_clue: str |
|
|
human_clue_number: int |
|
|
teams_reviewed: List[str] |
|
|
end_round: bool |
|
|
winner_and_score: tuple |
|
|
|
|
|
red_boss_is_called_counter: int |
|
|
blue_boss_is_called_counter: int |
|
|
red_captain_is_called_counter: int |
|
|
blue_captain_is_called_counter: int |
|
|
|
|
|
|
|
|
class MyGraph: |
|
|
def __init__(self): |
|
|
|
|
|
self.red_team = self._create_red_team_graph() |
|
|
self.blue_team = self._create_blue_team_graph() |
|
|
self.graph = self._create_graph() |
|
|
|
|
|
self.players = [] |
|
|
self.board = None |
|
|
self.guessed_words = [] |
|
|
self.current_team = "" |
|
|
self.chat_history = [] |
|
|
self.winners = [] |
|
|
|
|
|
self.IS_HUMAN_PLAYING = None |
|
|
|
|
|
def _create_red_team_graph(self): |
|
|
"""Compile red team subgraph""" |
|
|
builder = StateGraph(State) |
|
|
|
|
|
|
|
|
builder.add_node("red_boss", self.red_boss) |
|
|
builder.add_node("red_captain", self.red_captain) |
|
|
builder.add_node("red_agent_1", self.red_agent_1) |
|
|
builder.add_node("red_agent_2", self.red_agent_2) |
|
|
builder.add_node("red_boss_is_called", self.red_boss_is_called) |
|
|
builder.add_node("red_captain_is_called", self.red_captain_is_called) |
|
|
|
|
|
|
|
|
choose_word_tool = ToolNode(tools=[boss_agent_tools[0]]) |
|
|
final_choice = ToolNode(tools=[captain_agent_tools[0]]) |
|
|
transfer_to_agent_1 = ToolNode(tools=[captain_agent_tools[1]]) |
|
|
transfer_to_agent_2 = ToolNode(tools=[captain_agent_tools[2]]) |
|
|
|
|
|
builder.add_node("choose_word_tool", choose_word_tool) |
|
|
builder.add_node("final_choice", final_choice) |
|
|
builder.add_node("transfer_to_agent_1", transfer_to_agent_1) |
|
|
builder.add_node("transfer_to_agent_2", transfer_to_agent_2) |
|
|
builder.add_node("update_turn", self.update_turn) |
|
|
|
|
|
|
|
|
builder.add_edge(START, "red_boss") |
|
|
builder.add_conditional_edges( |
|
|
"red_boss", |
|
|
self.boss_choice, |
|
|
{ |
|
|
"choose_word_tool": "choose_word_tool", |
|
|
"red_boss_is_called": "red_boss_is_called", |
|
|
}, |
|
|
) |
|
|
|
|
|
builder.add_edge("red_boss_is_called", "red_boss") |
|
|
builder.add_edge("choose_word_tool", "red_captain") |
|
|
|
|
|
builder.add_conditional_edges( |
|
|
"red_captain", |
|
|
self.should_continue, |
|
|
{ |
|
|
"final_choice": "final_choice", |
|
|
"transfer_to_agent_1": "transfer_to_agent_1", |
|
|
"transfer_to_agent_2": "transfer_to_agent_2", |
|
|
"red_captain_is_called": "red_captain_is_called", |
|
|
}, |
|
|
) |
|
|
|
|
|
builder.add_edge("red_captain_is_called", "red_captain") |
|
|
builder.add_edge("transfer_to_agent_1", "red_agent_1") |
|
|
builder.add_edge("transfer_to_agent_2", "red_agent_2") |
|
|
builder.add_edge("red_agent_1", "red_captain") |
|
|
builder.add_edge("red_agent_2", "red_captain") |
|
|
builder.add_edge("final_choice", "update_turn") |
|
|
builder.add_edge("update_turn", END) |
|
|
|
|
|
return builder.compile() |
|
|
|
|
|
def _create_blue_team_graph(self): |
|
|
"""Compile blue team subgraph""" |
|
|
builder = StateGraph(State) |
|
|
|
|
|
|
|
|
builder.add_node("blue_boss", self.blue_boss) |
|
|
builder.add_node("blue_captain", self.blue_captain) |
|
|
builder.add_node("blue_agent_1", self.blue_agent_1) |
|
|
builder.add_node("blue_agent_2", self.blue_agent_2) |
|
|
builder.add_node("blue_boss_is_called", self.blue_boss_is_called) |
|
|
builder.add_node("blue_captain_is_called", self.blue_captain_is_called) |
|
|
|
|
|
|
|
|
choose_word_tool = ToolNode(tools=[boss_agent_tools[0]]) |
|
|
final_choice = ToolNode(tools=[captain_agent_tools[0]]) |
|
|
transfer_to_agent_1 = ToolNode(tools=[captain_agent_tools[1]]) |
|
|
transfer_to_agent_2 = ToolNode(tools=[captain_agent_tools[2]]) |
|
|
|
|
|
builder.add_node("choose_word_tool", choose_word_tool) |
|
|
builder.add_node("final_choice", final_choice) |
|
|
builder.add_node("transfer_to_agent_1", transfer_to_agent_1) |
|
|
builder.add_node("transfer_to_agent_2", transfer_to_agent_2) |
|
|
builder.add_node("update_turn", self.update_turn) |
|
|
|
|
|
|
|
|
builder.add_edge(START, "blue_boss") |
|
|
builder.add_conditional_edges( |
|
|
"blue_boss", |
|
|
self.boss_choice, |
|
|
{ |
|
|
"choose_word_tool": "choose_word_tool", |
|
|
"blue_boss_is_called": "blue_boss_is_called", |
|
|
}, |
|
|
) |
|
|
|
|
|
builder.add_edge("blue_boss_is_called", "blue_boss") |
|
|
builder.add_edge("choose_word_tool", "blue_captain") |
|
|
|
|
|
builder.add_conditional_edges( |
|
|
"blue_captain", |
|
|
self.should_continue, |
|
|
{ |
|
|
"final_choice": "final_choice", |
|
|
"transfer_to_agent_1": "transfer_to_agent_1", |
|
|
"transfer_to_agent_2": "transfer_to_agent_2", |
|
|
"blue_captain_is_called": "blue_captain_is_called", |
|
|
}, |
|
|
) |
|
|
|
|
|
builder.add_edge("blue_captain_is_called", "blue_captain") |
|
|
builder.add_edge("transfer_to_agent_1", "blue_agent_1") |
|
|
builder.add_edge("transfer_to_agent_2", "blue_agent_2") |
|
|
builder.add_edge("blue_agent_1", "blue_captain") |
|
|
builder.add_edge("blue_agent_2", "blue_captain") |
|
|
builder.add_edge("final_choice", "update_turn") |
|
|
builder.add_edge("update_turn", END) |
|
|
|
|
|
return builder.compile() |
|
|
|
|
|
def _create_graph(self) -> StateGraph: |
|
|
"""Create and compile the graph.""" |
|
|
|
|
|
builder = StateGraph(State) |
|
|
|
|
|
red_graph = self._create_red_team_graph() |
|
|
blue_graph = self._create_blue_team_graph() |
|
|
|
|
|
builder.add_node("judge", self.judge) |
|
|
|
|
|
|
|
|
builder.add_node("red_team", red_graph) |
|
|
builder.add_node("blue_team", blue_graph) |
|
|
|
|
|
builder.add_edge(START, "judge") |
|
|
builder.add_conditional_edges( |
|
|
"judge", |
|
|
self.route_after_judge, |
|
|
["red_team", "blue_team", END], |
|
|
) |
|
|
builder.add_edge("red_team", "judge") |
|
|
builder.add_edge("blue_team", "judge") |
|
|
|
|
|
graph = builder.compile() |
|
|
|
|
|
|
|
|
if not os.path.exists("graph.png"): |
|
|
try: |
|
|
img = graph.get_graph(xray=True).draw_mermaid_png() |
|
|
with open("graph.png", "wb") as f: |
|
|
f.write(img) |
|
|
except Exception as e: |
|
|
logger.error(f"[GRAPH IMAGE ERROR]: {e}") |
|
|
|
|
|
return graph |
|
|
|
|
|
|
|
|
async def red_boss(self, state: State): |
|
|
"""Red team boss gives a clue""" |
|
|
logger.info("[RED BOSS] MOMENT ") |
|
|
boss = next((p for p in state["players"] if p.team == "red" and p.role == "boss"), None) |
|
|
if not boss: |
|
|
return state |
|
|
|
|
|
board = state["board"] |
|
|
team_name = state["current_team"].upper() |
|
|
formatted_boss_system_prompt = boss_agent_system_prompt.format(team_name) |
|
|
chat_history = state["chat_history"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
formatted_history, current_round_messages = self._split_chat_history(chat_history) |
|
|
self.current_team = state["current_team"] |
|
|
|
|
|
new_message = [ |
|
|
{ |
|
|
"role": "system", |
|
|
"content": formatted_boss_system_prompt |
|
|
}, |
|
|
{ |
|
|
"role": "user", |
|
|
"content": f""" |
|
|
Keep in mind the history of the game so far:\n |
|
|
[HISTORY]\n{formatted_history}\n[/END HISTORY]\n\n |
|
|
[CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n |
|
|
Here is the current board:\n |
|
|
Red words: {", ".join(board['red'])}\n |
|
|
Blue words: {", ".join(board['blue'])}\n |
|
|
Neutral words: {", ".join(board['neutral'])}\n |
|
|
Killer word: {board['killer']}\n\n |
|
|
Based on this board, provide a clue and a number of words that relate to that clue. |
|
|
""" |
|
|
} |
|
|
] |
|
|
|
|
|
if self.IS_HUMAN_PLAYING and boss.model_name == "Human brain": |
|
|
clue_ = state['human_clue'] |
|
|
clue_number = state['human_clue_number'] |
|
|
answer = create_fake_tool_call(clue_, clue_number) |
|
|
else: |
|
|
if boss.model_name == "claude-sonnet-4-5-20250929": |
|
|
llm_with_tools = boss.model.bind_tools(boss_agent_tools) |
|
|
else: |
|
|
llm_with_tools = boss.model.bind_tools(boss_agent_tools, tool_choice="ChooseWord") |
|
|
answer = await llm_with_tools.ainvoke(new_message) |
|
|
|
|
|
logger.info(f"[RED BOSS ANSWER]: {answer}") |
|
|
|
|
|
chat_entry, clue, clue_number, _ = self._format_chat_entry(boss, answer) |
|
|
|
|
|
return { |
|
|
"messages": [answer], |
|
|
"current_role": "captain", |
|
|
"chat_history": state.get("chat_history", []) + [chat_entry], |
|
|
"clue": clue, |
|
|
"clue_number": clue_number, |
|
|
} |
|
|
|
|
|
async def red_captain(self, state: State): |
|
|
"""Red team captain coordinates guessing""" |
|
|
|
|
|
logger.info("°°°"*50) |
|
|
logger.info(f"[RED CAPTAIN] State clue: {state['clue']}") |
|
|
captain = next((p for p in state["players"] if p.team == "red" and p.role == "captain"), None) |
|
|
agents = [p for p in state["players"] if p.team == "red" and p.role == "player"] |
|
|
if not captain: |
|
|
return state |
|
|
|
|
|
board = state["board"] |
|
|
|
|
|
available_words = ( |
|
|
board["red"] + |
|
|
board["blue"] + |
|
|
board["neutral"] + |
|
|
[board["killer"]] |
|
|
) |
|
|
|
|
|
random.shuffle(available_words) |
|
|
|
|
|
logger.info(f"AVAILABLE WORDS: {available_words}") |
|
|
chat_history = state["chat_history"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
formatted_history, current_round_messages = self._split_chat_history(chat_history) |
|
|
|
|
|
team_name = state["current_team"].upper() |
|
|
formatted_captain_system_prompt = captain_agent_system_prompt.format( |
|
|
team_name, |
|
|
agents[0].name, |
|
|
agents[1].name |
|
|
) |
|
|
|
|
|
new_message = [ |
|
|
{ |
|
|
"role": "system", |
|
|
"content": formatted_captain_system_prompt |
|
|
}, |
|
|
{ |
|
|
"role": "user", |
|
|
"content": f""" |
|
|
Consider the history of the game so far:\n[HISTORY]\n{formatted_history}\n[/END HISTORY]\n\n |
|
|
[CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n |
|
|
Here is the list of words on the board:\n{', '.join(available_words)}\n\n |
|
|
This is what your boss said: '{state['clue']}' {state['clue_number']}, suggest which words to guess. |
|
|
""" |
|
|
} |
|
|
] |
|
|
|
|
|
llm_with_tools = captain.model.bind_tools( |
|
|
captain_agent_tools, |
|
|
) |
|
|
|
|
|
if state['red_captain_is_called_counter'] > 4: |
|
|
logger.info("Creating fake answer to stop loop") |
|
|
answer = create_agent_fake_tool_call() |
|
|
else: |
|
|
logger.info(f"red_captain_is_called_counter: {state['red_captain_is_called_counter']}") |
|
|
answer = await llm_with_tools.ainvoke(new_message) |
|
|
|
|
|
logger.info(f"[RED CAPTAIN ANSWER]: {answer}") |
|
|
logger.info("°°°"*50) |
|
|
chat_entry, _, _, guesses = self._format_chat_entry(captain, answer) |
|
|
|
|
|
return { |
|
|
"messages": [answer], |
|
|
"chat_history": state.get("chat_history", []) + [chat_entry], |
|
|
"guesses": guesses |
|
|
} |
|
|
|
|
|
async def red_agent_1(self, state: State): |
|
|
"""Red team agent 1 discusses the clue""" |
|
|
|
|
|
logger.info("---"*50) |
|
|
logger.info("[RED AGENT 1]") |
|
|
logger.info("MESSAGES") |
|
|
logger.info(state['messages']) |
|
|
logger.info("---"*20) |
|
|
|
|
|
chat_history = state["chat_history"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
formatted_history, current_round_messages = self._split_chat_history(chat_history) |
|
|
|
|
|
captain = next((p for p in state["players"] if p.team == "red" and p.role == "captain"), None) |
|
|
agents = [p for p in state["players"] if p.team == "red" and p.role == "player"] |
|
|
if not agents: |
|
|
return state |
|
|
|
|
|
agent = agents[0] |
|
|
|
|
|
board = state["board"] |
|
|
|
|
|
available_words = ( |
|
|
board["red"] + |
|
|
board["blue"] + |
|
|
board["neutral"] + |
|
|
[board["killer"]] |
|
|
) |
|
|
|
|
|
random.shuffle(available_words) |
|
|
|
|
|
team_name = state["current_team"].upper() |
|
|
formatted_player_system_prompt = player_agent_system_prompt.format( |
|
|
team_name, |
|
|
captain.name, |
|
|
agents[1].name |
|
|
) |
|
|
|
|
|
new_message = [ |
|
|
{ |
|
|
"role": "system", |
|
|
"content": formatted_player_system_prompt |
|
|
}, |
|
|
{ |
|
|
"role": "user", |
|
|
"content": f""" |
|
|
[HISTORY]\n{formatted_history}\n[END HISTORY]\n\n |
|
|
[CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n |
|
|
Available words: {', '.join(available_words)} |
|
|
""" |
|
|
} |
|
|
] |
|
|
|
|
|
answer = await agent.model.ainvoke(new_message) |
|
|
chat_entry, _, _, _ = self._format_chat_entry(agent, answer) |
|
|
|
|
|
logger.info(['RED AGENT 1 ANSWER']) |
|
|
logger.info(answer) |
|
|
|
|
|
return { |
|
|
"messages": [answer], |
|
|
"chat_history": state.get("chat_history", []) + [chat_entry] |
|
|
} |
|
|
|
|
|
async def red_agent_2(self, state: State): |
|
|
"""Red team agent 2 discusses the clue""" |
|
|
|
|
|
logger.info(f"[RED AGENT 2] State clue: {state['clue']}") |
|
|
chat_history = state["chat_history"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
formatted_history, current_round_messages = self._split_chat_history(chat_history) |
|
|
|
|
|
captain = next((p for p in state["players"] if p.team == "red" and p.role == "captain"), None) |
|
|
agents = [p for p in state["players"] if p.team == "red" and p.role == "player"] |
|
|
if len(agents) < 2: |
|
|
return state |
|
|
|
|
|
agent = agents[1] |
|
|
board = state["board"] |
|
|
|
|
|
available_words = ( |
|
|
board["red"] + |
|
|
board["blue"] + |
|
|
board["neutral"] + |
|
|
[board["killer"]] |
|
|
) |
|
|
|
|
|
random.shuffle(available_words) |
|
|
|
|
|
team_name = state["current_team"].upper() |
|
|
formatted_player_system_prompt = player_agent_system_prompt.format( |
|
|
team_name, |
|
|
captain.name, |
|
|
agents[0].name |
|
|
) |
|
|
|
|
|
new_message = [ |
|
|
{ |
|
|
"role": "system", |
|
|
"content": formatted_player_system_prompt |
|
|
}, |
|
|
{ |
|
|
"role": "user", |
|
|
"content": f""" |
|
|
[HISTORY]\n{formatted_history}\n[END HISTORY]\n\n |
|
|
[CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n |
|
|
Available words: {', '.join(available_words)} |
|
|
""" |
|
|
} |
|
|
] |
|
|
|
|
|
answer = await agent.model.ainvoke(new_message) |
|
|
chat_entry, _, _, _ = self._format_chat_entry(agent, answer) |
|
|
|
|
|
logger.info(['RED AGENT 2 ANSWER']) |
|
|
logger.info(answer) |
|
|
|
|
|
return { |
|
|
"messages": [answer], |
|
|
"chat_history": state.get("chat_history", []) + [chat_entry] |
|
|
} |
|
|
|
|
|
async def blue_boss(self, state: State): |
|
|
"""Blue team boss gives a clue""" |
|
|
logger.info("[BLUE BOSS MOMENT]") |
|
|
chat_history = state["chat_history"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
formatted_history, current_round_messages = self._split_chat_history(chat_history) |
|
|
|
|
|
boss = next((p for p in state["players"] if p.team == "blue" and p.role == "boss"), None) |
|
|
if not boss: |
|
|
return state |
|
|
|
|
|
board = state["board"] |
|
|
team_name = state["current_team"].upper() |
|
|
formatted_boss_system_prompt = boss_agent_system_prompt.format(team_name) |
|
|
self.current_team = state["current_team"] |
|
|
|
|
|
new_message = [ |
|
|
{ |
|
|
"role": "system", |
|
|
"content": formatted_boss_system_prompt |
|
|
}, |
|
|
{ |
|
|
"role": "user", |
|
|
"content": f""" |
|
|
Keep in mind the history of the game so far:\n |
|
|
[HISTORY]\n{formatted_history}\n[/END HISTORY]\n\n |
|
|
[CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n |
|
|
Here is the current board:\n |
|
|
Red words: {", ".join(board['red'])}\n |
|
|
Blue words: {", ".join(board['blue'])}\n |
|
|
Neutral words: {", ".join(board['neutral'])}\n |
|
|
Killer word: {board['killer']}\n\n |
|
|
Based on this board, provide a clue and a number of words that relate to that clue. |
|
|
""" |
|
|
} |
|
|
] |
|
|
|
|
|
if self.IS_HUMAN_PLAYING and boss.model_name == "Human brain": |
|
|
clue_ = state['human_clue'] |
|
|
clue_number = state['human_clue_number'] |
|
|
answer = create_fake_tool_call(clue_, clue_number) |
|
|
else: |
|
|
if boss.model_name == "claude-sonnet-4-5-20250929": |
|
|
llm_with_tools = boss.model.bind_tools(boss_agent_tools) |
|
|
else: |
|
|
llm_with_tools = boss.model.bind_tools(boss_agent_tools, tool_choice="ChooseWord") |
|
|
answer = await llm_with_tools.ainvoke(new_message) |
|
|
|
|
|
logger.info(f"[BLUE BOSS ANSWER] : {answer}") |
|
|
|
|
|
chat_entry, clue, clue_number, _ = self._format_chat_entry(boss, answer) |
|
|
|
|
|
return { |
|
|
"messages": [answer], |
|
|
"current_role": "captain", |
|
|
"chat_history": state.get("chat_history", []) + [chat_entry], |
|
|
"clue": clue, |
|
|
"clue_number": clue_number, |
|
|
} |
|
|
|
|
|
async def blue_captain(self, state: State): |
|
|
"""Blue team captain coordinates guessing""" |
|
|
|
|
|
logger.info("°°°"*50) |
|
|
logger.info(f"[BLUE CAPTAIN] State clue: {state['clue']}") |
|
|
captain = next((p for p in state["players"] if p.team == "blue" and p.role == "captain"), None) |
|
|
agents = [p for p in state["players"] if p.team == "blue" and p.role == "player"] |
|
|
if not captain: |
|
|
return state |
|
|
|
|
|
board = state["board"] |
|
|
chat_history = state["chat_history"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
formatted_history, current_round_messages = self._split_chat_history(chat_history) |
|
|
|
|
|
available_words = ( |
|
|
board["red"] + |
|
|
board["blue"] + |
|
|
board["neutral"] + |
|
|
[board["killer"]] |
|
|
) |
|
|
|
|
|
random.shuffle(available_words) |
|
|
|
|
|
team_name = state["current_team"].upper() |
|
|
formatted_captain_system_prompt = captain_agent_system_prompt.format( |
|
|
team_name, |
|
|
agents[0].name, |
|
|
agents[1].name |
|
|
) |
|
|
|
|
|
new_message = [ |
|
|
{ |
|
|
"role": "system", |
|
|
"content": formatted_captain_system_prompt |
|
|
}, |
|
|
{ |
|
|
"role": "user", |
|
|
"content": f""" |
|
|
Consider the history of the game so far:\n[HISTORY]\n{formatted_history}\n[/END HISTORY]\n\n |
|
|
[CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n |
|
|
Here is the list of words on the board:\n{', '.join(available_words)}\n\n |
|
|
This is what your boss said: '{state['clue']}' {state['clue_number']}, suggest which words to guess. |
|
|
""" |
|
|
} |
|
|
] |
|
|
|
|
|
llm_with_tools = captain.model.bind_tools( |
|
|
captain_agent_tools, |
|
|
) |
|
|
|
|
|
if state['blue_captain_is_called_counter'] > 4: |
|
|
logger.info("Creating fake answer to stop loop") |
|
|
answer = create_agent_fake_tool_call() |
|
|
else: |
|
|
logger.info(f"blue_captain_is_called_counter: {state['blue_captain_is_called_counter']}") |
|
|
answer = await llm_with_tools.ainvoke(new_message) |
|
|
|
|
|
logger.info(f"[BLUE CAPTAIN ANSWER] : {answer}") |
|
|
logger.info("°°°"*50) |
|
|
chat_entry, _, _, guesses = self._format_chat_entry(captain, answer) |
|
|
|
|
|
return { |
|
|
"messages": [answer], |
|
|
"chat_history": state.get("chat_history", []) + [chat_entry], |
|
|
"guesses": guesses |
|
|
} |
|
|
|
|
|
async def blue_agent_1(self, state: State): |
|
|
"""Blue team agent 1 discusses the clue""" |
|
|
logger.info("---"*50) |
|
|
logger.info("[BLUE AGENT 1]") |
|
|
chat_history = state["chat_history"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
formatted_history, current_round_messages = self._split_chat_history(chat_history) |
|
|
|
|
|
captain = next((p for p in state["players"] if p.team == "blue" and p.role == "captain"), None) |
|
|
agents = [p for p in state["players"] if p.team == "blue" and p.role == "player"] |
|
|
if not agents: |
|
|
return state |
|
|
|
|
|
agent = agents[0] |
|
|
board = state["board"] |
|
|
|
|
|
available_words = ( |
|
|
board["red"] + |
|
|
board["blue"] + |
|
|
board["neutral"] + |
|
|
[board["killer"]] |
|
|
) |
|
|
|
|
|
random.shuffle(available_words) |
|
|
|
|
|
team_name = state["current_team"].upper() |
|
|
formatted_player_system_prompt = player_agent_system_prompt.format( |
|
|
team_name, |
|
|
captain.name, |
|
|
agents[1].name |
|
|
) |
|
|
|
|
|
new_message = [ |
|
|
{ |
|
|
"role": "system", |
|
|
"content": formatted_player_system_prompt |
|
|
}, |
|
|
{ |
|
|
"role": "user", |
|
|
"content": f""" |
|
|
[HISTORY]\n{formatted_history}\n[END HISTORY]\n\n |
|
|
[CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n |
|
|
Available words: {', '.join(available_words)} |
|
|
""" |
|
|
} |
|
|
] |
|
|
|
|
|
answer = await agent.model.ainvoke(new_message) |
|
|
|
|
|
logger.info(['BLUE AGENT 1 ANSWER']) |
|
|
logger.info(answer) |
|
|
chat_entry, _, _, _ = self._format_chat_entry(agent, answer) |
|
|
|
|
|
return { |
|
|
"messages": [answer], |
|
|
"chat_history": state.get("chat_history", []) + [chat_entry] |
|
|
} |
|
|
|
|
|
async def blue_agent_2(self, state: State): |
|
|
"""Blue team agent 2 discusses the clue""" |
|
|
logger.info("---"*50) |
|
|
logger.info("[BLUE AGENT 2]") |
|
|
chat_history = state["chat_history"] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
formatted_history, current_round_messages = self._split_chat_history(chat_history) |
|
|
|
|
|
captain = next((p for p in state["players"] if p.team == "blue" and p.role == "captain"), None) |
|
|
agents = [p for p in state["players"] if p.team == "blue" and p.role == "player"] |
|
|
if not agents: |
|
|
return state |
|
|
|
|
|
agent = agents[1] |
|
|
board = state["board"] |
|
|
|
|
|
available_words = ( |
|
|
board["red"] + |
|
|
board["blue"] + |
|
|
board["neutral"] + |
|
|
[board["killer"]] |
|
|
) |
|
|
|
|
|
random.shuffle(available_words) |
|
|
|
|
|
team_name = state["current_team"].upper() |
|
|
formatted_player_system_prompt = player_agent_system_prompt.format( |
|
|
team_name, |
|
|
captain.name, |
|
|
agents[0].name |
|
|
) |
|
|
|
|
|
new_message = [ |
|
|
{ |
|
|
"role": "system", |
|
|
"content": formatted_player_system_prompt |
|
|
}, |
|
|
{ |
|
|
"role": "user", |
|
|
"content": f""" |
|
|
[HISTORY]\n{formatted_history}\n[END HISTORY]\n\n |
|
|
[CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n |
|
|
Available words: {', '.join(available_words)} |
|
|
""" |
|
|
} |
|
|
] |
|
|
|
|
|
answer = await agent.model.ainvoke(new_message) |
|
|
|
|
|
logger.info(['BLUE AGENT 2 ANSWER']) |
|
|
logger.info(answer) |
|
|
chat_entry, _, _, _ = self._format_chat_entry(agent, answer) |
|
|
|
|
|
return { |
|
|
"messages": [answer], |
|
|
"chat_history": state.get("chat_history", []) + [chat_entry] |
|
|
} |
|
|
|
|
|
def update_turn(self, state: State): |
|
|
"""Update turn counter""" |
|
|
turn = state.get("turn", 1) |
|
|
|
|
|
return { |
|
|
"turn": turn+1 |
|
|
} |
|
|
|
|
|
def red_boss_is_called(self, state: State): |
|
|
"""Update turn counter""" |
|
|
_counter = state.get("red_boss_is_called_counter", 1) |
|
|
|
|
|
return { |
|
|
"red_boss_is_called_counter": _counter+1 |
|
|
} |
|
|
|
|
|
def blue_boss_is_called(self, state: State): |
|
|
"""Update turn counter""" |
|
|
_counter = state.get("blue_boss_is_called_counter", 1) |
|
|
|
|
|
return { |
|
|
"blue_boss_is_called_counter": _counter+1 |
|
|
} |
|
|
|
|
|
def red_captain_is_called(self, state: State): |
|
|
"""Update turn counter""" |
|
|
_counter = state.get("red_captain_is_called_counter", 1) |
|
|
|
|
|
return { |
|
|
"red_captain_is_called_counter": _counter+1 |
|
|
} |
|
|
|
|
|
def blue_captain_is_called(self, state: State): |
|
|
"""Update turn counter""" |
|
|
_counter = state.get("blue_captain_is_called_counter", 1) |
|
|
|
|
|
return { |
|
|
"blue_captain_is_called_counter": _counter+1 |
|
|
} |
|
|
|
|
|
def judge(self, state: State): |
|
|
"""Evaluate guesses, update board, check win/lose conditions.""" |
|
|
|
|
|
|
|
|
def check_win_condition(): |
|
|
"""Returns (is_game_over, winner, win_message) tuple""" |
|
|
red_remaining = len(board.get("red", [])) |
|
|
blue_remaining = len(board.get("blue", [])) |
|
|
|
|
|
if red_remaining == 0: |
|
|
return True, "red", (red_remaining, blue_remaining) |
|
|
|
|
|
if blue_remaining == 0: |
|
|
return True, "blue", (red_remaining, blue_remaining) |
|
|
|
|
|
return False, None, None |
|
|
|
|
|
|
|
|
def create_multi_message_state(messages_content_list, next_team=None, switch_role=False, winner_and_score=None): |
|
|
"""Creates state with multiple messages""" |
|
|
messages = [] |
|
|
chat_entries = [] |
|
|
|
|
|
for content, title in messages_content_list: |
|
|
message = AIMessage( |
|
|
content=content, |
|
|
metadata={"title": title, "sender": "judge"} |
|
|
) |
|
|
messages.append(message) |
|
|
|
|
|
if title == "⚖️ Judge Decision": |
|
|
info = "chat_history" |
|
|
else: |
|
|
info = None |
|
|
|
|
|
chat_entry, _, _, _ = self._format_chat_entry(None, message, info=info) |
|
|
chat_entries.append(chat_entry) |
|
|
|
|
|
logger.info("****" * 50) |
|
|
|
|
|
filtered_chat = self._filter_important_messages(state.get("chat_history", [])) |
|
|
logger.info("**"*50) |
|
|
logger.info("FILTERED CHAT") |
|
|
logger.info(filtered_chat) |
|
|
logger.info("**"*50) |
|
|
|
|
|
end_state = { |
|
|
"messages": messages, |
|
|
"chat_history": filtered_chat + chat_entries, |
|
|
"board": board, |
|
|
"guesses": [], |
|
|
"history_guessed_words": history_guessed_words, |
|
|
"teams_reviewed": teams_reviewed, |
|
|
"end_round": end_round, |
|
|
"winner_and_score": winner_and_score |
|
|
} |
|
|
|
|
|
if next_team: |
|
|
end_state.update({ |
|
|
"current_team": next_team, |
|
|
"current_role": "boss" if switch_role else state.get("current_role"), |
|
|
"turn": state.get("turn", 1) + 1, |
|
|
"next_team": next_team, |
|
|
}) |
|
|
|
|
|
return end_state |
|
|
|
|
|
|
|
|
def create_turn_end_messages(results_list, next_team): |
|
|
"""Creates separate messages for results and turn transition""" |
|
|
team_emoji = "🔵" if next_team == "blue" else "🔴" |
|
|
results_text = "\n".join(results_list) |
|
|
|
|
|
red_remaining = len(board.get("red", [])) |
|
|
blue_remaining = len(board.get("blue", [])) |
|
|
|
|
|
|
|
|
results_message = results_text |
|
|
|
|
|
|
|
|
transition_message = ( |
|
|
f"🔄 **TURN COMPLETE**\n\n" |
|
|
f"{team_emoji} **{next_team.upper()} TEAM'S TURN** now begins!\n\n" |
|
|
f"**Remaining Words:**\n" |
|
|
f"🔴 Red: {red_remaining}\n" |
|
|
f"🔵 Blue: {blue_remaining}" |
|
|
) |
|
|
return [ |
|
|
(results_message, "⚖️ Judge Decision"), |
|
|
(transition_message, "🔄 Turn Transition") |
|
|
] |
|
|
|
|
|
|
|
|
def create_round_end_messages(results_list, next_team): |
|
|
"""Creates separate messages for results and round transition""" |
|
|
team_emoji = "🔵" if next_team == "blue" else "🔴" |
|
|
results_text = "\n".join(results_list) |
|
|
|
|
|
red_remaining = len(board.get("red", [])) |
|
|
blue_remaining = len(board.get("blue", [])) |
|
|
|
|
|
|
|
|
results_message = results_text |
|
|
|
|
|
|
|
|
transition_message = ( |
|
|
f"🎯 **ROUND COMPLETE!**\n\n" |
|
|
f"Both teams have played their turn.\n\n" |
|
|
f"{team_emoji} **{next_team.upper()} TEAM** starts the next round!\n\n" |
|
|
f"**Score Update:**\n" |
|
|
f"🔴 Red Team: {red_remaining} words remaining\n" |
|
|
f"🔵 Blue Team: {blue_remaining} words remaining\n\n" |
|
|
f"Let's keep going! 💪" |
|
|
) |
|
|
|
|
|
return [ |
|
|
(results_message, "⚖️ Judge Decision"), |
|
|
(transition_message, "🎯 Round Complete") |
|
|
] |
|
|
|
|
|
|
|
|
def create_game_over_messages(results_list, winner, scores, reason="normal"): |
|
|
"""Creates separate messages for results and game over""" |
|
|
results_text = "\n".join(results_list) |
|
|
red_remaining, blue_remaining = scores |
|
|
winner_emoji = "🔴" if winner == "red" else "🔵" |
|
|
|
|
|
|
|
|
results_message = results_text |
|
|
|
|
|
|
|
|
if reason == "killer": |
|
|
loser = "red" if winner == "blue" else "blue" |
|
|
game_over_message = ( |
|
|
f"🏆 **GAME OVER!**\n\n" |
|
|
f"💀 {loser.upper()} team hit the KILLER WORD!\n\n" |
|
|
f"{winner_emoji} **{winner.upper()} TEAM WINS!** 🎉\n\n" |
|
|
f"**Final Score:**\n" |
|
|
f"🔴 Red: {red_remaining} words remaining\n" |
|
|
f"🔵 Blue: {blue_remaining} words remaining\n\n" |
|
|
f"Better luck next time! 😅" |
|
|
) |
|
|
else: |
|
|
game_over_message = ( |
|
|
f"🏆 **GAME OVER!**\n\n" |
|
|
f"{winner_emoji} **{winner.upper()} TEAM WINS!** 🎉\n\n" |
|
|
f"All {winner} words have been found!\n\n" |
|
|
f"**Final Score:**\n" |
|
|
f"🔴 Red: {red_remaining} words remaining\n" |
|
|
f"🔵 Blue: {blue_remaining} words remaining\n\n" |
|
|
f"Congratulations to the {winner.title()} Team! 🥳" |
|
|
) |
|
|
|
|
|
return [ |
|
|
(results_message, "⚖️ Judge Decision"), |
|
|
(game_over_message, "🏆 Game Over") |
|
|
] |
|
|
|
|
|
logger.info("IS HUMAN PLAYING") |
|
|
logger.info(self.IS_HUMAN_PLAYING) |
|
|
logger.info("****" * 50) |
|
|
logger.info("[JUDGE]") |
|
|
logger.info("CHAT HISTORY") |
|
|
logger.info(state['chat_history']) |
|
|
|
|
|
|
|
|
if state.get("turn", 0) == 0: |
|
|
logger.info("[JUDGE INITIALIZING GAME STATE]") |
|
|
team_emoji = "🔴" if state["current_team"] == "red" else "🔵" |
|
|
|
|
|
|
|
|
game_start_content = ( |
|
|
f"🎮 **CODENAMES GAME STARTED!**\n\n" |
|
|
f"{team_emoji} **{state['current_team'].upper()} TEAM** goes first!\n\n" |
|
|
f"**Game Rules:**\n" |
|
|
f"- Teams alternate turns to guess words\n" |
|
|
f"- Match words to your team's color\n" |
|
|
f"- Avoid the opponent's words, neutral words, and the killer word ☠️\n" |
|
|
f"- First team to find all their words wins!\n\n" |
|
|
f"Good luck! 🍀" |
|
|
) |
|
|
|
|
|
message = AIMessage( |
|
|
content=game_start_content, |
|
|
metadata={"title": "🎮 Game Start", "sender": "judge"} |
|
|
) |
|
|
logger.info(f"[JUDGE MESSAGE]: {message}") |
|
|
chat_entry, _, _, _ = self._format_chat_entry(None, message, info="Game start") |
|
|
filtered_chat = self._filter_important_messages(state.get("chat_history", [])) |
|
|
return { |
|
|
"messages": [message], |
|
|
"turn": 1, |
|
|
"chat_history": filtered_chat + [chat_entry], |
|
|
} |
|
|
|
|
|
|
|
|
guesses = state.get("guesses", []) |
|
|
if not guesses: |
|
|
logger.info("[JUDGE] No guesses made.") |
|
|
return state |
|
|
|
|
|
|
|
|
board = state["board"] |
|
|
self.board = board |
|
|
current_team = state["current_team"] |
|
|
other_team = "red" if current_team == "blue" else "blue" |
|
|
history_guessed_words = state.get("history_guessed_words", []) |
|
|
teams_reviewed = state.get('teams_reviewed', []) |
|
|
teams_reviewed.append(current_team) |
|
|
|
|
|
if len(set(teams_reviewed)) == 2: |
|
|
end_round = True |
|
|
teams_reviewed = [] |
|
|
else: |
|
|
end_round = False |
|
|
|
|
|
results = ["📋 **Turn Results:**"] |
|
|
logger.info(f"[JUDGE] Team {current_team.upper()} guesses: {guesses}") |
|
|
|
|
|
|
|
|
for word in guesses: |
|
|
history_guessed_words.append(word) |
|
|
self.guessed_words = history_guessed_words |
|
|
logger.info(f"[JUDGE] Evaluating word: {word}") |
|
|
|
|
|
if word == "STOP_TURN": |
|
|
results.append(f"🚧 **{word}** The {current_team.upper()} team stops the turn.") |
|
|
break |
|
|
|
|
|
|
|
|
elif word in board.get(current_team, []): |
|
|
board[current_team].remove(word) |
|
|
self.board = board |
|
|
results.append(f"✅ **{word}** - Correct! ({current_team.upper()} team word)") |
|
|
|
|
|
|
|
|
elif word in board.get(other_team, []): |
|
|
board[other_team].remove(word) |
|
|
self.board = board |
|
|
results.append(f"❌ **{word}** - Oh no! You selected a {other_team.upper()} team word!") |
|
|
|
|
|
|
|
|
is_game_over, winner, scores = check_win_condition() |
|
|
if is_game_over: |
|
|
messages_list = create_game_over_messages(results, winner, scores) |
|
|
return create_multi_message_state(messages_list, winner_and_score=(winner, scores)) |
|
|
|
|
|
|
|
|
if end_round: |
|
|
messages_list = create_round_end_messages(results, other_team) |
|
|
else: |
|
|
messages_list = create_turn_end_messages(results, other_team) |
|
|
|
|
|
return create_multi_message_state(messages_list, next_team=other_team, switch_role=True) |
|
|
|
|
|
|
|
|
elif word in board.get("neutral", []): |
|
|
board["neutral"].remove(word) |
|
|
self.board = board |
|
|
results.append(f"⚪ **{word}** - Neutral word. Turn ends!") |
|
|
|
|
|
|
|
|
if end_round: |
|
|
messages_list = create_round_end_messages(results, other_team) |
|
|
else: |
|
|
messages_list = create_turn_end_messages(results, other_team) |
|
|
|
|
|
return create_multi_message_state(messages_list, next_team=other_team, switch_role=True) |
|
|
|
|
|
|
|
|
elif word == board.get("killer"): |
|
|
results.append(f"☠️ **{word}** - KILLER WORD! 💀") |
|
|
|
|
|
winner = other_team |
|
|
board["killer"] = None |
|
|
self.board = board |
|
|
|
|
|
red_remaining = len(board.get("red", [])) |
|
|
blue_remaining = len(board.get("blue", [])) |
|
|
scores = (red_remaining, blue_remaining) |
|
|
messages_list = create_game_over_messages( |
|
|
results, winner, scores, reason="killer" |
|
|
) |
|
|
return create_multi_message_state(messages_list, winner_and_score=(winner, scores, "killer")) |
|
|
|
|
|
|
|
|
is_game_over, winner, scores = check_win_condition() |
|
|
if is_game_over: |
|
|
messages_list = create_game_over_messages(results, winner, scores) |
|
|
return create_multi_message_state(messages_list, winner_and_score=(winner, scores)) |
|
|
|
|
|
|
|
|
if end_round: |
|
|
messages_list = create_round_end_messages(results, other_team) |
|
|
else: |
|
|
messages_list = create_turn_end_messages(results, other_team) |
|
|
|
|
|
return create_multi_message_state(messages_list, next_team=other_team, switch_role=True) |
|
|
|
|
|
def route_after_judge(self, state: State) -> str: |
|
|
"""Route to the appropriate team or end the game.""" |
|
|
logger.info("***" * 50) |
|
|
logger.info(f"HUMAN PLAYING? {self.IS_HUMAN_PLAYING}") |
|
|
logger.info("***" * 50) |
|
|
|
|
|
logger.info("\n\n") |
|
|
logger.info("***" * 50) |
|
|
logger.info("[ROUTING AFTER JUDGE]") |
|
|
|
|
|
board = state.get("board", {}) |
|
|
logger.info("BOARD ROUTING") |
|
|
logger.info(board) |
|
|
|
|
|
self.board = board |
|
|
self.chat_history = state['chat_history'] |
|
|
self.winners = state['winner_and_score'] |
|
|
|
|
|
|
|
|
if board["killer"] is None: |
|
|
logger.info("KILLER HIT, END GAME") |
|
|
logger.info("***" * 50) |
|
|
return END |
|
|
|
|
|
if not all(board.get(key) for key in ("red", "blue")): |
|
|
logger.info("END GAME") |
|
|
logger.info("***" * 50) |
|
|
return END |
|
|
|
|
|
|
|
|
|
|
|
if state['end_round']: |
|
|
logger.info("ROUND ENDS") |
|
|
logger.info("***" * 50) |
|
|
return END |
|
|
|
|
|
next_team = state.get("next_team") or state.get("current_team", "red") |
|
|
return f"{next_team}_team" |
|
|
|
|
|
def boss_choice(self, state: State) -> str: |
|
|
"""Determine whether to continue to tools or return a final answer.""" |
|
|
logger.info("###"*50) |
|
|
logger.info("[BOSS CHOICE]") |
|
|
messages = state["messages"] |
|
|
last_message = messages[-1] |
|
|
|
|
|
if hasattr(last_message, "tool_calls") and last_message.tool_calls: |
|
|
logger.info("[TOOL CALL]") |
|
|
tool_name = last_message.tool_calls[0]["name"] |
|
|
logger.info(tool_name) |
|
|
|
|
|
if tool_name == "ChooseWord": |
|
|
return "choose_word_tool" |
|
|
else: |
|
|
logger.warning("[WARNING] NESSUN TOOL E' STATO RICHIAMATO, TORNIAMO DAL BOSS") |
|
|
current_team = state['current_team'] |
|
|
logger.info(f"[CURRENT TEAM: {current_team}]") |
|
|
return f"{current_team}_boss_is_called" |
|
|
|
|
|
def should_continue(self, state: State) -> str: |
|
|
"""Determine whether to continue to tools or return a final answer.""" |
|
|
logger.info("###"*50) |
|
|
logger.info("[SHOULD CONTINUE]") |
|
|
messages = state["messages"] |
|
|
last_message = messages[-1] |
|
|
|
|
|
if hasattr(last_message, "tool_calls") and last_message.tool_calls: |
|
|
logger.info("[TOOL CALL]") |
|
|
tool_name = last_message.tool_calls[0]["name"] |
|
|
logger.info(tool_name) |
|
|
|
|
|
if tool_name == "Call_Agent_1": |
|
|
return "transfer_to_agent_1" |
|
|
elif tool_name == "Call_Agent_2": |
|
|
return "transfer_to_agent_2" |
|
|
elif tool_name == "TeamFinalChoice": |
|
|
return "final_choice" |
|
|
|
|
|
else: |
|
|
logger.warning("[WARNING] NESSUN TOOL E' STATO RICHIAMATO, TORNIAMO DAL CAPITANO") |
|
|
current_team = state['current_team'] |
|
|
logger.info(f"[CURRENT TEAM: {current_team}]") |
|
|
return f"{current_team}_captain_is_called" |
|
|
|
|
|
def _convert_to_string(self, value): |
|
|
"""Helper to convert any value to string""" |
|
|
if isinstance(value, str): |
|
|
return value |
|
|
elif isinstance(value, (dict, list)): |
|
|
return str(value) |
|
|
else: |
|
|
return str(value) |
|
|
|
|
|
def _filter_important_messages(self, chat_history): |
|
|
""" |
|
|
Filter chat history to keep only important messages: |
|
|
- Boss choices (ChooseWord) |
|
|
- Captain choices (TeamFinalChoice) |
|
|
- All Judge messages |
|
|
""" |
|
|
return [ |
|
|
entry for entry in chat_history |
|
|
if entry.get("tool_name") in ["ChooseWord", "TeamFinalChoice"] |
|
|
or entry.get("sender_type") == "judge" and entry.get("info") == "chat_history" |
|
|
] |
|
|
|
|
|
def _format_chat_entry(self, player, message, info=None): |
|
|
"""Helper to create a structured chat history entry""" |
|
|
tool_name = None |
|
|
clue = None |
|
|
clue_number = None |
|
|
guesses = [] |
|
|
|
|
|
|
|
|
if hasattr(message, 'tool_calls') and message.tool_calls: |
|
|
tool_call = message.tool_calls[0] |
|
|
tool_name = tool_call.get('name', '') |
|
|
args = tool_call.get('args', {}) |
|
|
|
|
|
if tool_name == 'Call_Agent_1': |
|
|
content = f"Called Agent 1: {self._convert_to_string(args.get('message', 'N/A'))}" |
|
|
elif tool_name == 'Call_Agent_2': |
|
|
content = f"Called Agent 2: {self._convert_to_string(args.get('message', 'N/A'))}" |
|
|
elif tool_name == 'ChooseWord': |
|
|
clue = self._convert_to_string(args.get('clue', 'N/A')) |
|
|
clue_number = args.get('clue_number', 'N/A') |
|
|
content = f"Clue: '{clue.upper()}' for {clue_number} word(s)" |
|
|
elif tool_name == 'TeamFinalChoice': |
|
|
guesses = args.get('guesses', []) |
|
|
content = f"Final choices: {', '.join(guesses)}" |
|
|
else: |
|
|
content = f"Used tool {tool_name}: {args}" |
|
|
elif hasattr(message, 'content') and message.content: |
|
|
content = message.content |
|
|
elif hasattr(message, 'text') and message.text: |
|
|
content = message.text |
|
|
else: |
|
|
content = str(message) |
|
|
|
|
|
|
|
|
if player is None: |
|
|
return { |
|
|
"sender_type": "judge", |
|
|
"team": "", |
|
|
"role": "", |
|
|
"name": "JUDGE", |
|
|
"tool_name": None, |
|
|
"content": content, |
|
|
"info": info, |
|
|
}, clue, clue_number, guesses |
|
|
|
|
|
return { |
|
|
"sender_type": "player", |
|
|
"team": player.team, |
|
|
"role": player.role, |
|
|
"name": player.name, |
|
|
"tool_name": tool_name, |
|
|
"content": content, |
|
|
"info": info, |
|
|
}, clue, clue_number, guesses |
|
|
|
|
|
def _split_chat_history(self, chat_history): |
|
|
""" |
|
|
Splits chat_history into two parts: |
|
|
- formatted_history: all entries up to and including the last one with info == "chat_history" |
|
|
- current_round: all entries after that |
|
|
|
|
|
Both are formatted strings, where each line is: |
|
|
[name (team role)]: content |
|
|
""" |
|
|
|
|
|
last_index = None |
|
|
|
|
|
|
|
|
for i, entry in enumerate(chat_history): |
|
|
if entry.get("info") == "chat_history": |
|
|
last_index = i |
|
|
|
|
|
|
|
|
def format_entry(entry): |
|
|
name = entry.get("name", "Unknown") |
|
|
team = entry.get("team", "N/A") |
|
|
role = entry.get("role", "N/A") |
|
|
content = entry.get("content", "") |
|
|
return f"[{name} ({team} {role})]: {content}" |
|
|
|
|
|
|
|
|
if last_index is None: |
|
|
formatted_history = "" |
|
|
current_round = "\n".join(format_entry(e) for e in chat_history) |
|
|
else: |
|
|
formatted_history = "\n".join( |
|
|
format_entry(e) for e in chat_history[:last_index + 1] |
|
|
) |
|
|
current_round = "\n".join( |
|
|
format_entry(e) for e in chat_history[last_index + 1:] |
|
|
) |
|
|
|
|
|
return formatted_history, current_round |
|
|
|
|
|
async def stream_graph(self, input_message, messages, players, board, dropdown_clue_number, guessed_words, chat_history, is_human_playing, turn_): |
|
|
"""Stream the graph execution.""" |
|
|
|
|
|
logger.info(f"Human Clue: {input_message}") |
|
|
logger.info(f"Human Clue number: {dropdown_clue_number}") |
|
|
logger.info(f"Board: {board}") |
|
|
logger.info(f"Starting team: {board['starting_team']}") |
|
|
logger.info(f"Turn: {turn_}") |
|
|
|
|
|
self.IS_HUMAN_PLAYING = is_human_playing |
|
|
self.board = board |
|
|
inputs = { |
|
|
"messages": [HumanMessage(content=input_message)] if input_message else [], |
|
|
"original_board": board, |
|
|
"board": board, |
|
|
"players": players, |
|
|
"current_team": board.get("starting_team", "red"), |
|
|
"current_role": "boss", |
|
|
"turn": turn_, |
|
|
"last_user_message": input_message or "", |
|
|
"guesses": [], |
|
|
"clue": None, |
|
|
"clue_number": None, |
|
|
"round_messages": [], |
|
|
"chat_history": chat_history, |
|
|
"human_clue": input_message, |
|
|
"human_clue_number": dropdown_clue_number, |
|
|
"teams_reviewed": [], |
|
|
"history_guessed_words": guessed_words, |
|
|
"end_round": False, |
|
|
"winner_and_score": None, |
|
|
"red_boss_is_called_counter": 0, |
|
|
"blue_boss_is_called_counter": 0, |
|
|
"red_captain_is_called_counter": 0, |
|
|
"blue_captain_is_called_counter": 0, |
|
|
} |
|
|
|
|
|
final_msg = "" |
|
|
messages = list(messages) if messages else [] |
|
|
previous_message_is_reasoning = False |
|
|
latest_lang_node = "" |
|
|
current_sender = "" |
|
|
last_message_sender = None |
|
|
last_reasoning_index = None |
|
|
|
|
|
async for chunk in self.graph.astream( |
|
|
inputs, {"recursion_limit": 100}, |
|
|
stream_mode=["messages", "updates"], |
|
|
subgraphs=True, |
|
|
): |
|
|
if chunk[1] == "messages": |
|
|
msg = chunk[2][0] |
|
|
lang_node = chunk[2][1]['langgraph_node'] |
|
|
if latest_lang_node != lang_node: |
|
|
latest_lang_node = lang_node |
|
|
current_sender = lang_node |
|
|
final_msg = "" |
|
|
last_message_sender = None |
|
|
last_reasoning_index = None |
|
|
else: |
|
|
agent_update = chunk[2] |
|
|
msg_from = next(iter(agent_update.keys())) |
|
|
|
|
|
if msg_from in [ |
|
|
'red_boss', 'red_captain', 'red_agent_1', 'red_agent_2', |
|
|
'blue_boss', 'blue_captain', 'blue_agent_1', 'blue_agent_2', |
|
|
'judge', 'final_choice' |
|
|
]: |
|
|
current_sender = msg_from |
|
|
msg = None |
|
|
else: |
|
|
try: |
|
|
msg = next(iter(agent_update.values()))['messages'][-1] |
|
|
except Exception as e: |
|
|
logger.error(f"[EXCEPTION]: {e}") |
|
|
logger.error(f"[CHUNK]: {chunk}") |
|
|
logger.error(f"[Agent update]: {agent_update}") |
|
|
msg = None |
|
|
|
|
|
|
|
|
|
|
|
if isinstance(msg, AIMessage) and msg.tool_calls: |
|
|
final_msg = "" |
|
|
last_message_sender = None |
|
|
last_reasoning_index = None |
|
|
yield messages, self.guessed_words, self.board, self.chat_history, self.winners |
|
|
|
|
|
elif isinstance(msg, ToolMessage): |
|
|
logger.info(f"[TOOL MESSAGE: {msg}]") |
|
|
tool_name = msg.name |
|
|
if tool_name == "TeamFinalChoice": |
|
|
try: |
|
|
command_data = json.loads(msg.content) |
|
|
update_data = command_data.get("update", {}) |
|
|
|
|
|
guesses = update_data.get("guesses") |
|
|
|
|
|
new_message = ChatMessage( |
|
|
role="assistant", |
|
|
content="I made my final choices: " + ", ".join(guesses), |
|
|
metadata={ |
|
|
"title": "🧠 Guesses", |
|
|
"sender": f"{self.current_team}_captain" |
|
|
} |
|
|
) |
|
|
except json.JSONDecodeError as e: |
|
|
logger.error(f"Error parsing tool message: {e}") |
|
|
new_message = ChatMessage( |
|
|
role="assistant", |
|
|
content=msg.content, |
|
|
metadata={ |
|
|
"title": "🧠 Guesses", |
|
|
"sender": f"{self.current_team}_captain" |
|
|
} |
|
|
) |
|
|
elif tool_name == "ChooseWord": |
|
|
try: |
|
|
command_data = json.loads(msg.content) |
|
|
update_data = command_data.get("update", {}) |
|
|
|
|
|
clue = update_data.get("clue") |
|
|
clue_number = update_data.get("clue_number") |
|
|
|
|
|
logger.info(f"Clue: {clue}, Clue Number: {clue_number}") |
|
|
|
|
|
new_message = ChatMessage( |
|
|
role="assistant", |
|
|
content=f"{clue}, {clue_number}", |
|
|
metadata={ |
|
|
"title": "🕵️♂️ Clue", |
|
|
"sender": f"{self.current_team}_boss" |
|
|
} |
|
|
) |
|
|
except json.JSONDecodeError as e: |
|
|
logger.error(f"Error parsing tool message: {e}") |
|
|
new_message = ChatMessage( |
|
|
role="assistant", |
|
|
content=msg.content, |
|
|
metadata={ |
|
|
"title": "🕵️♂️ Clue", |
|
|
"sender": f"{self.current_team}_boss" |
|
|
} |
|
|
) |
|
|
elif tool_name == "Call_Agent_1" or tool_name == "Call_Agent_2": |
|
|
if tool_name == "Call_Agent_1": |
|
|
title_ = "💭 Asking opinion of Agent 1" |
|
|
else: |
|
|
title_ = "💭 Asking opinion of Agent 2" |
|
|
|
|
|
try: |
|
|
command_data = json.loads(msg.content) |
|
|
update_data = command_data.get("update", {}) |
|
|
|
|
|
message = update_data.get("message") |
|
|
|
|
|
new_message = ChatMessage( |
|
|
role="assistant", |
|
|
content=message, |
|
|
metadata={ |
|
|
"title": title_, |
|
|
"sender": f"{self.current_team}_captain" |
|
|
} |
|
|
) |
|
|
except json.JSONDecodeError as e: |
|
|
logger.error(f"Error parsing tool message: {e}") |
|
|
new_message = ChatMessage( |
|
|
role="assistant", |
|
|
content=msg.content, |
|
|
metadata={ |
|
|
"title": title_, |
|
|
"sender": f"{self.current_team}_captain" |
|
|
} |
|
|
) |
|
|
else: |
|
|
new_message = ChatMessage( |
|
|
role="assistant", |
|
|
content=msg.content, |
|
|
metadata={ |
|
|
"title": f"""🛠️ {tool_name}""", |
|
|
"sender": current_sender |
|
|
} |
|
|
) |
|
|
|
|
|
if not messages or messages[-1] != new_message: |
|
|
messages.append(new_message) |
|
|
last_message_sender = new_message.metadata.get("sender") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
final_msg = "" |
|
|
yield messages, self.guessed_words, self.board, self.chat_history, self.winners |
|
|
|
|
|
elif isinstance(msg, AIMessageChunk): |
|
|
|
|
|
|
|
|
model_provider = msg.response_metadata.get('model_provider') |
|
|
if (model_provider in ["openai", "google_genai", "anthropic"] and isinstance(msg.content, list)): |
|
|
for item in msg.content: |
|
|
if not isinstance(item, dict): |
|
|
continue |
|
|
|
|
|
item_type = item.get('type') |
|
|
|
|
|
|
|
|
if item_type == 'reasoning': |
|
|
reasoning_text = "" |
|
|
summary = item.get('summary', []) |
|
|
|
|
|
for summary_item in summary: |
|
|
if isinstance(summary_item, dict) and summary_item.get('type') == 'summary_text': |
|
|
text = summary_item.get('text', '') |
|
|
item_index = summary_item.get('index') |
|
|
if text: |
|
|
reasoning_text += text |
|
|
|
|
|
if reasoning_text: |
|
|
|
|
|
if last_reasoning_index is not None and item_index != last_reasoning_index: |
|
|
reasoning_text = "\n\n" + reasoning_text |
|
|
|
|
|
last_reasoning_index = item_index |
|
|
|
|
|
|
|
|
if final_msg == "" or last_message_sender != current_sender or not previous_message_is_reasoning: |
|
|
final_msg = reasoning_text |
|
|
messages.append(ChatMessage( |
|
|
role="assistant", |
|
|
content=final_msg, |
|
|
metadata={ |
|
|
"title": "🧠 Thinking...", |
|
|
"sender": current_sender |
|
|
} |
|
|
)) |
|
|
last_message_sender = current_sender |
|
|
else: |
|
|
final_msg += reasoning_text |
|
|
if len(messages) > 0: |
|
|
messages[-1].content = final_msg |
|
|
|
|
|
previous_message_is_reasoning = True |
|
|
yield messages, self.guessed_words, self.board, self.chat_history, self.winners |
|
|
|
|
|
|
|
|
elif item_type == 'thinking': |
|
|
|
|
|
reasoning_text = item.get('thinking', []) |
|
|
|
|
|
if reasoning_text: |
|
|
|
|
|
|
|
|
if final_msg == "" or last_message_sender != current_sender or not previous_message_is_reasoning: |
|
|
final_msg = reasoning_text |
|
|
messages.append(ChatMessage( |
|
|
role="assistant", |
|
|
content=final_msg, |
|
|
metadata={ |
|
|
"title": "🧠 Thinking...", |
|
|
"sender": current_sender |
|
|
} |
|
|
)) |
|
|
last_message_sender = current_sender |
|
|
else: |
|
|
final_msg += reasoning_text |
|
|
if len(messages) > 0: |
|
|
messages[-1].content = final_msg |
|
|
|
|
|
previous_message_is_reasoning = True |
|
|
yield messages, self.guessed_words, self.board, self.chat_history, self.winners |
|
|
|
|
|
|
|
|
elif item_type == 'text': |
|
|
text_content = item.get('text', '') |
|
|
|
|
|
if text_content: |
|
|
|
|
|
if previous_message_is_reasoning: |
|
|
final_msg = "" |
|
|
previous_message_is_reasoning = False |
|
|
last_message_sender = None |
|
|
last_reasoning_index = None |
|
|
|
|
|
|
|
|
if final_msg == "" or last_message_sender != current_sender: |
|
|
final_msg = text_content |
|
|
messages.append(ChatMessage( |
|
|
role="assistant", |
|
|
content=final_msg, |
|
|
metadata={"sender": current_sender} |
|
|
)) |
|
|
last_message_sender = current_sender |
|
|
else: |
|
|
|
|
|
final_msg += text_content |
|
|
messages[-1].content = final_msg |
|
|
|
|
|
yield messages, self.guessed_words, self.board, self.chat_history, self.winners |
|
|
|
|
|
elif msg.additional_kwargs.get('reasoning_content'): |
|
|
reasoning_text = msg.additional_kwargs.get("reasoning_content") |
|
|
if reasoning_text: |
|
|
if final_msg == "" or last_message_sender != current_sender or not previous_message_is_reasoning: |
|
|
final_msg = reasoning_text |
|
|
messages.append(ChatMessage( |
|
|
role="assistant", |
|
|
content=final_msg, |
|
|
metadata={ |
|
|
"title": "🧠 Thinking...", |
|
|
"sender": current_sender |
|
|
} |
|
|
)) |
|
|
last_message_sender = current_sender |
|
|
else: |
|
|
final_msg += reasoning_text |
|
|
if len(messages) > 0: |
|
|
messages[-1].content = final_msg |
|
|
|
|
|
previous_message_is_reasoning = True |
|
|
|
|
|
elif msg.content and isinstance(msg.content, str): |
|
|
if previous_message_is_reasoning: |
|
|
final_msg = "" |
|
|
previous_message_is_reasoning = False |
|
|
last_message_sender = None |
|
|
|
|
|
|
|
|
if final_msg == "" or last_message_sender != current_sender: |
|
|
final_msg = msg.content |
|
|
|
|
|
messages.append(ChatMessage( |
|
|
role="assistant", |
|
|
content=final_msg, |
|
|
metadata={"sender": current_sender} |
|
|
)) |
|
|
last_message_sender = current_sender |
|
|
else: |
|
|
|
|
|
final_msg += msg.content |
|
|
messages[-1].content = final_msg |
|
|
|
|
|
yield messages, self.guessed_words, self.board, self.chat_history, self.winners |
|
|
else: |
|
|
reasoning = msg.additional_kwargs.get("reasoning_content") |
|
|
if reasoning: |
|
|
|
|
|
if final_msg == "" or last_message_sender != current_sender: |
|
|
final_msg = reasoning |
|
|
messages.append(ChatMessage( |
|
|
role="assistant", |
|
|
content=final_msg, |
|
|
metadata={ |
|
|
"title": """🧠 Thinking...""", |
|
|
"sender": current_sender |
|
|
} |
|
|
)) |
|
|
last_message_sender = current_sender |
|
|
else: |
|
|
if len(messages) > 0: |
|
|
final_msg += reasoning |
|
|
messages[-1].content = final_msg |
|
|
|
|
|
previous_message_is_reasoning = True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
yield messages, self.guessed_words, self.board, self.chat_history, self.winners |
|
|
|
|
|
elif isinstance(msg, AIMessage): |
|
|
if msg.content and msg.content.strip(): |
|
|
logger.info("***"*50) |
|
|
logger.info(f"msg.content: {msg.content}") |
|
|
logger.info(f"msg: {msg}") |
|
|
|
|
|
try: |
|
|
if msg.metadata.get("sender"): |
|
|
new_sender = msg.metadata.get("sender") |
|
|
current_sender = new_sender |
|
|
except Exception as e: |
|
|
logger.error(f"[EXCEPTION]: {e} - {msg}") |
|
|
if msg.metadata.get("title"): |
|
|
new_title = msg.metadata.get("title") |
|
|
messages.append(ChatMessage( |
|
|
role="assistant", |
|
|
content=msg.content, |
|
|
metadata={ |
|
|
"sender": new_sender, |
|
|
"title": new_title |
|
|
} |
|
|
)) |
|
|
last_message_sender = current_sender |
|
|
yield messages, self.guessed_words, self.board, self.chat_history, self.winners |
|
|
|
|
|
yield messages, self.guessed_words, self.board, self.chat_history, self.winners |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|