import os
import sys
import importlib.util
import site
import json
import torch
import gradio as gr
import torchaudio
import numpy as np
from huggingface_hub import snapshot_download, hf_hub_download
import subprocess
import re
import spaces

# 创建一个全局变量来跟踪已下载的资源
# Create a global variable to track downloaded resources
downloaded_resources = {
    "configs": False,
    "tokenizer_vq32": False,
    "tokenizer_vq8192": False,
    "ar_Vq32ToVq8192": False,
    "ar_PhoneToVq8192": False,
    "fmt_Vq8192ToMels": False,
    "vocoder": False
}

def install_espeak():
    """Detect and install espeak-ng dependency"""
    try:
        # Check if espeak-ng is already installed
        result = subprocess.run(["which", "espeak-ng"], capture_output=True, text=True)
        if result.returncode != 0:
            print("Detected espeak-ng not installed in the system, attempting to install...")
            # Try to install espeak-ng and its data using apt-get
            subprocess.run(["apt-get", "update"], check=True)
            # Install espeak-ng and the corresponding language data package
            subprocess.run(["apt-get", "install", "-y", "espeak-ng", "espeak-ng-data"], check=True)
            print("espeak-ng and its data packages installed successfully!")
        else:
            print("espeak-ng is already installed in the system.")
            # Even if already installed, try to update data to ensure integrity (optional but sometimes helpful)
            # print("Attempting to update espeak-ng data...")
            # subprocess.run(["apt-get", "update"], check=True)
            # subprocess.run(["apt-get", "install", "--only-upgrade", "-y", "espeak-ng-data"], check=True)

        # Verify Chinese support (optional)
        try:
            voices_result = subprocess.run(["espeak-ng", "--voices=cmn"], capture_output=True, text=True, check=True)
            if "cmn" in voices_result.stdout:
                print("espeak-ng supports 'cmn' language.")
            else:
                print("Warning: espeak-ng is installed, but 'cmn' language still seems unavailable.")
        except Exception as e:
             print(f"Error verifying espeak-ng Chinese support (may not affect functionality): {e}")

    except Exception as e:
        print(f"Error installing espeak-ng: {e}")
        print("Please try to run manually: apt-get update && apt-get install -y espeak-ng espeak-ng-data")

# Install espeak before all other operations
install_espeak()

def patch_langsegment_init():
    try:
        # Try to find the location of the LangSegment package
        spec = importlib.util.find_spec("LangSegment")
        if spec is None or spec.origin is None:
            print("Unable to locate LangSegment package.")
            return

        # Build the path to __init__.py
        init_path = os.path.join(os.path.dirname(spec.origin), '__init__.py')
        
        if not os.path.exists(init_path):
            print(f"LangSegment __init__.py file not found at: {init_path}")
            # Try to find in site-packages, applicable in some environments
            for site_pkg_path in site.getsitepackages():
                potential_path = os.path.join(site_pkg_path, 'LangSegment', '__init__.py')
                if os.path.exists(potential_path):
                    init_path = potential_path
                    print(f"Found __init__.py in site-packages: {init_path}")
                    break
            else: # If the loop ends normally (no break)
                 print(f"Also unable to find __init__.py in site-packages")
                 return


        print(f"Attempting to read LangSegment __init__.py: {init_path}")
        with open(init_path, 'r') as f:
            lines = f.readlines()

        modified = False
        new_lines = []
        target_line_prefix = "from .LangSegment import"

        for line in lines:
            stripped_line = line.strip()
            if stripped_line.startswith(target_line_prefix):
                if 'setLangfilters' in stripped_line or 'getLangfilters' in stripped_line:
                    print(f"Found line that needs modification: {stripped_line}")
                    # Remove setLangfilters and getLangfilters
                    modified_line = stripped_line.replace(',setLangfilters', '')
                    modified_line = modified_line.replace(',getLangfilters', '')
                    # Ensure comma handling is correct (e.g., if they are the last items)
                    modified_line = modified_line.replace('setLangfilters,', '')
                    modified_line = modified_line.replace('getLangfilters,', '')
                    # If they are the only extra imports, remove any redundant commas
                    modified_line = modified_line.rstrip(',') 
                    new_lines.append(modified_line + '\n')
                    modified = True
                    print(f"Modified line: {modified_line.strip()}")
                else:
                    new_lines.append(line) # Line is fine, keep as is
            else:
                new_lines.append(line) # Non-target line, keep as is

        if modified:
            print(f"Attempting to write back modified LangSegment __init__.py to: {init_path}")
            try:
                with open(init_path, 'w') as f:
                    f.writelines(new_lines)
                print("LangSegment __init__.py modified successfully.")
                # Try to reload the module to make changes effective (may not work, depending on import chain)
                try:
                    import LangSegment
                    importlib.reload(LangSegment)
                    print("LangSegment module has been attempted to reload.")
                except Exception as reload_e:
                     print(f"Error reloading LangSegment (may have no impact): {reload_e}")
            except PermissionError:
                print(f"Error: Insufficient permissions to modify {init_path}. Consider modifying requirements.txt.")
            except Exception as write_e:
                print(f"Other error occurred when writing LangSegment __init__.py: {write_e}")
        else:
            print("LangSegment __init__.py doesn't need modification.")

    except ImportError:
         print("LangSegment package not found, unable to fix.")
    except Exception as e:
        print(f"Unexpected error occurred when fixing LangSegment package: {e}")

