Spaces:
Sleeping
Sleeping
import requests | |
import pandas as pd | |
from PIL import Image | |
import os | |
import subprocess | |
from bs4 import BeautifulSoup | |
import urllib.parse | |
def web_search(query: str, api_key: str = None) -> str: | |
""" | |
Perform web search using SerpAPI if available, otherwise fallback to DuckDuckGo scraping. | |
""" | |
if api_key and api_key != "your-serpapi-key-here": | |
return _serpapi_search(query, api_key) | |
else: | |
return _duckduckgo_search(query) | |
def _serpapi_search(query: str, api_key: str) -> str: | |
"""Search using SerpAPI.""" | |
try: | |
url = f"https://serpapi.com/search" | |
params = { | |
"q": query, | |
"api_key": api_key, | |
"engine": "google" | |
} | |
response = requests.get(url, params=params, timeout=10) | |
response.raise_for_status() | |
results = response.json() | |
organic_results = results.get("organic_results", []) | |
if organic_results: | |
# Get top 3 results | |
search_summary = [] | |
for i, result in enumerate(organic_results[:3]): | |
title = result.get("title", "") | |
snippet = result.get("snippet", "") | |
if title and snippet: | |
search_summary.append(f"{i+1}. {title}: {snippet}") | |
return "\n".join(search_summary) if search_summary else "No useful results found" | |
else: | |
return "No search results found" | |
except requests.RequestException as e: | |
print(f"SerpAPI search error: {e}") | |
return "Search failed" | |
def _duckduckgo_search(query: str) -> str: | |
"""Fallback web search using DuckDuckGo scraping.""" | |
try: | |
# DuckDuckGo instant answer API | |
url = "https://api.duckduckgo.com/" | |
params = { | |
"q": query, | |
"format": "json", | |
"no_html": "1", | |
"skip_disambig": "1" | |
} | |
response = requests.get(url, params=params, timeout=10) | |
response.raise_for_status() | |
data = response.json() | |
# Try to get instant answer | |
abstract = data.get("Abstract", "") | |
if abstract: | |
return f"Summary: {abstract}" | |
# Try related topics | |
related_topics = data.get("RelatedTopics", []) | |
if related_topics: | |
summaries = [] | |
for topic in related_topics[:3]: | |
if isinstance(topic, dict) and "Text" in topic: | |
summaries.append(topic["Text"]) | |
if summaries: | |
return "Related information:\n" + "\n".join(summaries) | |
# Fallback to web scraping (simplified) | |
return _simple_web_scrape(query) | |
except Exception as e: | |
print(f"DuckDuckGo search error: {e}") | |
return "Search failed" | |
def _simple_web_scrape(query: str) -> str: | |
"""Simple web scraping fallback.""" | |
try: | |
# Use a simple search approach | |
search_url = f"https://html.duckduckgo.com/html/?q={urllib.parse.quote(query)}" | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" | |
} | |
response = requests.get(search_url, headers=headers, timeout=10) | |
if response.status_code == 200: | |
soup = BeautifulSoup(response.content, 'html.parser') | |
# Try to extract some basic information | |
results = soup.find_all('a', class_='result__snippet')[:3] | |
if results: | |
snippets = [r.get_text().strip() for r in results if r.get_text().strip()] | |
return "\n".join(snippets[:3]) if snippets else "Limited search results available" | |
return "Basic web search completed - limited results" | |
except Exception as e: | |
print(f"Web scraping error: {e}") | |
return "Web search unavailable" | |
def read_file(file_name: str) -> str: | |
""" | |
Read and process different file types (text, CSV, images). | |
""" | |
if not file_name or not os.path.exists(file_name): | |
return "File not found" | |
try: | |
file_extension = os.path.splitext(file_name)[1].lower() | |
if file_extension == ".csv": | |
return _read_csv_file(file_name) | |
elif file_extension in [".png", ".jpg", ".jpeg", ".gif", ".bmp"]: | |
return _read_image_file(file_name) | |
elif file_extension in [".txt", ".md", ".py", ".js", ".html", ".json"]: | |
return _read_text_file(file_name) | |
else: | |
# Try to read as text file | |
return _read_text_file(file_name) | |
except Exception as e: | |
return f"Error reading file: {str(e)}" | |
def _read_text_file(file_name: str) -> str: | |
"""Read a text file.""" | |
try: | |
with open(file_name, "r", encoding="utf-8") as f: | |
content = f.read() | |
return content[:5000] # Limit to first 5000 characters | |
except UnicodeDecodeError: | |
# Try with different encoding | |
try: | |
with open(file_name, "r", encoding="latin-1") as f: | |
content = f.read() | |
return content[:5000] | |
except Exception as e: | |
return f"Text file reading error: {str(e)}" | |
def _read_csv_file(file_name: str) -> str: | |
"""Read and summarize a CSV file.""" | |
try: | |
df = pd.read_csv(file_name) | |
# Create a summary | |
summary = [] | |
summary.append(f"CSV file shape: {df.shape[0]} rows, {df.shape[1]} columns") | |
summary.append(f"Columns: {', '.join(df.columns.tolist())}") | |
# Show first few rows | |
summary.append("\nFirst 5 rows:") | |
summary.append(df.head().to_string()) | |
# Show basic statistics for numeric columns | |
numeric_columns = df.select_dtypes(include=['number']).columns | |
if len(numeric_columns) > 0: | |
summary.append(f"\nNumeric column statistics:") | |
summary.append(df[numeric_columns].describe().to_string()) | |
return "\n".join(summary) | |
except Exception as e: | |
return f"CSV reading error: {str(e)}" | |
def _read_image_file(file_name: str) -> str: | |
"""Read and analyze an image file.""" | |
try: | |
# Try OCR first | |
try: | |
import pytesseract | |
img = Image.open(file_name) | |
# Get image info | |
info = f"Image: {img.size[0]}x{img.size[1]} pixels, mode: {img.mode}" | |
# Try OCR | |
text = pytesseract.image_to_string(img).strip() | |
if text: | |
return f"{info}\n\nExtracted text:\n{text}" | |
else: | |
return f"{info}\n\nNo text detected in image." | |
except ImportError: | |
# OCR not available, just return image info | |
img = Image.open(file_name) | |
return f"Image: {img.size[0]}x{img.size[1]} pixels, mode: {img.mode}\n(OCR not available - install pytesseract for text extraction)" | |
except Exception as e: | |
return f"Image reading error: {str(e)}" | |
def execute_code(code: str, timeout: int = 10) -> str: | |
""" | |
Execute Python code safely with timeout. | |
""" | |
try: | |
# Basic security check - prevent dangerous operations | |
dangerous_keywords = ["import os", "import subprocess", "__import__", "exec", "eval", "open("] | |
if any(keyword in code.lower() for keyword in dangerous_keywords): | |
return "Code execution blocked: potentially unsafe operations detected" | |
result = subprocess.run( | |
["python3", "-c", code], | |
capture_output=True, | |
text=True, | |
timeout=timeout, | |
cwd="/tmp" # Run in safe directory | |
) | |
if result.returncode == 0: | |
return result.stdout.strip() if result.stdout else "Code executed successfully (no output)" | |
else: | |
return f"Code execution error: {result.stderr.strip()}" | |
except subprocess.TimeoutExpired: | |
return "Code execution timeout" | |
except Exception as e: | |
return f"Code execution error: {str(e)}" | |
def calculate_simple_math(expression: str) -> str: | |
""" | |
Safely evaluate simple mathematical expressions. | |
""" | |
try: | |
# Only allow basic math characters | |
allowed_chars = set("0123456789+-*/.() ") | |
if not all(c in allowed_chars for c in expression): | |
return "Invalid mathematical expression" | |
# Use eval safely for basic math | |
result = eval(expression) | |
return str(result) | |
except Exception as e: | |
return f"Math calculation error: {str(e)}" |