#!/usr/bin/env python3 """ Web Scraper MCP Server A Model Context Protocol server that provides web scraping tools. Exposes functions to scrape websites, convert content to markdown, and generate sitemaps. """ import gradio as gr import requests from bs4 import BeautifulSoup from markdownify import markdownify as md from urllib.parse import urljoin, urlparse from typing import Tuple, List, Dict import re import tempfile import zipfile import os def scrape_website_content(url: str) -> Tuple[str, str]: """ Scrape a website and return its main content formatted as markdown and a downloadable file path. Args: url (str): The URL to scrape (can include or omit http/https protocol) Returns: Tuple[str, str]: The scraped content formatted as markdown, and a file path for download """ try: # Validate URL if not url.startswith(('http://', 'https://')): url = 'https://' + url # Create session with proper headers session = requests.Session() session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) # Make request response = session.get(url, timeout=10) response.raise_for_status() # Parse HTML soup = BeautifulSoup(response.content, 'html.parser') # Remove unwanted elements for element in soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): element.decompose() # Try to find main content area main_content = ( soup.find('main') or soup.find('article') or soup.find('div', class_=re.compile(r'content|main|post|article')) or soup.find('body') ) if main_content: # Convert to markdown markdown_text = md(str(main_content), heading_style="ATX") # Clean up the markdown # Remove excessive newlines markdown_text = re.sub(r'\n{3,}', '\n\n', markdown_text) # Remove empty links markdown_text = re.sub(r'\[\s*\]\([^)]*\)', '', markdown_text) # Clean up whitespace markdown_text = re.sub(r'[ \t]+', ' ', markdown_text) # Add title if available title = soup.find('title') if title: markdown_text = f"# {title.get_text().strip()}\n\n{markdown_text}" markdown_text = markdown_text.strip() # Write to temp file for download with tempfile.NamedTemporaryFile(delete=False, suffix=".md", mode="w", encoding="utf-8") as f: f.write(markdown_text) temp_path = f.name return markdown_text, temp_path return "No main content found on the webpage.", None except requests.exceptions.RequestException as e: return f"Error fetching URL: {str(e)}", None except Exception as e: return f"Error processing content: {str(e)}", None def generate_sitemap(url: str, max_links_per_domain: int = None) -> Tuple[str, str]: """ Generate a sitemap from all links found on a webpage and provide a downloadable file path. Args: url (str): The URL to analyze for links (can include or omit http/https protocol) max_links_per_domain (int, optional): Maximum number of links to display per domain. If None, shows all links. Defaults to None. Returns: Tuple[str, str]: A markdown-formatted sitemap of all links found on the page, and a file path for download """ try: # Validate URL if not url.startswith(('http://', 'https://')): url = 'https://' + url # Create session with proper headers session = requests.Session() session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) # Make request response = session.get(url, timeout=10) response.raise_for_status() # Parse HTML soup = BeautifulSoup(response.content, 'html.parser') # Find all links links = soup.find_all('a', href=True) # Process links sitemap_data = [] seen_urls = set() for link in links: href = link.get('href') text = link.get_text().strip() if not href: continue # Convert relative URLs to absolute full_url = urljoin(url, href) # Filter out unwanted links if (full_url in seen_urls or href.startswith(('#', 'javascript:', 'mailto:', 'tel:')) or full_url == url): continue seen_urls.add(full_url) # Create link entry if not text: text = href sitemap_data.append({ 'text': text[:100] + '...' if len(text) > 100 else text, 'url': full_url }) # Generate sitemap markdown if not sitemap_data: return "No links found on this page.", None sitemap_md = "# Sitemap\n\n" sitemap_md += f"Found {len(sitemap_data)} links:\n\n" # Group by domain for better organization domain_groups = {} parsed_base = urlparse(url) for item in sitemap_data: parsed_url = urlparse(item['url']) if parsed_url.netloc == parsed_base.netloc: domain_key = "Internal Links" else: domain_key = f"External Links ({parsed_url.netloc})" if domain_key not in domain_groups: domain_groups[domain_key] = [] domain_groups[domain_key].append(item) # Format sitemap for domain, links in domain_groups.items(): sitemap_md += f"## {domain}\n\n" # Use the limit parameter or show all links if None if max_links_per_domain is None: links_to_show = links remaining_links = 0 else: links_to_show = links[:max_links_per_domain] remaining_links = max(0, len(links) - max_links_per_domain) for link in links_to_show: sitemap_md += f"- [{link['text']}]({link['url']})\n" if remaining_links > 0: sitemap_md += f"- ... and {remaining_links} more links\n" sitemap_md += "\n" # Write to temp file for download with tempfile.NamedTemporaryFile(delete=False, suffix=".md", mode="w", encoding="utf-8") as f: f.write(sitemap_md) temp_path = f.name return sitemap_md, temp_path except requests.exceptions.RequestException as e: return f"Error fetching URL: {str(e)}", None except Exception as e: return f"Error processing content: {str(e)}", None def extract_all_content_as_zip(url: str, max_links: int = None) -> Tuple[str, str]: """ Extract text content from all links found on a webpage and create a downloadable zip file. Args: url (str): The URL to analyze for links (can include or omit http/https protocol) max_links (int, optional): Maximum number of links to process. If None, processes all links. Defaults to None. Returns: Tuple[str, str]: Status message and zip file path for download """ try: # Validate URL if not url.startswith(('http://', 'https://')): url = 'https://' + url # Create session with proper headers session = requests.Session() session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) # First get the sitemap to find all links response = session.get(url, timeout=10) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') links = soup.find_all('a', href=True) # Process links to get unique URLs unique_urls = set() parsed_base = urlparse(url) for link in links: href = link.get('href') if not href: continue full_url = urljoin(url, href) # Filter out unwanted links and focus on same domain for safety if (href.startswith(('#', 'javascript:', 'mailto:', 'tel:')) or full_url == url): continue # Only include internal links to avoid scraping too many external sites parsed_url = urlparse(full_url) if parsed_url.netloc == parsed_base.netloc: unique_urls.add(full_url) if not unique_urls: return "No internal links found to extract content from.", None # Use all URLs or limit if specified urls_to_process = list(unique_urls) total_links_found = len(urls_to_process) # Apply limit if specified if max_links is not None: urls_to_process = urls_to_process[:max_links] limited_message = f" (limited to {max_links} out of {total_links_found})" else: limited_message = "" # Create temporary zip file with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as temp_zip: zip_path = temp_zip.name successful_extractions = 0 failed_extractions = 0 with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zip_file: for i, link_url in enumerate(urls_to_process, 1): try: # Get content from each link link_response = session.get(link_url, timeout=10) link_response.raise_for_status() # Parse and extract content link_soup = BeautifulSoup(link_response.content, 'html.parser') # Remove unwanted elements for element in link_soup(['script', 'style', 'nav', 'footer', 'header', 'aside']): element.decompose() # Find main content main_content = ( link_soup.find('main') or link_soup.find('article') or link_soup.find('div', class_=re.compile(r'content|main|post|article')) or link_soup.find('body') ) if main_content: # Convert to markdown markdown_text = md(str(main_content), heading_style="ATX") # Clean up the markdown markdown_text = re.sub(r'\n{3,}', '\n\n', markdown_text) markdown_text = re.sub(r'\[\s*\]\([^)]*\)', '', markdown_text) markdown_text = re.sub(r'[ \t]+', ' ', markdown_text) # Add title if available title = link_soup.find('title') if title: markdown_text = f"# {title.get_text().strip()}\n\n{markdown_text}" markdown_text = markdown_text.strip() # Create safe filename parsed_link = urlparse(link_url) safe_filename = re.sub(r'[^\w\-_.]', '_', parsed_link.path or 'index') if not safe_filename.endswith('.md'): safe_filename += '.md' # Ensure unique filename if safe_filename == '.md' or safe_filename == 'index.md': safe_filename = f"page_{i}.md" # Add source URL as header final_content = f"\n\n{markdown_text}" # Add to zip zip_file.writestr(safe_filename, final_content) successful_extractions += 1 else: failed_extractions += 1 except Exception as e: failed_extractions += 1 continue status_message = f"Successfully extracted content from {successful_extractions} pages{limited_message}" if failed_extractions > 0: status_message += f", failed to extract from {failed_extractions} pages" status_message += f". Created zip file with {successful_extractions} markdown files." return status_message, zip_path except requests.exceptions.RequestException as e: return f"Error fetching URL: {str(e)}", None except Exception as e: return f"Error processing content: {str(e)}", None def generate_sitemap_for_ui(url: str) -> Tuple[str, str]: """ Wrapper function for the Gradio UI that shows all links without limitation. Args: url (str): The URL to analyze for links Returns: Tuple[str, str]: A markdown-formatted sitemap of all links found on the page, and a file path for download """ return generate_sitemap(url, max_links_per_domain=None) def generate_sitemap_with_limit(url: str, max_links_per_domain: int) -> Tuple[str, str]: """ Wrapper function for Gradio UI that allows configurable link limits per domain. Args: url (str): The URL to analyze for links max_links_per_domain (int): Maximum number of links to display per domain (0 = show all) Returns: Tuple[str, str]: A markdown-formatted sitemap of all links found on the page, and a file path for download """ limit = None if max_links_per_domain == 0 else max_links_per_domain return generate_sitemap(url, max_links_per_domain=limit) def extract_all_content_for_ui(url: str) -> Tuple[str, str]: """ Wrapper function for the Gradio UI that extracts content from all internal links without limitation. Args: url (str): The URL to analyze for links Returns: Tuple[str, str]: Status message and zip file path for download """ return extract_all_content_as_zip(url, max_links=None) def extract_limited_content_as_zip(url: str, max_links: int) -> Tuple[str, str]: """ Wrapper function for Gradio UI that allows configurable link limits for bulk extraction. Args: url (str): The URL to analyze for links max_links (int): Maximum number of links to process (0 = process all) Returns: Tuple[str, str]: Status message and zip file path for download """ limit = None if max_links == 0 else max_links return extract_all_content_as_zip(url, max_links=limit) # Create Gradio interfaces for each function def create_mcp_interface(): """Create Gradio interface that exposes web scraping tools as MCP functions.""" # Create individual interfaces for each tool scrape_interface = gr.Interface( fn=scrape_website_content, inputs=gr.Textbox( label="Website URL", placeholder="https://example.com or example.com" ), outputs=[ gr.Textbox( label="Scraped Content", lines=20, max_lines=50, show_copy_button=True, container=True ), gr.File(label="Download Markdown") ], title="Website Content Scraper", description="Extract and format website content as markdown", api_name="scrape_content" ) sitemap_interface = gr.Interface( fn=generate_sitemap_for_ui, inputs=gr.Textbox( label="Website URL", placeholder="https://example.com or example.com" ), outputs=[ gr.Textbox( label="Sitemap", lines=20, max_lines=50, show_copy_button=True, container=True ), gr.File(label="Download Sitemap") ], title="Website Sitemap Generator", description="Generate a sitemap of all links found on a webpage", api_name="generate_sitemap" ) bulk_extract_interface = gr.Interface( fn=extract_all_content_for_ui, inputs=gr.Textbox( label="Website URL", placeholder="https://example.com or example.com" ), outputs=[ gr.Textbox( label="Extraction Status", lines=10, max_lines=20, show_copy_button=True, container=True ), gr.File(label="Download ZIP Archive") ], title="Bulk Content Extractor", description="Extract text content from all internal links and download as ZIP", api_name="extract_all_content" ) # Enhanced sitemap interface with configurable limits sitemap_limited_interface = gr.Interface( fn=generate_sitemap_with_limit, inputs=[ gr.Textbox( label="Website URL", placeholder="https://example.com or example.com" ), gr.Number( label="Max Links Per Domain", value=0, info="Enter 0 to show all links, or a positive number to limit display per domain", minimum=0, maximum=1000 ) ], outputs=[ gr.Textbox( label="Sitemap", lines=20, max_lines=50, show_copy_button=True, container=True ), gr.File(label="Download Sitemap") ], title="Configurable Sitemap Generator", description="Generate a sitemap with optional display limits (0 = show all links)", api_name="generate_sitemap_limited" ) # Enhanced bulk extract interface with configurable limits bulk_limited_interface = gr.Interface( fn=extract_limited_content_as_zip, inputs=[ gr.Textbox( label="Website URL", placeholder="https://example.com or example.com" ), gr.Number( label="Max Pages to Extract", value=0, info="Enter 0 to process all pages, or a positive number to limit extraction", minimum=0, maximum=1000 ) ], outputs=[ gr.Textbox( label="Extraction Status", lines=10, max_lines=20, show_copy_button=True, container=True ), gr.File(label="Download ZIP Archive") ], title="Limited Bulk Content Extractor", description="Extract text content from internal links with optional processing limits (0 = extract all)", api_name="extract_limited_content" ) # Combine into tabbed interface demo = gr.TabbedInterface( [scrape_interface, sitemap_interface, sitemap_limited_interface, bulk_extract_interface, bulk_limited_interface], ["Content Scraper", "All Links Sitemap", "Limited Sitemap", "Bulk Extractor", "Limited Bulk Extractor"], title="🕷️ Web Scraper MCP Server" ) return demo if __name__ == "__main__": # Create and launch the MCP server app = create_mcp_interface() app.launch( mcp_server=True )