import gradio as gr
from transformers import AutoTokenizer
import pandas as pd
import re
from datetime import datetime
from huggingface_hub import HfApi, DatasetCard, DatasetCardData, create_repo
from gradio_huggingfacehub_search import HuggingfaceHubSearch
import os
import tempfile
import re

# --- Configuration ---
HF_TOKEN = os.getenv("HF_TOKEN")
DATASET_REPO_ID = os.getenv("DATASET_REPO", "Lyte/tokenizer-leaderboard")
DATASET_FILE_NAME = "leaderboard.csv"

PREDEFINED_TEXT = '''
import gradio as gr
from transformers import AutoTokenizer
import pandas as pd
import re
from datetime import datetime
from huggingface_hub import HfApi, DatasetCard, DatasetCardData, create_repo
from gradio_huggingfacehub_search import HuggingfaceHubSearch
import os
import tempfile

# --- Configuration ---
HF_TOKEN = os.getenv("HF_TOKEN")
DATASET_REPO_ID = os.getenv("DATASET_REPO", "Lyte/tokenizer-leaderboard")
DATASET_FILE_NAME = "leaderboard.csv"

PREDEFINED_TEXT = """
The quick brown fox jumps over 12 lazy dogs! 🐕‍🦺
Special characters: #@%^&*()_+-=[]{}|;:'",.<>/?\\~
Code samples:
- Python: def hello(): print("Hello World! 2023")
- HTML: <div class="container" id="main">Content</div>
- JSON: {"key": "value", "numbers": [1, 2, 3.14]}
Math equations: E = mc² → 3×10⁸ m/s
Multilingual text: 速い茶色の狐が怠惰な犬を飛び越える 😸
Emojis: 👍🎉🚀❤️🔥
Mixed casing: OpenAI's GPT-4 vs gpt-3.5-turbo
"""

WORD_COUNT = len(re.findall(r'\S+', PREDEFINED_TEXT))
LEADERBOARD_COLUMNS = [
    "Model ID", "Token Count", "Vocab Size",
    "Tokens/Word", "Chars/Token", "Timestamp"
]

# --- Hugging Face Hub Functions ---
def create_huggingface_dataset():
    """Creates the dataset repository on the Hub if it doesn't exist."""
    try:
        api = HfApi(token=HF_TOKEN)
        create_repo(repo_id=DATASET_REPO_ID, token=HF_TOKEN, repo_type="dataset", exist_ok=True)

        card_data = DatasetCardData(
            language="en",
            license="mit",
            size_categories=["1K<n<10K"],
            tags=["tokenizer", "leaderboard", "performance", "gradio"],
        )
        card = DatasetCard.from_template(
            card_data,
            template_path=None,
            Title="Tokenizer Leaderboard",
            Description="A leaderboard of tokenizer performance based on various metrics.",
            How_to_use="The leaderboard data is stored in a CSV file named 'leaderboard.csv'.",
        )
        card.push_to_hub(repo_id=DATASET_REPO_ID, token=HF_TOKEN)
        print(f"Dataset repository '{DATASET_REPO_ID}' created (or already exists).")

    except Exception as e:
        print(f"Error creating dataset repository: {e}")
        raise

def load_leaderboard_from_hub():
    """Loads the leaderboard data from the Hugging Face Hub as a pandas DataFrame."""
    try:
        api = HfApi(token=HF_TOKEN)
        dataset_path = api.dataset_info(repo_id=DATASET_REPO_ID, token=HF_TOKEN).siblings
        csv_file_info = next((file for file in dataset_path if file.rfilename == DATASET_FILE_NAME), None)

        if csv_file_info is None:
             print(f"'{DATASET_FILE_NAME}' not found in '{DATASET_REPO_ID}'. Returning an empty DataFrame")
             return pd.DataFrame(columns=LEADERBOARD_COLUMNS)

        file_path = api.hf_hub_download(repo_id=DATASET_REPO_ID, filename=DATASET_FILE_NAME, repo_type="dataset")
        df = pd.read_csv(file_path)
        df = df.sort_values(by="Token Count", ascending=True)
        df["Tokens/Word"] = df["Tokens/Word"].round(2)
        df["Chars/Token"] = df["Chars/Token"].round(2)
        return df

    except Exception as e:
        print(f"Error loading leaderboard from Hugging Face Hub: {e}")
        return pd.DataFrame(columns=LEADERBOARD_COLUMNS)

def push_leaderboard_to_hub(df):
    """Pushes the updated leaderboard DataFrame to the Hugging Face Hub."""
    try:
        with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".csv") as tmpfile:
            df.to_csv(tmpfile.name, index=False)
            tmp_path = tmpfile.name

        api = HfApi(token=HF_TOKEN)
        api.upload_file(
            path_or_fileobj=tmp_path,
            path_in_repo=DATASET_FILE_NAME,
            repo_id=DATASET_REPO_ID,
            repo_type="dataset",
            token=HF_TOKEN,
            commit_message="Update leaderboard"
        )
        os.remove(tmp_path)

        print(f"Leaderboard updated and pushed to {DATASET_REPO_ID}")

    except Exception as e:
        print(f"Error pushing leaderboard to Hugging Face Hub: {e}")
        raise


# --- Utility Functions ---

def get_tokenizer_stats(model_id, text):
    if not model_id:
        raise ValueError("No model ID provided")
    try:
        tokenizer = AutoTokenizer.from_pretrained(model_id, token=HF_TOKEN, trust_remote_code=True)
        tokens = tokenizer.encode(text, add_special_tokens=False)
        text_length = len(text)
        return {
            "token_count": len(tokens),
            "vocab_size": tokenizer.vocab_size,
            "token_word_ratio": round(len(tokens) / WORD_COUNT, 2),
            "chars_per_token": round(text_length / len(tokens), 2) if tokens else 0
        }
    except Exception as e:
        raise RuntimeError(f"Failed to load tokenizer or encode text: {str(e)}") from e

def is_model_in_leaderboard(df, model_id):
    return model_id in df["Model ID"].values

def add_to_leaderboard(model_id):
    if not model_id:
        return "❌ Error: No model ID provided"
    df = load_leaderboard_from_hub()
    if is_model_in_leaderboard(df, model_id):
        return "⚠️ Model already in leaderboard"
    try:
        stats = get_tokenizer_stats(model_id, PREDEFINED_TEXT)
        new_row = pd.DataFrame([{
            "Model ID": model_id,
            "Token Count": stats["token_count"],
            "Vocab Size": stats["vocab_size"],
            "Tokens/Word": stats["token_word_ratio"],
            "Chars/Token": stats["chars_per_token"],
            "Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }])
        updated_df = pd.concat([df, new_row], ignore_index=True)
        push_leaderboard_to_hub(updated_df)
        return "✅ Added to leaderboard!"
    except Exception as e:
        return f"❌ Error: {str(e)}"

def analyze_tokenizer(model_id, text):
    if not model_id:
        return "❌ Error: Please select or enter a model ID"
    try:
        stats = get_tokenizer_stats(model_id, text)
        return (
            f"Token Count: {stats['token_count']}\n"
            f"Vocab Size: {stats['vocab_size']}\n"
            f"Tokens/Word: {stats['token_word_ratio']:.2f}\n"
            f"Chars/Token: {stats['chars_per_token']:.2f}"
        )
    except Exception as e:
        return f"❌ Analysis Failed: {str(e)}"

def compare_tokenizers(model_ids_str, use_standard_text):
    try:
        model_list = [mid.strip() for mid in model_ids_str.split(',') if mid.strip()]
        if not model_list:
            return pd.DataFrame({"Error": ["No models provided"]})
        results = []
        for model_id in model_list:
            try:
                stats = get_tokenizer_stats(model_id, PREDEFINED_TEXT)
                results.append({
                    "Model ID": model_id,
                    "Tokens": stats["token_count"],
                    "Vocab Size": stats["vocab_size"],
                    "Tokens/Word": f"{stats['token_word_ratio']:.2f}",
                    "Chars/Token": f"{stats['chars_per_token']:.2f}",
                    "Status": "✅ Success"
                })
            except Exception as e:
                results.append({
                    "Model ID": model_id,
                    "Tokens": "-",
                    "Vocab Size": "-",
                    "Tokens/Word": "-",
                    "Chars/Token": "-",
                    "Status": f"❌ {str(e)}"
                })
        return pd.DataFrame(results)
    except Exception as e:
        return pd.DataFrame({"Error": [str(e)]})

def get_leaderboard_for_download():
    """Loads, prepares, and returns a Gradio File object for download."""
    try:
        df = load_leaderboard_from_hub()
        with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as tmpfile:
            df.to_csv(tmpfile.name, index=False)
            # Return a Gradio File object, NOT just the path
            return gr.File(value=tmpfile.name, label="Download CSV")
    except Exception as e:
        print(f"Error preparing file for download: {e}")
        return None


def initial_benchmark_run():
    try:
        print("Starting initial benchmark run...")
        default_models = [
            "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B",
            "Qwen/Qwen2.5-7B-Instruct-1M",
            "simplescaling/s1.1-32B",
            "Xenova/gpt-4o",
            "microsoft/phi-4",
            "deepseek-ai/DeepSeek-R1",
            "google/gemma-2-27b-it",
            "HuggingFaceTB/SmolLM2-135M-Instruct",
            "mistralai/Mistral-7B-Instruct-v0.3",
            "tomg-group-umd/huginn-0125",
            "microsoft/Phi-3.5-mini-instruct",
            "openai-community/gpt2"
        ]
        df = load_leaderboard_from_hub()
        for model_id in default_models:
            try:
                if not is_model_in_leaderboard(df, model_id):
                    print(f"Benchmarking {model_id}...")
                    result = add_to_leaderboard(model_id)
                    print(f"Result for {model_id}: {result}")
                else:
                    print(f"{model_id} already in leaderboard, skipping.")
            except Exception as e:
                print(f"Error benchmarking {model_id}: {str(e)}")
        print("Initial benchmarking complete.")
    except Exception as e:
        print(f"Fatal error in initial benchmark: {str(e)}")

# --- Gradio Interface ---
with gr.Blocks(title="Tokenizers Leaderboard", theme=gr.themes.Soft()) as iface:
    gr.Markdown("# 🏆 Tokenizers Leaderboard")

    with gr.Tab("Analyze"):
        gr.Markdown("## Single Tokenizer Analysis")
        with gr.Row():
            model_search = HuggingfaceHubSearch(label="Search Models", placeholder="Search Hugging Face models...", search_type="model")
            custom_model = gr.Textbox(label="Direct Model ID", placeholder="e.g.: mistralai/Mistral-7B-Instruct-v0.3", max_lines=1)
        model_id = gr.Textbox(visible=False)
        gr.Markdown("### Input Text")
        text_input = gr.Textbox(lines=5, value=PREDEFINED_TEXT, label="Analysis Text")
        with gr.Row():
            analyze_btn = gr.Button("Analyze", variant="primary")
            add_btn = gr.Button("Add to Leaderboard")
        analysis_output = gr.Textbox(label="Results", interactive=False)
        model_search.change(lambda x: x, model_search, model_id)
        custom_model.change(lambda x: x, custom_model, model_id)
        analyze_btn.click(analyze_tokenizer, [model_id, text_input], analysis_output)
        add_event = add_btn.click(add_to_leaderboard, model_id, analysis_output)

    with gr.Tab("Compare"):
        gr.Markdown("## Multi-Model Comparison")
        gr.Markdown(f"**Standard Text:** `{PREDEFINED_TEXT[:80]}...`")
        model_ids = gr.Textbox(label="Model IDs (comma-separated)", placeholder="Enter models: meta-llama/Llama-2-7b, google/gemma-7b, ...")
        compare_btn = gr.Button("Compare Models", variant="primary")
        comparison_table = gr.DataFrame(label="Results", interactive=False)
        compare_btn.click(compare_tokenizers, [model_ids, gr.Checkbox(value=True, visible=False)], comparison_table)

    with gr.Tab("Leaderboard"):
        gr.Markdown("## Performance Leaderboard")
        with gr.Row():
            download_btn = gr.DownloadButton(label="Download CSV", value="tokenizer_leaderboard.csv")
        leaderboard_table = gr.DataFrame(label="Top Tokenizers", headers=LEADERBOARD_COLUMNS, interactive=False,
            datatype=["str", "number", "number", "number", "number", "str"])

        # Connect the download button to the function that prepares the CSV
        download_btn.click(get_leaderboard_for_download, inputs=[], outputs=download_btn)

        iface.load(fn=load_leaderboard_from_hub, outputs=leaderboard_table)
        add_event.then(load_leaderboard_from_hub, None, leaderboard_table)


create_huggingface_dataset()
initial_benchmark_run()
iface.launch()
'''

