llama-models / app /tools.py
deniskiplimo816's picture
Upload 27 files
293ab16 verified
import datetime
import random
import logging
import contextlib
import io
from typing import Any, Callable, Dict, List, Tuple, Union, Type
from pydantic import BaseModel, Field
from playwright.sync_api import sync_playwright
import duckduckgo_search
# External modules (you must have these implemented)
from app.email_tool import send_email, generate_email
from app.repl_tool import run_python_code
from app.translation_tool import translate
from app.vision import describe_image
from langchain.agents import Tool
# === Logging Setup ===
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# === Structured Input Schemas ===
class ExecInput(BaseModel):
code: str = Field(..., description="Python code to execute")
class WebSearchInput(BaseModel):
url: str = Field(..., description="URL to browse for content")
# === Python Code Execution ===
def exec_py(inputs: ExecInput) -> str:
"""
Securely execute Python code and return output or error.
"""
output = io.StringIO()
local_vars = {}
try:
with contextlib.redirect_stdout(output):
exec(inputs.code, {}, local_vars)
return output.getvalue().strip() or "✅ Code executed successfully."
except Exception as e:
logger.error(f"exec_py error: {e}")
return f"❌ Error: {e}"
# === Simple Tools ===
def calc(expression: str) -> str:
"""
Evaluate a simple math expression.
"""
try:
result = eval(expression, {"__builtins__": None}, {})
return str(result)
except Exception as e:
logger.error(f"Calc error for '{expression}': {e}")
return "❌ Invalid expression"
def time_now(_: str = "") -> str:
return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def joke(_: str = "") -> str:
return random.choice([
"Parallel lines have so much in common—they’ll never meet.",
"Why don’t scientists trust atoms? Because they make up everything!",
"I told my computer I needed a break, and it said no problem — it needed one too."
])
def browse(inputs: WebSearchInput) -> str:
"""
Fetch and return the text content of a web page.
"""
try:
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=True)
page = browser.new_page()
page.goto(inputs.url, timeout=10000)
text = page.text_content("body") or ""
browser.close()
return text[:500] if text else "❌ No content found."
except Exception as e:
logger.error(f"Browse error: {e}")
return f"❌ Error browsing: {e}"
def ddg_search(query: str) -> List[str]:
try:
results = duckduckgo_search.DuckDuckGoSearch().text(query)
return results[:3] if results else ["No results"]
except Exception as e:
logger.error(f"DuckDuckGo search error: {e}")
return ["❌ Search failed"]
# === LangChain-Formatted Tool List ===
def get_tools() -> List[Tool]:
return [
Tool(
name="Code Interpreter",
func=run_python_code,
description="🧠 Executes Python code and returns results.",
),
Tool(
name="Email Generator",
func=lambda prompt: generate_email("Customer", prompt, 15),
description="📧 Generates a promotional or customer email.",
),
Tool(
name="Translate Text",
func=translate,
description="🌐 Translates input text to a selected language.",
),
Tool(
name="Describe Image",
func=describe_image,
description="🖼️ Describes the contents of an image.",
),
Tool(
name="Browse Website",
func=browse,
description="🔎 Browse and fetch page content.",
args_schema=WebSearchInput,
),
Tool(
name="Python Executor",
func=exec_py,
description="🐍 Execute Python code and return output.",
args_schema=ExecInput,
),
]
# === TOOL MAPPING ===
TOOLS: Dict[str, Union[Callable[..., Any], Tuple[Type[BaseModel], Callable[..., Any]]]] = {
"calc": calc,
"time": time_now,
"joke": joke,
"search": ddg_search,
"browse": (WebSearchInput, browse),
"exec_python": (ExecInput, exec_py),
"send_email": send_email,
"run_code": run_python_code,
"translate": translate,
"describe_image": describe_image,
}
# === Dynamic Tool Execution ===
def use_tool(tool_name: str, *args, **kwargs) -> Any:
tool = TOOLS.get(tool_name)
if not tool:
return {"error": f"Tool '{tool_name}' not found"}
if isinstance(tool, tuple):
schema_cls, func = tool
try:
input_data = args[0] if args else kwargs
validated = schema_cls.parse_obj(input_data)
return func(validated)
except Exception as e:
logger.error(f"Validation failed for '{tool_name}': {e}")
return {"error": f"Invalid input for tool '{tool_name}': {e}"}
else:
try:
return tool(*args, **kwargs)
except Exception as e:
logger.error(f"Execution failed for '{tool_name}': {e}")
return {"error": f"Tool execution failed: {e}"}
# === Dispatcher ===
def tool_dispatch(command: str, args: Any) -> Any:
if command not in TOOLS:
return {"error": f"Unknown tool '{command}'"}
tool = TOOLS[command]
if isinstance(args, (list, tuple)):
return use_tool(command, *args)
elif isinstance(args, dict):
return use_tool(command, args)
else:
return use_tool(command, args)