|
|
|
""" |
|
Web Scraper MCP Server |
|
|
|
A Model Context Protocol server that provides web scraping tools. |
|
Exposes functions to scrape websites, convert content to markdown, and generate sitemaps. |
|
""" |
|
|
|
import gradio as gr |
|
import requests |
|
from bs4 import BeautifulSoup |
|
from markdownify import markdownify as md |
|
from urllib.parse import urljoin, urlparse |
|
from typing import Tuple, List, Dict |
|
import re |
|
import tempfile |
|
import zipfile |
|
import os |
|
def scrape_website_content(url: str) -> Tuple[str, str]: |
|
""" |
|
Scrape a website and return its main content formatted as markdown and a downloadable file path. |
|
|
|
Args: |
|
url (str): The URL to scrape (can include or omit http/https protocol) |
|
|
|
Returns: |
|
Tuple[str, str]: The scraped content formatted as markdown, and a file path for download |
|
""" |
|
try: |
|
|
|
if not url.startswith(('http://', 'https://')): |
|
url = 'https://' + url |
|
|
|
|
|
session = requests.Session() |
|
session.headers.update({ |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
}) |
|
|
|
|
|
response = session.get(url, timeout=10) |
|
response.raise_for_status() |
|
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
|
|
for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): |
|
element.decompose() |
|
|
|
|
|
main_content = ( |
|
soup.find('main') or |
|
soup.find('article') or |
|
soup.find('div', class_=re.compile(r'content|main|post|article')) or |
|
soup.find('body') |
|
) |
|
|
|
if main_content: |
|
|
|
markdown_text = md(str(main_content), heading_style="ATX") |
|
|
|
|
|
|
|
markdown_text = re.sub(r'\n{3,}', '\n\n', markdown_text) |
|
|
|
markdown_text = re.sub(r'\[\s*\]\([^)]*\)', '', markdown_text) |
|
|
|
markdown_text = re.sub(r'[ \t]+', ' ', markdown_text) |
|
|
|
|
|
title = soup.find('title') |
|
if title: |
|
markdown_text = f"# {title.get_text().strip()}\n\n{markdown_text}" |
|
markdown_text = markdown_text.strip() |
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".md", mode="w", encoding="utf-8") as f: |
|
f.write(markdown_text) |
|
temp_path = f.name |
|
return markdown_text, temp_path |
|
return "No main content found on the webpage.", None |
|
except requests.exceptions.RequestException as e: |
|
return f"Error fetching URL: {str(e)}", None |
|
except Exception as e: |
|
return f"Error processing content: {str(e)}", None |
|
|
|
|
|
def generate_sitemap(url: str, max_links_per_domain: int = None) -> Tuple[str, str]: |
|
""" |
|
Generate a sitemap from all links found on a webpage and provide a downloadable file path. |
|
|
|
Args: |
|
url (str): The URL to analyze for links (can include or omit http/https protocol) |
|
max_links_per_domain (int, optional): Maximum number of links to display per domain. |
|
If None, shows all links. Defaults to None. |
|
|
|
Returns: |
|
Tuple[str, str]: A markdown-formatted sitemap of all links found on the page, and a file path for download |
|
""" |
|
try: |
|
|
|
if not url.startswith(('http://', 'https://')): |
|
url = 'https://' + url |
|
|
|
|
|
session = requests.Session() |
|
session.headers.update({ |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
}) |
|
|
|
|
|
response = session.get(url, timeout=10) |
|
response.raise_for_status() |
|
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
|
|
links = soup.find_all('a', href=True) |
|
|
|
|
|
sitemap_data = [] |
|
seen_urls = set() |
|
|
|
for link in links: |
|
href = link.get('href') |
|
text = link.get_text().strip() |
|
|
|
if not href: |
|
continue |
|
|
|
|
|
full_url = urljoin(url, href) |
|
|
|
|
|
if (full_url in seen_urls or |
|
href.startswith(('#', 'javascript:', 'mailto:', 'tel:')) or |
|
full_url == url): |
|
continue |
|
|
|
seen_urls.add(full_url) |
|
|
|
|
|
if not text: |
|
text = href |
|
|
|
sitemap_data.append({ |
|
'text': text[:100] + '...' if len(text) > 100 else text, |
|
'url': full_url |
|
}) |
|
|
|
|
|
if not sitemap_data: |
|
return "No links found on this page.", None |
|
|
|
sitemap_md = "# Sitemap\n\n" |
|
sitemap_md += f"Found {len(sitemap_data)} links:\n\n" |
|
|
|
|
|
domain_groups = {} |
|
parsed_base = urlparse(url) |
|
|
|
for item in sitemap_data: |
|
parsed_url = urlparse(item['url']) |
|
|
|
if parsed_url.netloc == parsed_base.netloc: |
|
domain_key = "Internal Links" |
|
else: |
|
domain_key = f"External Links ({parsed_url.netloc})" |
|
|
|
if domain_key not in domain_groups: |
|
domain_groups[domain_key] = [] |
|
|
|
domain_groups[domain_key].append(item) |
|
|
|
for domain, links in domain_groups.items(): |
|
sitemap_md += f"## {domain}\n\n" |
|
|
|
|
|
if max_links_per_domain is None: |
|
links_to_show = links |
|
remaining_links = 0 |
|
else: |
|
links_to_show = links[:max_links_per_domain] |
|
remaining_links = max(0, len(links) - max_links_per_domain) |
|
|
|
for link in links_to_show: |
|
sitemap_md += f"- [{link['text']}]({link['url']})\n" |
|
|
|
if remaining_links > 0: |
|
sitemap_md += f"- ... and {remaining_links} more links\n" |
|
|
|
sitemap_md += "\n" |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".md", mode="w", encoding="utf-8") as f: |
|
f.write(sitemap_md) |
|
temp_path = f.name |
|
return sitemap_md, temp_path |
|
except requests.exceptions.RequestException as e: |
|
return f"Error fetching URL: {str(e)}", None |
|
except Exception as e: |
|
return f"Error processing content: {str(e)}", None |
|
|
|
|
|
def extract_all_content_as_zip(url: str, max_links: int = None) -> Tuple[str, str]: |
|
""" |
|
Extract text content from all links found on a webpage and create a downloadable zip file. |
|
|
|
Args: |
|
url (str): The URL to analyze for links (can include or omit http/https protocol) |
|
max_links (int, optional): Maximum number of links to process. If None, processes all links. Defaults to None. |
|
|
|
Returns: |
|
Tuple[str, str]: Status message and zip file path for download |
|
""" |
|
try: |
|
|
|
if not url.startswith(('http://', 'https://')): |
|
url = 'https://' + url |
|
|
|
|
|
session = requests.Session() |
|
session.headers.update({ |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
}) |
|
|
|
|
|
response = session.get(url, timeout=10) |
|
response.raise_for_status() |
|
|
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
links = soup.find_all('a', href=True) |
|
|
|
|
|
unique_urls = set() |
|
parsed_base = urlparse(url) |
|
|
|
for link in links: |
|
href = link.get('href') |
|
if not href: |
|
continue |
|
|
|
full_url = urljoin(url, href) |
|
|
|
|
|
if (href.startswith(('#', 'javascript:', 'mailto:', 'tel:')) or |
|
full_url == url): |
|
continue |
|
|
|
parsed_url = urlparse(full_url) |
|
if parsed_url.netloc == parsed_base.netloc: |
|
unique_urls.add(full_url) |
|
|
|
if not unique_urls: |
|
return "No internal links found to extract content from.", None |
|
|
|
|
|
urls_to_process = list(unique_urls) |
|
total_links_found = len(urls_to_process) |
|
|
|
|
|
if max_links is not None: |
|
urls_to_process = urls_to_process[:max_links] |
|
limited_message = f" (limited to {max_links} out of {total_links_found})" |
|
else: |
|
limited_message = "" |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as temp_zip: |
|
zip_path = temp_zip.name |
|
|
|
successful_extractions = 0 |
|
failed_extractions = 0 |
|
|
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zip_file: |
|
for i, link_url in enumerate(urls_to_process, 1): |
|
try: |
|
|
|
link_response = session.get(link_url, timeout=10) |
|
link_response.raise_for_status() |
|
|
|
|
|
link_soup = BeautifulSoup(link_response.content, 'html.parser') |
|
|
|
|
|
for element in link_soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): |
|
element.decompose() |
|
|
|
|
|
main_content = ( |
|
link_soup.find('main') or |
|
link_soup.find('article') or |
|
link_soup.find('div', class_=re.compile(r'content|main|post|article')) or |
|
link_soup.find('body') |
|
) |
|
|
|
if main_content: |
|
|
|
markdown_text = md(str(main_content), heading_style="ATX") |
|
|
|
|
|
markdown_text = re.sub(r'\n{3,}', '\n\n', markdown_text) |
|
markdown_text = re.sub(r'\[\s*\]\([^)]*\)', '', markdown_text) |
|
markdown_text = re.sub(r'[ \t]+', ' ', markdown_text) |
|
|
|
|
|
title = link_soup.find('title') |
|
if title: |
|
markdown_text = f"# {title.get_text().strip()}\n\n{markdown_text}" |
|
|
|
markdown_text = markdown_text.strip() |
|
|
|
|
|
parsed_link = urlparse(link_url) |
|
safe_filename = re.sub(r'[^\w\-_.]', '_', parsed_link.path or 'index') |
|
if not safe_filename.endswith('.md'): |
|
safe_filename += '.md' |
|
|
|
|
|
if safe_filename == '.md' or safe_filename == 'index.md': |
|
safe_filename = f"page_{i}.md" |
|
|
|
|
|
final_content = f"<!-- Source: {link_url} -->\n\n{markdown_text}" |
|
|
|
|
|
zip_file.writestr(safe_filename, final_content) |
|
successful_extractions += 1 |
|
else: failed_extractions += 1 |
|
|
|
except Exception as e: |
|
failed_extractions += 1 |
|
continue |
|
|
|
status_message = f"Successfully extracted content from {successful_extractions} pages{limited_message}" |
|
if failed_extractions > 0: |
|
status_message += f", failed to extract from {failed_extractions} pages" |
|
status_message += f". Created zip file with {successful_extractions} markdown files." |
|
|
|
return status_message, zip_path |
|
|
|
except requests.exceptions.RequestException as e: |
|
return f"Error fetching URL: {str(e)}", None |
|
except Exception as e: |
|
return f"Error processing content: {str(e)}", None |
|
|
|
|
|
def generate_sitemap_for_ui(url: str) -> Tuple[str, str]: |
|
""" |
|
Wrapper function for the Gradio UI that shows all links without limitation. |
|
|
|
Args: |
|
url (str): The URL to analyze for links |
|
|
|
Returns: |
|
Tuple[str, str]: A markdown-formatted sitemap of all links found on the page, and a file path for download |
|
""" |
|
return generate_sitemap(url, max_links_per_domain=None) |
|
|
|
|
|
def generate_sitemap_with_limit(url: str, max_links_per_domain: int) -> Tuple[str, str]: |
|
""" |
|
Wrapper function for Gradio UI that allows configurable link limits per domain. |
|
|
|
Args: |
|
url (str): The URL to analyze for links |
|
max_links_per_domain (int): Maximum number of links to display per domain (0 = show all) |
|
|
|
Returns: |
|
Tuple[str, str]: A markdown-formatted sitemap of all links found on the page, and a file path for download |
|
""" |
|
limit = None if max_links_per_domain == 0 else max_links_per_domain |
|
return generate_sitemap(url, max_links_per_domain=limit) |
|
|
|
|
|
def extract_all_content_for_ui(url: str) -> Tuple[str, str]: |
|
""" |
|
Wrapper function for the Gradio UI that extracts content from all internal links without limitation. |
|
|
|
Args: |
|
url (str): The URL to analyze for links |
|
|
|
Returns: |
|
Tuple[str, str]: Status message and zip file path for download |
|
""" |
|
return extract_all_content_as_zip(url, max_links=None) |
|
|
|
|
|
def extract_limited_content_as_zip(url: str, max_links: int) -> Tuple[str, str]: |
|
""" |
|
Wrapper function for Gradio UI that allows configurable link limits for bulk extraction. |
|
|
|
Args: |
|
url (str): The URL to analyze for links |
|
max_links (int): Maximum number of links to process (0 = process all) |
|
|
|
Returns: |
|
Tuple[str, str]: Status message and zip file path for download |
|
""" |
|
limit = None if max_links == 0 else max_links |
|
return extract_all_content_as_zip(url, max_links=limit) |
|
|
|
|
|
|
|
def create_mcp_interface(): |
|
"""Create Gradio interface that exposes web scraping tools as MCP functions.""" |
|
|
|
scrape_interface = gr.Interface( |
|
fn=scrape_website_content, |
|
inputs=gr.Textbox( |
|
label="Website URL", |
|
placeholder="https://example.com or example.com" |
|
), |
|
outputs=[ |
|
gr.Textbox( |
|
label="Scraped Content", |
|
lines=20, |
|
max_lines=50, |
|
show_copy_button=True, |
|
container=True |
|
), |
|
gr.File(label="Download Markdown") |
|
], |
|
title="Website Content Scraper", |
|
description="Extract and format website content as markdown", |
|
api_name="scrape_content" ) |
|
|
|
sitemap_interface = gr.Interface( |
|
fn=generate_sitemap_for_ui, |
|
inputs=gr.Textbox( |
|
label="Website URL", |
|
placeholder="https://example.com or example.com" |
|
), |
|
outputs=[ |
|
gr.Textbox( |
|
label="Sitemap", |
|
lines=20, |
|
max_lines=50, |
|
show_copy_button=True, |
|
container=True |
|
), |
|
gr.File(label="Download Sitemap") |
|
], |
|
title="Website Sitemap Generator", |
|
description="Generate a sitemap of all links found on a webpage", |
|
api_name="generate_sitemap" |
|
) |
|
bulk_extract_interface = gr.Interface( |
|
fn=extract_all_content_for_ui, |
|
inputs=gr.Textbox( |
|
label="Website URL", |
|
placeholder="https://example.com or example.com" |
|
), |
|
outputs=[ |
|
gr.Textbox( |
|
label="Extraction Status", |
|
lines=10, |
|
max_lines=20, |
|
show_copy_button=True, |
|
container=True |
|
), |
|
gr.File(label="Download ZIP Archive") |
|
], |
|
title="Bulk Content Extractor", |
|
description="Extract text content from all internal links and download as ZIP", |
|
api_name="extract_all_content" ) |
|
|
|
|
|
sitemap_limited_interface = gr.Interface( |
|
fn=generate_sitemap_with_limit, |
|
inputs=[ |
|
gr.Textbox( |
|
label="Website URL", |
|
placeholder="https://example.com or example.com" |
|
), |
|
gr.Number( |
|
label="Max Links Per Domain", |
|
value=0, |
|
info="Enter 0 to show all links, or a positive number to limit display per domain", |
|
minimum=0, |
|
maximum=1000 |
|
) |
|
], |
|
outputs=[ |
|
gr.Textbox( |
|
label="Sitemap", |
|
lines=20, |
|
max_lines=50, |
|
show_copy_button=True, |
|
container=True |
|
), |
|
gr.File(label="Download Sitemap") |
|
], |
|
title="Configurable Sitemap Generator", |
|
description="Generate a sitemap with optional display limits (0 = show all links)", |
|
api_name="generate_sitemap_limited" |
|
) |
|
|
|
|
|
bulk_limited_interface = gr.Interface( |
|
fn=extract_limited_content_as_zip, |
|
inputs=[ |
|
gr.Textbox( |
|
label="Website URL", |
|
placeholder="https://example.com or example.com" |
|
), |
|
gr.Number( |
|
label="Max Pages to Extract", |
|
value=0, |
|
info="Enter 0 to process all pages, or a positive number to limit extraction", |
|
minimum=0, |
|
maximum=1000 |
|
) |
|
], |
|
outputs=[ |
|
gr.Textbox( |
|
label="Extraction Status", |
|
lines=10, |
|
max_lines=20, |
|
show_copy_button=True, |
|
container=True |
|
), |
|
gr.File(label="Download ZIP Archive") |
|
], |
|
title="Limited Bulk Content Extractor", |
|
description="Extract text content from internal links with optional processing limits (0 = extract all)", |
|
api_name="extract_limited_content" |
|
) |
|
|
|
|
|
demo = gr.TabbedInterface( |
|
[scrape_interface, sitemap_interface, sitemap_limited_interface, bulk_extract_interface, bulk_limited_interface], |
|
["Content Scraper", "All Links Sitemap", "Limited Sitemap", "Bulk Extractor", "Limited Bulk Extractor"], |
|
title="🕷️ Web Scraper MCP Server" |
|
) |
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
app = create_mcp_interface() |
|
app.launch( |
|
mcp_server=True |
|
) |
|
|