Spaces:
Running
Running
import os | |
import asyncio | |
import logging | |
import threading | |
import queue | |
import gradio as gr | |
import httpx | |
from typing import Generator, Any, Dict, List, Optional | |
from functools import lru_cache | |
# -------------------- Configuration -------------------- | |
logging.basicConfig( | |
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
# -------------------- External Model Call (with Caching and Retry) -------------------- | |
async def call_model(prompt: str, model: str = "gpt-4o", api_key: str = None, max_retries: int = 3) -> str: | |
""" | |
Sends a prompt to the OpenAI API endpoint with retries and exponential backoff. | |
""" | |
if api_key is None: | |
api_key = os.getenv("OPENAI_API_KEY") | |
if api_key is None: | |
raise ValueError("OpenAI API key not found.") | |
url = "https://api.openai.com/v1/chat/completions" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json", | |
} | |
payload = {"model": model, "messages": [{"role": "user", "content": prompt}]} | |
for attempt in range(max_retries): | |
try: | |
async with httpx.AsyncClient(timeout=httpx.Timeout(300.0)) as client: | |
response = await client.post(url, headers=headers, json=payload) | |
response.raise_for_status() | |
response_json = response.json() # Synchronous parsing is acceptable here | |
return response_json["choices"][0]["message"]["content"] | |
except httpx.HTTPStatusError as e: | |
logging.error(f"HTTP error (attempt {attempt + 1}/{max_retries}): {e}") | |
if e.response.status_code in (502, 503, 504): | |
await asyncio.sleep(2 ** attempt) | |
continue | |
else: | |
raise | |
except httpx.RequestError as e: | |
logging.error(f"Request error (attempt {attempt + 1}/{max_retries}): {e}") | |
await asyncio.sleep(2 ** attempt) | |
continue | |
except Exception as e: | |
logging.error(f"Unexpected error (attempt {attempt+1}/{max_retries}): {e}") | |
raise | |
raise Exception(f"Failed to get response from OpenAI API after {max_retries} attempts.") | |
# -------------------- Shared Context -------------------- | |
class Context: | |
def __init__(self, original_task: str, optimized_task: Optional[str] = None, | |
plan: Optional[str] = None, code: Optional[str] = None, | |
review_comments: Optional[List[Dict[str, str]]] = None, | |
test_cases: Optional[str] = None, test_results: Optional[str] = None, | |
documentation: Optional[str] = None, conversation_history: Optional[List[Dict[str, str]]] = None): | |
self.original_task = original_task | |
self.optimized_task = optimized_task | |
self.plan = plan | |
self.code = code | |
self.review_comments = review_comments or [] | |
self.test_cases = test_cases | |
self.test_results = test_results | |
self.documentation = documentation | |
self.conversation_history = conversation_history or [] | |
def add_conversation_entry(self, agent_name: str, message: str): | |
self.conversation_history.append({"agent": agent_name, "message": message}) | |
# -------------------- Agent Classes -------------------- | |
class PromptOptimizerAgent: | |
async def optimize_prompt(self, context: Context, api_key: str) -> Context: | |
""" | |
Optimizes the user’s original prompt. | |
""" | |
system_prompt = ( | |
"Improve the prompt. Be clear, specific, and complete. " | |
"Keep original intent. Return ONLY the revised prompt." | |
) | |
full_prompt = f"{system_prompt}\n\nUser's prompt:\n{context.original_task}" | |
optimized = await call_model(full_prompt, model="gpt-4o", api_key=api_key) | |
context.optimized_task = optimized | |
context.add_conversation_entry("Prompt Optimizer", f"Optimized Task:\n{optimized}") | |
return context | |
class OrchestratorAgent: | |
def __init__(self, log_queue: queue.Queue, human_event: threading.Event, human_input_queue: queue.Queue) -> None: | |
self.log_queue = log_queue | |
self.human_event = human_event | |
self.human_input_queue = human_input_queue | |
async def generate_plan(self, context: Context, api_key: str) -> Context: | |
""" | |
Generates (or revises) a plan using human feedback if necessary. | |
Uses an iterative approach instead of recursion. | |
""" | |
while True: | |
if context.plan: | |
prompt = ( | |
f"You are a planner. Revise/complete the plan for '{context.original_task}' using feedback:\n" | |
f"{context.plan}\n\n" | |
"If unsure, output 'REQUEST_HUMAN_FEEDBACK\\n[Question]'" | |
) | |
else: | |
prompt = ( | |
f"You are a planner. Create a plan for: '{context.optimized_task}'. " | |
"Break down the task and assign sub-tasks to: Coder, Code Reviewer, Quality Assurance Tester, and Documentation Agent. " | |
"Include review/revision steps, error handling, and documentation instructions.\n\n" | |
"If unsure, output 'REQUEST_HUMAN_FEEDBACK\\n[Question]'" | |
) | |
plan = await call_model(prompt, model="gpt-4o", api_key=api_key) | |
context.add_conversation_entry("Orchestrator", f"Plan:\n{plan}") | |
# Check if human feedback is requested. | |
if "REQUEST_HUMAN_FEEDBACK" in plan: | |
question = plan.split("REQUEST_HUMAN_FEEDBACK\n", 1)[1].strip() | |
self.log_queue.put("[Orchestrator]: Requesting human feedback...") | |
self.log_queue.put(f"[Orchestrator]: Question for human: {question}") | |
# Prepare feedback context and trigger the human feedback event. | |
feedback_request_context = ( | |
f"The orchestrator agent is requesting feedback on the following task:\n" | |
f"**{context.optimized_task}**\n\n" | |
f"Current plan:\n**{context.plan or 'None'}**\n\n" | |
f"Question:\n**{question}**" | |
) | |
self.human_event.set() | |
# Pass the context to the human input handler. | |
self.human_input_queue.put(feedback_request_context) | |
human_response = self.human_input_queue.get() # Blocking call for human response. | |
self.human_event.clear() | |
self.log_queue.put(f"[Orchestrator]: Received human feedback: {human_response}") | |
# Incorporate human feedback into the plan and loop again. | |
context.plan = context.plan + "\n" + human_response if context.plan else human_response | |
else: | |
context.plan = plan | |
break # Exit loop when no feedback is requested. | |
return context | |
class CoderAgent: | |
async def generate_code(self, context: Context, api_key: str, model: str = "gpt-4o") -> Context: | |
""" | |
Generates code based on the provided plan. | |
""" | |
prompt = ( | |
"You are a coding agent. Output ONLY the code. " | |
"Adhere to best practices and include error handling.\n\n" | |
f"Instructions:\n{context.plan}" | |
) | |
code = await call_model(prompt, model=model, api_key=api_key) | |
context.code = code | |
context.add_conversation_entry("Coder", f"Code:\n{code}") | |
return context | |
class CodeReviewerAgent: | |
async def review_code(self, context: Context, api_key: str) -> Context: | |
""" | |
Reviews the generated code and returns either actionable feedback or 'APPROVE'. | |
""" | |
prompt = ( | |
"You are a code reviewer. Provide CONCISE feedback focusing on correctness, efficiency, readability, error handling, and security. " | |
"If the code is acceptable, respond with ONLY 'APPROVE'. Do NOT generate code.\n\n" | |
f"Task: {context.optimized_task}\n\nCode:\n{context.code}" | |
) | |
review = await call_model(prompt, model="gpt-4o", api_key=api_key) | |
context.add_conversation_entry("Code Reviewer", f"Review:\n{review}") | |
# Check for approval; if not approved, parse feedback. | |
if "APPROVE" not in review.upper(): | |
structured_review = {"comments": []} | |
for line in review.splitlines(): | |
if line.strip(): | |
structured_review["comments"].append({ | |
"issue": line.strip(), | |
"line_number": "N/A", | |
"severity": "Medium" | |
}) | |
context.review_comments.append(structured_review) | |
return context | |
class QualityAssuranceTesterAgent: | |
async def generate_test_cases(self, context: Context, api_key: str) -> Context: | |
""" | |
Generates test cases considering edge and error cases. | |
""" | |
prompt = ( | |
"You are a testing agent. Generate comprehensive test cases considering edge cases and error scenarios. " | |
"Output in a clear format.\n\n" | |
f"Task: {context.optimized_task}\n\nCode:\n{context.code}" | |
) | |
test_cases = await call_model(prompt, model="gpt-4o", api_key=api_key) | |
context.test_cases = test_cases | |
context.add_conversation_entry("QA Tester", f"Test Cases:\n{test_cases}") | |
return context | |
async def run_tests(self, context: Context, api_key: str) -> Context: | |
""" | |
Runs the generated test cases and compares expected vs. actual outcomes. | |
""" | |
prompt = ( | |
"Run the test cases. Compare actual vs expected outputs and state any discrepancies. " | |
"If all tests pass, output 'TESTS PASSED'.\n\n" | |
f"Code:\n{context.code}\n\nTest Cases:\n{context.test_cases}" | |
) | |
test_results = await call_model(prompt, model="gpt-4o", api_key=api_key) | |
context.test_results = test_results | |
context.add_conversation_entry("QA Tester", f"Test Results:\n{test_results}") | |
return context | |
class DocumentationAgent: | |
async def generate_documentation(self, context: Context, api_key: str) -> Context: | |
""" | |
Generates concise documentation including a --help message. | |
""" | |
prompt = ( | |
"Generate clear documentation including a brief description, explanation, and a --help message.\n\n" | |
f"Code:\n{context.code}" | |
) | |
documentation = await call_model(prompt, model="gpt-4o", api_key=api_key) | |
context.documentation = documentation | |
context.add_conversation_entry("Documentation Agent", f"Documentation:\n{documentation}") | |
return context | |
# -------------------- Agent Dispatcher -------------------- | |
class AgentDispatcher: | |
def __init__(self, log_queue: queue.Queue, human_event: threading.Event, human_input_queue: queue.Queue): | |
self.log_queue = log_queue | |
self.human_event = human_event | |
self.human_input_queue = human_input_queue | |
self.agents = { | |
"prompt_optimizer": PromptOptimizerAgent(), | |
"orchestrator": OrchestratorAgent(log_queue, human_event, human_input_queue), | |
"coder": CoderAgent(), | |
"code_reviewer": CodeReviewerAgent(), | |
"qa_tester": QualityAssuranceTesterAgent(), | |
"documentation_agent": DocumentationAgent(), | |
} | |
async def dispatch(self, agent_name: str, context: Context, api_key: str, **kwargs) -> Context: | |
""" | |
Dispatches the task to the specified agent. | |
""" | |
agent = self.agents.get(agent_name) | |
if not agent: | |
raise ValueError(f"Unknown agent: {agent_name}") | |
self.log_queue.put(f"[{agent_name.replace('_', ' ').title()}]: Starting task...") | |
if agent_name == "prompt_optimizer": | |
context = await agent.optimize_prompt(context, api_key) | |
elif agent_name == "orchestrator": | |
context = await agent.generate_plan(context, api_key) | |
elif agent_name == "coder": | |
context = await agent.generate_code(context, api_key, **kwargs) | |
elif agent_name == "code_reviewer": | |
context = await agent.review_code(context, api_key) | |
elif agent_name == "qa_tester": | |
if kwargs.get("generate_tests", False): | |
context = await agent.generate_test_cases(context, api_key) | |
elif kwargs.get("run_tests", False): | |
context = await agent.run_tests(context, api_key) | |
elif agent_name == "documentation_agent": | |
context = await agent.generate_documentation(context, api_key) | |
else: | |
raise ValueError(f"Unknown Agent Name: {agent_name}") | |
return context | |
async def determine_next_agent(self, context: Context, api_key: str) -> str: | |
""" | |
Determines the next agent to run based on the current context. | |
""" | |
if not context.optimized_task: | |
return "prompt_optimizer" | |
if not context.plan: | |
return "orchestrator" | |
if not context.code: | |
return "coder" | |
# Check if any review comment lacks an APPROVE. | |
if not any( | |
"APPROVE" in comment.get("issue", "").upper() | |
for review in context.review_comments | |
for comment in review.get("comments", []) | |
): | |
return "code_reviewer" | |
if not context.test_cases: | |
return "qa_tester" | |
if not context.test_results or "TESTS PASSED" not in context.test_results.upper(): | |
return "qa_tester" | |
if not context.documentation: | |
return "documentation_agent" | |
return "done" # All tasks are complete | |
# -------------------- Multi-Agent Conversation -------------------- | |
async def multi_agent_conversation(task_message: str, log_queue: queue.Queue, api_key: str, | |
human_event: threading.Event, human_input_queue: queue.Queue) -> None: | |
""" | |
Orchestrates the multi-agent conversation. | |
""" | |
context = Context(original_task=task_message) | |
dispatcher = AgentDispatcher(log_queue, human_event, human_input_queue) | |
next_agent = await dispatcher.determine_next_agent(context, api_key) | |
# Prevent endless revisions by tracking coder iterations. | |
coder_iterations = 0 | |
while next_agent != "done": | |
if next_agent == "qa_tester": | |
if not context.test_cases: | |
context = await dispatcher.dispatch(next_agent, context, api_key, generate_tests=True) | |
else: | |
context = await dispatcher.dispatch(next_agent, context, api_key, run_tests=True) | |
elif next_agent == "coder" and (context.review_comments or context.test_results): | |
coder_iterations += 1 | |
# Switch to a different model after the first iteration. | |
context = await dispatcher.dispatch(next_agent, context, api_key, model="gpt-3.5-turbo-16k") | |
else: | |
context = await dispatcher.dispatch(next_agent, context, api_key) | |
# Check for approval in code review if applicable. | |
if next_agent == "code_reviewer": | |
approved = any( | |
"APPROVE" in comment.get("issue", "").upper() | |
for review in context.review_comments | |
for comment in review.get("comments", []) | |
) | |
if not approved: | |
# If not approved, we continue with coder to improve the code. | |
next_agent = "coder" | |
else: | |
next_agent = await dispatcher.determine_next_agent(context, api_key) | |
else: | |
next_agent = await dispatcher.determine_next_agent(context, api_key) | |
if next_agent == "coder" and coder_iterations > 5: | |
log_queue.put("Maximum revision iterations reached. Exiting.") | |
break | |
log_queue.put("Conversation complete.") | |
log_queue.put(("result", context.conversation_history)) | |
# -------------------- Process Generator and Human Input -------------------- | |
def process_conversation_generator(task_message: str, api_key: str, | |
human_event: threading.Event, human_input_queue: queue.Queue, | |
log_queue: queue.Queue) -> Generator[str, None, None]: | |
""" | |
Runs the conversation and yields log messages. | |
""" | |
# Run the conversation asynchronously. | |
asyncio.run(multi_agent_conversation(task_message, log_queue, api_key, human_event, human_input_queue)) | |
final_result = None | |
while True: | |
try: | |
msg = log_queue.get_nowait() | |
if isinstance(msg, tuple) and msg[0] == "result": | |
final_result = msg[1] | |
yield gr.Chatbot.update(value=final_result, visible=True) | |
yield "Conversation complete." | |
break | |
else: | |
yield msg | |
except queue.Empty: | |
pass | |
# If human feedback is requested, yield an appropriate message. | |
if human_event.is_set(): | |
yield "Waiting for human feedback..." | |
# Use a short asynchronous sleep to avoid busy-waiting. | |
asyncio.run(asyncio.sleep(0.1)) | |
def get_human_feedback(placeholder_text: str, human_input_queue: queue.Queue) -> gr.Blocks: | |
""" | |
Constructs the Gradio interface to collect human feedback. | |
""" | |
with gr.Blocks() as human_feedback_interface: | |
with gr.Row(): | |
human_input = gr.Textbox(lines=4, label="Human Feedback", placeholder=placeholder_text) | |
with gr.Row(): | |
submit_button = gr.Button("Submit Feedback") | |
def submit_feedback(input_text: str): | |
human_input_queue.put(input_text) | |
return "" | |
submit_button.click(fn=submit_feedback, inputs=human_input, outputs=human_input) | |
return human_feedback_interface | |
# -------------------- Chat Function for Gradio -------------------- | |
def multi_agent_chat(message: str, history: List[Any], openai_api_key: str = None) -> Generator[Any, None, None]: | |
""" | |
Gradio chat function that runs the multi-agent conversation. | |
""" | |
if not openai_api_key: | |
openai_api_key = os.getenv("OPENAI_API_KEY") | |
if not openai_api_key: | |
yield "Error: API key not provided." | |
return | |
human_event = threading.Event() | |
human_input_queue = queue.Queue() | |
log_queue = queue.Queue() | |
yield from process_conversation_generator(message, openai_api_key, human_event, human_input_queue, log_queue) | |
# -------------------- Launch the Chatbot -------------------- | |
iface = gr.ChatInterface( | |
fn=multi_agent_chat, | |
chatbot=gr.Chatbot(type="messages"), | |
additional_inputs=[ | |
gr.Textbox(label="OpenAI API Key (optional)", type="password", placeholder="Leave blank to use env variable") | |
], | |
title="Multi-Agent Task Solver with Human-in-the-Loop", | |
description=( | |
"- Collaborative workflow with Human-in-the-Loop.\n" | |
"- Orchestrator can ask for human feedback.\n" | |
"- Enter a task; agents will work on it. You may be prompted for input.\n" | |
"- Max 5 revisions.\n" | |
"- Provide API Key." | |
) | |
) | |
# Dummy interface to prevent Gradio errors. | |
dummy_iface = gr.Interface(lambda x: x, "textbox", "textbox") | |
if __name__ == "__main__": | |
demo = gr.TabbedInterface([iface, dummy_iface], ["Chatbot", "Dummy"]) | |
demo.launch(share=True) | |