Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| from gradio import ChatInterface, Request | |
| from gradio.helpers import special_args | |
| import anyio | |
| import os | |
| import threading | |
| import sys | |
| from itertools import chain | |
| import autogen | |
| from autogen.code_utils import extract_code | |
| from autogen import UserProxyAgent, AssistantAgent, Agent, OpenAIWrapper | |
| LOG_LEVEL = "INFO" | |
| TIMEOUT = 60 | |
| class myChatInterface(ChatInterface): | |
| async def _submit_fn( | |
| self, | |
| message: str, | |
| history_with_input: list[list[str | None]], | |
| request: Request, | |
| *args, | |
| ) -> tuple[list[list[str | None]], list[list[str | None]]]: | |
| history = history_with_input[:-1] | |
| inputs, _, _ = special_args( | |
| self.fn, inputs=[message, history, *args], request=request | |
| ) | |
| if self.is_async: | |
| response = await self.fn(*inputs) | |
| else: | |
| response = await anyio.to_thread.run_sync( | |
| self.fn, *inputs, limiter=self.limiter | |
| ) | |
| # history.append([message, response]) | |
| return history, history | |
| with gr.Blocks() as demo: | |
| def flatten_chain(list_of_lists): | |
| return list(chain.from_iterable(list_of_lists)) | |
| class thread_with_trace(threading.Thread): | |
| # https://www.geeksforgeeks.org/python-different-ways-to-kill-a-thread/ | |
| # https://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread | |
| def __init__(self, *args, **keywords): | |
| threading.Thread.__init__(self, *args, **keywords) | |
| self.killed = False | |
| self._return = None | |
| def start(self): | |
| self.__run_backup = self.run | |
| self.run = self.__run | |
| threading.Thread.start(self) | |
| def __run(self): | |
| sys.settrace(self.globaltrace) | |
| self.__run_backup() | |
| self.run = self.__run_backup | |
| def run(self): | |
| if self._target is not None: | |
| self._return = self._target(*self._args, **self._kwargs) | |
| def globaltrace(self, frame, event, arg): | |
| if event == "call": | |
| return self.localtrace | |
| else: | |
| return None | |
| def localtrace(self, frame, event, arg): | |
| if self.killed: | |
| if event == "line": | |
| raise SystemExit() | |
| return self.localtrace | |
| def kill(self): | |
| self.killed = True | |
| def join(self, timeout=0): | |
| threading.Thread.join(self, timeout) | |
| return self._return | |
| def update_agent_history(recipient, messages, sender, config): | |
| if config is None: | |
| config = recipient | |
| if messages is None: | |
| messages = recipient._oai_messages[sender] | |
| message = messages[-1] | |
| msg = message.get("content", "") | |
| # config.append(msg) if msg is not None else None # config can be agent_history | |
| return False, None # required to ensure the agent communication flow continues | |
| def _is_termination_msg(message): | |
| """Check if a message is a termination message. | |
| Terminate when no code block is detected. Currently only detect python code blocks. | |
| """ | |
| if isinstance(message, dict): | |
| message = message.get("content") | |
| if message is None: | |
| return False | |
| cb = extract_code(message) | |
| contain_code = False | |
| for c in cb: | |
| # todo: support more languages | |
| if c[0] == "python": | |
| contain_code = True | |
| break | |
| return not contain_code | |
| def initialize_agents(config_list): | |
| assistant = AssistantAgent( | |
| name="assistant", | |
| max_consecutive_auto_reply=10, | |
| llm_config={ | |
| # "seed": 42, | |
| "timeout": TIMEOUT, | |
| "config_list": config_list, | |
| }, | |
| ) | |
| userproxy = UserProxyAgent( | |
| name="userproxy", | |
| human_input_mode="NEVER", | |
| is_termination_msg=_is_termination_msg, | |
| max_consecutive_auto_reply=10, | |
| # code_execution_config=False, | |
| code_execution_config={ | |
| "work_dir": "coding", | |
| "use_docker": False, # set to True or image name like "python:3" to use docker | |
| }, | |
| ) | |
| # assistant.register_reply([Agent, None], update_agent_history) | |
| # userproxy.register_reply([Agent, None], update_agent_history) | |
| return assistant, userproxy | |
| def chat_to_oai_message(chat_history): | |
| """Convert chat history to OpenAI message format.""" | |
| messages = [] | |
| if LOG_LEVEL == "DEBUG": | |
| print(f"chat_to_oai_message: {chat_history}") | |
| for msg in chat_history: | |
| messages.append( | |
| { | |
| "content": msg[0].split()[0] | |
| if msg[0].startswith("exitcode") | |
| else msg[0], | |
| "role": "user", | |
| } | |
| ) | |
| messages.append({"content": msg[1], "role": "assistant"}) | |
| return messages | |
| def oai_message_to_chat(oai_messages, sender): | |
| """Convert OpenAI message format to chat history.""" | |
| chat_history = [] | |
| messages = oai_messages[sender] | |
| if LOG_LEVEL == "DEBUG": | |
| print(f"oai_message_to_chat: {messages}") | |
| for i in range(0, len(messages), 2): | |
| chat_history.append( | |
| [ | |
| messages[i]["content"], | |
| messages[i + 1]["content"] if i + 1 < len(messages) else "", | |
| ] | |
| ) | |
| return chat_history | |
| def agent_history_to_chat(agent_history): | |
| """Convert agent history to chat history.""" | |
| chat_history = [] | |
| for i in range(0, len(agent_history), 2): | |
| chat_history.append( | |
| [ | |
| agent_history[i], | |
| agent_history[i + 1] if i + 1 < len(agent_history) else None, | |
| ] | |
| ) | |
| return chat_history | |
| def initiate_chat(config_list, user_message, chat_history): | |
| if LOG_LEVEL == "DEBUG": | |
| print(f"chat_history_init: {chat_history}") | |
| # agent_history = flatten_chain(chat_history) | |
| if len(config_list[0].get("api_key", "")) < 2: | |
| chat_history.append( | |
| [ | |
| user_message, | |
| "Hi, nice to meet you! Please enter your API keys in below text boxs.", | |
| ] | |
| ) | |
| return chat_history | |
| else: | |
| llm_config = { | |
| # "seed": 42, | |
| "timeout": TIMEOUT, | |
| "config_list": config_list, | |
| } | |
| assistant.llm_config.update(llm_config) | |
| assistant.client = OpenAIWrapper(**assistant.llm_config) | |
| if user_message.strip().lower().startswith("show file:"): | |
| filename = user_message.strip().lower().replace("show file:", "").strip() | |
| filepath = os.path.join("coding", filename) | |
| if os.path.exists(filepath): | |
| chat_history.append([user_message, (filepath,)]) | |
| else: | |
| chat_history.append([user_message, f"File {filename} not found."]) | |
| return chat_history | |
| assistant.reset() | |
| oai_messages = chat_to_oai_message(chat_history) | |
| assistant._oai_system_message_origin = assistant._oai_system_message.copy() | |
| assistant._oai_system_message += oai_messages | |
| try: | |
| userproxy.initiate_chat(assistant, message=user_message) | |
| messages = userproxy.chat_messages | |
| chat_history += oai_message_to_chat(messages, assistant) | |
| # agent_history = flatten_chain(chat_history) | |
| except Exception as e: | |
| # agent_history += [user_message, str(e)] | |
| # chat_history[:] = agent_history_to_chat(agent_history) | |
| chat_history.append([user_message, str(e)]) | |
| assistant._oai_system_message = assistant._oai_system_message_origin.copy() | |
| if LOG_LEVEL == "DEBUG": | |
| print(f"chat_history: {chat_history}") | |
| # print(f"agent_history: {agent_history}") | |
| return chat_history | |
| def chatbot_reply_thread(input_text, chat_history, config_list): | |
| """Chat with the agent through terminal.""" | |
| thread = thread_with_trace( | |
| target=initiate_chat, args=(config_list, input_text, chat_history) | |
| ) | |
| thread.start() | |
| try: | |
| messages = thread.join(timeout=TIMEOUT) | |
| if thread.is_alive(): | |
| thread.kill() | |
| thread.join() | |
| messages = [ | |
| input_text, | |
| "Timeout Error: Please check your API keys and try again later.", | |
| ] | |
| except Exception as e: | |
| messages = [ | |
| [ | |
| input_text, | |
| str(e) | |
| if len(str(e)) > 0 | |
| else "Invalid Request to OpenAI, please check your API keys.", | |
| ] | |
| ] | |
| return messages | |
| def chatbot_reply_plain(input_text, chat_history, config_list): | |
| """Chat with the agent through terminal.""" | |
| try: | |
| messages = initiate_chat(config_list, input_text, chat_history) | |
| except Exception as e: | |
| messages = [ | |
| [ | |
| input_text, | |
| str(e) | |
| if len(str(e)) > 0 | |
| else "Invalid Request to OpenAI, please check your API keys.", | |
| ] | |
| ] | |
| return messages | |
| def chatbot_reply(input_text, chat_history, config_list): | |
| """Chat with the agent through terminal.""" | |
| return chatbot_reply_thread(input_text, chat_history, config_list) | |
| def get_description_text(): | |
| return """ | |
| # Microsoft AutoGen: Multi-Round Human Interaction Chatbot Demo | |
| This demo shows how to build a chatbot which can handle multi-round conversations with human interactions. | |
| #### [AutoGen](https://github.com/microsoft/autogen) [Discord](https://discord.gg/pAbnFJrkgZ) [Paper](https://arxiv.org/abs/2308.08155) [SourceCode](https://github.com/thinkall/autogen-demos) | |
| """ | |
| def update_config(): | |
| config_list = autogen.config_list_from_models( | |
| model_list=[os.environ.get("MODEL", "gpt-35-turbo")], | |
| ) | |
| if not config_list: | |
| config_list = [ | |
| { | |
| "api_key": "", | |
| "base_url": "", | |
| "api_type": "azure", | |
| "api_version": "2023-07-01-preview", | |
| "model": "gpt-35-turbo", | |
| } | |
| ] | |
| return config_list | |
| def set_params(model, oai_key, aoai_key, aoai_base): | |
| os.environ["MODEL"] = model | |
| os.environ["OPENAI_API_KEY"] = oai_key | |
| os.environ["AZURE_OPENAI_API_KEY"] = aoai_key | |
| os.environ["AZURE_OPENAI_API_BASE"] = aoai_base | |
| def respond(message, chat_history, model, oai_key, aoai_key, aoai_base): | |
| set_params(model, oai_key, aoai_key, aoai_base) | |
| config_list = update_config() | |
| chat_history[:] = chatbot_reply(message, chat_history, config_list) | |
| if LOG_LEVEL == "DEBUG": | |
| print(f"return chat_history: {chat_history}") | |
| return "" | |
| config_list, assistant, userproxy = ( | |
| [ | |
| { | |
| "api_key": "", | |
| "base_url": "", | |
| "api_type": "azure", | |
| "api_version": "2023-07-01-preview", | |
| "model": "gpt-35-turbo", | |
| } | |
| ], | |
| None, | |
| None, | |
| ) | |
| assistant, userproxy = initialize_agents(config_list) | |
| description = gr.Markdown(get_description_text()) | |
| with gr.Row() as params: | |
| txt_model = gr.Dropdown( | |
| label="Model", | |
| choices=[ | |
| "gpt-4", | |
| "gpt-35-turbo", | |
| "gpt-3.5-turbo", | |
| ], | |
| allow_custom_value=True, | |
| value="gpt-35-turbo", | |
| container=True, | |
| ) | |
| txt_oai_key = gr.Textbox( | |
| label="OpenAI API Key", | |
| placeholder="Enter OpenAI API Key", | |
| max_lines=1, | |
| show_label=True, | |
| container=True, | |
| type="password", | |
| ) | |
| txt_aoai_key = gr.Textbox( | |
| label="Azure OpenAI API Key", | |
| placeholder="Enter Azure OpenAI API Key", | |
| max_lines=1, | |
| show_label=True, | |
| container=True, | |
| type="password", | |
| ) | |
| txt_aoai_base_url = gr.Textbox( | |
| label="Azure OpenAI API Base", | |
| placeholder="Enter Azure OpenAI Base Url", | |
| max_lines=1, | |
| show_label=True, | |
| container=True, | |
| type="password", | |
| ) | |
| chatbot = gr.Chatbot( | |
| [], | |
| elem_id="chatbot", | |
| bubble_full_width=False, | |
| avatar_images=( | |
| "human.png", | |
| (os.path.join(os.path.dirname(__file__), "autogen.png")), | |
| ), | |
| render=False, | |
| height=600, | |
| ) | |
| txt_input = gr.Textbox( | |
| scale=4, | |
| show_label=False, | |
| placeholder="Enter text and press enter", | |
| container=False, | |
| render=False, | |
| autofocus=True, | |
| ) | |
| chatiface = myChatInterface( | |
| respond, | |
| chatbot=chatbot, | |
| textbox=txt_input, | |
| additional_inputs=[ | |
| txt_model, | |
| txt_oai_key, | |
| txt_aoai_key, | |
| txt_aoai_base_url, | |
| ], | |
| examples=[ | |
| ["write a python function to count the sum of two numbers?"], | |
| ["what if the production of two numbers?"], | |
| [ | |
| "Plot a chart of the last year's stock prices of Microsoft, Google and Apple and save to stock_price.png." | |
| ], | |
| ["show file: stock_price.png"], | |
| ], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(share=True, server_name="0.0.0.0") | |