Spaces:
Running
Running
import os | |
import subprocess | |
import signal | |
os.environ["GRADIO_ANALYTICS_ENABLED"] = "False" | |
import gradio as gr | |
import tempfile | |
import torch | |
from datasets import load_dataset | |
from tqdm.auto import tqdm | |
import re | |
import numpy as np | |
import gc | |
import unicodedata | |
from multiprocessing import cpu_count | |
from transformers import LlamaTokenizerFast | |
import fasttext | |
from typing import Tuple, Dict, List, Generator | |
import json | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
from datetime import datetime | |
import warnings | |
from huggingface_hub import HfApi, create_repo, upload_file, snapshot_download, whoami, HfFolder | |
from pathlib import Path | |
from textwrap import dedent | |
from scipy import stats | |
from apscheduler.schedulers.background import BackgroundScheduler | |
warnings.filterwarnings('ignore') | |
# Environment variables | |
HF_TOKEN = os.environ.get("HF_TOKEN") | |
# Global variables for model caching | |
MODEL_CACHE_DIR = Path.home() / ".cache" / "ultra_fineweb" | |
MODEL_CACHE_DIR.mkdir(parents=True, exist_ok=True) | |
MODEL_LOADED = False | |
fasttext_model = None | |
tokenizer = None | |
# CSS | |
css = """ | |
.gradio-container {overflow-y: auto;} | |
.gr-button-primary { | |
background-color: #ff6b00 !important; | |
border-color: #ff6b00 !important; | |
} | |
.gr-button-primary:hover { | |
background-color: #ff8534 !important; | |
border-color: #ff8534 !important; | |
} | |
""" | |
# HTML templates | |
TITLE = """ | |
<div style="text-align: center; margin-bottom: 30px;"> | |
<h1 style="font-size: 36px; margin-bottom: 10px;">Create your own Dataset Quality Scores, blazingly fast ⚡!</h1> | |
<p style="font-size: 16px; color: #666;">The space takes a HF dataset as input, scores it and provides statistics and quality distribution.</p> | |
</div> | |
""" | |
DESCRIPTION_MD = """ | |
### 📋 How it works: | |
1. Choose a dataset from Hugging Face Hub. | |
2. The Ultra-FineWeb classifier will score each text sample. | |
3. View quality distribution and download the scored dataset. | |
4. Optionally, upload the results to a new repository on your Hugging Face account. | |
**Note:** The first run will download the model (~347MB), which may take a moment. | |
""" | |
# --- Helper Functions --- | |
# ============================================================================== | |
# --- HATAYI GİDEREN KESİN VE NİHAİ DÜZELTME BURADA --- | |
# `escape` fonksiyonu, olması gereken doğru haline geri getirildi. | |
# ============================================================================== | |
def escape(s: str) -> str: | |
"""Escape special characters for safe HTML display.""" | |
s = str(s) | |
s = s.replace("&", "&") | |
s = s.replace("<", "<") | |
s = s.replace(">", ">") | |
s = s.replace('"', """) | |
s = s.replace("\n", "<br/>") | |
return s | |
def fasttext_preprocess(content: str, tokenizer) -> str: | |
if not isinstance(content, str): return "" | |
content = re.sub(r'\n{3,}', '\n\n', content).lower() | |
content = ''.join(c for c in unicodedata.normalize('NFKD', content) if unicodedata.category(c) != 'Mn') | |
token_ids = tokenizer.encode(content, add_special_tokens=False) | |
content = ' '.join([tokenizer.decode([token_id]) for token_id in token_ids]) | |
content = re.sub(r'\n', ' n ', content).replace('\r', '').replace('\t', ' ') | |
return re.sub(r' +', ' ', content).strip() | |
def fasttext_infer(norm_content: str, model) -> Tuple[str, float]: | |
"""Run inference using the FastText model. | |
Args: | |
norm_content: Normalized text content to score | |
model: Loaded FastText model | |
Returns: | |
Tuple of (predicted_label, score) where score is between 0 and 1 | |
""" | |
try: | |
# Get prediction from model | |
pred_label, pred_prob = model.predict(norm_content) | |
# Handle different label formats | |
if isinstance(pred_label, (list, np.ndarray)) and len(pred_label) > 0: | |
pred_label = pred_label[0] | |
# Default score if we can't process it | |
score = 0.5 | |
# Handle different probability formats | |
if pred_prob is not None: | |
# If it's a numpy array, convert to list | |
if hasattr(pred_prob, 'tolist'): | |
pred_prob = pred_prob.tolist() | |
# Handle list/array formats | |
if isinstance(pred_prob, (list, np.ndarray)) and len(pred_prob) > 0: | |
# Get first element if it's a nested structure | |
first_prob = pred_prob[0] if not isinstance(pred_prob[0], (list, np.ndarray)) else pred_prob[0][0] | |
score = float(first_prob) | |
else: | |
# Try direct conversion if it's a single value | |
score = float(pred_prob) | |
# Ensure score is between 0 and 1 | |
score = max(0.0, min(1.0, score)) | |
return pred_label, score | |
except Exception as e: | |
print(f"Error in fasttext_infer: {e}") | |
return "__label__neg", 0.0 | |
def load_models(): | |
global MODEL_LOADED, fasttext_model, tokenizer | |
if MODEL_LOADED and tokenizer is not None and fasttext_model is not None: | |
return tokenizer, fasttext_model | |
try: | |
model_dir = MODEL_CACHE_DIR / "Ultra-FineWeb-classifier" | |
if not model_dir.exists(): | |
snapshot_download(repo_id="openbmb/Ultra-FineWeb-classifier", local_dir=str(model_dir), local_dir_use_symlinks=False) | |
# Load tokenizer and model | |
tokenizer = LlamaTokenizerFast.from_pretrained(str(model_dir / "tokenizer")) | |
fasttext_model = fasttext.load_model(str(model_dir / "classifier.bin")) | |
MODEL_LOADED = True | |
return tokenizer, fasttext_model | |
except Exception as e: | |
print(f"Error loading models: {e}") | |
return None, None | |
def create_quality_plot(scores: List[float], dataset_name: str) -> str: | |
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmpfile: | |
output_path = tmpfile.name | |
plt.figure(figsize=(10, 6)) | |
sns.histplot(scores, bins=50, kde=True, color='#6B7FD7', edgecolor='black') | |
mean_score, median_score = np.mean(scores), np.median(scores) | |
plt.axvline(mean_score, color='green', linestyle='--', linewidth=2, label=f'Mean: {mean_score:.3f}') | |
plt.axvline(median_score, color='orange', linestyle=':', linewidth=2, label=f'Median: {median_score:.3f}') | |
plt.xlabel('Quality Score'); plt.ylabel('Density') | |
plt.title(f'Quality Score Distribution - {dataset_name}', fontweight='bold') | |
plt.legend(); plt.grid(axis='y', alpha=0.3); plt.xlim(0, 1) | |
plt.tight_layout(); plt.savefig(output_path, dpi=150) | |
plt.close() | |
return output_path | |
def process_dataset( | |
model_id: str, | |
dataset_split: str, | |
text_column: str, | |
sample_size: int, | |
batch_size: int, | |
progress=gr.Progress(track_tqdm=True) | |
) -> Generator: | |
log_text = "" | |
def update_log(msg): | |
nonlocal log_text | |
timestamp = datetime.now().strftime('%H:%M:%S') | |
log_text += f"[{timestamp}] {msg}\n" | |
return (log_text, None, None, None, None, gr.update(visible=False), gr.update(visible=False)) | |
try: | |
yield update_log("Starting process...") | |
yield update_log("Loading scoring models...") | |
if not load_models(): | |
raise gr.Error("Failed to load scoring models. Please check logs.") | |
yield update_log("Models loaded successfully.") | |
yield update_log(f"Loading dataset '{model_id}' split '{dataset_split}'...") | |
dataset = load_dataset(model_id, split=dataset_split, streaming=False) | |
yield update_log("Dataset loaded.") | |
if text_column not in dataset.column_names: | |
raise gr.Error(f"Column '{text_column}' not found. Available: {', '.join(dataset.column_names)}") | |
actual_samples = min(sample_size, len(dataset)) | |
dataset = dataset.select(range(actual_samples)) | |
yield update_log(f"Starting to score {actual_samples:,} samples...") | |
scores, scored_data = [], [] | |
for i in tqdm(range(0, actual_samples, batch_size), desc="Scoring batches"): | |
batch = dataset[i:min(i + batch_size, actual_samples)] | |
for text in batch[text_column]: | |
norm_content = fasttext_preprocess(text, tokenizer) | |
label, score = fasttext_infer(norm_content, fasttext_model) if norm_content else ("__label__neg", 0.0) | |
scores.append(score) | |
scored_data.append({'text': text, 'quality_score': score, 'predicted_label': label}) | |
yield update_log("Scoring complete. Generating results and plot...") | |
stats_dict = {'dataset_id': model_id, 'processed_samples': actual_samples, 'statistics': {'mean': float(np.mean(scores)), 'median': float(np.median(scores))}} | |
plot_file = create_quality_plot(scores, model_id.split('/')[-1]) | |
with tempfile.NamedTemporaryFile('w', suffix=".jsonl", delete=False, encoding='utf-8') as f: | |
output_file_path = f.name | |
for item in scored_data: f.write(json.dumps(item, ensure_ascii=False) + '\n') | |
with tempfile.NamedTemporaryFile('w', suffix=".json", delete=False, encoding='utf-8') as f: | |
stats_file_path = f.name | |
json.dump(stats_dict, f, indent=2) | |
summary_lines = [ | |
"#### ✅ Scoring Completed!", | |
f"- **Dataset:** `{model_id}`", | |
f"- **Processed Samples:** `{actual_samples:,}`", | |
f"- **Mean Score:** `{stats_dict['statistics']['mean']:.3f}`", | |
f"- **Median Score:** `{stats_dict['statistics']['median']:.3f}`" | |
] | |
summary_md = "\n".join(summary_lines) | |
yield update_log("Process finished successfully!") | |
yield (log_text, summary_md, output_file_path, stats_file_path, plot_file, gr.update(visible=True), gr.update(visible=True)) | |
except Exception as e: | |
error_log = update_log(f"ERROR: {e}")[0] | |
error_summary_md = f"### ❌ Error\n```\n{escape(str(e))}\n```" | |
yield (error_log, error_summary_md, None, None, None, gr.update(visible=True), gr.update(visible=False)) | |
def upload_to_hub( | |
scored_file: str, stats_file: str, plot_file: str, new_dataset_id: str, | |
private: bool, hf_token: str, progress=gr.Progress(track_tqdm=True) | |
) -> str: | |
if not hf_token: return '❌ <span style="color: red;">Please provide your Hugging Face token.</span>' | |
if not all([scored_file, new_dataset_id]): return '❌ <span style="color: red;">Missing scored file or new dataset ID.</span>' | |
try: | |
progress(0.1, desc="Connecting to Hub...") | |
api = HfApi(token=hf_token) | |
username = whoami(token=hf_token)["name"] | |
repo_id = f"{username}/{new_dataset_id}" if "/" not in new_dataset_id else new_dataset_id | |
progress(0.2, desc=f"Creating repo: {repo_id}") | |
repo_url = create_repo(repo_id=repo_id, repo_type="dataset", exist_ok=True, private=private, token=hf_token).repo_url | |
progress(0.4, desc="Uploading files...") | |
upload_file(path_or_fileobj=scored_file, path_in_repo="data/scored_dataset.jsonl", repo_id=repo_id, repo_type="dataset", token=hf_token) | |
if stats_file and os.path.exists(stats_file): | |
upload_file(path_or_fileobj=stats_file, path_in_repo="statistics.json", repo_id=repo_id, repo_type="dataset", token=hf_token) | |
if plot_file and os.path.exists(plot_file): | |
upload_file(path_or_fileobj=plot_file, path_in_repo="quality_distribution.png", repo_id=repo_id, repo_type="dataset", token=hf_token) | |
readme_lines = [ | |
"---", | |
"license: apache-2.0", | |
"---", | |
f"# Quality-Scored Dataset: {repo_id.split('/')[-1]}", | |
"This dataset was scored for quality using the [Dataset Quality Scorer Space](https://huggingface.co/spaces/ggml-org/dataset-quality-scorer).", | |
"", | |
"## Usage", | |
"```python", | |
"from datasets import load_dataset", | |
f'dataset = load_dataset("{repo_id}", split="train")', | |
"```" | |
] | |
readme_content = "\n".join(readme_lines) | |
upload_file(path_or_fileobj=readme_content.encode(), path_in_repo="README.md", repo_id=repo_id, repo_type="dataset", token=hf_token) | |
progress(1.0, "Done!") | |
return f'✅ <span style="color: green;">Successfully uploaded to <a href="{repo_url}" target="_blank">{repo_id}</a></span>' | |
except Exception as e: | |
return f'❌ <span style="color: red;">Upload failed: {escape(str(e))}</span>' | |
def create_demo(): | |
with gr.Blocks(css=css, title="Dataset Quality Scorer") as demo: | |
gr.HTML(TITLE) | |
gr.Markdown(DESCRIPTION_MD) | |
with gr.Row(): | |
with gr.Column(scale=3): | |
gr.Markdown("### 1. Configure Dataset") | |
dataset_id = gr.Textbox( | |
label="Hugging Face Dataset ID", | |
value="roneneldan/TinyStories", | |
placeholder="username/dataset_name" | |
) | |
text_column = gr.Textbox(label="Text Column Name", value="text") | |
with gr.Column(scale=2): | |
gr.Markdown("### 2. Configure Scoring") | |
dataset_split = gr.Dropdown(["train", "validation", "test"], label="Split", value="train") | |
with gr.Row(): | |
sample_size = gr.Number(label="Sample Size", value=1000, minimum=100, step=100) | |
batch_size = gr.Number(label="Batch Size", value=32, minimum=1, step=1) | |
live_log = gr.Textbox(label="Live Log", interactive=False, lines=8, max_lines=20) | |
with gr.Row(): | |
clear_btn = gr.Button("Clear", variant="secondary") | |
process_btn = gr.Button("🚀 Start Scoring", variant="primary", size="lg") | |
with gr.Group(visible=False) as results_group: | |
gr.Markdown("--- \n ### 3. Review Results") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
summary_output = gr.Markdown(label="Summary") | |
scored_file_output = gr.File(label="📄 Download Scored Dataset (.jsonl)", type="filepath") | |
stats_file_output = gr.File(label="📊 Download Statistics (.json)", type="filepath") | |
with gr.Column(scale=1): | |
plot_output = gr.Image(label="Quality Distribution", show_label=True) | |
with gr.Group(visible=False) as upload_group: | |
gr.Markdown("--- \n ### 4. (Optional) Upload to Hugging Face Hub") | |
hf_token_input = gr.Textbox(label="Hugging Face Token", type="password", placeholder="hf_...", value=HF_TOKEN or "") | |
new_dataset_id = gr.Textbox(label="New Dataset Name", placeholder="my-scored-dataset") | |
private_checkbox = gr.Checkbox(label="Make dataset private", value=False) | |
upload_btn = gr.Button("📤 Upload to Hub", variant="primary") | |
upload_status = gr.HTML() | |
def clear_form(): | |
return "roneneldan/TinyStories", "train", "text", 1000, 32, "", None, None, None, None, gr.update(visible=False), gr.update(visible=False), "" | |
outputs_list = [ | |
live_log, summary_output, scored_file_output, stats_file_output, plot_output, | |
results_group, upload_group | |
] | |
process_btn.click( | |
fn=process_dataset, | |
inputs=[dataset_id, dataset_split, text_column, sample_size, batch_size], | |
outputs=outputs_list | |
) | |
clear_btn.click( | |
fn=clear_form, | |
outputs=[ | |
dataset_id, dataset_split, text_column, sample_size, batch_size, live_log, | |
summary_output, scored_file_output, stats_file_output, plot_output, | |
results_group, upload_group, upload_status | |
] | |
) | |
upload_btn.click( | |
fn=upload_to_hub, | |
inputs=[scored_file_output, stats_file_output, plot_output, new_dataset_id, private_checkbox, hf_token_input], | |
outputs=[upload_status] | |
) | |
return demo | |
# --- App Execution --- | |
demo = create_demo() | |
if __name__ == "__main__": | |
demo.queue().launch(debug=False, show_api=False) |