Sonu313131's picture
Update app.py
9651da6 verified
raw
history blame
10.3 kB
import os
import gradio as gr
import requests
import inspect
import pandas as pd
import asyncio
from smolagents import ToolCallingAgent, InferenceClientModel, OpenAIServerModel
from smolagents import DuckDuckGoSearchTool, Tool, CodeAgent
from huggingface_hub import login
#h
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
#login(token=os.environ["HUGGINGFACEHUB_API_TOKEN"])
openai_key = os.environ.get("OPENAI_API_KEY")
search_tool = DuckDuckGoSearchTool()
##Tool 2
from smolagents import Tool
from huggingface_hub import hf_hub_download
import pandas as pd
class ExcelAnalysisTool(Tool):
name = "excel_analysis"
description = (
"Loads an Excel file from the GAIA dataset on Hugging Face and calculates "
"the total sales for items labeled as 'food', excluding drinks. "
"Provide input as a string with the filename, e.g., 'sales_data.xlsx'."
)
inputs = {
"filename": {
"type": "string",
"description": "The name of the Excel file (e.g., 'sales_data.xlsx')"
}
}
output_type = "string"
repo_id = "gaia-benchmark/GAIA"
def forward(self, filename: str) -> str:
try:
file_path = hf_hub_download(
repo_id=self.repo_id,
filename=filename,
repo_type="dataset"
)
df = pd.read_excel(file_path)
food_sales = df[
(df['category'].str.lower() == 'food') &
(df['item'].str.lower() != 'drinks')
]
total_sales = food_sales['sales'].sum()
return f"Total sales for food items: ${total_sales:.2f}"
except FileNotFoundError:
return "Error: The specified file was not found."
except KeyError as e:
return f"Error: Missing expected column in the Excel file: {str(e)}"
except Exception as e:
return f"An unexpected error occurred: {str(e)}"
##Tool 3
import wikipedia
from smolagents import Tool
class WikiTool(Tool):
name = "wiki_tool"
description = (
"Performs Wikipedia lookups. Actions supported: 'summary' and 'is_historical_country'."
)
inputs = {
"action": {
"type": "string",
"description": "The action to perform: 'summary' or 'is_historical_country'"
},
"topic": {
"type": "string",
"description": "The topic or country name to look up"
}
}
output_type = "string"
def forward(self, action: str, topic: str) -> str:
if action == "summary":
return self.fetch_summary(topic)
elif action == "is_historical_country":
return self.is_historical_country(topic)
else:
return "Error: Unknown action. Use 'summary' or 'is_historical_country'."
def fetch_summary(self, topic: str) -> str:
try:
return wikipedia.summary(topic, sentences=3)
except wikipedia.DisambiguationError as e:
return f"Disambiguation: {e.options[:5]}"
except wikipedia.PageError:
return "No page found."
except Exception as e:
return f"Unexpected error: {str(e)}"
def is_historical_country(self, topic: str) -> str:
try:
summary = wikipedia.summary(topic, sentences=2).lower()
keywords = [
"former country", "no longer exists", "historical country",
"was a country", "defunct", "dissolved", "existed until",
"disestablished", "merged into"
]
return "yes" if any(k in summary for k in keywords) else "no"
except:
return "no"
wiki_tool = WikiTool()
excel_tool = ExcelAnalysisTool()
async def run_and_submit_all(profile: gr.OAuthProfile | None):
log_output = ""
try:
agent = ToolCallingAgent(
tools=[search_tool, wiki_tool, excel_tool],
model=OpenAIServerModel(
model_id="gpt-4o", # βœ… valid OpenAI model name
api_key=os.environ["OPENAI_API_KEY"] # βœ… securely load from environment
),
max_steps=20,
verbosity_level=2
)
except Exception as e:
yield f"Error initializing agent: {e}", None, log_output
return
space_id = os.getenv("SPACE_ID")
agent_code = f"https://huggingface.co/spaces/{space_id}/tree/main"
questions_url = f"{DEFAULT_API_URL}/questions"
try:
response = requests.get(questions_url, timeout=15)
response.raise_for_status()
questions_data = response.json()
if not questions_data:
yield "Fetched questions list is empty or invalid format.", None, log_output
return
except Exception as e:
yield f"Error fetching questions: {e}", None, log_output
return
results_log = []
answers_payload = []
loop = asyncio.get_event_loop()
for item in questions_data:
task_id = item.get("task_id")
question_text = item.get("question")
if not task_id or question_text is None:
continue
log_output += f"πŸ” Solving Task ID: {task_id}...\n"
yield None, None, log_output
try:
system_prompt = (
"""You must only reply with a single line:
FINAL ANSWER: [your answer]
Never include reasoning, markdown, Task Outcome, Explanation, or examples.
NEVER use numbered points or extra formatting.
If your answer is a string, write it in lowercase, no articles, no quotes.
If your answer is a number, use digits only. If the answer is "no one" or "none", write exactly that.
DO NOT provide any explanation or context. Just the line: FINAL ANSWER: ...
"""
)
full_prompt = system_prompt + f"Question: {question_text.strip()}"
agent_result = await loop.run_in_executor(None, agent, full_prompt)
# Extract final answer cleanly
if isinstance(agent_result, dict) and "final_answer" in agent_result:
final_answer = str(agent_result["final_answer"]).strip()
elif isinstance(agent_result, str):
response_text = agent_result.strip()
# Remove known boilerplate
if "Here is the final answer from your managed agent" in response_text:
response_text = response_text.split(":", 1)[-1].strip()
if "FINAL ANSWER:" in response_text:
_, final_answer = response_text.rsplit("FINAL ANSWER:", 1)
final_answer = final_answer.strip()
else:
final_answer = response_text
else:
final_answer = str(agent_result).strip()
answers_payload.append({
"task_id": task_id,
"submitted_answer": final_answer
})
results_log.append({
"Task ID": task_id,
"Question": question_text,
"Submitted Answer": final_answer
})
log_output += f"βœ… Done: {task_id} β€” Answer: {final_answer[:60]}\n"
yield None, None, log_output
except Exception as e:
print(f"Error running agent on task {task_id}: {e}")
results_log.append({
"Task ID": task_id,
"Question": question_text,
"Submitted Answer": f"AGENT ERROR: {e}"
})
log_output += f"⛔️ Error: {task_id} β€” {e}\n"
yield None, None, log_output
if not answers_payload:
yield "Agent did not produce any answers to submit.", pd.DataFrame(results_log), log_output
return
username = profile.username if profile else "unknown"
submit_url = f"{DEFAULT_API_URL}/submit"
submission_data = {"username": username.strip(), "agent_code": agent_code, "answers": answers_payload}
try:
response = requests.post(submit_url, json=submission_data, timeout=60)
response.raise_for_status()
result_data = response.json()
final_status = (
f"Submission Successful!\n"
f"User: {result_data.get('username')}\n"
f"Overall Score: {result_data.get('score', 'N/A')}% "
f"({result_data.get('correct_count', '?')}/{result_data.get('total_attempted', '?')} correct)\n"
f"Message: {result_data.get('message', 'No message received.')}"
)
results_df = pd.DataFrame(results_log)
yield final_status, results_df, log_output
except Exception as e:
status_message = f"Submission Failed: {e}"
results_df = pd.DataFrame(results_log)
yield status_message, results_df, log_output
with gr.Blocks() as demo:
gr.Markdown("# Basic Agent Evaluation Runner")
gr.Markdown("""
**Instructions:**
1. Clone this space and define your agent logic.
2. Log in to your Hugging Face account.
3. Click 'Run Evaluation & Submit All Answers'.
---
**Note:**
The run may take time. Async is now used to improve responsiveness.
""")
gr.LoginButton()
run_button = gr.Button("Run Evaluation & Submit All Answers")
status_output = gr.Textbox(label="Run Status / Submission Result", lines=5, interactive=False)
results_table = gr.DataFrame(label="Questions and Agent Answers", wrap=True)
progress_log = gr.Textbox(label="Progress Log", lines=10, interactive=False)
run_button.click(fn=run_and_submit_all, outputs=[status_output, results_table, progress_log])
if __name__ == "__main__":
print("\n" + "-"*30 + " App Starting " + "-"*30)
space_host_startup = os.getenv("SPACE_HOST")
space_id_startup = os.getenv("SPACE_ID")
if space_host_startup:
print(f"βœ… SPACE_HOST: https://{space_host_startup}.hf.space")
if space_id_startup:
print(f"βœ… SPACE_ID: https://huggingface.co/spaces/{space_id_startup}")
print("Launching Gradio Interface...")
demo.launch(debug=True, share=False)