super_agent / tools /web_search.py
lezaf
Fix lists nested in tables parsed twice
abaaf08
raw
history blame
6.72 kB
import requests
import numpy as np
import pandas as pd
from io import StringIO
from bs4 import BeautifulSoup
from langchain_core.tools import tool
from duckduckgo_search import DDGS
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from tools.utils import StructureAwareTextSplitter
TOP_K = 5
MAX_RESULTS = 2
UNWANTED_TAGS = ['nav', 'header', 'footer', 'aside', 'form', 'script', 'style']
TAGS_TO_KEEP = ['h1', 'h2', 'h3', 'p', 'ul', 'ol', 'table']
def _format_table_to_string(table_html):
"""
Convert an HTML table to a markdown-style string representation.
Args:
table_html (str): HTML string of the table.
Returns:
str: Table formatted as a markdown-style string, or a message if parsing fails.
"""
try:
df = pd.read_html(StringIO(table_html))[0]
except:
return ["[Table could not be parsed]"]
if df.empty:
return None
table_str = "|"
# Put column headers
for col in df.columns:
table_str += f" {col} |"
table_str += "\n"
# Put rows
for _, row in df.iterrows():
table_str += "|"
for col, val in row.items():
table_str += f" {val} |"
table_str += "\n"
return table_str
def _extract_list(tag, level=0):
"""
Recursively extract nested HTML lists (<ul> or <ol>) into a formatted text list.
Args:
tag (bs4.element.Tag): The <ul> or <ol> BeautifulSoup tag to extract.
level (int): The current nesting level (used for indentation and prefixing).
Returns:
list[str]: List of formatted strings representing the list items, preserving nesting.
"""
items = []
if tag.name not in ["ul", "ol"]:
return items
is_ordered = tag.name == "ol"
# Determine prefix style
if is_ordered:
# Use numbers for top-level, letters for nested
if level == 0:
item_prefix = lambda idx: f"{idx+1}."
else:
# a., b., c., ...
item_prefix = lambda idx: f"{chr(97+idx)}."
else:
item_prefix = lambda idx: "-"
for idx, li in enumerate(tag.find_all("li", recursive=False)):
# Get the text before any nested list
text = li.find(text=True, recursive=False)
text = text.strip() if text else ""
# Check for nested lists
nested = li.find(["ul", "ol"], recursive=False)
if nested:
nested_items = _extract_list(nested, level+1)
if text:
items.append(f"{' '*level}{item_prefix(idx)} {text}")
items.extend([f"{' '*(level+1)}{line}" for line in nested_items])
else:
items.append(f"{' '*level}{item_prefix(idx)} {text}")
return items
def _parse_structured_content(soup):
"""
Parse the main content of a BeautifulSoup HTML document into structured blocks.
Args:
soup (bs4.BeautifulSoup): Parsed HTML document.
Returns:
list[dict]: List of structured content blocks (headers, paragraphs, lists, tables).
"""
content = []
for tag in soup.find_all(TAGS_TO_KEEP):
if tag.name in ['h1', 'h2', 'h3']:
content.append({'type': 'header', 'level': tag.name, 'text': tag.get_text(strip=True)})
elif tag.name == 'p':
content.append({'type': 'paragraph', 'text': tag.get_text(strip=True)})
elif tag.name in ['ul', 'ol']:
if tag.find_parent(['ul', 'ol', 'table']) is None:
items = _extract_list(tag)
content.append({'type': 'list', 'items': items})
elif tag.name == 'table':
content.append({'type': 'table', 'html': str(tag)})
return content
@tool
def web_search(query: str) -> str:
"""
Perform a web search using DuckDuckGo.
This tool is acting as live data RAG (Retrieval-Augmented Generation) tool.
It's useful for retrieving relevant information or obtaining domain knowledge
in a specific area, such as mathematics, science, games, etc.
Args:
query (str): The search query.
Returns:
chunks (str): Concatenated string of most relevant chunks.
"""
# ----- STEP 1: Find the most relevant webpages
results = DDGS(timeout=30).text(query, max_results=MAX_RESULTS)
urls = [r['href'] for r in results if 'href' in r]
all_chunks = []
for url in urls:
try:
response = requests.get(url)
html = response.text
except Exception as e:
return f"Error fetching URL {url}: {str(e)}"
# Output the html content to a file for debugging
with open(f"test_output/{urls.index(url)}_web_search.txt", "w", encoding="utf-8") as f:
f.write(html)
# ----- STEP 2: Parse and clean the HTML content
soup = BeautifulSoup(html, "html.parser")
# Remove unwanted tags before parsing structured content
for tag in soup.find_all(UNWANTED_TAGS):
tag.decompose()
structured_content = _parse_structured_content(soup)
# ----- STEP 3: Format tables to string representation
for item in structured_content:
if item['type'] == 'table':
table_str = _format_table_to_string(item['html'])
if table_str:
item['text'] = table_str
else:
# Skip empty or unparseable tables
structured_content.remove(item)
# ----- STEP 4: Split structured content into chunks
splitter = StructureAwareTextSplitter(chunk_size=500, chunk_overlap=50)
documents = splitter.split_documents(structured_content)
all_chunks.extend([
f"\n\n----- CHUNK {i} (url: {url})-----\n\n" + doc.page_content
for i, doc in enumerate(documents)
])
# ----- STEP 5: Make embeddings
model = SentenceTransformer("all-MiniLM-L6-v2") # Small & fast
embeddings = model.encode(all_chunks)
embedded_query = model.encode(query)
# ----- STEP 6: Calculate cosine similarity
# Reshape query for pairwise comparison
embedded_query = np.array(embedded_query).reshape(1, -1)
embeddings = np.array(embeddings)
# Compute cosine similarities
similarities = cosine_similarity(embedded_query, embeddings)[0] # Shape: (n_chunks,)
# Get most similar chunks
top_indices = similarities.argsort()[-TOP_K:][::-1]
# output in a file the top chunks
with open(f"test_output/top_chunks.txt", "w", encoding="utf-8") as f:
for c in all_chunks:
f.write(c)
return "".join([all_chunks[idx] for idx in top_indices])