# Execute the fix before all other imports (especially Amphion) that might trigger LangSegment
patch_langsegment_init()

# Clone Amphion repository
if not os.path.exists("Amphion"):
    subprocess.run(["git", "clone", "https://github.com/open-mmlab/Amphion.git"])
    os.chdir("Amphion")
else:
    if not os.getcwd().endswith("Amphion"):
        os.chdir("Amphion")

# Add Amphion to the path
if os.path.dirname(os.path.abspath("Amphion")) not in sys.path:
    sys.path.append(os.path.dirname(os.path.abspath("Amphion")))

# Ensure needed directories exist
os.makedirs("wav", exist_ok=True)
os.makedirs("ckpts/Vevo", exist_ok=True)

from models.vc.vevo.vevo_utils import VevoInferencePipeline, save_audio, load_wav

# Download and setup config files
def setup_configs():
    if downloaded_resources["configs"]:
        print("Config files already downloaded, skipping...")
        return
        
    config_path = "models/vc/vevo/config"
    os.makedirs(config_path, exist_ok=True)
    
    config_files = [
        "PhoneToVq8192.json",
        "Vocoder.json",
        "Vq32ToVq8192.json",
        "Vq8192ToMels.json",
        "hubert_large_l18_c32.yaml",
    ]
    
    for file in config_files:
        file_path = f"{config_path}/{file}"
        if not os.path.exists(file_path):
            try:
                file_data = hf_hub_download(
                    repo_id="amphion/Vevo", 
                    filename=f"config/{file}", 
                    repo_type="model",
                )
                os.makedirs(os.path.dirname(file_path), exist_ok=True)
                # Copy file to target location
                subprocess.run(["cp", file_data, file_path])
            except Exception as e:
                print(f"Error downloading config file {file}: {e}")
    
    downloaded_resources["configs"] = True

setup_configs()

# Device configuration
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f"Using device: {device}")

# Initialize pipeline dictionary
inference_pipelines = {}

# Download all necessary model resources at startup
def preload_all_resources():
    print("Preloading all model resources...")
    # Download configuration files
    setup_configs()
    
    # Store the downloaded model paths
    global downloaded_content_tokenizer_path
    global downloaded_content_style_tokenizer_path
    global downloaded_ar_vq32_path
    global downloaded_ar_phone_path
    global downloaded_fmt_path
    global downloaded_vocoder_path
    
    # Download Content Tokenizer (vq32)
    if not downloaded_resources["tokenizer_vq32"]:
        print("Preloading Content Tokenizer (vq32)...")
        local_dir = snapshot_download(
            repo_id="amphion/Vevo",
            repo_type="model",
            cache_dir="./ckpts/Vevo",
            allow_patterns=["tokenizer/vq32/*"],
        )
        downloaded_content_tokenizer_path = local_dir
        downloaded_resources["tokenizer_vq32"] = True
        print("Content Tokenizer (vq32) download completed")
    
    # Download Content-Style Tokenizer (vq8192)
    if not downloaded_resources["tokenizer_vq8192"]:
        print("Preloading Content-Style Tokenizer (vq8192)...")
        local_dir = snapshot_download(
            repo_id="amphion/Vevo",
            repo_type="model",
            cache_dir="./ckpts/Vevo",
            allow_patterns=["tokenizer/vq8192/*"],
        )
        downloaded_content_style_tokenizer_path = local_dir
        downloaded_resources["tokenizer_vq8192"] = True
        print("Content-Style Tokenizer (vq8192) download completed")
    
    # Download Autoregressive Transformer (Vq32ToVq8192)
    if not downloaded_resources["ar_Vq32ToVq8192"]:
        print("Preloading Autoregressive Transformer (Vq32ToVq8192)...")
        local_dir = snapshot_download(
            repo_id="amphion/Vevo",
            repo_type="model",
            cache_dir="./ckpts/Vevo",
            allow_patterns=["contentstyle_modeling/Vq32ToVq8192/*"],
        )
        downloaded_ar_vq32_path = local_dir
        downloaded_resources["ar_Vq32ToVq8192"] = True
        print("Autoregressive Transformer (Vq32ToVq8192) download completed")
    
    # Download Autoregressive Transformer (PhoneToVq8192)
    if not downloaded_resources["ar_PhoneToVq8192"]:
        print("Preloading Autoregressive Transformer (PhoneToVq8192)...")
        local_dir = snapshot_download(
            repo_id="amphion/Vevo",
            repo_type="model",
            cache_dir="./ckpts/Vevo",
            allow_patterns=["contentstyle_modeling/PhoneToVq8192/*"],
        )
        downloaded_ar_phone_path = local_dir
        downloaded_resources["ar_PhoneToVq8192"] = True
        print("Autoregressive Transformer (PhoneToVq8192) download completed")
    
    # Download Flow Matching Transformer
    if not downloaded_resources["fmt_Vq8192ToMels"]:
        print("Preloading Flow Matching Transformer (Vq8192ToMels)...")
        local_dir = snapshot_download(
            repo_id="amphion/Vevo",
            repo_type="model",
            cache_dir="./ckpts/Vevo",
            allow_patterns=["acoustic_modeling/Vq8192ToMels/*"],
        )
        downloaded_fmt_path = local_dir
        downloaded_resources["fmt_Vq8192ToMels"] = True
        print("Flow Matching Transformer (Vq8192ToMels) download completed")
    
    # Download Vocoder
    if not downloaded_resources["vocoder"]:
        print("Preloading Vocoder...")
        local_dir = snapshot_download(
            repo_id="amphion/Vevo",
            repo_type="model",
            cache_dir="./ckpts/Vevo",
            allow_patterns=["acoustic_modeling/Vocoder/*"],
        )
        downloaded_vocoder_path = local_dir
        downloaded_resources["vocoder"] = True
        print("Vocoder download completed")
    
    print("All model resources preloading completed!")

