import os
from datetime import datetime
import subprocess
import time

# Third-party imports
import numpy as np
import torch
from PIL import Image
import accelerate
import gradio as gr
import spaces
from transformers import (
    Qwen2_5_VLForConditionalGeneration,
    AutoTokenizer,
    AutoProcessor
)

# Local imports
from qwen_vl_utils import process_vision_info

# Set device agnostic code
if torch.cuda.is_available():
    device = "cuda"
elif (torch.backends.mps.is_available()) and (torch.backends.mps.is_built()):
    device = "mps"
else:
    device = "cpu"

print(f"[INFO] Using device: {device}")


def array_to_image_path(image_array):
    if image_array is None:
        raise ValueError("No image provided. Please upload an image before submitting.")
    # Convert numpy array to PIL Image
    img = Image.fromarray(np.uint8(image_array))
    
    # Generate a unique filename using timestamp
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"image_{timestamp}.png"
    
    # Save the image
    img.save(filename)
    
    # Get the full path of the saved image
    full_path = os.path.abspath(filename)
    
    return full_path
    
models = {
    "Fancy-MLLM/R1-Onevision-7B": Qwen2_5_VLForConditionalGeneration.from_pretrained("Fancy-MLLM/R1-Onevision-7B", 
                                                                                      trust_remote_code=True, 
                                                                                      torch_dtype="auto",
                                                                                      device_map="auto").eval(),
}

processors = {
    "Fancy-MLLM/R1-Onevision-7B": AutoProcessor.from_pretrained("Fancy-MLLM/R1-Onevision-7B", trust_remote_code=True),
}

DESCRIPTION = "[🦖 Fancy-MLLM/R1-Onevision-7B Demo]"

kwargs = {}
kwargs['torch_dtype'] = torch.bfloat16

user_prompt = '<|user|>\n'
assistant_prompt = '<|assistant|>\n'
prompt_suffix = "<|end|>\n"

@spaces.GPU
def run_example(image, text_input=None, model_id=None):
    start_time = time.time()
    image_path = array_to_image_path(image)
    
    print(image_path)
    model = models[model_id]
    processor = processors[model_id]

    image = Image.fromarray(image).convert("RGB")
    messages = [
    {
            "role": "user",
            "content": [
                {
                    "type": "image",
                    "image": image_path,
                },
                {"type": "text", "text": text_input},
            ],
        }
    ]
    
    # Preparation for inference
    text = processor.apply_chat_template(
        messages, tokenize=False, add_generation_prompt=True
    )
    image_inputs, video_inputs = process_vision_info(messages)
    inputs = processor(
        text=[text],
        images=image_inputs,
        videos=video_inputs,
        padding=True,
        return_tensors="pt",
    )
    inputs = inputs.to(device)
    
    # Inference: Generation of the output
    generated_ids = model.generate(**inputs, max_new_tokens=2048)
    generated_ids_trimmed = [
        out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
    ]
    output_text = processor.batch_decode(
        generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
    )

    end_time = time.time()
    total_time = round(end_time - start_time, 2)
    
    return output_text[0], total_time

css = """
  #output {
    height: 500px; 
    overflow: auto; 
    border: 1px solid #ccc; 
  }
"""

with gr.Blocks(css=css) as demo:
    gr.Markdown(DESCRIPTION)
    with gr.Tab(label="R1-Onevision-7B Input"):
        with gr.Row():
            with gr.Column():
                input_img = gr.Image(label="Input Picture")
                model_selector = gr.Dropdown(choices=list(models.keys()), 
                                             label="Model", 
                                             value="Fancy-MLLM/R1-Onevision-7B")
                text_input = gr.Textbox(label="Text Prompt")
                submit_btn = gr.Button(value="Submit")
            with gr.Column():
                output_text = gr.Textbox(label="Output Text")
                time_taken = gr.Textbox(label="Time taken for processing + inference")

        submit_btn.click(run_example, [input_img, text_input, model_selector], [output_text, time_taken])

demo.queue(api_open=False)
demo.launch(debug=True)