Spaces:
Sleeping
Sleeping
import datetime | |
import random | |
import logging | |
import contextlib | |
import io | |
from typing import Any, Callable, Dict, List, Tuple, Union, Type | |
from pydantic import BaseModel, Field | |
from playwright.sync_api import sync_playwright | |
import duckduckgo_search | |
# External modules (you must have these implemented) | |
from app.email_tool import send_email, generate_email | |
from app.repl_tool import run_python_code | |
from app.translation_tool import translate | |
from app.vision import describe_image | |
from langchain.agents import Tool | |
# === Logging Setup === | |
logger = logging.getLogger(__name__) | |
logging.basicConfig(level=logging.INFO) | |
# === Structured Input Schemas === | |
class ExecInput(BaseModel): | |
code: str = Field(..., description="Python code to execute") | |
class WebSearchInput(BaseModel): | |
url: str = Field(..., description="URL to browse for content") | |
# === Python Code Execution === | |
def exec_py(inputs: ExecInput) -> str: | |
""" | |
Securely execute Python code and return output or error. | |
""" | |
output = io.StringIO() | |
local_vars = {} | |
try: | |
with contextlib.redirect_stdout(output): | |
exec(inputs.code, {}, local_vars) | |
return output.getvalue().strip() or "✅ Code executed successfully." | |
except Exception as e: | |
logger.error(f"exec_py error: {e}") | |
return f"❌ Error: {e}" | |
# === Simple Tools === | |
def calc(expression: str) -> str: | |
""" | |
Evaluate a simple math expression. | |
""" | |
try: | |
result = eval(expression, {"__builtins__": None}, {}) | |
return str(result) | |
except Exception as e: | |
logger.error(f"Calc error for '{expression}': {e}") | |
return "❌ Invalid expression" | |
def time_now(_: str = "") -> str: | |
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
def joke(_: str = "") -> str: | |
return random.choice([ | |
"Parallel lines have so much in common—they’ll never meet.", | |
"Why don’t scientists trust atoms? Because they make up everything!", | |
"I told my computer I needed a break, and it said no problem — it needed one too." | |
]) | |
def browse(inputs: WebSearchInput) -> str: | |
""" | |
Fetch and return the text content of a web page. | |
""" | |
try: | |
with sync_playwright() as pw: | |
browser = pw.chromium.launch(headless=True) | |
page = browser.new_page() | |
page.goto(inputs.url, timeout=10000) | |
text = page.text_content("body") or "" | |
browser.close() | |
return text[:500] if text else "❌ No content found." | |
except Exception as e: | |
logger.error(f"Browse error: {e}") | |
return f"❌ Error browsing: {e}" | |
def ddg_search(query: str) -> List[str]: | |
try: | |
results = duckduckgo_search.DuckDuckGoSearch().text(query) | |
return results[:3] if results else ["No results"] | |
except Exception as e: | |
logger.error(f"DuckDuckGo search error: {e}") | |
return ["❌ Search failed"] | |
# === LangChain-Formatted Tool List === | |
def get_tools() -> List[Tool]: | |
return [ | |
Tool( | |
name="Code Interpreter", | |
func=run_python_code, | |
description="🧠 Executes Python code and returns results.", | |
), | |
Tool( | |
name="Email Generator", | |
func=lambda prompt: generate_email("Customer", prompt, 15), | |
description="📧 Generates a promotional or customer email.", | |
), | |
Tool( | |
name="Translate Text", | |
func=translate, | |
description="🌐 Translates input text to a selected language.", | |
), | |
Tool( | |
name="Describe Image", | |
func=describe_image, | |
description="🖼️ Describes the contents of an image.", | |
), | |
Tool( | |
name="Browse Website", | |
func=browse, | |
description="🔎 Browse and fetch page content.", | |
args_schema=WebSearchInput, | |
), | |
Tool( | |
name="Python Executor", | |
func=exec_py, | |
description="🐍 Execute Python code and return output.", | |
args_schema=ExecInput, | |
), | |
] | |
# === TOOL MAPPING === | |
TOOLS: Dict[str, Union[Callable[..., Any], Tuple[Type[BaseModel], Callable[..., Any]]]] = { | |
"calc": calc, | |
"time": time_now, | |
"joke": joke, | |
"search": ddg_search, | |
"browse": (WebSearchInput, browse), | |
"exec_python": (ExecInput, exec_py), | |
"send_email": send_email, | |
"run_code": run_python_code, | |
"translate": translate, | |
"describe_image": describe_image, | |
} | |
# === Dynamic Tool Execution === | |
def use_tool(tool_name: str, *args, **kwargs) -> Any: | |
tool = TOOLS.get(tool_name) | |
if not tool: | |
return {"error": f"Tool '{tool_name}' not found"} | |
if isinstance(tool, tuple): | |
schema_cls, func = tool | |
try: | |
input_data = args[0] if args else kwargs | |
validated = schema_cls.parse_obj(input_data) | |
return func(validated) | |
except Exception as e: | |
logger.error(f"Validation failed for '{tool_name}': {e}") | |
return {"error": f"Invalid input for tool '{tool_name}': {e}"} | |
else: | |
try: | |
return tool(*args, **kwargs) | |
except Exception as e: | |
logger.error(f"Execution failed for '{tool_name}': {e}") | |
return {"error": f"Tool execution failed: {e}"} | |
# === Dispatcher === | |
def tool_dispatch(command: str, args: Any) -> Any: | |
if command not in TOOLS: | |
return {"error": f"Unknown tool '{command}'"} | |
tool = TOOLS[command] | |
if isinstance(args, (list, tuple)): | |
return use_tool(command, *args) | |
elif isinstance(args, dict): | |
return use_tool(command, args) | |
else: | |
return use_tool(command, args) | |