# Initialize path variables to store downloaded model paths
downloaded_content_tokenizer_path = None
downloaded_content_style_tokenizer_path = None
downloaded_ar_vq32_path = None
downloaded_ar_phone_path = None
downloaded_fmt_path = None
downloaded_vocoder_path = None

# Preload all resources before creating the Gradio interface
preload_all_resources()

def get_pipeline(pipeline_type):
    if pipeline_type in inference_pipelines:
        return inference_pipelines[pipeline_type]
    
    # Initialize pipeline based on the required pipeline type
    if pipeline_type == "style" or pipeline_type == "voice":
        # Use already downloaded Content Tokenizer
        if downloaded_resources["tokenizer_vq32"]:
            content_tokenizer_ckpt_path = os.path.join(
                downloaded_content_tokenizer_path, "tokenizer/vq32/hubert_large_l18_c32.pkl"
            )
        else:
            # Fallback to direct download
            local_dir = snapshot_download(
                repo_id="amphion/Vevo",
                repo_type="model",
                cache_dir="./ckpts/Vevo",
                allow_patterns=["tokenizer/vq32/*"],
            )
            content_tokenizer_ckpt_path = os.path.join(
                local_dir, "tokenizer/vq32/hubert_large_l18_c32.pkl"
            )
        
        # Use already downloaded Content-Style Tokenizer
        if downloaded_resources["tokenizer_vq8192"]:
            content_style_tokenizer_ckpt_path = os.path.join(
                downloaded_content_style_tokenizer_path, "tokenizer/vq8192"
            )
        else:
            # Fallback to direct download
            local_dir = snapshot_download(
                repo_id="amphion/Vevo",
                repo_type="model",
                cache_dir="./ckpts/Vevo",
                allow_patterns=["tokenizer/vq8192/*"],
            )
            content_style_tokenizer_ckpt_path = os.path.join(local_dir, "tokenizer/vq8192")
        
        # Use already downloaded Autoregressive Transformer
        ar_cfg_path = "./models/vc/vevo/config/Vq32ToVq8192.json"
        if downloaded_resources["ar_Vq32ToVq8192"]:
            ar_ckpt_path = os.path.join(
                downloaded_ar_vq32_path, "contentstyle_modeling/Vq32ToVq8192"
            )
        else:
            # Fallback to direct download
            local_dir = snapshot_download(
                repo_id="amphion/Vevo",
                repo_type="model",
                cache_dir="./ckpts/Vevo",
                allow_patterns=["contentstyle_modeling/Vq32ToVq8192/*"],
            )
            ar_ckpt_path = os.path.join(local_dir, "contentstyle_modeling/Vq32ToVq8192")
        
        # Use already downloaded Flow Matching Transformer
        fmt_cfg_path = "./models/vc/vevo/config/Vq8192ToMels.json"
        if downloaded_resources["fmt_Vq8192ToMels"]:
            fmt_ckpt_path = os.path.join(
                downloaded_fmt_path, "acoustic_modeling/Vq8192ToMels"
            )
        else:
            # Fallback to direct download
            local_dir = snapshot_download(
                repo_id="amphion/Vevo",
                repo_type="model",
                cache_dir="./ckpts/Vevo",
                allow_patterns=["acoustic_modeling/Vq8192ToMels/*"],
            )
            fmt_ckpt_path = os.path.join(local_dir, "acoustic_modeling/Vq8192ToMels")
        
        # Use already downloaded Vocoder
        vocoder_cfg_path = "./models/vc/vevo/config/Vocoder.json"
        if downloaded_resources["vocoder"]:
            vocoder_ckpt_path = os.path.join(
                downloaded_vocoder_path, "acoustic_modeling/Vocoder"
            )
        else:
            # Fallback to direct download
            local_dir = snapshot_download(
                repo_id="amphion/Vevo",
                repo_type="model",
                cache_dir="./ckpts/Vevo",
                allow_patterns=["acoustic_modeling/Vocoder/*"],
            )
            vocoder_ckpt_path = os.path.join(local_dir, "acoustic_modeling/Vocoder")
        
        # Initialize pipeline
        inference_pipeline = VevoInferencePipeline(
            content_tokenizer_ckpt_path=content_tokenizer_ckpt_path,
            content_style_tokenizer_ckpt_path=content_style_tokenizer_ckpt_path,
            ar_cfg_path=ar_cfg_path,
            ar_ckpt_path=ar_ckpt_path,
            fmt_cfg_path=fmt_cfg_path,
            fmt_ckpt_path=fmt_ckpt_path,
            vocoder_cfg_path=vocoder_cfg_path,
            vocoder_ckpt_path=vocoder_ckpt_path,
            device=device,
        )
        
    elif pipeline_type == "timbre":
        # Use already downloaded Content-Style Tokenizer
        if downloaded_resources["tokenizer_vq8192"]:
            content_style_tokenizer_ckpt_path = os.path.join(
                downloaded_content_style_tokenizer_path, "tokenizer/vq8192"
            )
        else:
            # Fallback to direct download
            local_dir = snapshot_download(
                repo_id="amphion/Vevo",
                repo_type="model",
                cache_dir="./ckpts/Vevo",
                allow_patterns=["tokenizer/vq8192/*"],
            )
            content_style_tokenizer_ckpt_path = os.path.join(local_dir, "tokenizer/vq8192")
        
        # Use already downloaded Flow Matching Transformer
        fmt_cfg_path = "./models/vc/vevo/config/Vq8192ToMels.json"
        if downloaded_resources["fmt_Vq8192ToMels"]:
            fmt_ckpt_path = os.path.join(
                downloaded_fmt_path, "acoustic_modeling/Vq8192ToMels"
            )
        else:
            # Fallback to direct download
            local_dir = snapshot_download(
                repo_id="amphion/Vevo",
                repo_type="model",
                cache_dir="./ckpts/Vevo",
                allow_patterns=["acoustic_modeling/Vq8192ToMels/*"],
            )
            fmt_ckpt_path = os.path.join(local_dir, "acoustic_modeling/Vq8192ToMels")
        
        # Use already downloaded Vocoder
        vocoder_cfg_path = "./models/vc/vevo/config/Vocoder.json"
        if downloaded_resources["vocoder"]:
            vocoder_ckpt_path = os.path.join(
                downloaded_vocoder_path, "acoustic_modeling/Vocoder"
            )
        else:
            # Fallback to direct download
            local_dir = snapshot_download(
                repo_id="amphion/Vevo",
                repo_type="model",
                cache_dir="./ckpts/Vevo",
                allow_patterns=["acoustic_modeling/Vocoder/*"],
            )
            vocoder_ckpt_path = os.path.join(local_dir, "acoustic_modeling/Vocoder")
        
        # Initialize pipeline
        inference_pipeline = VevoInferencePipeline(
            content_style_tokenizer_ckpt_path=content_style_tokenizer_ckpt_path,
            fmt_cfg_path=fmt_cfg_path,
            fmt_ckpt_path=fmt_ckpt_path,
            vocoder_cfg_path=vocoder_cfg_path,
            vocoder_ckpt_path=vocoder_ckpt_path,
            device=device,
        )
        
    elif pipeline_type == "tts":
        # Use already downloaded Content-Style Tokenizer
        if downloaded_resources["tokenizer_vq8192"]:
            content_style_tokenizer_ckpt_path = os.path.join(
                downloaded_content_style_tokenizer_path, "tokenizer/vq8192"
            )
        else:
            # Fallback to direct download
            local_dir = snapshot_download(
                repo_id="amphion/Vevo",
                repo_type="model",
                cache_dir="./ckpts/Vevo",
                allow_patterns=["tokenizer/vq8192/*"],
            )
            content_style_tokenizer_ckpt_path = os.path.join(local_dir, "tokenizer/vq8192")
        
        # Use already downloaded Autoregressive Transformer (TTS specific)
        ar_cfg_path = "./models/vc/vevo/config/PhoneToVq8192.json"
        if downloaded_resources["ar_PhoneToVq8192"]:
            ar_ckpt_path = os.path.join(
                downloaded_ar_phone_path, "contentstyle_modeling/PhoneToVq8192"
            )
        else:
            # Fallback to direct download
            local_dir = snapshot_download(
                repo_id="amphion/Vevo",
                repo_type="model",
                cache_dir="./ckpts/Vevo",
                allow_patterns=["contentstyle_modeling/PhoneToVq8192/*"],
            )
            ar_ckpt_path = os.path.join(local_dir, "contentstyle_modeling/PhoneToVq8192")
        
        # Use already downloaded Flow Matching Transformer
        fmt_cfg_path = "./models/vc/vevo/config/Vq8192ToMels.json"
        if downloaded_resources["fmt_Vq8192ToMels"]:
            fmt_ckpt_path = os.path.join(
                downloaded_fmt_path, "acoustic_modeling/Vq8192ToMels"
            )
        else:
            # Fallback to direct download
            local_dir = snapshot_download(
                repo_id="amphion/Vevo",
                repo_type="model",
                cache_dir="./ckpts/Vevo",
                allow_patterns=["acoustic_modeling/Vq8192ToMels/*"],
            )
            fmt_ckpt_path = os.path.join(local_dir, "acoustic_modeling/Vq8192ToMels")
        
        # Use already downloaded Vocoder
        vocoder_cfg_path = "./models/vc/vevo/config/Vocoder.json"
        if downloaded_resources["vocoder"]:
            vocoder_ckpt_path = os.path.join(
                downloaded_vocoder_path, "acoustic_modeling/Vocoder"
            )
        else:
            # Fallback to direct download
            local_dir = snapshot_download(
                repo_id="amphion/Vevo",
                repo_type="model",
                cache_dir="./ckpts/Vevo",
                allow_patterns=["acoustic_modeling/Vocoder/*"],
            )
            vocoder_ckpt_path = os.path.join(local_dir, "acoustic_modeling/Vocoder")
        
        # Initialize pipeline
        inference_pipeline = VevoInferencePipeline(
            content_style_tokenizer_ckpt_path=content_style_tokenizer_ckpt_path,
            ar_cfg_path=ar_cfg_path,
            ar_ckpt_path=ar_ckpt_path,
            fmt_cfg_path=fmt_cfg_path,
            fmt_ckpt_path=fmt_ckpt_path,
            vocoder_cfg_path=vocoder_cfg_path,
            vocoder_ckpt_path=vocoder_ckpt_path,
            device=device,
        )
    
    # Cache pipeline instance
    inference_pipelines[pipeline_type] = inference_pipeline
    return inference_pipeline

