lucadipalma
modify graph to set limits; visualization bugs
59048dd
import json
import os
import random
from gradio import ChatMessage
from langchain_core.messages import AIMessage, AIMessageChunk, HumanMessage, ToolMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
from support.log_manager import logger
from support.my_tools import captain_agent_tools, boss_agent_tools
from support.tools.boss_agent_tools import create_fake_tool_call
from support.tools.captain_agent_tools import create_agent_fake_tool_call
from support.prompts import captain_agent_system_prompt, player_agent_system_prompt, boss_agent_system_prompt
from typing import Annotated, List, Literal, Optional
from typing_extensions import TypedDict
class State(TypedDict):
messages: Annotated[list, add_messages]
chat_history: List[str]
round_messages: Annotated[list, add_messages]
original_board: dict
board: dict
players: List # Player type
current_team: Literal['red', 'blue']
current_role: str
turn: int
last_user_message: str
guesses: List[str]
clue: Optional[str]
clue_number: Optional[int]
next_team: Literal['red', 'blue']
history_guessed_words: List[str]
human_clue: str
human_clue_number: int
teams_reviewed: List[str]
end_round: bool
winner_and_score: tuple
red_boss_is_called_counter: int
blue_boss_is_called_counter: int
red_captain_is_called_counter: int
blue_captain_is_called_counter: int
class MyGraph:
def __init__(self):
self.red_team = self._create_red_team_graph()
self.blue_team = self._create_blue_team_graph()
self.graph = self._create_graph()
self.players = []
self.board = None
self.guessed_words = []
self.current_team = ""
self.chat_history = []
self.winners = []
self.IS_HUMAN_PLAYING = None
def _create_red_team_graph(self):
"""Compile red team subgraph"""
builder = StateGraph(State)
# Core team nodes
builder.add_node("red_boss", self.red_boss)
builder.add_node("red_captain", self.red_captain)
builder.add_node("red_agent_1", self.red_agent_1)
builder.add_node("red_agent_2", self.red_agent_2)
builder.add_node("red_boss_is_called", self.red_boss_is_called)
builder.add_node("red_captain_is_called", self.red_captain_is_called)
# Tool nodes
choose_word_tool = ToolNode(tools=[boss_agent_tools[0]])
final_choice = ToolNode(tools=[captain_agent_tools[0]])
transfer_to_agent_1 = ToolNode(tools=[captain_agent_tools[1]])
transfer_to_agent_2 = ToolNode(tools=[captain_agent_tools[2]])
builder.add_node("choose_word_tool", choose_word_tool)
builder.add_node("final_choice", final_choice)
builder.add_node("transfer_to_agent_1", transfer_to_agent_1)
builder.add_node("transfer_to_agent_2", transfer_to_agent_2)
builder.add_node("update_turn", self.update_turn)
# Team flow
builder.add_edge(START, "red_boss")
builder.add_conditional_edges(
"red_boss",
self.boss_choice,
{
"choose_word_tool": "choose_word_tool",
"red_boss_is_called": "red_boss_is_called",
},
)
builder.add_edge("red_boss_is_called", "red_boss")
builder.add_edge("choose_word_tool", "red_captain")
builder.add_conditional_edges(
"red_captain",
self.should_continue,
{
"final_choice": "final_choice",
"transfer_to_agent_1": "transfer_to_agent_1",
"transfer_to_agent_2": "transfer_to_agent_2",
"red_captain_is_called": "red_captain_is_called",
},
)
builder.add_edge("red_captain_is_called", "red_captain")
builder.add_edge("transfer_to_agent_1", "red_agent_1")
builder.add_edge("transfer_to_agent_2", "red_agent_2")
builder.add_edge("red_agent_1", "red_captain")
builder.add_edge("red_agent_2", "red_captain")
builder.add_edge("final_choice", "update_turn")
builder.add_edge("update_turn", END)
return builder.compile()
def _create_blue_team_graph(self):
"""Compile blue team subgraph"""
builder = StateGraph(State)
# Core team nodes
builder.add_node("blue_boss", self.blue_boss)
builder.add_node("blue_captain", self.blue_captain)
builder.add_node("blue_agent_1", self.blue_agent_1)
builder.add_node("blue_agent_2", self.blue_agent_2)
builder.add_node("blue_boss_is_called", self.blue_boss_is_called)
builder.add_node("blue_captain_is_called", self.blue_captain_is_called)
# Tool nodes
choose_word_tool = ToolNode(tools=[boss_agent_tools[0]])
final_choice = ToolNode(tools=[captain_agent_tools[0]])
transfer_to_agent_1 = ToolNode(tools=[captain_agent_tools[1]])
transfer_to_agent_2 = ToolNode(tools=[captain_agent_tools[2]])
builder.add_node("choose_word_tool", choose_word_tool)
builder.add_node("final_choice", final_choice)
builder.add_node("transfer_to_agent_1", transfer_to_agent_1)
builder.add_node("transfer_to_agent_2", transfer_to_agent_2)
builder.add_node("update_turn", self.update_turn)
# Team flow
builder.add_edge(START, "blue_boss")
builder.add_conditional_edges(
"blue_boss",
self.boss_choice,
{
"choose_word_tool": "choose_word_tool",
"blue_boss_is_called": "blue_boss_is_called",
},
)
builder.add_edge("blue_boss_is_called", "blue_boss")
builder.add_edge("choose_word_tool", "blue_captain")
builder.add_conditional_edges(
"blue_captain",
self.should_continue,
{
"final_choice": "final_choice",
"transfer_to_agent_1": "transfer_to_agent_1",
"transfer_to_agent_2": "transfer_to_agent_2",
"blue_captain_is_called": "blue_captain_is_called",
},
)
builder.add_edge("blue_captain_is_called", "blue_captain")
builder.add_edge("transfer_to_agent_1", "blue_agent_1")
builder.add_edge("transfer_to_agent_2", "blue_agent_2")
builder.add_edge("blue_agent_1", "blue_captain")
builder.add_edge("blue_agent_2", "blue_captain")
builder.add_edge("final_choice", "update_turn")
builder.add_edge("update_turn", END)
return builder.compile()
def _create_graph(self) -> StateGraph:
"""Create and compile the graph."""
builder = StateGraph(State)
red_graph = self._create_red_team_graph()
blue_graph = self._create_blue_team_graph()
builder.add_node("judge", self.judge)
# builder.add_node("red_team", lambda s: self.call_team(s, "red"))
# builder.add_node("blue_team", lambda s: self.call_team(s, "blue"))
builder.add_node("red_team", red_graph)
builder.add_node("blue_team", blue_graph)
builder.add_edge(START, "judge")
builder.add_conditional_edges(
"judge",
self.route_after_judge,
["red_team", "blue_team", END],
)
builder.add_edge("red_team", "judge")
builder.add_edge("blue_team", "judge")
graph = builder.compile()
# Optional visualization
if not os.path.exists("graph.png"):
try:
img = graph.get_graph(xray=True).draw_mermaid_png()
with open("graph.png", "wb") as f:
f.write(img)
except Exception as e:
logger.error(f"[GRAPH IMAGE ERROR]: {e}")
return graph
# --- Agent Node Implementations ---
async def red_boss(self, state: State):
"""Red team boss gives a clue"""
logger.info("[RED BOSS] MOMENT ")
boss = next((p for p in state["players"] if p.team == "red" and p.role == "boss"), None)
if not boss:
return state
board = state["board"]
team_name = state["current_team"].upper()
formatted_boss_system_prompt = boss_agent_system_prompt.format(team_name)
chat_history = state["chat_history"]
# formatted_history = "\n".join([
# entry.get("content", "")
# for entry in chat_history
# ])
formatted_history, current_round_messages = self._split_chat_history(chat_history)
self.current_team = state["current_team"]
new_message = [
{
"role": "system",
"content": formatted_boss_system_prompt
},
{
"role": "user",
"content": f"""
Keep in mind the history of the game so far:\n
[HISTORY]\n{formatted_history}\n[/END HISTORY]\n\n
[CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n
Here is the current board:\n
Red words: {", ".join(board['red'])}\n
Blue words: {", ".join(board['blue'])}\n
Neutral words: {", ".join(board['neutral'])}\n
Killer word: {board['killer']}\n\n
Based on this board, provide a clue and a number of words that relate to that clue.
"""
}
]
if self.IS_HUMAN_PLAYING and boss.model_name == "Human brain":
clue_ = state['human_clue']
clue_number = state['human_clue_number']
answer = create_fake_tool_call(clue_, clue_number)
else:
if boss.model_name == "claude-sonnet-4-5-20250929":
llm_with_tools = boss.model.bind_tools(boss_agent_tools)
else:
llm_with_tools = boss.model.bind_tools(boss_agent_tools, tool_choice="ChooseWord")
answer = await llm_with_tools.ainvoke(new_message)
logger.info(f"[RED BOSS ANSWER]: {answer}")
chat_entry, clue, clue_number, _ = self._format_chat_entry(boss, answer)
return {
"messages": [answer],
"current_role": "captain",
"chat_history": state.get("chat_history", []) + [chat_entry],
"clue": clue,
"clue_number": clue_number,
}
async def red_captain(self, state: State):
"""Red team captain coordinates guessing"""
logger.info("°°°"*50)
logger.info(f"[RED CAPTAIN] State clue: {state['clue']}")
captain = next((p for p in state["players"] if p.team == "red" and p.role == "captain"), None)
agents = [p for p in state["players"] if p.team == "red" and p.role == "player"]
if not captain:
return state
board = state["board"]
available_words = (
board["red"] +
board["blue"] +
board["neutral"] +
[board["killer"]]
)
random.shuffle(available_words)
logger.info(f"AVAILABLE WORDS: {available_words}")
chat_history = state["chat_history"]
# formatted_history = "\n".join([
# entry.get("content", "")
# for entry in chat_history
# ])
formatted_history, current_round_messages = self._split_chat_history(chat_history)
team_name = state["current_team"].upper()
formatted_captain_system_prompt = captain_agent_system_prompt.format(
team_name,
agents[0].name,
agents[1].name
)
new_message = [
{
"role": "system",
"content": formatted_captain_system_prompt
},
{
"role": "user",
"content": f"""
Consider the history of the game so far:\n[HISTORY]\n{formatted_history}\n[/END HISTORY]\n\n
[CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n
Here is the list of words on the board:\n{', '.join(available_words)}\n\n
This is what your boss said: '{state['clue']}' {state['clue_number']}, suggest which words to guess.
"""
}
]
llm_with_tools = captain.model.bind_tools(
captain_agent_tools,
)
if state['red_captain_is_called_counter'] > 4:
logger.info("Creating fake answer to stop loop")
answer = create_agent_fake_tool_call()
else:
logger.info(f"red_captain_is_called_counter: {state['red_captain_is_called_counter']}")
answer = await llm_with_tools.ainvoke(new_message)
logger.info(f"[RED CAPTAIN ANSWER]: {answer}")
logger.info("°°°"*50)
chat_entry, _, _, guesses = self._format_chat_entry(captain, answer)
return {
"messages": [answer],
"chat_history": state.get("chat_history", []) + [chat_entry],
"guesses": guesses
}
async def red_agent_1(self, state: State):
"""Red team agent 1 discusses the clue"""
logger.info("---"*50)
logger.info("[RED AGENT 1]")
logger.info("MESSAGES")
logger.info(state['messages'])
logger.info("---"*20)
chat_history = state["chat_history"]
# formatted_history = "\n".join([
# entry.get("content", "")
# for entry in chat_history
# ])
formatted_history, current_round_messages = self._split_chat_history(chat_history)
captain = next((p for p in state["players"] if p.team == "red" and p.role == "captain"), None)
agents = [p for p in state["players"] if p.team == "red" and p.role == "player"]
if not agents:
return state
agent = agents[0]
board = state["board"]
available_words = (
board["red"] +
board["blue"] +
board["neutral"] +
[board["killer"]]
)
random.shuffle(available_words)
team_name = state["current_team"].upper()
formatted_player_system_prompt = player_agent_system_prompt.format(
team_name,
captain.name,
agents[1].name
)
new_message = [
{
"role": "system",
"content": formatted_player_system_prompt
},
{
"role": "user",
"content": f"""
[HISTORY]\n{formatted_history}\n[END HISTORY]\n\n
[CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n
Available words: {', '.join(available_words)}
"""
}
]
answer = await agent.model.ainvoke(new_message)
chat_entry, _, _, _ = self._format_chat_entry(agent, answer)
logger.info(['RED AGENT 1 ANSWER'])
logger.info(answer)
return {
"messages": [answer],
"chat_history": state.get("chat_history", []) + [chat_entry]
}
async def red_agent_2(self, state: State):
"""Red team agent 2 discusses the clue"""
logger.info(f"[RED AGENT 2] State clue: {state['clue']}")
chat_history = state["chat_history"]
# formatted_history = "\n".join([
# entry.get("content", "")
# for entry in chat_history
# ])
formatted_history, current_round_messages = self._split_chat_history(chat_history)
captain = next((p for p in state["players"] if p.team == "red" and p.role == "captain"), None)
agents = [p for p in state["players"] if p.team == "red" and p.role == "player"]
if len(agents) < 2:
return state
agent = agents[1]
board = state["board"]
available_words = (
board["red"] +
board["blue"] +
board["neutral"] +
[board["killer"]]
)
random.shuffle(available_words)
team_name = state["current_team"].upper()
formatted_player_system_prompt = player_agent_system_prompt.format(
team_name,
captain.name,
agents[0].name
)
new_message = [
{
"role": "system",
"content": formatted_player_system_prompt
},
{
"role": "user",
"content": f"""
[HISTORY]\n{formatted_history}\n[END HISTORY]\n\n
[CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n
Available words: {', '.join(available_words)}
"""
}
]
answer = await agent.model.ainvoke(new_message)
chat_entry, _, _, _ = self._format_chat_entry(agent, answer)
logger.info(['RED AGENT 2 ANSWER'])
logger.info(answer)
return {
"messages": [answer],
"chat_history": state.get("chat_history", []) + [chat_entry]
}
async def blue_boss(self, state: State):
"""Blue team boss gives a clue"""
logger.info("[BLUE BOSS MOMENT]")
chat_history = state["chat_history"]
# formatted_history = "\n".join([
# entry.get("content", "")
# for entry in chat_history
# ])
formatted_history, current_round_messages = self._split_chat_history(chat_history)
boss = next((p for p in state["players"] if p.team == "blue" and p.role == "boss"), None)
if not boss:
return state
board = state["board"]
team_name = state["current_team"].upper()
formatted_boss_system_prompt = boss_agent_system_prompt.format(team_name)
self.current_team = state["current_team"]
new_message = [
{
"role": "system",
"content": formatted_boss_system_prompt
},
{
"role": "user",
"content": f"""
Keep in mind the history of the game so far:\n
[HISTORY]\n{formatted_history}\n[/END HISTORY]\n\n
[CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n
Here is the current board:\n
Red words: {", ".join(board['red'])}\n
Blue words: {", ".join(board['blue'])}\n
Neutral words: {", ".join(board['neutral'])}\n
Killer word: {board['killer']}\n\n
Based on this board, provide a clue and a number of words that relate to that clue.
"""
}
]
if self.IS_HUMAN_PLAYING and boss.model_name == "Human brain":
clue_ = state['human_clue']
clue_number = state['human_clue_number']
answer = create_fake_tool_call(clue_, clue_number)
else:
if boss.model_name == "claude-sonnet-4-5-20250929":
llm_with_tools = boss.model.bind_tools(boss_agent_tools)
else:
llm_with_tools = boss.model.bind_tools(boss_agent_tools, tool_choice="ChooseWord")
answer = await llm_with_tools.ainvoke(new_message)
logger.info(f"[BLUE BOSS ANSWER] : {answer}")
chat_entry, clue, clue_number, _ = self._format_chat_entry(boss, answer)
return {
"messages": [answer],
"current_role": "captain",
"chat_history": state.get("chat_history", []) + [chat_entry],
"clue": clue,
"clue_number": clue_number,
}
async def blue_captain(self, state: State):
"""Blue team captain coordinates guessing"""
logger.info("°°°"*50)
logger.info(f"[BLUE CAPTAIN] State clue: {state['clue']}")
captain = next((p for p in state["players"] if p.team == "blue" and p.role == "captain"), None)
agents = [p for p in state["players"] if p.team == "blue" and p.role == "player"]
if not captain:
return state
board = state["board"]
chat_history = state["chat_history"]
# formatted_history = "\n".join([
# entry.get("content", "")
# for entry in chat_history
# ])
formatted_history, current_round_messages = self._split_chat_history(chat_history)
available_words = (
board["red"] +
board["blue"] +
board["neutral"] +
[board["killer"]]
)
random.shuffle(available_words)
team_name = state["current_team"].upper()
formatted_captain_system_prompt = captain_agent_system_prompt.format(
team_name,
agents[0].name,
agents[1].name
)
new_message = [
{
"role": "system",
"content": formatted_captain_system_prompt
},
{
"role": "user",
"content": f"""
Consider the history of the game so far:\n[HISTORY]\n{formatted_history}\n[/END HISTORY]\n\n
[CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n
Here is the list of words on the board:\n{', '.join(available_words)}\n\n
This is what your boss said: '{state['clue']}' {state['clue_number']}, suggest which words to guess.
"""
}
]
llm_with_tools = captain.model.bind_tools(
captain_agent_tools,
)
if state['blue_captain_is_called_counter'] > 4:
logger.info("Creating fake answer to stop loop")
answer = create_agent_fake_tool_call()
else:
logger.info(f"blue_captain_is_called_counter: {state['blue_captain_is_called_counter']}")
answer = await llm_with_tools.ainvoke(new_message)
logger.info(f"[BLUE CAPTAIN ANSWER] : {answer}")
logger.info("°°°"*50)
chat_entry, _, _, guesses = self._format_chat_entry(captain, answer)
return {
"messages": [answer],
"chat_history": state.get("chat_history", []) + [chat_entry],
"guesses": guesses
}
async def blue_agent_1(self, state: State):
"""Blue team agent 1 discusses the clue"""
logger.info("---"*50)
logger.info("[BLUE AGENT 1]")
chat_history = state["chat_history"]
# formatted_history = "\n".join([
# entry.get("content", "")
# for entry in chat_history
# ])
formatted_history, current_round_messages = self._split_chat_history(chat_history)
captain = next((p for p in state["players"] if p.team == "blue" and p.role == "captain"), None)
agents = [p for p in state["players"] if p.team == "blue" and p.role == "player"]
if not agents:
return state
agent = agents[0]
board = state["board"]
available_words = (
board["red"] +
board["blue"] +
board["neutral"] +
[board["killer"]]
)
random.shuffle(available_words)
team_name = state["current_team"].upper()
formatted_player_system_prompt = player_agent_system_prompt.format(
team_name,
captain.name,
agents[1].name
)
new_message = [
{
"role": "system",
"content": formatted_player_system_prompt
},
{
"role": "user",
"content": f"""
[HISTORY]\n{formatted_history}\n[END HISTORY]\n\n
[CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n
Available words: {', '.join(available_words)}
"""
}
]
answer = await agent.model.ainvoke(new_message)
logger.info(['BLUE AGENT 1 ANSWER'])
logger.info(answer)
chat_entry, _, _, _ = self._format_chat_entry(agent, answer)
return {
"messages": [answer],
"chat_history": state.get("chat_history", []) + [chat_entry]
}
async def blue_agent_2(self, state: State):
"""Blue team agent 2 discusses the clue"""
logger.info("---"*50)
logger.info("[BLUE AGENT 2]")
chat_history = state["chat_history"]
# formatted_history = "\n".join([
# entry.get("content", "")
# for entry in chat_history
# ])
formatted_history, current_round_messages = self._split_chat_history(chat_history)
captain = next((p for p in state["players"] if p.team == "blue" and p.role == "captain"), None)
agents = [p for p in state["players"] if p.team == "blue" and p.role == "player"]
if not agents:
return state
agent = agents[1]
board = state["board"]
available_words = (
board["red"] +
board["blue"] +
board["neutral"] +
[board["killer"]]
)
random.shuffle(available_words)
team_name = state["current_team"].upper()
formatted_player_system_prompt = player_agent_system_prompt.format(
team_name,
captain.name,
agents[0].name
)
new_message = [
{
"role": "system",
"content": formatted_player_system_prompt
},
{
"role": "user",
"content": f"""
[HISTORY]\n{formatted_history}\n[END HISTORY]\n\n
[CURRENT ROUND MESSAGES]\n{current_round_messages}\n[/END CURRENT ROUND MESSAGES]\n\n
Available words: {', '.join(available_words)}
"""
}
]
answer = await agent.model.ainvoke(new_message)
logger.info(['BLUE AGENT 2 ANSWER'])
logger.info(answer)
chat_entry, _, _, _ = self._format_chat_entry(agent, answer)
return {
"messages": [answer],
"chat_history": state.get("chat_history", []) + [chat_entry]
}
def update_turn(self, state: State):
"""Update turn counter"""
turn = state.get("turn", 1)
return {
"turn": turn+1
}
def red_boss_is_called(self, state: State):
"""Update turn counter"""
_counter = state.get("red_boss_is_called_counter", 1)
return {
"red_boss_is_called_counter": _counter+1
}
def blue_boss_is_called(self, state: State):
"""Update turn counter"""
_counter = state.get("blue_boss_is_called_counter", 1)
return {
"blue_boss_is_called_counter": _counter+1
}
def red_captain_is_called(self, state: State):
"""Update turn counter"""
_counter = state.get("red_captain_is_called_counter", 1)
return {
"red_captain_is_called_counter": _counter+1
}
def blue_captain_is_called(self, state: State):
"""Update turn counter"""
_counter = state.get("blue_captain_is_called_counter", 1)
return {
"blue_captain_is_called_counter": _counter+1
}
def judge(self, state: State):
"""Evaluate guesses, update board, check win/lose conditions."""
# Helper function to check win conditions
def check_win_condition():
"""Returns (is_game_over, winner, win_message) tuple"""
red_remaining = len(board.get("red", []))
blue_remaining = len(board.get("blue", []))
if red_remaining == 0:
return True, "red", (red_remaining, blue_remaining)
if blue_remaining == 0:
return True, "blue", (red_remaining, blue_remaining)
return False, None, None
# Helper function to create multiple messages
def create_multi_message_state(messages_content_list, next_team=None, switch_role=False, winner_and_score=None):
"""Creates state with multiple messages"""
messages = []
chat_entries = []
for content, title in messages_content_list:
message = AIMessage(
content=content,
metadata={"title": title, "sender": "judge"}
)
messages.append(message)
if title == "⚖️ Judge Decision":
info = "chat_history"
else:
info = None
chat_entry, _, _, _ = self._format_chat_entry(None, message, info=info)
chat_entries.append(chat_entry)
logger.info("****" * 50)
filtered_chat = self._filter_important_messages(state.get("chat_history", []))
logger.info("**"*50)
logger.info("FILTERED CHAT")
logger.info(filtered_chat)
logger.info("**"*50)
end_state = {
"messages": messages,
"chat_history": filtered_chat + chat_entries,
"board": board,
"guesses": [],
"history_guessed_words": history_guessed_words,
"teams_reviewed": teams_reviewed,
"end_round": end_round,
"winner_and_score": winner_and_score
}
if next_team:
end_state.update({
"current_team": next_team,
"current_role": "boss" if switch_role else state.get("current_role"),
"turn": state.get("turn", 1) + 1,
"next_team": next_team,
})
return end_state
# Helper function to create turn end messages
def create_turn_end_messages(results_list, next_team):
"""Creates separate messages for results and turn transition"""
team_emoji = "🔵" if next_team == "blue" else "🔴"
results_text = "\n".join(results_list)
red_remaining = len(board.get("red", []))
blue_remaining = len(board.get("blue", []))
# Message 1: Results
results_message = results_text
# Message 2: Turn transition
transition_message = (
f"🔄 **TURN COMPLETE**\n\n"
f"{team_emoji} **{next_team.upper()} TEAM'S TURN** now begins!\n\n"
f"**Remaining Words:**\n"
f"🔴 Red: {red_remaining}\n"
f"🔵 Blue: {blue_remaining}"
)
return [
(results_message, "⚖️ Judge Decision"),
(transition_message, "🔄 Turn Transition")
]
# Helper function to create round end messages
def create_round_end_messages(results_list, next_team):
"""Creates separate messages for results and round transition"""
team_emoji = "🔵" if next_team == "blue" else "🔴"
results_text = "\n".join(results_list)
red_remaining = len(board.get("red", []))
blue_remaining = len(board.get("blue", []))
# Message 1: Results
results_message = results_text
# Message 2: Round transition
transition_message = (
f"🎯 **ROUND COMPLETE!**\n\n"
f"Both teams have played their turn.\n\n"
f"{team_emoji} **{next_team.upper()} TEAM** starts the next round!\n\n"
f"**Score Update:**\n"
f"🔴 Red Team: {red_remaining} words remaining\n"
f"🔵 Blue Team: {blue_remaining} words remaining\n\n"
f"Let's keep going! 💪"
)
return [
(results_message, "⚖️ Judge Decision"),
(transition_message, "🎯 Round Complete")
]
# Helper function to create game over messages
def create_game_over_messages(results_list, winner, scores, reason="normal"):
"""Creates separate messages for results and game over"""
results_text = "\n".join(results_list)
red_remaining, blue_remaining = scores
winner_emoji = "🔴" if winner == "red" else "🔵"
# Message 1: Results
results_message = results_text
# Message 2: Game over
if reason == "killer":
loser = "red" if winner == "blue" else "blue"
game_over_message = (
f"🏆 **GAME OVER!**\n\n"
f"💀 {loser.upper()} team hit the KILLER WORD!\n\n"
f"{winner_emoji} **{winner.upper()} TEAM WINS!** 🎉\n\n"
f"**Final Score:**\n"
f"🔴 Red: {red_remaining} words remaining\n"
f"🔵 Blue: {blue_remaining} words remaining\n\n"
f"Better luck next time! 😅"
)
else:
game_over_message = (
f"🏆 **GAME OVER!**\n\n"
f"{winner_emoji} **{winner.upper()} TEAM WINS!** 🎉\n\n"
f"All {winner} words have been found!\n\n"
f"**Final Score:**\n"
f"🔴 Red: {red_remaining} words remaining\n"
f"🔵 Blue: {blue_remaining} words remaining\n\n"
f"Congratulations to the {winner.title()} Team! 🥳"
)
return [
(results_message, "⚖️ Judge Decision"),
(game_over_message, "🏆 Game Over")
]
logger.info("IS HUMAN PLAYING")
logger.info(self.IS_HUMAN_PLAYING)
logger.info("****" * 50)
logger.info("[JUDGE]")
logger.info("CHAT HISTORY")
logger.info(state['chat_history'])
# Initialization: first turn - GAME START MESSAGE
if state.get("turn", 0) == 0:
logger.info("[JUDGE INITIALIZING GAME STATE]")
team_emoji = "🔴" if state["current_team"] == "red" else "🔵"
# Game start message
game_start_content = (
f"🎮 **CODENAMES GAME STARTED!**\n\n"
f"{team_emoji} **{state['current_team'].upper()} TEAM** goes first!\n\n"
f"**Game Rules:**\n"
f"- Teams alternate turns to guess words\n"
f"- Match words to your team's color\n"
f"- Avoid the opponent's words, neutral words, and the killer word ☠️\n"
f"- First team to find all their words wins!\n\n"
f"Good luck! 🍀"
)
message = AIMessage(
content=game_start_content,
metadata={"title": "🎮 Game Start", "sender": "judge"}
)
logger.info(f"[JUDGE MESSAGE]: {message}")
chat_entry, _, _, _ = self._format_chat_entry(None, message, info="Game start")
filtered_chat = self._filter_important_messages(state.get("chat_history", []))
return {
"messages": [message],
"turn": 1,
"chat_history": filtered_chat + [chat_entry],
}
# Check if there are guesses to process
guesses = state.get("guesses", [])
if not guesses:
logger.info("[JUDGE] No guesses made.")
return state
# Initialize variables
board = state["board"]
self.board = board
current_team = state["current_team"]
other_team = "red" if current_team == "blue" else "blue"
history_guessed_words = state.get("history_guessed_words", [])
teams_reviewed = state.get('teams_reviewed', [])
teams_reviewed.append(current_team)
if len(set(teams_reviewed)) == 2:
end_round = True
teams_reviewed = []
else:
end_round = False
results = ["📋 **Turn Results:**"]
logger.info(f"[JUDGE] Team {current_team.upper()} guesses: {guesses}")
# Process each guess
for word in guesses:
history_guessed_words.append(word)
self.guessed_words = history_guessed_words
logger.info(f"[JUDGE] Evaluating word: {word}")
if word == "STOP_TURN":
results.append(f"🚧 **{word}** The {current_team.upper()} team stops the turn.")
break
# ✅ Correct team word
elif word in board.get(current_team, []):
board[current_team].remove(word)
self.board = board
results.append(f"✅ **{word}** - Correct! ({current_team.upper()} team word)")
# ❌ Other team's word — stop immediately and check win condition
elif word in board.get(other_team, []):
board[other_team].remove(word)
self.board = board
results.append(f"❌ **{word}** - Oh no! You selected a {other_team.upper()} team word!")
# Check if this caused the other team to win
is_game_over, winner, scores = check_win_condition()
if is_game_over:
messages_list = create_game_over_messages(results, winner, scores)
return create_multi_message_state(messages_list, winner_and_score=(winner, scores))
# Check if round is complete
if end_round:
messages_list = create_round_end_messages(results, other_team)
else:
messages_list = create_turn_end_messages(results, other_team)
return create_multi_message_state(messages_list, next_team=other_team, switch_role=True)
# ⚪ Neutral card — stop guessing for this turn
elif word in board.get("neutral", []):
board["neutral"].remove(word)
self.board = board
results.append(f"⚪ **{word}** - Neutral word. Turn ends!")
# Check if round is complete
if end_round:
messages_list = create_round_end_messages(results, other_team)
else:
messages_list = create_turn_end_messages(results, other_team)
return create_multi_message_state(messages_list, next_team=other_team, switch_role=True)
# ☠️ Killer word — game over
elif word == board.get("killer"):
results.append(f"☠️ **{word}** - KILLER WORD! 💀")
# loser = current_team
winner = other_team
board["killer"] = None
self.board = board
red_remaining = len(board.get("red", []))
blue_remaining = len(board.get("blue", []))
scores = (red_remaining, blue_remaining)
messages_list = create_game_over_messages(
results, winner, scores, reason="killer"
)
return create_multi_message_state(messages_list, winner_and_score=(winner, scores, "killer"))
# ✅ All guesses processed successfully - check win conditions
is_game_over, winner, scores = check_win_condition()
if is_game_over:
messages_list = create_game_over_messages(results, winner, scores)
return create_multi_message_state(messages_list, winner_and_score=(winner, scores))
# Check if round is complete before normal turn switch
if end_round:
messages_list = create_round_end_messages(results, other_team)
else:
messages_list = create_turn_end_messages(results, other_team)
return create_multi_message_state(messages_list, next_team=other_team, switch_role=True)
def route_after_judge(self, state: State) -> str:
"""Route to the appropriate team or end the game."""
logger.info("***" * 50)
logger.info(f"HUMAN PLAYING? {self.IS_HUMAN_PLAYING}")
logger.info("***" * 50)
logger.info("\n\n")
logger.info("***" * 50)
logger.info("[ROUTING AFTER JUDGE]")
board = state.get("board", {})
logger.info("BOARD ROUTING")
logger.info(board)
self.board = board
self.chat_history = state['chat_history']
self.winners = state['winner_and_score']
# Check if the game is over
if board["killer"] is None:
logger.info("KILLER HIT, END GAME")
logger.info("***" * 50)
return END
if not all(board.get(key) for key in ("red", "blue")):
logger.info("END GAME")
logger.info("***" * 50)
return END
# Human vs. Non-human handling
# if self.IS_HUMAN_PLAYING:
if state['end_round']:
logger.info("ROUND ENDS")
logger.info("***" * 50)
return END
next_team = state.get("next_team") or state.get("current_team", "red")
return f"{next_team}_team"
def boss_choice(self, state: State) -> str:
"""Determine whether to continue to tools or return a final answer."""
logger.info("###"*50)
logger.info("[BOSS CHOICE]")
messages = state["messages"]
last_message = messages[-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
logger.info("[TOOL CALL]")
tool_name = last_message.tool_calls[0]["name"]
logger.info(tool_name)
if tool_name == "ChooseWord":
return "choose_word_tool"
else:
logger.warning("[WARNING] NESSUN TOOL E' STATO RICHIAMATO, TORNIAMO DAL BOSS")
current_team = state['current_team']
logger.info(f"[CURRENT TEAM: {current_team}]")
return f"{current_team}_boss_is_called"
def should_continue(self, state: State) -> str:
"""Determine whether to continue to tools or return a final answer."""
logger.info("###"*50)
logger.info("[SHOULD CONTINUE]")
messages = state["messages"]
last_message = messages[-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
logger.info("[TOOL CALL]")
tool_name = last_message.tool_calls[0]["name"]
logger.info(tool_name)
if tool_name == "Call_Agent_1":
return "transfer_to_agent_1"
elif tool_name == "Call_Agent_2":
return "transfer_to_agent_2"
elif tool_name == "TeamFinalChoice":
return "final_choice"
else:
logger.warning("[WARNING] NESSUN TOOL E' STATO RICHIAMATO, TORNIAMO DAL CAPITANO")
current_team = state['current_team']
logger.info(f"[CURRENT TEAM: {current_team}]")
return f"{current_team}_captain_is_called"
def _convert_to_string(self, value):
"""Helper to convert any value to string"""
if isinstance(value, str):
return value
elif isinstance(value, (dict, list)):
return str(value)
else:
return str(value)
def _filter_important_messages(self, chat_history):
"""
Filter chat history to keep only important messages:
- Boss choices (ChooseWord)
- Captain choices (TeamFinalChoice)
- All Judge messages
"""
return [
entry for entry in chat_history
if entry.get("tool_name") in ["ChooseWord", "TeamFinalChoice"]
or entry.get("sender_type") == "judge" and entry.get("info") == "chat_history"
]
def _format_chat_entry(self, player, message, info=None):
"""Helper to create a structured chat history entry"""
tool_name = None
clue = None
clue_number = None
guesses = []
# Extract content based on message type
if hasattr(message, 'tool_calls') and message.tool_calls:
tool_call = message.tool_calls[0]
tool_name = tool_call.get('name', '')
args = tool_call.get('args', {})
if tool_name == 'Call_Agent_1':
content = f"Called Agent 1: {self._convert_to_string(args.get('message', 'N/A'))}"
elif tool_name == 'Call_Agent_2':
content = f"Called Agent 2: {self._convert_to_string(args.get('message', 'N/A'))}"
elif tool_name == 'ChooseWord':
clue = self._convert_to_string(args.get('clue', 'N/A'))
clue_number = args.get('clue_number', 'N/A')
content = f"Clue: '{clue.upper()}' for {clue_number} word(s)"
elif tool_name == 'TeamFinalChoice':
guesses = args.get('guesses', [])
content = f"Final choices: {', '.join(guesses)}"
else:
content = f"Used tool {tool_name}: {args}"
elif hasattr(message, 'content') and message.content:
content = message.content
elif hasattr(message, 'text') and message.text:
content = message.text
else:
content = str(message)
# Return structured entry
if player is None:
return {
"sender_type": "judge",
"team": "",
"role": "",
"name": "JUDGE",
"tool_name": None,
"content": content,
"info": info,
}, clue, clue_number, guesses
return {
"sender_type": "player",
"team": player.team,
"role": player.role,
"name": player.name,
"tool_name": tool_name,
"content": content,
"info": info,
}, clue, clue_number, guesses
def _split_chat_history(self, chat_history):
"""
Splits chat_history into two parts:
- formatted_history: all entries up to and including the last one with info == "chat_history"
- current_round: all entries after that
Both are formatted strings, where each line is:
[name (team role)]: content
"""
last_index = None
# Find the last entry with info == "chat_history"
for i, entry in enumerate(chat_history):
if entry.get("info") == "chat_history":
last_index = i
# Helper to format a single entry
def format_entry(entry):
name = entry.get("name", "Unknown")
team = entry.get("team", "N/A")
role = entry.get("role", "N/A")
content = entry.get("content", "")
return f"[{name} ({team} {role})]: {content}"
# Build formatted strings
if last_index is None:
formatted_history = ""
current_round = "\n".join(format_entry(e) for e in chat_history)
else:
formatted_history = "\n".join(
format_entry(e) for e in chat_history[:last_index + 1]
)
current_round = "\n".join(
format_entry(e) for e in chat_history[last_index + 1:]
)
return formatted_history, current_round
async def stream_graph(self, input_message, messages, players, board, dropdown_clue_number, guessed_words, chat_history, is_human_playing, turn_):
"""Stream the graph execution."""
logger.info(f"Human Clue: {input_message}")
logger.info(f"Human Clue number: {dropdown_clue_number}")
logger.info(f"Board: {board}")
logger.info(f"Starting team: {board['starting_team']}")
logger.info(f"Turn: {turn_}")
self.IS_HUMAN_PLAYING = is_human_playing
self.board = board
inputs = {
"messages": [HumanMessage(content=input_message)] if input_message else [],
"original_board": board,
"board": board,
"players": players,
"current_team": board.get("starting_team", "red"),
"current_role": "boss",
"turn": turn_,
"last_user_message": input_message or "",
"guesses": [],
"clue": None,
"clue_number": None,
"round_messages": [],
"chat_history": chat_history,
"human_clue": input_message,
"human_clue_number": dropdown_clue_number,
"teams_reviewed": [],
"history_guessed_words": guessed_words,
"end_round": False,
"winner_and_score": None,
"red_boss_is_called_counter": 0,
"blue_boss_is_called_counter": 0,
"red_captain_is_called_counter": 0,
"blue_captain_is_called_counter": 0,
}
final_msg = ""
messages = list(messages) if messages else []
previous_message_is_reasoning = False
latest_lang_node = ""
current_sender = "" # Track current sender
last_message_sender = None # Track the sender of the last message
last_reasoning_index = None # Track reasoning step index for OpenAI
async for chunk in self.graph.astream(
inputs, {"recursion_limit": 100},
stream_mode=["messages", "updates"],
subgraphs=True,
):
if chunk[1] == "messages":
msg = chunk[2][0]
lang_node = chunk[2][1]['langgraph_node']
if latest_lang_node != lang_node:
latest_lang_node = lang_node
current_sender = lang_node # Update sender
final_msg = ""
last_message_sender = None # Reset when node changes
last_reasoning_index = None # Reset reasoning index
else:
agent_update = chunk[2]
msg_from = next(iter(agent_update.keys()))
if msg_from in [
'red_boss', 'red_captain', 'red_agent_1', 'red_agent_2',
'blue_boss', 'blue_captain', 'blue_agent_1', 'blue_agent_2',
'judge', 'final_choice'
]:
current_sender = msg_from # Update sender
msg = None
else:
try:
msg = next(iter(agent_update.values()))['messages'][-1]
except Exception as e:
logger.error(f"[EXCEPTION]: {e}")
logger.error(f"[CHUNK]: {chunk}")
logger.error(f"[Agent update]: {agent_update}")
msg = None
# logger.info(f"[MSG]: {msg}")
if isinstance(msg, AIMessage) and msg.tool_calls:
final_msg = ""
last_message_sender = None # Reset after tool calls
last_reasoning_index = None # Reset reasoning index
yield messages, self.guessed_words, self.board, self.chat_history, self.winners
elif isinstance(msg, ToolMessage):
logger.info(f"[TOOL MESSAGE: {msg}]")
tool_name = msg.name
if tool_name == "TeamFinalChoice":
try:
command_data = json.loads(msg.content)
update_data = command_data.get("update", {})
guesses = update_data.get("guesses")
new_message = ChatMessage(
role="assistant",
content="I made my final choices: " + ", ".join(guesses),
metadata={
"title": "🧠 Guesses",
"sender": f"{self.current_team}_captain"
}
)
except json.JSONDecodeError as e:
logger.error(f"Error parsing tool message: {e}")
new_message = ChatMessage(
role="assistant",
content=msg.content,
metadata={
"title": "🧠 Guesses",
"sender": f"{self.current_team}_captain"
}
)
elif tool_name == "ChooseWord":
try:
command_data = json.loads(msg.content)
update_data = command_data.get("update", {})
clue = update_data.get("clue")
clue_number = update_data.get("clue_number")
logger.info(f"Clue: {clue}, Clue Number: {clue_number}")
new_message = ChatMessage(
role="assistant",
content=f"{clue}, {clue_number}",
metadata={
"title": "🕵️‍♂️ Clue",
"sender": f"{self.current_team}_boss"
}
)
except json.JSONDecodeError as e:
logger.error(f"Error parsing tool message: {e}")
new_message = ChatMessage(
role="assistant",
content=msg.content,
metadata={
"title": "🕵️‍♂️ Clue",
"sender": f"{self.current_team}_boss"
}
)
elif tool_name == "Call_Agent_1" or tool_name == "Call_Agent_2":
if tool_name == "Call_Agent_1":
title_ = "💭 Asking opinion of Agent 1"
else:
title_ = "💭 Asking opinion of Agent 2"
try:
command_data = json.loads(msg.content)
update_data = command_data.get("update", {})
message = update_data.get("message")
new_message = ChatMessage(
role="assistant",
content=message,
metadata={
"title": title_,
"sender": f"{self.current_team}_captain"
}
)
except json.JSONDecodeError as e:
logger.error(f"Error parsing tool message: {e}")
new_message = ChatMessage(
role="assistant",
content=msg.content,
metadata={
"title": title_,
"sender": f"{self.current_team}_captain"
}
)
else:
new_message = ChatMessage(
role="assistant",
content=msg.content,
metadata={
"title": f"""🛠️ {tool_name}""",
"sender": current_sender
}
)
if not messages or messages[-1] != new_message:
messages.append(new_message)
last_message_sender = new_message.metadata.get("sender")
# else:
# logger.info("*******"*50)
# logger.info("SKIP")
final_msg = ""
yield messages, self.guessed_words, self.board, self.chat_history, self.winners
elif isinstance(msg, AIMessageChunk):
# Handle OpenAI format (both reasoning and text)
model_provider = msg.response_metadata.get('model_provider')
if (model_provider in ["openai", "google_genai", "anthropic"] and isinstance(msg.content, list)):
for item in msg.content:
if not isinstance(item, dict):
continue
item_type = item.get('type')
# Handle openai reasoning content
if item_type == 'reasoning':
reasoning_text = ""
summary = item.get('summary', [])
for summary_item in summary:
if isinstance(summary_item, dict) and summary_item.get('type') == 'summary_text':
text = summary_item.get('text', '')
item_index = summary_item.get('index')
if text:
reasoning_text += text
if reasoning_text:
# Check if reasoning index changed (new step)
if last_reasoning_index is not None and item_index != last_reasoning_index:
reasoning_text = "\n\n" + reasoning_text
last_reasoning_index = item_index
# Create or update reasoning message
if final_msg == "" or last_message_sender != current_sender or not previous_message_is_reasoning:
final_msg = reasoning_text
messages.append(ChatMessage(
role="assistant",
content=final_msg,
metadata={
"title": "🧠 Thinking...",
"sender": current_sender
}
))
last_message_sender = current_sender
else:
final_msg += reasoning_text
if len(messages) > 0:
messages[-1].content = final_msg
previous_message_is_reasoning = True
yield messages, self.guessed_words, self.board, self.chat_history, self.winners
# Handle Google and Anthropic thinking content
elif item_type == 'thinking':
# reasoning_text = ""
reasoning_text = item.get('thinking', [])
if reasoning_text:
# Create or update reasoning message
if final_msg == "" or last_message_sender != current_sender or not previous_message_is_reasoning:
final_msg = reasoning_text
messages.append(ChatMessage(
role="assistant",
content=final_msg,
metadata={
"title": "🧠 Thinking...",
"sender": current_sender
}
))
last_message_sender = current_sender
else:
final_msg += reasoning_text
if len(messages) > 0:
messages[-1].content = final_msg
previous_message_is_reasoning = True
yield messages, self.guessed_words, self.board, self.chat_history, self.winners
# Handle text content (regular response after reasoning)
elif item_type == 'text':
text_content = item.get('text', '')
if text_content:
# If we were in reasoning mode, start fresh
if previous_message_is_reasoning:
final_msg = ""
previous_message_is_reasoning = False
last_message_sender = None
last_reasoning_index = None
# Check if we need a new message (empty OR different sender)
if final_msg == "" or last_message_sender != current_sender:
final_msg = text_content
messages.append(ChatMessage(
role="assistant",
content=final_msg,
metadata={"sender": current_sender}
))
last_message_sender = current_sender
else:
# Same sender, continue streaming to last message
final_msg += text_content
messages[-1].content = final_msg
yield messages, self.guessed_words, self.board, self.chat_history, self.winners
elif msg.additional_kwargs.get('reasoning_content'):
reasoning_text = msg.additional_kwargs.get("reasoning_content")
if reasoning_text:
if final_msg == "" or last_message_sender != current_sender or not previous_message_is_reasoning:
final_msg = reasoning_text
messages.append(ChatMessage(
role="assistant",
content=final_msg,
metadata={
"title": "🧠 Thinking...",
"sender": current_sender
}
))
last_message_sender = current_sender
else:
final_msg += reasoning_text
if len(messages) > 0:
messages[-1].content = final_msg
previous_message_is_reasoning = True
elif msg.content and isinstance(msg.content, str):
if previous_message_is_reasoning:
final_msg = ""
previous_message_is_reasoning = False
last_message_sender = None # Reset after reasoning
# Check if we need a new message (empty OR different sender)
if final_msg == "" or last_message_sender != current_sender:
final_msg = msg.content # Start fresh for new messages
# if current_sender not in ['red_captain', 'blue_captain']:
messages.append(ChatMessage(
role="assistant",
content=final_msg,
metadata={"sender": current_sender}
))
last_message_sender = current_sender # Track sender
else:
# Same sender, continue streaming to last message
final_msg += msg.content
messages[-1].content = final_msg
yield messages, self.guessed_words, self.board, self.chat_history, self.winners
else:
reasoning = msg.additional_kwargs.get("reasoning_content")
if reasoning:
# Check if we need a new message for reasoning
if final_msg == "" or last_message_sender != current_sender:
final_msg = reasoning
messages.append(ChatMessage(
role="assistant",
content=final_msg,
metadata={
"title": """🧠 Thinking...""",
"sender": current_sender
}
))
last_message_sender = current_sender
else:
if len(messages) > 0:
final_msg += reasoning
messages[-1].content = final_msg
previous_message_is_reasoning = True
# elif ()
# skip streaming tool call chunks
# if final_msg == "" or last_message_sender != current_sender:
# final_msg = msg.tool_call_chunks[0]['args']
# messages.append(ChatMessage(
# role="assistant",
# content=final_msg,
# metadata={"sender": current_sender}
# ))
# last_message_sender = current_sender # Track sender
# else:
# # Same sender, continue streaming to last message
# final_msg += msg.tool_call_chunks[0]['args']
# messages[-1].content = final_msg
yield messages, self.guessed_words, self.board, self.chat_history, self.winners
elif isinstance(msg, AIMessage):
if msg.content and msg.content.strip():
logger.info("***"*50)
logger.info(f"msg.content: {msg.content}")
logger.info(f"msg: {msg}")
try:
if msg.metadata.get("sender"):
new_sender = msg.metadata.get("sender")
current_sender = new_sender
except Exception as e:
logger.error(f"[EXCEPTION]: {e} - {msg}")
if msg.metadata.get("title"):
new_title = msg.metadata.get("title")
messages.append(ChatMessage(
role="assistant",
content=msg.content,
metadata={
"sender": new_sender,
"title": new_title
}
))
last_message_sender = current_sender # Track sender
yield messages, self.guessed_words, self.board, self.chat_history, self.winners
yield messages, self.guessed_words, self.board, self.chat_history, self.winners
# async def create_graph():
# boss_tools = await load_boss_tools()
# captain_tools = await load_captain_tools()
# return MyGraph(boss_tools, captain_tools)