WORD_COUNT = len(re.findall(r'\S+', PREDEFINED_TEXT))
LEADERBOARD_COLUMNS = [
    "Model ID", "Token Count", "Vocab Size",
    "Tokens/Word", "Chars/Token", "Timestamp"
]

# --- Hugging Face Hub Functions ---
def create_huggingface_dataset():
    """Creates the dataset repository on the Hub if it doesn't exist."""
    try:
        api = HfApi(token=HF_TOKEN)
        create_repo(repo_id=DATASET_REPO_ID, token=HF_TOKEN, repo_type="dataset", exist_ok=True)

        card_data = DatasetCardData(
            language="en",
            license="mit",
            size_categories=["1K<n<10K"],
            tags=["tokenizer", "leaderboard", "performance", "gradio"],
        )
        card = DatasetCard.from_template(
            card_data,
            template_path=None,
            Title="Tokenizer Leaderboard",
            Description="A leaderboard of tokenizer performance based on various metrics.",
            How_to_use="The leaderboard data is stored in a CSV file named 'leaderboard.csv'.",
        )
        card.push_to_hub(repo_id=DATASET_REPO_ID, token=HF_TOKEN)
        print(f"Dataset repository '{DATASET_REPO_ID}' created (or already exists).")

    except Exception as e:
        print(f"Error creating dataset repository: {e}")
        raise