# Implement VEVO functionality functions
@spaces.GPU()
def vevo_style(content_wav, style_wav):
    temp_content_path = "wav/temp_content.wav"
    temp_style_path = "wav/temp_style.wav"
    output_path = "wav/output_vevostyle.wav"
    
    # Check and process audio data
    if content_wav is None or style_wav is None:
        raise ValueError("Please upload audio files")
    
    # Process audio format
    if isinstance(content_wav, tuple) and len(content_wav) == 2:
        if isinstance(content_wav[0], np.ndarray):
            content_data, content_sr = content_wav
        else:
            content_sr, content_data = content_wav
        
        # Ensure single channel
        if len(content_data.shape) > 1 and content_data.shape[1] > 1:
            content_data = np.mean(content_data, axis=1)
        
        # Resample to 24kHz
        if content_sr != 24000:
            content_tensor = torch.FloatTensor(content_data).unsqueeze(0)
            content_tensor = torchaudio.functional.resample(content_tensor, content_sr, 24000)
            content_sr = 24000
        else:
            content_tensor = torch.FloatTensor(content_data).unsqueeze(0)
        
        # Normalize volume
        content_tensor = content_tensor / (torch.max(torch.abs(content_tensor)) + 1e-6) * 0.95
    else:
        raise ValueError("Invalid content audio format")
    
    if isinstance(style_wav[0], np.ndarray):
        style_data, style_sr = style_wav
    else:
        style_sr, style_data = style_wav

    # Ensure single channel
    if len(style_data.shape) > 1 and style_data.shape[1] > 1:
        style_data = np.mean(style_data, axis=1)

    # Resample to 24kHz
    if style_sr != 24000:
        style_tensor = torch.FloatTensor(style_data).unsqueeze(0)
        style_tensor = torchaudio.functional.resample(style_tensor, style_sr, 24000)
        style_sr = 24000
    else:
        style_tensor = torch.FloatTensor(style_data).unsqueeze(0)

    # Normalize volume
    style_tensor = style_tensor / (torch.max(torch.abs(style_tensor)) + 1e-6) * 0.95
    
    # Print debug information
    print(f"Content audio shape: {content_tensor.shape}, sample rate: {content_sr}")
    print(f"Style audio shape: {style_tensor.shape}, sample rate: {style_sr}")
    
    # Save audio
    torchaudio.save(temp_content_path, content_tensor, content_sr)
    torchaudio.save(temp_style_path, style_tensor, style_sr)
    
    try:
        # Get pipeline
        pipeline = get_pipeline("style")
        
        # Inference
        gen_audio = pipeline.inference_ar_and_fm(
            src_wav_path=temp_content_path,
            src_text=None,
            style_ref_wav_path=temp_style_path,
            timbre_ref_wav_path=temp_content_path,
        )
        
        # Check if generated audio is numerical anomaly
        if torch.isnan(gen_audio).any() or torch.isinf(gen_audio).any():
            print("Warning: Generated audio contains NaN or Inf values")
            gen_audio = torch.nan_to_num(gen_audio, nan=0.0, posinf=0.95, neginf=-0.95)
        
        print(f"Generated audio shape: {gen_audio.shape}, max: {torch.max(gen_audio)}, min: {torch.min(gen_audio)}")
        
        # Save generated audio
        save_audio(gen_audio, output_path=output_path)
        
        return output_path
    except Exception as e:
        print(f"Error during processing: {e}")
        import traceback
        traceback.print_exc()
        raise e

