Spaces:
Running
Running
from __future__ import annotations | |
import gradio as gr | |
import subprocess | |
import os | |
import re | |
import tempfile | |
import json | |
import csv | |
from typing import Iterable # Added for Theme | |
from rag_scraper.scraper import Scraper | |
from rag_scraper.converter import Converter | |
from rag_scraper.link_extractor import LinkExtractor, LinkType | |
from rag_scraper.utils import URLUtils | |
from gradio.themes.base import Base # Added for Theme | |
from gradio.themes.utils import colors, fonts, sizes # Added for Theme | |
import markdown_pdf # Added for PDF conversion | |
# --- Custom Theme Definition --- | |
class Seafoam(Base): | |
def __init__( | |
self, | |
*, | |
primary_hue: colors.Color | str = colors.teal, | |
secondary_hue: colors.Color | str = colors.cyan, | |
neutral_hue: colors.Color | str = colors.gray, | |
spacing_size: sizes.Size | str = sizes.spacing_md, | |
radius_size: sizes.Size | str = sizes.radius_md, | |
text_size: sizes.Size | str = sizes.text_md, # Adjusted from lg for a more professional feel | |
font: fonts.Font | |
| str | |
| Iterable[fonts.Font | str] = ( | |
fonts.GoogleFont("Inter"), # Modern sans-serif | |
"ui-sans-serif", | |
"system-ui", | |
"sans-serif", | |
), | |
font_mono: fonts.Font | |
| str | |
| Iterable[fonts.Font | str] = ( | |
fonts.GoogleFont("IBM Plex Mono"), | |
"ui-monospace", | |
"monospace", | |
), | |
): | |
super().__init__( | |
primary_hue=primary_hue, | |
secondary_hue=secondary_hue, | |
neutral_hue=neutral_hue, | |
spacing_size=spacing_size, | |
radius_size=radius_size, | |
text_size=text_size, | |
font=font, | |
font_mono=font_mono, | |
) | |
# Dark Mode First | |
super().set( | |
# Core Colors | |
body_background_fill_dark="black", # True black | |
body_text_color_dark=colors.gray50, | |
block_background_fill_dark=colors.gray_900, | |
block_border_color_dark=colors.gray_700, | |
block_label_background_fill_dark=colors.gray_800, | |
block_label_text_color_dark=colors.gray50, | |
input_background_fill_dark=colors.gray_800, | |
input_border_color_dark=colors.gray_600, | |
input_text_color_dark=colors.gray_50, | |
button_primary_background_fill_dark=colors.teal_600, | |
button_primary_background_fill_hover_dark=colors.teal_500, | |
button_primary_text_color_dark="white", | |
button_secondary_background_fill_dark=colors.gray_700, | |
button_secondary_background_fill_hover_dark=colors.gray_600, | |
button_secondary_text_color_dark="white", | |
slider_color_dark=colors.teal_500, | |
# Light Mode | |
body_background_fill="white", | |
body_text_color=colors.gray_800, | |
block_background_fill=colors.gray_50, | |
block_border_color=colors.gray_300, | |
block_label_background_fill=colors.gray50, | |
block_label_text_color=colors.gray_700, | |
input_background_fill=colors.white, | |
input_border_color=colors.gray_300, | |
input_text_color=colors.gray_900, | |
button_primary_background_fill=colors.teal_500, | |
button_primary_background_fill_hover=colors.teal_600, | |
button_primary_text_color="white", | |
button_secondary_background_fill=colors.gray50, | |
button_secondary_background_fill_hover=colors.gray_300, | |
button_secondary_text_color=colors.gray_800, | |
slider_color=colors.teal_500, | |
# General | |
block_title_text_weight="600", | |
block_shadow="*shadow_drop_lg", | |
button_shadow="*shadow_drop" | |
) | |
seafoam_theme = Seafoam() | |
def is_github_repo(url_or_id): | |
"""Check if the input is a GitHub repository URL or ID.""" | |
if "github.com" in url_or_id: | |
return True | |
if re.match(r'^[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+$', url_or_id): | |
return True | |
return False | |
def check_repomix_installed(): | |
"""Check if Repomix is installed.""" | |
try: | |
result = subprocess.run(["repomix", "--version"], | |
capture_output=True, text=True, check=False) | |
return result.returncode == 0 | |
except Exception: | |
return False | |
def run_repomix(repo_url_or_id, progress=gr.Progress(track_tqdm=True)): | |
"""Run Repomix on the GitHub repository and return the content.""" | |
progress(0, desc="Starting Repomix processing...") | |
try: | |
with tempfile.TemporaryDirectory() as temp_dir: | |
output_file_name = "repomix-output.md" | |
output_file_path = os.path.join(temp_dir, output_file_name) | |
if '/' in repo_url_or_id and not repo_url_or_id.startswith('http'): | |
repo_url = f"https://github.com/{repo_url_or_id}" | |
else: | |
repo_url = repo_url_or_id | |
progress(0.2, desc=f"Running Repomix on {repo_url}...") | |
cmd = [ | |
"repomix", | |
"--remote", repo_url, | |
"--output", output_file_path, | |
"--style", "markdown", | |
"--compress" | |
] | |
process = subprocess.run(cmd, capture_output=True, text=True, check=False, encoding='utf-8') | |
progress(0.8, desc="Repomix command executed.") | |
if process.returncode != 0: | |
error_details = f"Return Code: {process.returncode}\nStderr: {process.stderr}\nStdout: {process.stdout}" | |
return f"Error running Repomix:\n{error_details}", None | |
if os.path.exists(output_file_path): | |
with open(output_file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
progress(1, desc="Repomix output processed.") | |
return content, output_file_path | |
else: | |
error_details = f"Return Code: {process.returncode}\nStderr: {process.stderr}\nStdout: {process.stdout}" | |
return f"Error: Repomix did not generate an output file at '{output_file_path}'.\nRepomix Output:\n{error_details}", None | |
except Exception as e: | |
progress(1, desc="Error during Repomix processing.") | |
return f"Error processing GitHub repository: {str(e)}", None | |
def scrape_and_convert_website(url, depth, progress=gr.Progress(track_tqdm=True)): | |
"""Fetch HTML, extract links, convert to Markdown.""" | |
progress(0, desc=f"Starting web scrape for {url}...") | |
visited_urls = set() | |
all_markdown_content = "" | |
def recursive_scrape(current_url, current_depth, total_links_estimate=1, link_index=0): | |
if current_url in visited_urls or current_depth < 0: | |
return "" | |
visited_urls.add(current_url) | |
try: | |
progress_val = link_index / total_links_estimate if total_links_estimate > 0 else 0 | |
progress(progress_val, desc=f"Scraping: {current_url} (Depth: {depth - current_depth})") | |
html_content = Scraper.fetch_html(current_url) | |
except Exception as e: | |
return f"Error fetching {current_url}: {str(e)}\n" | |
markdown_content = f"## Extracted from: {current_url}\n\n" | |
markdown_content += Converter.html_to_markdown( | |
html=html_content, | |
base_url=current_url, | |
parser_features='html.parser', | |
ignore_links=True | |
) | |
page_content = markdown_content + "\n\n" | |
if current_depth > 0: | |
try: | |
links = LinkExtractor.scrape_url(current_url, link_type=LinkType.INTERNAL) | |
valid_links = [ | |
link for link in links | |
if URLUtils.is_internal(link, current_url) and link not in visited_urls | |
] | |
num_links = len(valid_links) | |
for i, link_url in enumerate(valid_links): | |
page_content += recursive_scrape(link_url, current_depth - 1, num_links, i) | |
except Exception as e: | |
page_content += f"Error extracting links from {current_url}: {str(e)}\n" | |
return page_content | |
all_markdown_content = recursive_scrape(url, depth) | |
progress(1, desc="Web scraping complete.") | |
with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".md", encoding="utf-8") as tmp_file: | |
tmp_file.write(all_markdown_content) | |
return all_markdown_content, tmp_file.name | |
def convert_to_json(markdown_content, source_url_or_id): | |
data = {"source": source_url_or_id, "content": markdown_content} | |
return json.dumps(data, indent=2) | |
def convert_to_csv(markdown_content, source_url_or_id): | |
output = tempfile.NamedTemporaryFile(mode='w+', delete=False, newline='', suffix=".csv", encoding="utf-8") | |
writer = csv.writer(output) | |
writer.writerow(["source", "content"]) | |
writer.writerow([source_url_or_id, markdown_content]) | |
output.close() | |
return output.name | |
def save_output_to_file(content, output_format, source_url_or_id): | |
"""Saves content to a temporary file based on format and returns its path.""" | |
processed_content = content # Default for Markdown and Text | |
if output_format == "JSON": | |
suffix = ".json" | |
processed_content = convert_to_json(content, source_url_or_id) | |
elif output_format == "CSV": | |
# convert_to_csv returns a path directly | |
return convert_to_csv(content, source_url_or_id) | |
elif output_format == "Text": | |
suffix = ".txt" | |
elif output_format == "PDF": | |
suffix = ".pdf" | |
# PDF conversion happens differently, creates file directly | |
pdf_output_path = "" | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_pdf_file: | |
pdf_output_path = tmp_pdf_file.name | |
# Basic PDF conversion from Markdown string | |
# You might need to install a library like `markdown-pdf` or `WeasyPrint` | |
# Example using markdown_pdf (ensure it's installed: pip install markdown-pdf) | |
md_pdf = markdown_pdf.MarkdownPdf(toc_level=2) | |
# md_pdf.meta["css"] = "your_custom_css_path.css" # Optional: for styling | |
md_pdf.convert_from_string(content, pdf_output_path) | |
return pdf_output_path | |
except Exception as e: | |
# Fallback: save as markdown with .pdf.md suffix if PDF fails | |
print(f"PDF conversion failed: {e}. Saving as Markdown instead.") | |
suffix = ".pdf.md" # Indicate it's markdown intended for PDF | |
# No processed_content change needed, it's already markdown | |
else: # Default to Markdown | |
suffix = ".md" | |
# For formats that don't return early (JSON, Text, Markdown, PDF fallback) | |
with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=suffix, encoding="utf-8") as tmp_file: | |
tmp_file.write(processed_content) | |
return tmp_file.name | |
def process_input_updated(url_or_id, source_type, depth, output_format_selection, progress=gr.Progress(track_tqdm=True)): | |
progress(0, desc="Initializing...") | |
raw_content = "" | |
error_message = "" | |
output_file_path = None | |
if source_type == "GitHub Repository": | |
if not check_repomix_installed(): | |
error_message = "Repomix is not installed or not accessible. Please ensure it's installed globally." | |
return error_message, None, None | |
raw_content, _ = run_repomix(url_or_id, progress=progress) | |
if "Error" in raw_content: | |
error_message = raw_content | |
raw_content = "" | |
elif source_type == "Webpage": | |
raw_content, _ = scrape_and_convert_website(url_or_id, depth, progress=progress) | |
if "Error" in raw_content: | |
error_message = raw_content | |
raw_content = "" | |
else: | |
error_message = "Invalid source type selected." | |
return error_message, None, None | |
if error_message: | |
return error_message, None, None | |
try: | |
progress(0.9, desc=f"Converting to {output_format_selection}...") | |
output_file_path = save_output_to_file(raw_content, output_format_selection, url_or_id) | |
preview_content = raw_content # Default for Markdown, Text | |
if output_format_selection == "JSON": | |
preview_content = convert_to_json(raw_content, url_or_id) | |
elif output_format_selection == "CSV" and output_file_path: | |
try: | |
with open(output_file_path, 'r', encoding='utf-8') as f_csv: | |
csv_preview_lines = [next(f_csv) for _ in range(5)] | |
preview_content = "".join(csv_preview_lines) | |
if not preview_content: preview_content = "[CSV content is empty or very short]" | |
except StopIteration: | |
with open(output_file_path, 'r', encoding='utf-8') as f_csv: | |
preview_content = f_csv.read() | |
if not preview_content: preview_content = "[CSV content is empty]" | |
except Exception as e_csv_preview: | |
preview_content = f"[Error reading CSV for preview: {str(e_csv_preview)}]" | |
elif output_format_selection == "CSV" and not output_file_path: | |
preview_content = "[CSV file path not available for preview]" | |
elif output_format_selection == "PDF": | |
preview_content = f"[PDF generated. Download to view: {os.path.basename(output_file_path if output_file_path else 'file.pdf')}]" | |
if "Saving as Markdown instead" in (output_file_path or ""): # Check if PDF failed | |
preview_content = raw_content + f"\n\n[Note: PDF conversion failed, showing Markdown. File saved as .pdf.md]" | |
progress(1, desc="Processing complete.") | |
return f"Successfully processed: {url_or_id}", preview_content, output_file_path | |
except Exception as e: | |
return f"Error during file conversion/saving: {str(e)}", raw_content, None | |
with gr.Blocks(theme=seafoam_theme) as iface: # Applied custom theme | |
gr.Markdown("# RAG-Ready Content Scraper") | |
gr.Markdown( | |
"Scrape webpage content or GitHub repositories to generate RAG-ready datasets." | |
) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
url_input = gr.Textbox( | |
label="Enter URL or GitHub Repository ID", | |
placeholder="e.g., https://example.com OR username/repo" | |
) | |
source_type_input = gr.Radio( | |
choices=["Webpage", "GitHub Repository"], | |
value="Webpage", | |
label="Select Source Type" | |
) | |
depth_input = gr.Slider( | |
minimum=0, maximum=3, step=1, value=0, | |
label="Scraping Depth (for Webpages)", | |
info="0: Only main page. Ignored for GitHub repos." | |
) | |
output_format_input = gr.Dropdown( | |
choices=["Markdown", "JSON", "CSV", "Text", "PDF"], # Added Text and PDF | |
value="Markdown", | |
label="Select Output Format" | |
) | |
submit_button = gr.Button("Process Content", variant="primary") | |
with gr.Column(scale=3): | |
status_output = gr.Textbox(label="Status", interactive=False) | |
preview_output = gr.Code(label="Preview Content", language="markdown", interactive=False) | |
file_download_output = gr.File(label="Download Processed File", interactive=False) | |
# Removed progress_bar = gr.Progress(track_tqdm=True) as it's passed directly | |
gr.Examples( | |
examples=[ | |
["https://gradio.app/docs/js", "Webpage", 1, "Markdown"], | |
["gradio-app/gradio", "GitHub Repository", 0, "Text"], # Changed to Text | |
["https://en.wikipedia.org/wiki/Retrieval-augmented_generation", "Webpage", 0, "JSON"], | |
], | |
inputs=[url_input, source_type_input, depth_input, output_format_input], | |
outputs=[status_output, preview_output, file_download_output], | |
fn=process_input_updated, | |
cache_examples=False | |
) | |
with gr.Accordion("How it Works & More Info", open=False): | |
gr.Markdown( | |
""" | |
**Webpage Scraping:** | |
1. Enter a full URL (e.g., `https://example.com`). | |
2. Select "Webpage" as the source type. | |
3. Set the desired scraping depth. | |
4. Choose your output format. | |
**GitHub Repository Processing:** | |
1. Enter a GitHub repository URL or ID (e.g., `username/repo`). | |
2. Select "GitHub Repository". (Depth is ignored). | |
3. Choose your output format. Uses **RepoMix**. | |
**Output Formats:** Markdown, JSON, CSV, Text, PDF. | |
**Note:** PDF generation requires `markdown-pdf` library. | |
This app is designed for Docker/HuggingFace Spaces. | |
[View Source Code on HuggingFace Spaces](https://huggingface.co/spaces/CultriX/RAG-Scraper) | |
""" | |
) | |
submit_button.click( | |
fn=process_input_updated, | |
inputs=[url_input, source_type_input, depth_input, output_format_input], | |
outputs=[status_output, preview_output, file_download_output], | |
) | |
if __name__ == "__main__": | |
iface.launch() | |