|
import requests |
|
import pandas as pd |
|
from PIL import Image |
|
import os |
|
import subprocess |
|
from bs4 import BeautifulSoup |
|
import urllib.parse |
|
|
|
def web_search(query: str, api_key: str = None) -> str: |
|
""" |
|
Perform web search using SerpAPI if available, otherwise fallback to DuckDuckGo scraping. |
|
""" |
|
if api_key and api_key != "your-serpapi-key-here": |
|
return _serpapi_search(query, api_key) |
|
else: |
|
return _duckduckgo_search(query) |
|
|
|
def _serpapi_search(query: str, api_key: str) -> str: |
|
"""Search using SerpAPI.""" |
|
try: |
|
url = f"https://serpapi.com/search" |
|
params = { |
|
"q": query, |
|
"api_key": api_key, |
|
"engine": "google" |
|
} |
|
response = requests.get(url, params=params, timeout=10) |
|
response.raise_for_status() |
|
|
|
results = response.json() |
|
organic_results = results.get("organic_results", []) |
|
|
|
if organic_results: |
|
|
|
search_summary = [] |
|
for i, result in enumerate(organic_results[:3]): |
|
title = result.get("title", "") |
|
snippet = result.get("snippet", "") |
|
if title and snippet: |
|
search_summary.append(f"{i+1}. {title}: {snippet}") |
|
|
|
return "\n".join(search_summary) if search_summary else "No useful results found" |
|
else: |
|
return "No search results found" |
|
|
|
except requests.RequestException as e: |
|
print(f"SerpAPI search error: {e}") |
|
return "Search failed" |
|
|
|
def _duckduckgo_search(query: str) -> str: |
|
"""Fallback web search using DuckDuckGo scraping.""" |
|
try: |
|
|
|
url = "https://api.duckduckgo.com/" |
|
params = { |
|
"q": query, |
|
"format": "json", |
|
"no_html": "1", |
|
"skip_disambig": "1" |
|
} |
|
|
|
response = requests.get(url, params=params, timeout=10) |
|
response.raise_for_status() |
|
|
|
data = response.json() |
|
|
|
|
|
abstract = data.get("Abstract", "") |
|
if abstract: |
|
return f"Summary: {abstract}" |
|
|
|
|
|
related_topics = data.get("RelatedTopics", []) |
|
if related_topics: |
|
summaries = [] |
|
for topic in related_topics[:3]: |
|
if isinstance(topic, dict) and "Text" in topic: |
|
summaries.append(topic["Text"]) |
|
if summaries: |
|
return "Related information:\n" + "\n".join(summaries) |
|
|
|
|
|
return _simple_web_scrape(query) |
|
|
|
except Exception as e: |
|
print(f"DuckDuckGo search error: {e}") |
|
return "Search failed" |
|
|
|
def _simple_web_scrape(query: str) -> str: |
|
"""Simple web scraping fallback.""" |
|
try: |
|
|
|
search_url = f"https://html.duckduckgo.com/html/?q={urllib.parse.quote(query)}" |
|
headers = { |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" |
|
} |
|
|
|
response = requests.get(search_url, headers=headers, timeout=10) |
|
if response.status_code == 200: |
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
results = soup.find_all('a', class_='result__snippet')[:3] |
|
if results: |
|
snippets = [r.get_text().strip() for r in results if r.get_text().strip()] |
|
return "\n".join(snippets[:3]) if snippets else "Limited search results available" |
|
|
|
return "Basic web search completed - limited results" |
|
|
|
except Exception as e: |
|
print(f"Web scraping error: {e}") |
|
return "Web search unavailable" |
|
|
|
def read_file(file_name: str) -> str: |
|
""" |
|
Read and process different file types (text, CSV, images). |
|
""" |
|
if not file_name or not os.path.exists(file_name): |
|
return "File not found" |
|
|
|
try: |
|
file_extension = os.path.splitext(file_name)[1].lower() |
|
|
|
if file_extension == ".csv": |
|
return _read_csv_file(file_name) |
|
elif file_extension in [".png", ".jpg", ".jpeg", ".gif", ".bmp"]: |
|
return _read_image_file(file_name) |
|
elif file_extension in [".txt", ".md", ".py", ".js", ".html", ".json"]: |
|
return _read_text_file(file_name) |
|
else: |
|
|
|
return _read_text_file(file_name) |
|
|
|
except Exception as e: |
|
return f"Error reading file: {str(e)}" |
|
|
|
def _read_text_file(file_name: str) -> str: |
|
"""Read a text file.""" |
|
try: |
|
with open(file_name, "r", encoding="utf-8") as f: |
|
content = f.read() |
|
return content[:5000] |
|
except UnicodeDecodeError: |
|
|
|
try: |
|
with open(file_name, "r", encoding="latin-1") as f: |
|
content = f.read() |
|
return content[:5000] |
|
except Exception as e: |
|
return f"Text file reading error: {str(e)}" |
|
|
|
def _read_csv_file(file_name: str) -> str: |
|
"""Read and summarize a CSV file.""" |
|
try: |
|
df = pd.read_csv(file_name) |
|
|
|
|
|
summary = [] |
|
summary.append(f"CSV file shape: {df.shape[0]} rows, {df.shape[1]} columns") |
|
summary.append(f"Columns: {', '.join(df.columns.tolist())}") |
|
|
|
|
|
summary.append("\nFirst 5 rows:") |
|
summary.append(df.head().to_string()) |
|
|
|
|
|
numeric_columns = df.select_dtypes(include=['number']).columns |
|
if len(numeric_columns) > 0: |
|
summary.append(f"\nNumeric column statistics:") |
|
summary.append(df[numeric_columns].describe().to_string()) |
|
|
|
return "\n".join(summary) |
|
|
|
except Exception as e: |
|
return f"CSV reading error: {str(e)}" |
|
|
|
def _read_image_file(file_name: str) -> str: |
|
"""Read and analyze an image file.""" |
|
try: |
|
|
|
try: |
|
import pytesseract |
|
img = Image.open(file_name) |
|
|
|
|
|
info = f"Image: {img.size[0]}x{img.size[1]} pixels, mode: {img.mode}" |
|
|
|
|
|
text = pytesseract.image_to_string(img).strip() |
|
if text: |
|
return f"{info}\n\nExtracted text:\n{text}" |
|
else: |
|
return f"{info}\n\nNo text detected in image." |
|
|
|
except ImportError: |
|
|
|
img = Image.open(file_name) |
|
return f"Image: {img.size[0]}x{img.size[1]} pixels, mode: {img.mode}\n(OCR not available - install pytesseract for text extraction)" |
|
|
|
except Exception as e: |
|
return f"Image reading error: {str(e)}" |
|
|
|
def execute_code(code: str, timeout: int = 10) -> str: |
|
""" |
|
Execute Python code safely with timeout. |
|
""" |
|
try: |
|
|
|
dangerous_keywords = ["import os", "import subprocess", "__import__", "exec", "eval", "open("] |
|
if any(keyword in code.lower() for keyword in dangerous_keywords): |
|
return "Code execution blocked: potentially unsafe operations detected" |
|
|
|
result = subprocess.run( |
|
["python3", "-c", code], |
|
capture_output=True, |
|
text=True, |
|
timeout=timeout, |
|
cwd="/tmp" |
|
) |
|
|
|
if result.returncode == 0: |
|
return result.stdout.strip() if result.stdout else "Code executed successfully (no output)" |
|
else: |
|
return f"Code execution error: {result.stderr.strip()}" |
|
|
|
except subprocess.TimeoutExpired: |
|
return "Code execution timeout" |
|
except Exception as e: |
|
return f"Code execution error: {str(e)}" |
|
|
|
def calculate_simple_math(expression: str) -> str: |
|
""" |
|
Safely evaluate simple mathematical expressions. |
|
""" |
|
try: |
|
|
|
allowed_chars = set("0123456789+-*/.() ") |
|
if not all(c in allowed_chars for c in expression): |
|
return "Invalid mathematical expression" |
|
|
|
|
|
result = eval(expression) |
|
return str(result) |
|
|
|
except Exception as e: |
|
return f"Math calculation error: {str(e)}" |