@spaces.GPU()
def vevo_timbre(content_wav, reference_wav):
    temp_content_path = "wav/temp_content.wav"
    temp_reference_path = "wav/temp_reference.wav"
    output_path = "wav/output_vevotimbre.wav"
    
    # Check and process audio data
    if content_wav is None or reference_wav is None:
        raise ValueError("Please upload audio files")
    
    # Process content audio format
    if isinstance(content_wav, tuple) and len(content_wav) == 2:
        if isinstance(content_wav[0], np.ndarray):
            content_data, content_sr = content_wav
        else:
            content_sr, content_data = content_wav
        
        # Ensure single channel
        if len(content_data.shape) > 1 and content_data.shape[1] > 1:
            content_data = np.mean(content_data, axis=1)
        
        # Resample to 24kHz
        if content_sr != 24000:
            content_tensor = torch.FloatTensor(content_data).unsqueeze(0)
            content_tensor = torchaudio.functional.resample(content_tensor, content_sr, 24000)
            content_sr = 24000
        else:
            content_tensor = torch.FloatTensor(content_data).unsqueeze(0)
        
        # Normalize volume
        content_tensor = content_tensor / (torch.max(torch.abs(content_tensor)) + 1e-6) * 0.95
    else:
        raise ValueError("Invalid content audio format")
    
    # Process reference audio format
    if isinstance(reference_wav, tuple) and len(reference_wav) == 2:
        if isinstance(reference_wav[0], np.ndarray):
            reference_data, reference_sr = reference_wav
        else:
            reference_sr, reference_data = reference_wav
        
        # Ensure single channel
        if len(reference_data.shape) > 1 and reference_data.shape[1] > 1:
            reference_data = np.mean(reference_data, axis=1)
        
        # Resample to 24kHz
        if reference_sr != 24000:
            reference_tensor = torch.FloatTensor(reference_data).unsqueeze(0)
            reference_tensor = torchaudio.functional.resample(reference_tensor, reference_sr, 24000)
            reference_sr = 24000
        else:
            reference_tensor = torch.FloatTensor(reference_data).unsqueeze(0)
        
        # Normalize volume
        reference_tensor = reference_tensor / (torch.max(torch.abs(reference_tensor)) + 1e-6) * 0.95
    else:
        raise ValueError("Invalid reference audio format")
    
    # Print debug information
    print(f"Content audio shape: {content_tensor.shape}, sample rate: {content_sr}")
    print(f"Reference audio shape: {reference_tensor.shape}, sample rate: {reference_sr}")
    
    # Save uploaded audio
    torchaudio.save(temp_content_path, content_tensor, content_sr)
    torchaudio.save(temp_reference_path, reference_tensor, reference_sr)
    
    try:
        # Get pipeline
        pipeline = get_pipeline("timbre")
        
        # Inference
        gen_audio = pipeline.inference_fm(
            src_wav_path=temp_content_path,
            timbre_ref_wav_path=temp_reference_path,
            flow_matching_steps=32,
        )
        
        # Check if generated audio is numerical anomaly
        if torch.isnan(gen_audio).any() or torch.isinf(gen_audio).any():
            print("Warning: Generated audio contains NaN or Inf values")
            gen_audio = torch.nan_to_num(gen_audio, nan=0.0, posinf=0.95, neginf=-0.95)
        
        print(f"Generated audio shape: {gen_audio.shape}, max: {torch.max(gen_audio)}, min: {torch.min(gen_audio)}")
        
        # Save generated audio
        save_audio(gen_audio, output_path=output_path)
        
        return output_path
    except Exception as e:
        print(f"Error during processing: {e}")
        import traceback
        traceback.print_exc()
        raise e