def load_leaderboard_from_hub():
    """Loads the leaderboard data from the Hugging Face Hub as a pandas DataFrame."""
    try:
        api = HfApi(token=HF_TOKEN)
        dataset_path = api.dataset_info(repo_id=DATASET_REPO_ID, token=HF_TOKEN).siblings
        csv_file_info = next((file for file in dataset_path if file.rfilename == DATASET_FILE_NAME), None)

        if csv_file_info is None:
             print(f"'{DATASET_FILE_NAME}' not found in '{DATASET_REPO_ID}'. Returning an empty DataFrame")
             return pd.DataFrame(columns=LEADERBOARD_COLUMNS)

        file_path = api.hf_hub_download(repo_id=DATASET_REPO_ID, filename=DATASET_FILE_NAME, repo_type="dataset")
        df = pd.read_csv(file_path)
        df = df.sort_values(by="Token Count", ascending=True)
        df["Tokens/Word"] = df["Tokens/Word"].round(2)
        df["Chars/Token"] = df["Chars/Token"].round(2)
        return df

    except Exception as e:
        print(f"Error loading leaderboard from Hugging Face Hub: {e}")
        return pd.DataFrame(columns=LEADERBOARD_COLUMNS)

def push_leaderboard_to_hub(df):
    """Pushes the updated leaderboard DataFrame to the Hugging Face Hub."""
    try:
        with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=".csv") as tmpfile:
            df.to_csv(tmpfile.name, index=False)
            tmp_path = tmpfile.name

        api = HfApi(token=HF_TOKEN)
        api.upload_file(
            path_or_fileobj=tmp_path,
            path_in_repo=DATASET_FILE_NAME,
            repo_id=DATASET_REPO_ID,
            repo_type="dataset",
            token=HF_TOKEN,
            commit_message="Update leaderboard"
        )
        os.remove(tmp_path)

        print(f"Leaderboard updated and pushed to {DATASET_REPO_ID}")

    except Exception as e:
        print(f"Error pushing leaderboard to Hugging Face Hub: {e}")
        raise


