Spaces:
Runtime error
Runtime error
#!/usr/bin/env python3 | |
from nicegui import ui, run, app | |
import requests | |
import zipfile | |
import io | |
import ast | |
import json | |
import tiktoken | |
# --- Core Helper Functions --- | |
def is_file_type(file_path: str, file_extension: str) -> bool: | |
"""Check if the file has the specified file extension.""" | |
return file_path.endswith(file_extension) | |
def is_likely_useful_file(file_path: str, lang: str = "python") -> bool: | |
"""Determine if the file is likely to be useful by excluding common non-source directories and config files.""" | |
excluded_dirs = ["docs", "examples", "tests", "test", "scripts", "utils", "benchmarks"] | |
utility_or_config_files = [] | |
github_workflow_or_docs = [".github", ".gitignore", "LICENSE"] | |
if lang == "python": | |
excluded_dirs.append("__pycache__") | |
utility_or_config_files.extend(["hubconf.py", "setup.py"]) | |
github_workflow_or_docs.extend(["stale.py", "gen-card-", "write_model_card"]) | |
elif lang == "go": | |
excluded_dirs.append("vendor") | |
utility_or_config_files.extend(["go.mod", "go.sum", "Makefile"]) | |
if any(part.startswith(".") for part in file_path.split("/")): | |
return False | |
if "test" in file_path.lower(): | |
return False | |
for excluded_dir in excluded_dirs: | |
if f"/{excluded_dir}/" in file_path or file_path.startswith(excluded_dir + "/"): | |
return False | |
for file_name in utility_or_config_files: | |
if file_name in file_path: | |
return False | |
for doc_file in github_workflow_or_docs: | |
if doc_file in file_path: | |
return False | |
return True | |
def is_test_file(file_content: str, lang: str) -> bool: | |
"""Determine if the file content suggests it is a test file by checking for testing library imports.""" | |
test_indicators = {"python": ["unittest", "pytest"], "go": ["testing"]}.get(lang, []) | |
if lang == "python": | |
try: | |
module = ast.parse(file_content) | |
for node in ast.walk(module): | |
if isinstance(node, ast.Import): | |
for alias in node.names: | |
if alias.name in test_indicators: | |
return True | |
elif isinstance(node, ast.ImportFrom): | |
if node.module in test_indicators: | |
return True | |
except SyntaxError: | |
pass | |
return False | |
# --- Token Calculation Function --- | |
def get_token_count(text: str) -> int: | |
"""Calculates the number of tokens in a string using the cl100k_base encoding.""" | |
try: | |
encoding = tiktoken.get_encoding("cl100k_base") | |
tokens = encoding.encode(text) | |
return len(tokens) | |
except Exception as e: | |
print(f"Could not calculate tokens: {e}") | |
return 0 | |
# --- Core Processing Logic --- | |
def download_and_process_repo(repo_url: str, branch_or_tag: str, log: ui.log) -> str | None: | |
""" | |
Downloads and processes files from a GitHub repository, logging progress to the UI. | |
Returns the concatenated content as a string, or None on failure. | |
""" | |
download_url = f"{repo_url}/archive/refs/heads/{branch_or_tag}.zip" | |
lang = "python" | |
log.push(f"Attempting to download from: {download_url}") | |
try: | |
response = requests.get(download_url, timeout=30) | |
response.raise_for_status() | |
except requests.exceptions.RequestException as e: | |
log.push(f"β Error: Failed to download the repository. {e}") | |
return None | |
log.push("β Download successful. Processing files...") | |
zip_file = zipfile.ZipFile(io.BytesIO(response.content)) | |
file_contents = "" | |
all_files = zip_file.namelist() | |
log.push(f"Found {len(all_files)} total files in the archive.") | |
for file_path in all_files: | |
cleaned_path = "/".join(file_path.split('/')[1:]) | |
if not cleaned_path or file_path.endswith("/"): | |
continue | |
if not is_file_type(cleaned_path, ".py") or not is_likely_useful_file(cleaned_path, lang): | |
continue | |
try: | |
file_content = zip_file.read(file_path).decode("utf-8") | |
except (UnicodeDecodeError, Exception) as e: | |
log.push(f"β οΈ Skipping (read/decode error): {cleaned_path} - {e}") | |
continue | |
if is_test_file(file_content, lang): | |
log.push(f"Skipping (test file): {cleaned_path}") | |
continue | |
log.push(f"Appending: {cleaned_path}") | |
file_contents += f"# File: {cleaned_path}\n" | |
file_contents += file_content | |
file_contents += "\n\n" | |
log.push("β Processing complete.") | |
return file_contents | |
# --- Dedicated health check endpoint --- | |
def health_check(): | |
"""A simple endpoint for the Hugging Face health checker.""" | |
return "OK", 200 | |
# --- NiceGUI User Interface Definition --- | |
def main_page(): | |
"""Defines the layout and functionality of the web interface.""" | |
async def process_repository(): | |
"""Handles the button click event to start processing the repository.""" | |
# 1. Clear previous results and set loading state | |
log.clear() | |
output_area.set_value('') | |
token_count_label.set_text('0') | |
process_button.set_visibility(False) | |
spinner.set_visibility(True) | |
repo_url = repo_input.value | |
branch = branch_input.value | |
if not repo_url: | |
ui.notify('Repository URL cannot be empty.', type='negative') | |
process_button.set_visibility(True) | |
spinner.set_visibility(False) | |
return | |
# 2. Run the blocking I/O function in a separate thread | |
content = await run.io_bound(download_and_process_repo, repo_url, branch, log) | |
# 3. Restore UI and display results | |
process_button.set_visibility(True) | |
spinner.set_visibility(False) | |
if content is not None: | |
output_area.set_value(content) | |
# Calculate and display token count | |
num_tokens = get_token_count(content) | |
token_count_label.set_text(f'{num_tokens:,}') | |
ui.notify('Repository processed successfully!', type='positive') | |
else: | |
ui.notify('Failed to process repository. Check log for details.', type='negative') | |
async def copy_to_clipboard(text: str): | |
"""Copies the provided text to the user's clipboard.""" | |
if not text: | |
ui.notify('There is no content to copy.', type='warning') | |
return | |
await ui.run_javascript(f'navigator.clipboard.writeText({json.dumps(text)})', respond=False) | |
ui.notify('Output copied to clipboard!', type='positive') | |
# --- UI Layout --- | |
ui.query('body').style('background-color: #f4f4f5;') | |
with ui.column().classes('w-full items-center gap-4 mx-auto p-4'): | |
ui.label('Research MAGIC GitHub Repo to Single File').classes('text-3xl font-bold mt-4') | |
ui.label('Concatenate repository Python and Go source files into a single file for Agentic Analysis.').classes( | |
'text-lg text-gray-600') | |
with ui.card().classes('w-full max-w-4xl shadow-lg'): | |
with ui.row().classes('w-full items-center gap-4'): | |
repo_input = ui.input( | |
label="GitHub Repository URL", | |
value="https://github.com/rodrigo-masini/github2file" | |
).props('outlined dense').classes('flex-grow') | |
branch_input = ui.input(label="Branch or Tag", value="master").props('outlined dense').style( | |
'width: 150px;') | |
process_button = ui.button('Process Repository', on_click=process_repository).props('icon=hub') | |
spinner = ui.spinner(size='lg').classes('absolute-center') | |
spinner.set_visibility(False) | |
with ui.card().classes('w-full max-w-4xl shadow-lg'): | |
ui.label('Processing Log').classes('text-xl font-semibold') | |
log = ui.log().classes('w-full h-48').props('bordered') | |
# --- New Dynamic Output Information Card --- | |
with ui.card().classes('w-full max-w-4xl shadow-lg'): | |
ui.label('Output Information').classes('text-xl font-semibold') | |
with ui.row().classes('w-full items-center'): | |
ui.label('Total Tokens (cl100k_base):').classes('text-lg') | |
token_count_label = ui.label('0').classes('text-lg font-mono font-bold ml-2') | |
with ui.card().classes('w-full max-w-4xl shadow-lg'): | |
with ui.row().classes('w-full justify-between items-center'): | |
ui.label('Concatenated Repo Data').classes('text-xl font-semibold') | |
ui.button(icon='content_copy', on_click=lambda: copy_to_clipboard(output_area.value)).props( | |
'flat dense round') | |
output_area = ui.textarea().classes('w-full h-96 font-mono').props( | |
'outlined readonly placeholder="Output will appear here..."') | |
# NiceGUI application on port 8080 | |
ui.run() |