@spaces.GPU()
def vevo_voice(content_wav, style_reference_wav, timbre_reference_wav):
    temp_content_path = "wav/temp_content.wav"
    temp_style_path = "wav/temp_style.wav"
    temp_timbre_path = "wav/temp_timbre.wav"
    output_path = "wav/output_vevovoice.wav"
    
    # Check and process audio data
    if content_wav is None or style_reference_wav is None or timbre_reference_wav is None:
        raise ValueError("Please upload all required audio files")
    
    # Process content audio format
    if isinstance(content_wav, tuple) and len(content_wav) == 2:
        if isinstance(content_wav[0], np.ndarray):
            content_data, content_sr = content_wav
        else:
            content_sr, content_data = content_wav
        
        # Ensure single channel
        if len(content_data.shape) > 1 and content_data.shape[1] > 1:
            content_data = np.mean(content_data, axis=1)
        
        # Resample to 24kHz
        if content_sr != 24000:
            content_tensor = torch.FloatTensor(content_data).unsqueeze(0)
            content_tensor = torchaudio.functional.resample(content_tensor, content_sr, 24000)
            content_sr = 24000
        else:
            content_tensor = torch.FloatTensor(content_data).unsqueeze(0)
        
        # Normalize volume
        content_tensor = content_tensor / (torch.max(torch.abs(content_tensor)) + 1e-6) * 0.95
    else:
        raise ValueError("Invalid content audio format")
    
    # Process style reference audio format
    if isinstance(style_reference_wav, tuple) and len(style_reference_wav) == 2:
        if isinstance(style_reference_wav[0], np.ndarray):
            style_data, style_sr = style_reference_wav
        else:
            style_sr, style_data = style_reference_wav
        
        # Ensure single channel
        if len(style_data.shape) > 1 and style_data.shape[1] > 1:
            style_data = np.mean(style_data, axis=1)
        
        # Resample to 24kHz
        if style_sr != 24000:
            style_tensor = torch.FloatTensor(style_data).unsqueeze(0)
            style_tensor = torchaudio.functional.resample(style_tensor, style_sr, 24000)
            style_sr = 24000
        else:
            style_tensor = torch.FloatTensor(style_data).unsqueeze(0)
        
        # Normalize volume
        style_tensor = style_tensor / (torch.max(torch.abs(style_tensor)) + 1e-6) * 0.95
    else:
        raise ValueError("Invalid style reference audio format")
    
    # Process timbre reference audio format
    if isinstance(timbre_reference_wav, tuple) and len(timbre_reference_wav) == 2:
        if isinstance(timbre_reference_wav[0], np.ndarray):
            timbre_data, timbre_sr = timbre_reference_wav
        else:
            timbre_sr, timbre_data = timbre_reference_wav
        
        # Ensure single channel
        if len(timbre_data.shape) > 1 and timbre_data.shape[1] > 1:
            timbre_data = np.mean(timbre_data, axis=1)
        
        # Resample to 24kHz
        if timbre_sr != 24000:
            timbre_tensor = torch.FloatTensor(timbre_data).unsqueeze(0)
            timbre_tensor = torchaudio.functional.resample(timbre_tensor, timbre_sr, 24000)
            timbre_sr = 24000
        else:
            timbre_tensor = torch.FloatTensor(timbre_data).unsqueeze(0)
        
        # Normalize volume
        timbre_tensor = timbre_tensor / (torch.max(torch.abs(timbre_tensor)) + 1e-6) * 0.95
    else:
        raise ValueError("Invalid timbre reference audio format")
    
    # Print debug information
    print(f"Content audio shape: {content_tensor.shape}, sample rate: {content_sr}")
    print(f"Style reference audio shape: {style_tensor.shape}, sample rate: {style_sr}")
    print(f"Timbre reference audio shape: {timbre_tensor.shape}, sample rate: {timbre_sr}")
    
    # Save uploaded audio
    torchaudio.save(temp_content_path, content_tensor, content_sr)
    torchaudio.save(temp_style_path, style_tensor, style_sr)
    torchaudio.save(temp_timbre_path, timbre_tensor, timbre_sr)
    
    try:
        # Get pipeline
        pipeline = get_pipeline("voice")
        
        # Inference
        gen_audio = pipeline.inference_ar_and_fm(
            src_wav_path=temp_content_path,
            src_text=None,
            style_ref_wav_path=temp_style_path,
            timbre_ref_wav_path=temp_timbre_path,
        )
        
        # Check if generated audio is numerical anomaly
        if torch.isnan(gen_audio).any() or torch.isinf(gen_audio).any():
            print("Warning: Generated audio contains NaN or Inf values")
            gen_audio = torch.nan_to_num(gen_audio, nan=0.0, posinf=0.95, neginf=-0.95)
        
        print(f"Generated audio shape: {gen_audio.shape}, max: {torch.max(gen_audio)}, min: {torch.min(gen_audio)}")
        
        # Save generated audio
        save_audio(gen_audio, output_path=output_path)
        
        return output_path
    except Exception as e:
        print(f"Error during processing: {e}")
        import traceback
        traceback.print_exc()
        raise e