# --- Utility Functions ---

def get_tokenizer_stats(model_id, text):
    if not model_id:
        raise ValueError("No model ID provided")
    try:
        tokenizer = AutoTokenizer.from_pretrained(model_id, token=HF_TOKEN, trust_remote_code=True)
        tokens = tokenizer.encode(text, add_special_tokens=False)
        text_length = len(text)
        return {
            "token_count": len(tokens),
            "vocab_size": tokenizer.vocab_size,
            "token_word_ratio": round(len(tokens) / WORD_COUNT, 2),
            "chars_per_token": round(text_length / len(tokens), 2) if tokens else 0
        }
    except Exception as e:
        raise RuntimeError(f"Failed to load tokenizer or encode text: {str(e)}") from e

def is_model_in_leaderboard(df, model_id):
    return model_id in df["Model ID"].values

def add_to_leaderboard(model_id):
    if not model_id:
        return "❌ Error: No model ID provided"
    df = load_leaderboard_from_hub()
    if is_model_in_leaderboard(df, model_id):
        return "⚠️ Model already in leaderboard"
    try:
        stats = get_tokenizer_stats(model_id, PREDEFINED_TEXT)
        new_row = pd.DataFrame([{
            "Model ID": model_id,
            "Token Count": stats["token_count"],
            "Vocab Size": stats["vocab_size"],
            "Tokens/Word": stats["token_word_ratio"],
            "Chars/Token": stats["chars_per_token"],
            "Timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        }])
        updated_df = pd.concat([df, new_row], ignore_index=True)
        push_leaderboard_to_hub(updated_df)
        return "✅ Added to leaderboard!"
    except Exception as e:
        return f"❌ Error: {str(e)}"

def analyze_tokenizer(model_id, text):
    if not model_id:
        return "❌ Error: Please select or enter a model ID"
    try:
        stats = get_tokenizer_stats(model_id, text)
        return (
            f"Token Count: {stats['token_count']}\n"
            f"Vocab Size: {stats['vocab_size']}\n"
            f"Tokens/Word: {stats['token_word_ratio']:.2f}\n"
            f"Chars/Token: {stats['chars_per_token']:.2f}"
        )
    except Exception as e:
        return f"❌ Analysis Failed: {str(e)}"

def compare_tokenizers(model_ids_str, use_standard_text):
    try:
        model_list = [mid.strip() for mid in model_ids_str.split(',') if mid.strip()]
        if not model_list:
            return pd.DataFrame({"Error": ["No models provided"]})
        results = []
        for model_id in model_list:
            try:
                stats = get_tokenizer_stats(model_id, PREDEFINED_TEXT)
                results.append({
                    "Model ID": model_id,
                    "Tokens": stats["token_count"],
                    "Vocab Size": stats["vocab_size"],
                    "Tokens/Word": f"{stats['token_word_ratio']:.2f}",
                    "Chars/Token": f"{stats['chars_per_token']:.2f}",
                    "Status": "✅ Success"
                })
            except Exception as e:
                results.append({
                    "Model ID": model_id,
                    "Tokens": "-",
                    "Vocab Size": "-",
                    "Tokens/Word": "-",
                    "Chars/Token": "-",
                    "Status": f"❌ {str(e)}"
                })
        return pd.DataFrame(results)
    except Exception as e:
        return pd.DataFrame({"Error": [str(e)]})

def get_leaderboard_for_download():
    """Loads, prepares, and returns a Gradio File object for download."""
    try:
        df = load_leaderboard_from_hub()
        with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as tmpfile:
            df.to_csv(tmpfile.name, index=False)
            # Return a Gradio File object, NOT just the path
            return gr.File(value=tmpfile.name, label="Download CSV")
    except Exception as e:
        print(f"Error preparing file for download: {e}")
        return None


def initial_benchmark_run():
    try:
        print("Starting initial benchmark run...")
        default_models = [
            "deepseek-ai/DeepSeek-R1-Distill-Qwen-32B",
            "Qwen/Qwen2.5-7B-Instruct-1M",
            "simplescaling/s1.1-32B",
            "Xenova/gpt-4o",
            "microsoft/phi-4",
            "deepseek-ai/DeepSeek-R1",
            "google/gemma-2-27b-it",
            "HuggingFaceTB/SmolLM2-135M-Instruct",
            "mistralai/Mistral-7B-Instruct-v0.3",
            "tomg-group-umd/huginn-0125",
            "microsoft/Phi-3.5-mini-instruct",
            "openai-community/gpt2"
        ]
        df = load_leaderboard_from_hub()
        for model_id in default_models:
            try:
                if not is_model_in_leaderboard(df, model_id):
                    print(f"Benchmarking {model_id}...")
                    result = add_to_leaderboard(model_id)
                    print(f"Result for {model_id}: {result}")
                else:
                    print(f"{model_id} already in leaderboard, skipping.")
            except Exception as e:
                print(f"Error benchmarking {model_id}: {str(e)}")
        print("Initial benchmarking complete.")
    except Exception as e:
        print(f"Fatal error in initial benchmark: {str(e)}")

# --- Gradio Interface ---
with gr.Blocks(title="Tokenizers Leaderboard", theme=gr.themes.Soft()) as iface:
    gr.Markdown("# 🏆 Tokenizers Leaderboard")

    with gr.Tab("Analyze"):
        gr.Markdown("## Single Tokenizer Analysis")
        with gr.Row():
            model_search = HuggingfaceHubSearch(label="Search Models", placeholder="Search Hugging Face models...", search_type="model")
            custom_model = gr.Textbox(label="Direct Model ID", placeholder="e.g.: mistralai/Mistral-7B-Instruct-v0.3", max_lines=1)
        model_id = gr.Textbox(visible=False)
        gr.Markdown("### Input Text")
        text_input = gr.Textbox(lines=5, value=PREDEFINED_TEXT, label="Analysis Text")
        with gr.Row():
            analyze_btn = gr.Button("Analyze", variant="primary")
            add_btn = gr.Button("Add to Leaderboard")
        analysis_output = gr.Textbox(label="Results", interactive=False)
        model_search.change(lambda x: x, model_search, model_id)
        custom_model.change(lambda x: x, custom_model, model_id)
        analyze_btn.click(analyze_tokenizer, [model_id, text_input], analysis_output)
        add_event = add_btn.click(add_to_leaderboard, model_id, analysis_output)

    with gr.Tab("Compare"):
        gr.Markdown("## Multi-Model Comparison")
        gr.Markdown(f"**Standard Text:** `{PREDEFINED_TEXT[:80]}...`")
        model_ids = gr.Textbox(label="Model IDs (comma-separated)", placeholder="Enter models: meta-llama/Llama-2-7b, google/gemma-7b, ...")
        compare_btn = gr.Button("Compare Models", variant="primary")
        comparison_table = gr.DataFrame(label="Results", interactive=False)
        compare_btn.click(compare_tokenizers, [model_ids, gr.Checkbox(value=True, visible=False)], comparison_table)

    with gr.Tab("Leaderboard"):
        gr.Markdown("## Performance Leaderboard")
        gr.Markdown(f"The tokenizers are run on a predefined text of {len(PREDEFINED_TEXT)} Length which has a word count of {WORD_COUNT}")
        with gr.Row():
            download_btn = gr.DownloadButton(label="Download CSV", value="tokenizer_leaderboard.csv")
        leaderboard_table = gr.DataFrame(label="Top Tokenizers", headers=LEADERBOARD_COLUMNS, interactive=False,
            datatype=["str", "number", "number", "number", "number", "str"])

        # Connect the download button to the function that prepares the CSV
        download_btn.click(get_leaderboard_for_download, inputs=[], outputs=download_btn)

        iface.load(fn=load_leaderboard_from_hub, outputs=leaderboard_table)
        add_event.then(load_leaderboard_from_hub, None, leaderboard_table)


create_huggingface_dataset()
initial_benchmark_run()
iface.launch()