Spaces:
Sleeping
Sleeping
import requests | |
from pydantic import BaseModel, Field | |
from huggingface_hub import InferenceClient | |
from openai import OpenAI | |
from bs4 import BeautifulSoup | |
from markdownify import markdownify as md | |
from langchain_core.tools import tool, Tool | |
from langchain_experimental.utilities import PythonREPL | |
from pypdf import PdfReader | |
from io import BytesIO | |
from youtube_transcript_api import YouTubeTranscriptApi | |
from pytube import extract | |
import pandas as pd, requests, json, tempfile, io, os, re | |
class SpreadsheetInput(BaseModel): | |
file_url: str = Field(..., description="URL of the .xlsx, .xls, or .csv file") | |
sheet: str | int | None = Field( | |
None, | |
description="Sheet name or 0-based index (only for Excel; ignored for CSV)", | |
) | |
# Quick one-shot aggregate (optional) | |
agg: str | None = Field( | |
None, | |
description="Optional quick aggregate of the form COL:FUNC (e.g. 'Sales:sum'). " | |
"Supported FUNC values: sum, mean, count. " | |
"If omitted, the whole sheet is returned as JSON.", | |
) | |
def spreadsheet_tool(file_url: str, sheet: str | int | None = None, | |
agg: str | None = None) -> str: | |
""" | |
Download a spreadsheet/CSV, load into pandas, optionally run a quick aggregate, | |
and return JSON or a scalar string. | |
""" | |
try: | |
# 1 β fetch | |
resp = requests.get(file_url, timeout=30) | |
resp.raise_for_status() | |
suffix = os.path.splitext(file_url.split("?")[0])[-1].lower() | |
# 2 β parse | |
if suffix in {".xlsx", ".xls"}: | |
df = pd.read_excel(io.BytesIO(resp.content), sheet_name=sheet or 0) | |
elif suffix == ".csv": | |
df = pd.read_csv(io.BytesIO(resp.content)) | |
else: | |
return f"Unsupported file type: {suffix}" | |
# 3 β aggregate or full dump | |
if agg: | |
col, func = agg.split(":", 1) | |
col = col.strip() | |
func = func.strip().lower() | |
if func == "sum": | |
result = df[col].sum() | |
elif func == "mean": | |
result = df[col].mean() | |
elif func == "count": | |
result = df[col].count() | |
else: | |
return f"Unsupported aggregate function: {func}" | |
# Return a JSON scalar so the agent can parse easily | |
return json.dumps({"result": float(result)}) | |
# Return full table (records-oriented JSON) | |
return df.to_json(orient="records") | |
except Exception as e: | |
return f"spreadsheet_tool failed: {e}" | |
class HtmlTableInput(BaseModel): | |
url: str = Field(..., description="Web page with an HTML table") | |
table_index: int = Field(0, description="Which table on the page (0-based)") | |
as_markdown: bool = Field(False, description="Return markdown instead of JSON") | |
def html_table_query(url: str, table_index: int = 0, as_markdown: bool = False) -> str: | |
"""Fetch an HTML table and return it as JSON or markdown.""" | |
try: | |
html = requests.get(url, timeout=20).text | |
df = pd.read_html(html)[table_index] | |
return df.to_markdown(index=False) if as_markdown else df.to_json(orient="records") | |
except Exception as e: | |
return f"html_table_query failed: {e}" | |
# --- Basic operations --- # | |
class WikipediaSearchInput(BaseModel): | |
"""Schema for WikipediaSearchTool""" | |
query: str = Field(..., description="The search phrase, e.g. 'Mercedes Sosa discography'") | |
lang: str = Field("en", description="Language code, default English Wikipedia") | |
top_k: int = Field(1, ge=1, le=10, description="How many top pages to fetch and return (1β10)") | |
def wikipedia_search(query: str, lang: str = "en", top_k: int = 2) -> str: | |
""" | |
Search Wikipedia and return the top-k page extracts (intro only) as markdown. | |
Each hit contains: | |
β’ Title (as H2) | |
β’ Source URL | |
β’ Intro/lead section text | |
""" | |
base = f"https://{lang}.wikipedia.org/w/api.php" | |
# STEP 1 β run search | |
try: | |
resp = requests.get( | |
base, | |
params={ | |
"action": "query", | |
"list": "search", | |
"srsearch": query, | |
"srlimit": top_k, | |
"format": "json", | |
"utf8": 1, | |
}, | |
timeout=15, | |
) | |
resp.raise_for_status() | |
hits = resp.json()["query"]["search"] | |
if not hits: | |
return f"No Wikipedia results for **{query}** in `{lang}`." | |
md_blocks = [] | |
pageids = [str(h["pageid"]) for h in hits] | |
# STEP 2 β get plain-text extracts for those pageids | |
extracts = requests.get( | |
base, | |
params={ | |
"action": "query", | |
"prop": "extracts", | |
"explaintext": True, | |
"exintro": True, # only the lead/intro paragraph(s) | |
"pageids": "|".join(pageids), | |
"format": "json", | |
"utf8": 1, | |
}, | |
timeout=15, | |
) | |
extracts.raise_for_status() | |
pages = extracts.json()["query"]["pages"] | |
for pid in pageids: | |
page = pages[pid] | |
title = page["title"] | |
url = f"https://{lang}.wikipedia.org/?curid={pid}" | |
intro = page.get("extract", "").strip() or "_No extract available_" | |
md_blocks.append(f"## {title}\n[{url}]({url})\n\n{intro}") | |
return "\n\n---\n\n".join(md_blocks) | |
except Exception as e: | |
return f"wikipedia_search failed: {e}" | |
def multiply(a: float, b: float) -> float: | |
"""Multiplies two numbers. | |
Args: | |
a (float): the first number | |
b (float): the second number | |
""" | |
return a * b | |
def add(a: float, b: float) -> float: | |
"""Adds two numbers. | |
Args: | |
a (float): the first number | |
b (float): the second number | |
""" | |
return a + b | |
def subtract(a: float, b: float) -> int: | |
"""Subtracts two numbers. | |
Args: | |
a (float): the first number | |
b (float): the second number | |
""" | |
return a - b | |
def divide(a: float, b: float) -> float: | |
"""Divides two numbers. | |
Args: | |
a (float): the first float number | |
b (float): the second float number | |
""" | |
if b == 0: | |
raise ValueError("Cannot divided by zero.") | |
return a / b | |
def modulus(a: int, b: int) -> int: | |
"""Get the modulus of two numbers. | |
Args: | |
a (int): the first number | |
b (int): the second number | |
""" | |
return a % b | |
def power(a: float, b: float) -> float: | |
"""Get the power of two numbers. | |
Args: | |
a (float): the first number | |
b (float): the second number | |
""" | |
return a**b | |
# --- Functions --- # | |
def query_image(query: str, image_url: str, need_reasoning: bool = False) -> str: | |
"""Ask anything about an image using a Vision Language Model | |
Args: | |
query (str): The query about the image, e.g. how many persons are on the image? | |
image_url (str): The URL to the image | |
need_reasoning (bool): Set to True for complex query that require a reasoning model to answer properly. Set to False otherwise. | |
""" | |
try: | |
#use full model for image recognition | |
model_name = "gpt-4.1" | |
client = OpenAI() | |
response = client.responses.create( | |
model=model_name, | |
input=[{ | |
"role": "user", | |
"content": [ | |
{"type": "input_text", "text": query}, | |
{ | |
"type": "input_image", | |
"image_url": image_url, | |
}, | |
], | |
}], | |
) | |
return response.output_text | |
except Exception as e: | |
return f"query_image failed: {e}" | |
def automatic_speech_recognition(file_url: str, file_extension: str) -> str: | |
"""Transcribe an audio file to text | |
Args: | |
file_url (str): the URL to the audio file | |
file_extension (str): the file extension, e.g. mp3 | |
""" | |
# PROVIDER = 'huggingface' | |
try: | |
response = requests.get(file_url) | |
response.raise_for_status() | |
# write to disk | |
file_extension = file_extension.replace('.','') | |
with open(f'tmp.{file_extension}', 'wb') as file: | |
file.write(response.content) | |
audio_file = open(f'tmp.{file_extension}', "rb") | |
client = OpenAI() | |
transcription = client.audio.transcriptions.create( | |
model="whisper-1", | |
file=audio_file | |
) | |
return transcription.text | |
except Exception as e: | |
return f"automatic_speech_recognition failed: {e}" | |
def get_webpage_content(page_url: str) -> str: | |
"""Load a web page and return it to markdown if possible | |
Args: | |
page_url (str): the URL of web page to get | |
""" | |
try: | |
r = requests.get(page_url) | |
r.raise_for_status() | |
text = "" | |
# special case if page is a PDF file | |
if r.headers.get('Content-Type', '') == 'application/pdf': | |
pdf_file = BytesIO(r.content) | |
reader = PdfReader(pdf_file) | |
for page in reader.pages: | |
text += page.extract_text() | |
else: | |
soup = BeautifulSoup((r.text), 'html.parser') | |
if soup.body: | |
# convert to markdown | |
text = md(str(soup.body)) | |
else: | |
# return the raw content | |
text = r.text | |
return text | |
except Exception as e: | |
return f"get_webpage_content failed: {e}" | |
# ======= Python code interpreter ======= | |
# WARNING: Python REPL can execute arbitrary code on the host machine (e.g., delete files, make network requests). Use with caution. | |
class PythonREPLInput(BaseModel): | |
code: str = Field(description="The Python code string to execute.") | |
python_repl = PythonREPL() | |
python_repl_tool = Tool( | |
name="python_repl", | |
description="""A Python REPL shell (Read-Eval-Print Loop). | |
Use this to execute single or multi-line python commands. | |
Input should be syntactically valid Python code. | |
Always end your code with `print(...)` to see the output. | |
Do NOT execute code that could be harmful to the host system. | |
You are allowed to download files from URLs. | |
Do NOT send commands that block indefinitely (e.g., `input()`).""", | |
func=python_repl.run, | |
args_schema=PythonREPLInput | |
) | |
def get_youtube_transcript(page_url: str) -> str: | |
"""Get the transcript of a YouTube video | |
Args: | |
page_url (str): YouTube URL of the video | |
""" | |
try: | |
# get video ID from URL | |
video_id = extract.video_id(page_url) | |
# get transcript | |
ytt_api = YouTubeTranscriptApi() | |
transcript = ytt_api.fetch(video_id) | |
# keep only text | |
txt = '\n'.join([s.text for s in transcript.snippets]) | |
return txt | |
except Exception as e: | |
return f"get_youtube_transcript failed: {e}" |