| import gradio as gr | |
| import re | |
| from smolagents.memory import ActionStep, PlanningStep, FinalAnswerStep | |
| from smolagents.models import ChatMessageStreamDelta | |
| def get_step_footnote_content(step_log: ActionStep | PlanningStep, step_name: str) -> str: | |
| """Get a footnote string for a step log with duration and token information""" | |
| step_footnote = f"**{step_name}**" | |
| if hasattr(step_log, 'token_usage') and step_log.token_usage is not None: | |
| step_footnote += f" | Input tokens: {step_log.token_usage.input_tokens:,} | Output tokens: {step_log.token_usage.output_tokens:,}" | |
| if hasattr(step_log, 'timing') and step_log.timing and step_log.timing.duration: | |
| step_footnote += f" | Duration: {round(float(step_log.timing.duration), 2)}s" | |
| return f"""<span style="color: #bbbbc2; font-size: 12px;">{step_footnote}</span> """ | |
| def _clean_model_output(model_output: str) -> str: | |
| if not model_output: | |
| return "" | |
| model_output = model_output.strip() | |
| model_output = re.sub(r"```\s*<end_code>", "```", model_output) | |
| model_output = re.sub(r"<end_code>\s*```", "```", model_output) | |
| model_output = re.sub(r"```\s*\n\s*<end_code>", "```", model_output) | |
| return model_output | |
| def _process_action_step(step_log: ActionStep, skip_model_outputs: bool = False): | |
| step_number = f"Step {step_log.step_number}" | |
| if not skip_model_outputs and getattr(step_log, "model_output", ""): | |
| model_output = _clean_model_output(step_log.model_output) | |
| yield gr.ChatMessage( | |
| role="assistant", | |
| content=model_output, | |
| metadata={"title": f"π Reasoning ({step_number})", "status": "done"} | |
| ) | |
| if getattr(step_log, "tool_calls", []): | |
| first_tool_call = step_log.tool_calls[0] | |
| args = first_tool_call.arguments | |
| content = str(args.get("answer", str(args))) if isinstance(args, dict) else str(args).strip() | |
| tool_name = first_tool_call.name | |
| icon = "π" if "search" in tool_name else "π οΈ" | |
| yield gr.ChatMessage( | |
| role="assistant", | |
| content=f"```python\n{content}\n```", | |
| metadata={"title": f"{icon} Used tool: {tool_name}", "status": "done"} | |
| ) | |
| if getattr(step_log, "observations", "") and step_log.observations.strip(): | |
| log_content = step_log.observations.strip() | |
| yield gr.ChatMessage( | |
| role="assistant", | |
| content=f"```text\n{log_content}\n```", | |
| metadata={"title": "π Tool Output", "status": "done"} | |
| ) | |
| if getattr(step_log, "error", None): | |
| yield gr.ChatMessage( | |
| role="assistant", | |
| content=f"β οΈ **Error:** {str(step_log.error)}", | |
| metadata={"title": "π« Error", "status": "done"} | |
| ) | |
| yield gr.ChatMessage( | |
| role="assistant", | |
| content=get_step_footnote_content(step_log, step_number), | |
| metadata={"status": "done"} | |
| ) | |
| yield gr.ChatMessage(role="assistant", content="---", metadata={"status": "done"}) | |
| def _process_planning_step(step_log: PlanningStep, skip_model_outputs: bool = False): | |
| if not skip_model_outputs: | |
| yield gr.ChatMessage( | |
| role="assistant", | |
| content=step_log.plan, | |
| metadata={"title": "π§ Planning Phase", "status": "done"} | |
| ) | |
| yield gr.ChatMessage( | |
| role="assistant", | |
| content=get_step_footnote_content(step_log, "Planning Stats"), | |
| metadata={"status": "done"} | |
| ) | |
| yield gr.ChatMessage(role="assistant", content="---", metadata={"status": "done"}) | |
| def _process_final_answer_step(step_log: FinalAnswerStep): | |
| """ | |
| Extracts the final answer from the step log, handling multiple possible attributes. | |
| """ | |
| final_answer = None | |
| possible_attrs = ['output', 'answer', 'final_answer'] | |
| for attr in possible_attrs: | |
| if hasattr(step_log, attr): | |
| final_answer = getattr(step_log, attr) | |
| if final_answer: | |
| break | |
| if final_answer is None: | |
| final_answer = str(step_log) | |
| match = re.search(r"output=(['\"])(.*?)\1\)", final_answer, re.DOTALL) | |
| if match: | |
| final_answer = match.group(2).encode('utf-8').decode('unicode_escape') | |
| content = final_answer | |
| if hasattr(content, 'to_string'): | |
| content = content.to_string() | |
| else: | |
| content = str(content) | |
| yield gr.ChatMessage( | |
| role="assistant", | |
| content=f"π **Final Answer**\n\n{content}", | |
| metadata={"status": "done"} | |
| ) | |
| def pull_messages_from_step(step_log, skip_model_outputs=False): | |
| if isinstance(step_log, PlanningStep): | |
| yield from _process_planning_step(step_log, skip_model_outputs) | |
| elif isinstance(step_log, ActionStep): | |
| yield from _process_action_step(step_log, skip_model_outputs) | |
| elif isinstance(step_log, FinalAnswerStep): | |
| yield from _process_final_answer_step(step_log) | |
| def stream_to_gradio(agent, task: str, reset_agent_memory: bool = False): | |
| """Main generator function for the Chat Interface""" | |
| for event in agent.run(task, stream=True, max_steps=10, reset=reset_agent_memory): | |
| if isinstance(event, (ActionStep, PlanningStep, FinalAnswerStep)): | |
| for message in pull_messages_from_step(event): | |
| yield message | |
| elif isinstance(event, ChatMessageStreamDelta): | |
| if event.content: | |
| yield event.content |