Spaces:
Running
Running
import gradio as gr | |
import subprocess | |
import os | |
import re | |
import tempfile | |
import json | |
import csv | |
from rag_scraper.scraper import Scraper | |
from rag_scraper.converter import Converter | |
from rag_scraper.link_extractor import LinkExtractor, LinkType | |
from rag_scraper.utils import URLUtils | |
def is_github_repo(url_or_id): | |
"""Check if the input is a GitHub repository URL or ID.""" | |
if "github.com" in url_or_id: | |
return True | |
if re.match(r'^[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+$', url_or_id): | |
return True | |
return False | |
def check_repomix_installed(): | |
"""Check if Repomix is installed.""" | |
try: | |
result = subprocess.run(["repomix", "--version"], | |
capture_output=True, text=True, check=False) | |
return result.returncode == 0 | |
except Exception: | |
return False | |
def run_repomix(repo_url_or_id, progress=gr.Progress(track_tqdm=True)): | |
"""Run Repomix on the GitHub repository and return the content.""" | |
progress(0, desc="Starting Repomix processing...") | |
try: | |
with tempfile.TemporaryDirectory() as temp_dir: | |
# RepoMix typically outputs a zip file if not specifying a single output style, | |
# or a specific file if --style is used. | |
# For simplicity, let's assume we want markdown and it outputs to a known file or stdout. | |
# The current repomix command in the original script uses --style markdown and --output. | |
output_file_name = "repomix-output.md" # Assuming markdown output | |
output_file_path = os.path.join(temp_dir, output_file_name) | |
if '/' in repo_url_or_id and not repo_url_or_id.startswith('http'): | |
repo_url = f"https://github.com/{repo_url_or_id}" | |
else: | |
repo_url = repo_url_or_id | |
progress(0.2, desc=f"Running Repomix on {repo_url}...") | |
cmd = [ | |
"repomix", | |
"--remote", repo_url, | |
"--output", output_file_path, # Direct output to a file | |
"--style", "markdown", # Explicitly request markdown | |
"--compress" | |
] | |
process = subprocess.run(cmd, capture_output=True, text=True, check=False) | |
progress(0.8, desc="Repomix command executed.") | |
if process.returncode != 0: | |
return f"Error running Repomix: {process.stderr}", None | |
if os.path.exists(output_file_path): | |
with open(output_file_path, 'r', encoding='utf-8') as f: | |
content = f.read() | |
progress(1, desc="Repomix output processed.") | |
return content, output_file_path # Return content and path for potential download | |
else: | |
return "Error: Repomix did not generate an output file.", None | |
except Exception as e: | |
progress(1, desc="Error during Repomix processing.") | |
return f"Error processing GitHub repository: {str(e)}", None | |
def scrape_and_convert_website(url, depth, progress=gr.Progress(track_tqdm=True)): | |
"""Fetch HTML, extract links, convert to Markdown.""" | |
progress(0, desc=f"Starting web scrape for {url}...") | |
visited_urls = set() | |
all_markdown_content = "" | |
def recursive_scrape(current_url, current_depth, total_links_estimate=1, link_index=0): | |
if current_url in visited_urls or current_depth < 0: | |
return "" | |
visited_urls.add(current_url) | |
try: | |
progress_val = link_index / total_links_estimate if total_links_estimate > 0 else 0 | |
progress(progress_val, desc=f"Scraping: {current_url} (Depth: {depth - current_depth})") | |
html_content = Scraper.fetch_html(current_url) | |
except Exception as e: | |
return f"Error fetching {current_url}: {str(e)}\n" | |
markdown_content = f"## Extracted from: {current_url}\n\n" | |
markdown_content += Converter.html_to_markdown( | |
html=html_content, | |
base_url=current_url, | |
parser_features='html.parser', | |
ignore_links=True | |
) | |
page_content = markdown_content + "\n\n" | |
if current_depth > 0: | |
try: | |
links = LinkExtractor.scrape_url(current_url, link_type=LinkType.INTERNAL) | |
# Filter out already visited links and external links more carefully | |
valid_links = [ | |
link for link in links | |
if URLUtils.is_internal(link, current_url) and link not in visited_urls | |
] | |
num_links = len(valid_links) | |
for i, link_url in enumerate(valid_links): | |
page_content += recursive_scrape(link_url, current_depth - 1, num_links, i) | |
except Exception as e: | |
page_content += f"Error extracting links from {current_url}: {str(e)}\n" | |
return page_content | |
all_markdown_content = recursive_scrape(url, depth) | |
progress(1, desc="Web scraping complete.") | |
# For web scraping, we create a temporary file with the content for download | |
with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".md", encoding="utf-8") as tmp_file: | |
tmp_file.write(all_markdown_content) | |
return all_markdown_content, tmp_file.name | |
# --- Data Conversion Functions (Stubs for now) --- | |
def convert_to_json(markdown_content, source_url_or_id): | |
"""Converts markdown content to a JSON string.""" | |
# Basic implementation: create a JSON object with source and content | |
# More sophisticated parsing can be added later | |
data = {"source": source_url_or_id, "content": markdown_content} | |
return json.dumps(data, indent=2) | |
def convert_to_csv(markdown_content, source_url_or_id): | |
"""Converts markdown content to a CSV string.""" | |
# Basic implementation: create a CSV with source and content | |
# This is a simplified CSV; real CSVs might need more structure | |
output = tempfile.NamedTemporaryFile(mode='w+', delete=False, newline='', suffix=".csv", encoding="utf-8") | |
writer = csv.writer(output) | |
writer.writerow(["source", "content"]) # Header | |
# Split content into manageable chunks or lines if necessary for CSV | |
# For now, putting all content in one cell. | |
writer.writerow([source_url_or_id, markdown_content]) | |
output.close() | |
return output.name # Return path to the CSV file | |
def save_output_to_file(content, output_format, source_url_or_id): | |
"""Saves content to a temporary file based on format and returns its path.""" | |
suffix = f".{output_format.lower()}" | |
if output_format == "JSON": | |
processed_content = convert_to_json(content, source_url_or_id) | |
elif output_format == "CSV": | |
# convert_to_csv now returns a path directly | |
return convert_to_csv(content, source_url_or_id) | |
else: # Markdown/Text | |
processed_content = content | |
suffix = ".md" | |
with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=suffix, encoding="utf-8") as tmp_file: | |
tmp_file.write(processed_content) | |
return tmp_file.name | |
# --- Main Processing Function --- | |
def process_input_updated(url_or_id, source_type, depth, output_format_selection, progress=gr.Progress(track_tqdm=True)): | |
"""Main function to process URL or GitHub repo based on selected type and format.""" | |
progress(0, desc="Initializing...") | |
raw_content = "" | |
error_message = "" | |
output_file_path = None | |
if source_type == "GitHub Repository": | |
if not check_repomix_installed(): | |
error_message = "Repomix is not installed or not accessible. Please ensure it's installed globally in your Docker environment." | |
return error_message, None, None # Text output, Preview, File output | |
raw_content, _ = run_repomix(url_or_id, progress=progress) # Repomix returns content and its original path | |
if "Error" in raw_content: # Simple error check | |
error_message = raw_content | |
raw_content = "" | |
elif source_type == "Webpage": | |
raw_content, _ = scrape_and_convert_website(url_or_id, depth, progress=progress) | |
if "Error" in raw_content: # Simple error check | |
error_message = raw_content | |
raw_content = "" | |
else: | |
error_message = "Invalid source type selected." | |
return error_message, None, None | |
if error_message: | |
return error_message, None, None # Error text, no preview, no file | |
# Save raw_content (which is markdown) to a file of the chosen output_format | |
# This will handle conversion if necessary | |
try: | |
progress(0.9, desc=f"Converting to {output_format_selection}...") | |
output_file_path = save_output_to_file(raw_content, output_format_selection, url_or_id) | |
# For preview, we'll show the raw markdown, or a snippet of JSON/CSV | |
preview_content = raw_content # Default to markdown | |
if output_format_selection == "JSON": | |
preview_content = convert_to_json(raw_content, url_or_id) | |
elif output_format_selection == "CSV": | |
# For CSV preview, maybe just show a message or first few lines | |
preview_content = f"CSV file generated. Path: {output_file_path}\nFirst few lines might be shown here in a real app." | |
# Or read a bit of the CSV for preview: | |
# with open(output_file_path, 'r', encoding='utf-8') as f_csv: | |
# preview_content = "".join(f_csv.readlines()[:5]) | |
progress(1, desc="Processing complete.") | |
return f"Successfully processed: {url_or_id}", preview_content, output_file_path | |
except Exception as e: | |
return f"Error during file conversion/saving: {str(e)}", raw_content, None | |
# --- Gradio Interface Definition --- | |
with gr.Blocks(theme=gr.themes.Soft()) as iface: | |
gr.Markdown("# RAG-Ready Content Scraper") | |
gr.Markdown( | |
"Scrape webpage content (using RAG-scraper) or GitHub repositories (using RepoMix) " | |
"to generate RAG-ready datasets. Uses Docker for full functionality on HuggingFace Spaces." | |
) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
url_input = gr.Textbox( | |
label="Enter URL or GitHub Repository ID", | |
placeholder="e.g., https://example.com OR username/repo" | |
) | |
source_type_input = gr.Radio( | |
choices=["Webpage", "GitHub Repository"], | |
value="Webpage", | |
label="Select Source Type" | |
) | |
depth_input = gr.Slider( | |
minimum=0, maximum=3, step=1, value=0, | |
label="Scraping Depth (for Webpages)", | |
info="0: Only main page. Ignored for GitHub repos." | |
) | |
output_format_input = gr.Dropdown( | |
choices=["Markdown", "JSON", "CSV"], # Markdown is like text file | |
value="Markdown", | |
label="Select Output Format" | |
) | |
submit_button = gr.Button("Process Content", variant="primary") | |
with gr.Column(scale=3): | |
status_output = gr.Textbox(label="Status", interactive=False) | |
preview_output = gr.Code(label="Preview Content", language="markdown", interactive=False) # Default to markdown, can show JSON too | |
file_download_output = gr.File(label="Download Processed File", interactive=False) | |
progress_bar = gr.Progress(track_tqdm=True) | |
# --- Examples --- | |
gr.Examples( | |
examples=[ | |
["https://gradio.app/docs/js", "Webpage", 1, "Markdown"], | |
["gradio-app/gradio", "GitHub Repository", 0, "Markdown"], | |
["https://en.wikipedia.org/wiki/Retrieval-augmented_generation", "Webpage", 0, "JSON"], | |
], | |
inputs=[url_input, source_type_input, depth_input, output_format_input], | |
outputs=[status_output, preview_output, file_download_output], # Function needs to match this | |
fn=process_input_updated, # Make sure the function signature matches | |
cache_examples=False # For development, disable caching | |
) | |
# --- How it Works & GitHub Link --- | |
with gr.Accordion("How it Works & More Info", open=False): | |
gr.Markdown( | |
""" | |
**Webpage Scraping:** | |
1. Enter a full URL (e.g., `https://example.com`). | |
2. Select "Webpage" as the source type. | |
3. Set the desired scraping depth (how many levels of internal links to follow). | |
4. Choose your output format. | |
5. The tool fetches HTML, converts it to Markdown, and follows internal links up to the specified depth. | |
**GitHub Repository Processing:** | |
1. Enter a GitHub repository URL (e.g., `https://github.com/username/repo`) or shorthand ID (e.g., `username/repo`). | |
2. Select "GitHub Repository" as the source type. (Scraping depth is ignored). | |
3. Choose your output format. | |
4. The tool uses **RepoMix** to fetch and process the repository into a structured Markdown format. | |
**Output Formats:** | |
- **Markdown:** Plain text Markdown file, suitable for direct reading or further processing. | |
- **JSON:** Structured JSON output, typically with fields like `source` and `content`. | |
- **CSV:** Comma-Separated Values file, useful for tabular data or importing into spreadsheets. | |
**Note on HuggingFace Spaces:** This application is designed to run in a Docker-based HuggingFace Space, | |
which allows the use of `RepoMix` for GitHub repositories. | |
[View Source Code on HuggingFace Spaces](https://huggingface.co/spaces/CultriX/RAG-Scraper) | |
""" | |
) | |
submit_button.click( | |
fn=process_input_updated, | |
inputs=[url_input, source_type_input, depth_input, output_format_input, progress_bar], | |
outputs=[status_output, preview_output, file_download_output] | |
) | |
if __name__ == "__main__": | |
iface.launch() | |