@spaces.GPU()
def vevo_tts(text, ref_wav, timbre_ref_wav=None, style_ref_text=None, src_language="en", ref_language="en", style_ref_text_language="en"):
    temp_ref_path = "wav/temp_ref.wav"
    temp_timbre_path = "wav/temp_timbre.wav"
    output_path = "wav/output_vevotts.wav"
     
    # Check and process audio data
    if ref_wav is None:
        raise ValueError("Please upload a reference audio file")
    
    # Process reference audio format
    if isinstance(ref_wav, tuple) and len(ref_wav) == 2:
        if isinstance(ref_wav[0], np.ndarray):
            ref_data, ref_sr = ref_wav
        else:
            ref_sr, ref_data = ref_wav
        
        # Ensure single channel
        if len(ref_data.shape) > 1 and ref_data.shape[1] > 1:
            ref_data = np.mean(ref_data, axis=1)
        
        # Resample to 24kHz
        if ref_sr != 24000:
            ref_tensor = torch.FloatTensor(ref_data).unsqueeze(0)
            ref_tensor = torchaudio.functional.resample(ref_tensor, ref_sr, 24000)
            ref_sr = 24000
        else:
            ref_tensor = torch.FloatTensor(ref_data).unsqueeze(0)
        
        # Normalize volume
        ref_tensor = ref_tensor / (torch.max(torch.abs(ref_tensor)) + 1e-6) * 0.95
    else:
        raise ValueError("Invalid reference audio format")
    
    # Print debug information
    print(f"Reference audio shape: {ref_tensor.shape}, sample rate: {ref_sr}")
    if style_ref_text:
        print(f"Style reference text: {style_ref_text}, language: {style_ref_text_language}")
    
    # Save uploaded audio
    torchaudio.save(temp_ref_path, ref_tensor, ref_sr)
    
    if timbre_ref_wav is not None:
        if isinstance(timbre_ref_wav, tuple) and len(timbre_ref_wav) == 2:
            if isinstance(timbre_ref_wav[0], np.ndarray):
                timbre_data, timbre_sr = timbre_ref_wav
            else:
                timbre_sr, timbre_data = timbre_ref_wav
            
            # Ensure single channel
            if len(timbre_data.shape) > 1 and timbre_data.shape[1] > 1:
                timbre_data = np.mean(timbre_data, axis=1)
            
            # Resample to 24kHz
            if timbre_sr != 24000:
                timbre_tensor = torch.FloatTensor(timbre_data).unsqueeze(0)
                timbre_tensor = torchaudio.functional.resample(timbre_tensor, timbre_sr, 24000)
                timbre_sr = 24000
            else:
                timbre_tensor = torch.FloatTensor(timbre_data).unsqueeze(0)
            
            # Normalize volume
            timbre_tensor = timbre_tensor / (torch.max(torch.abs(timbre_tensor)) + 1e-6) * 0.95
            
            print(f"Timbre reference audio shape: {timbre_tensor.shape}, sample rate: {timbre_sr}")
            torchaudio.save(temp_timbre_path, timbre_tensor, timbre_sr)
        else:
            raise ValueError("Invalid timbre reference audio format")
    else:
        temp_timbre_path = temp_ref_path
    
    try:
        # Get pipeline
        pipeline = get_pipeline("tts")
        
        # Inference
        gen_audio = pipeline.inference_ar_and_fm(
            src_wav_path=None,
            src_text=text,
            style_ref_wav_path=temp_ref_path,
            timbre_ref_wav_path=temp_timbre_path,
            style_ref_wav_text=style_ref_text,
            src_text_language=src_language,
            style_ref_wav_text_language=style_ref_text_language,
        )
        
        # Check if generated audio is numerical anomaly
        if torch.isnan(gen_audio).any() or torch.isinf(gen_audio).any():
            print("Warning: Generated audio contains NaN or Inf values")
            gen_audio = torch.nan_to_num(gen_audio, nan=0.0, posinf=0.95, neginf=-0.95)
        
        print(f"Generated audio shape: {gen_audio.shape}, max: {torch.max(gen_audio)}, min: {torch.min(gen_audio)}")
        
        # Save generated audio
        save_audio(gen_audio, output_path=output_path)
        
        return output_path
    except Exception as e:
        print(f"Error during processing: {e}")
        import traceback
        traceback.print_exc()
        raise e

# Create Gradio interface
with gr.Blocks(title="Vevo: Controllable Zero-Shot Voice Imitation with Self-Supervised Disentanglement") as demo:
    gr.Markdown("# Vevo: Controllable Zero-Shot Voice Imitation with Self-Supervised Disentanglement")        
    # Add link tag line
    with gr.Row(elem_id="links_row"):
        gr.HTML("""
        <div style="display: flex; justify-content: flex-start; gap: 8px; margin: 0 0; padding-left: 0px;">
            <a href="https://arxiv.org/abs/2502.07243" target="_blank" style="text-decoration: none;">
                <img alt="arXiv Paper" src="https://img.shields.io/badge/arXiv-Paper-red">
            </a>
            <a href="https://openreview.net/pdf?id=anQDiQZhDP" target="_blank" style="text-decoration: none;">
                <img alt="ICLR Paper" src="https://img.shields.io/badge/ICLR-Paper-64b63a">
            </a>
            <a href="https://huggingface.co/amphion/Vevo" target="_blank" style="text-decoration: none;">
                <img alt="HuggingFace Model" src="https://img.shields.io/badge/%F0%9F%A4%97%20HuggingFace-Model-yellow">
            </a>
            <a href="https://github.com/open-mmlab/Amphion/tree/main/models/vc/vevo" target="_blank" style="text-decoration: none;">
                <img alt="GitHub Repo" src="https://img.shields.io/badge/GitHub-Repo-blue">
            </a>
        </div>
        """)

    with gr.Tab("Vevo-Timbre"):
        gr.Markdown("### Vevo-Timbre: Maintain style but transfer timbre")
        with gr.Row():
            with gr.Column():
                timbre_content = gr.Audio(label="Source Audio", type="numpy")
                timbre_reference = gr.Audio(label="Timbre Reference", type="numpy")
                timbre_button = gr.Button("Generate")
            with gr.Column():
                timbre_output = gr.Audio(label="Result")
        timbre_button.click(vevo_timbre, inputs=[timbre_content, timbre_reference], outputs=timbre_output)

    with gr.Tab("Vevo-Style"):
        gr.Markdown("### Vevo-Style: Maintain timbre but transfer style (accent, emotion, etc.)")
        with gr.Row():
            with gr.Column():
                style_content = gr.Audio(label="Source Audio", type="numpy")
                style_reference = gr.Audio(label="Style Reference", type="numpy")
                style_button = gr.Button("Generate")
            with gr.Column():
                style_output = gr.Audio(label="Result")
        style_button.click(vevo_style, inputs=[style_content, style_reference], outputs=style_output)

    with gr.Tab("Vevo-Voice"):
        gr.Markdown("### Vevo-Voice: Transfers both style and timbre with separate references")
        with gr.Row():
            with gr.Column():
                voice_content = gr.Audio(label="Source Audio", type="numpy")
                voice_style_reference = gr.Audio(label="Style Reference", type="numpy")
                voice_timbre_reference = gr.Audio(label="Timbre Reference", type="numpy")
                voice_button = gr.Button("Generate")
            with gr.Column():
                voice_output = gr.Audio(label="Result")
        voice_button.click(vevo_voice, inputs=[voice_content, voice_style_reference, voice_timbre_reference], outputs=voice_output)
    
    
    
    with gr.Tab("Vevo-TTS"):
        gr.Markdown("### Vevo-TTS: Text-to-speech with separate style and timbre references")
        with gr.Row():
            with gr.Column():
                tts_text = gr.Textbox(label="Target Text", placeholder="Enter text to synthesize...", lines=3)
                tts_src_language = gr.Dropdown(["en", "zh", "de", "fr", "ja", "ko"], label="Text Language", value="en")
                tts_reference = gr.Audio(label="Style Reference", type="numpy")                
                tts_style_ref_text = gr.Textbox(label="Style Reference Text", placeholder="Enter style reference text...", lines=3)
                tts_style_ref_text_language = gr.Dropdown(["en", "zh", "de", "fr", "ja", "ko"], label="Style Reference Text Language", value="en")
                tts_timbre_reference = gr.Audio(label="Timbre Reference", type="numpy")
                tts_button = gr.Button("Generate")
            with gr.Column():
                tts_output = gr.Audio(label="Result")
        
        tts_button.click(
            vevo_tts, 
            inputs=[tts_text, tts_reference, tts_timbre_reference, tts_style_ref_text, tts_src_language, tts_style_ref_text_language], 
            outputs=tts_output
        )
    
    gr.Markdown("""
    ## About VEVO
    VEVO is a versatile voice synthesis and conversion model that offers four main functionalities:
    1. **Vevo-Style**: Maintains timbre but transfers style (accent, emotion, etc.)
    2. **Vevo-Timbre**: Maintains style but transfers timbre
    3. **Vevo-Voice**: Transfers both style and timbre with separate references
    4. **Vevo-TTS**: Text-to-speech with separate style and timbre references
    
    For more information, visit the [Amphion project](https://github.com/open-mmlab/Amphion)
    """)

# Launch application
demo.launch()