import os
from share import *
import config

import cv2
import einops
import gradio as gr
import numpy as np
import torch
import random

from pytorch_lightning import seed_everything
from annotator.util import resize_image
from cldm.model import create_model, load_state_dict
from cldm.ddim_haced_sag_step import DDIMSampler
from lavis.models import load_model_and_preprocess
from PIL import Image
import tqdm

from ldm.models.autoencoder_train import AutoencoderKL

ckpt_path="./pretrained_models/main_model.ckpt"

model = create_model('./models/cldm_v15_inpainting_infer1.yaml').cpu()
model.load_state_dict(load_state_dict(ckpt_path, location='cuda'),strict=False)
model = model.cuda()

ddim_sampler = DDIMSampler(model)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BLIP_model, vis_processors, _ = load_model_and_preprocess(name="blip_caption", model_type="base_coco", is_eval=True, device=device)

vae_model_ckpt_path="./pretrained_models/content-guided_deformable_vae.ckpt"

def load_vae():
    init_config = {
        "embed_dim": 4,
        "monitor": "val/rec_loss",
        "ddconfig":{
          "double_z": True,
          "z_channels": 4,
          "resolution": 256,
          "in_channels": 3,
          "out_ch": 3,
          "ch": 128,
          "ch_mult":[1,2,4,4],
          "num_res_blocks": 2,
          "attn_resolutions": [],
          "dropout": 0.0,
        },
        "lossconfig":{
          "target": "ldm.modules.losses.LPIPSWithDiscriminator",
          "params":{
            "disc_start": 501,
            "kl_weight": 0,
            "disc_weight": 0.025,
            "disc_factor": 1.0
        }
        }
    }
    vae = AutoencoderKL(**init_config)
    vae.load_state_dict(load_state_dict(vae_model_ckpt_path, location='cuda'))
    vae = vae.cuda()
    return vae

vae_model=load_vae()

def encode_mask(mask,masked_image):
    mask = torch.nn.functional.interpolate(mask, size=(mask.shape[2] // 8, mask.shape[3] // 8))
    # mask=torch.cat([mask] * 2) #if do_classifier_free_guidance else mask
    mask = mask.to(device="cuda")
    # do_classifier_free_guidance=False
    masked_image_latents = model.get_first_stage_encoding(model.encode_first_stage(masked_image.cuda())).detach()
    return mask,masked_image_latents

def get_mask(input_image,hint_image):
    mask=input_image.copy()
    H,W,C=input_image.shape
    for i in range(H):
        for j in range(W):
            if input_image[i,j,0]==hint_image[i,j,0]:
                # print(input_image[i,j,0])
                mask[i,j,:]=255.
            else:
                mask[i,j,:]=0. #input_image[i,j,:]
    kernel=cv2.getStructuringElement(cv2.MORPH_RECT,(3,3))
    mask=cv2.morphologyEx(np.array(mask),cv2.MORPH_OPEN,kernel,iterations=1)
    return mask

def prepare_mask_and_masked_image(image, mask):
    """
    Prepares a pair (image, mask) to be consumed by the Stable Diffusion pipeline. This means that those inputs will be
    converted to ``torch.Tensor`` with shapes ``batch x channels x height x width`` where ``channels`` is ``3`` for the
    ``image`` and ``1`` for the ``mask``.
    The ``image`` will be converted to ``torch.float32`` and normalized to be in ``[-1, 1]``. The ``mask`` will be
    binarized (``mask > 0.5``) and cast to ``torch.float32`` too.
    Args:
        image (Union[np.array, PIL.Image, torch.Tensor]): The image to inpaint.
            It can be a ``PIL.Image``, or a ``height x width x 3`` ``np.array`` or a ``channels x height x width``
            ``torch.Tensor`` or a ``batch x channels x height x width`` ``torch.Tensor``.
        mask (_type_): The mask to apply to the image, i.e. regions to inpaint.
            It can be a ``PIL.Image``, or a ``height x width`` ``np.array`` or a ``1 x height x width``
            ``torch.Tensor`` or a ``batch x 1 x height x width`` ``torch.Tensor``.
    Raises:
        ValueError: ``torch.Tensor`` images should be in the ``[-1, 1]`` range. ValueError: ``torch.Tensor`` mask
        should be in the ``[0, 1]`` range. ValueError: ``mask`` and ``image`` should have the same spatial dimensions.
        TypeError: ``mask`` is a ``torch.Tensor`` but ``image`` is not
            (ot the other way around).
    Returns:
        tuple[torch.Tensor]: The pair (mask, masked_image) as ``torch.Tensor`` with 4
            dimensions: ``batch x channels x height x width``.
    """
    if isinstance(image, torch.Tensor):
        if not isinstance(mask, torch.Tensor):
            raise TypeError(f"`image` is a torch.Tensor but `mask` (type: {type(mask)} is not")

        # Batch single image
        if image.ndim == 3:
            assert image.shape[0] == 3, "Image outside a batch should be of shape (3, H, W)"
            image = image.unsqueeze(0)

        # Batch and add channel dim for single mask
        if mask.ndim == 2:
            mask = mask.unsqueeze(0).unsqueeze(0)

        # Batch single mask or add channel dim
        if mask.ndim == 3:
            # Single batched mask, no channel dim or single mask not batched but channel dim
            if mask.shape[0] == 1:
                mask = mask.unsqueeze(0)

            # Batched masks no channel dim
            else:
                mask = mask.unsqueeze(1)

        assert image.ndim == 4 and mask.ndim == 4, "Image and Mask must have 4 dimensions"
        assert image.shape[-2:] == mask.shape[-2:], "Image and Mask must have the same spatial dimensions"
        assert image.shape[0] == mask.shape[0], "Image and Mask must have the same batch size"

        # Check image is in [-1, 1]
        if image.min() < -1 or image.max() > 1:
            raise ValueError("Image should be in [-1, 1] range")

        # Check mask is in [0, 1]
        if mask.min() < 0 or mask.max() > 1:
            raise ValueError("Mask should be in [0, 1] range")

        # Binarize mask
        mask[mask < 0.5] = 0
        mask[mask >= 0.5] = 1

        # Image as float32
        image = image.to(dtype=torch.float32)
    elif isinstance(mask, torch.Tensor):
        raise TypeError(f"`mask` is a torch.Tensor but `image` (type: {type(image)} is not")
    else:
        # preprocess image
        if isinstance(image, (Image.Image, np.ndarray)):
            image = [image]

        if isinstance(image, list) and isinstance(image[0], Image.Image):
            image = [np.array(i.convert("RGB"))[None, :] for i in image]
            image = np.concatenate(image, axis=0)
        elif isinstance(image, list) and isinstance(image[0], np.ndarray):
            image = np.concatenate([i[None, :] for i in image], axis=0)

        image = image.transpose(0, 3, 1, 2)
        image = torch.from_numpy(image).to(dtype=torch.float32) / 127.5 - 1.0

        # preprocess mask
        if isinstance(mask, (Image.Image, np.ndarray)):
            mask = [mask]

        if isinstance(mask, list) and isinstance(mask[0], Image.Image):
            mask = np.concatenate([np.array(m.convert("L"))[None, None, :] for m in mask], axis=0)
            mask = mask.astype(np.float32) / 255.0
        elif isinstance(mask, list) and isinstance(mask[0], np.ndarray):
            mask = np.concatenate([m[None, None, :] for m in mask], axis=0)

        mask[mask < 0.5] = 0
        mask[mask >= 0.5] = 1
        mask = torch.from_numpy(mask)

    masked_image = image * (mask < 0.5)

    return mask, masked_image

# generate image
generator = torch.manual_seed(859311133)#0
def path2L(img_path):
    raw_image = cv2.imread(img_path)
    raw_image = cv2.cvtColor(raw_image,cv2.COLOR_BGR2LAB)
    raw_image_input = cv2.merge([raw_image[:,:,0],raw_image[:,:,0],raw_image[:,:,0]])
    return raw_image_input

def is_gray_scale(img, threshold=10):
    img = Image.fromarray(img)
    if len(img.getbands()) == 1:
        return True
    img1 = np.asarray(img.getchannel(channel=0), dtype=np.int16)
    img2 = np.asarray(img.getchannel(channel=1), dtype=np.int16)
    img3 = np.asarray(img.getchannel(channel=2), dtype=np.int16)
    diff1 = (img1 - img2).var()
    diff2 = (img2 - img3).var()
    diff3 = (img3 - img1).var()
    diff_sum = (diff1 + diff2 + diff3) / 3.0
    if diff_sum <= threshold:
        return True
    else:
        return False

def randn_tensor(
    shape,
    generator= None,
    device= None,
    dtype=None,
    layout= None,
):
    """A helper function to create random tensors on the desired `device` with the desired `dtype`. When
    passing a list of generators, you can seed each batch size individually. If CPU generators are passed, the tensor
    is always created on the CPU.
    """
    # device on which tensor is created defaults to device
    rand_device = device
    batch_size = shape[0]

    layout = layout or torch.strided
    device = device or torch.device("cpu")

    if generator is not None:
        gen_device_type = generator.device.type if not isinstance(generator, list) else generator[0].device.type
        if gen_device_type != device.type and gen_device_type == "cpu":
            rand_device = "cpu"
            if device != "mps":
                print("The passed generator was created on 'cpu' even though a tensor on {device} was expected.")
                # logger.info(
                #     f"The passed generator was created on 'cpu' even though a tensor on {device} was expected."
                #     f" Tensors will be created on 'cpu' and then moved to {device}. Note that one can probably"
                #     f" slighly speed up this function by passing a generator that was created on the {device} device."
                # )
        elif gen_device_type != device.type and gen_device_type == "cuda":
            raise ValueError(f"Cannot generate a {device} tensor from a generator of type {gen_device_type}.")

    # make sure generator list of length 1 is treated like a non-list
    if isinstance(generator, list) and len(generator) == 1:
        generator = generator[0]

    if isinstance(generator, list):
        shape = (1,) + shape[1:]
        latents = [
            torch.randn(shape, generator=generator[i], device=rand_device, dtype=dtype, layout=layout)
            for i in range(batch_size)
        ]
        latents = torch.cat(latents, dim=0).to(device)
    else:
        latents = torch.randn(shape, generator=generator, device=rand_device, dtype=dtype, layout=layout).to(device)

    return latents

def add_noise(
        original_samples: torch.FloatTensor,
        noise: torch.FloatTensor,
        timesteps: torch.IntTensor,
    ) -> torch.FloatTensor:
        betas = torch.linspace(0.00085, 0.0120, 1000, dtype=torch.float32)
        alphas = 1.0 - betas
        alphas_cumprod = torch.cumprod(alphas, dim=0)
        alphas_cumprod = alphas_cumprod.to(device=original_samples.device, dtype=original_samples.dtype)
        timesteps = timesteps.to(original_samples.device)

        sqrt_alpha_prod = alphas_cumprod[timesteps] ** 0.5
        sqrt_alpha_prod = sqrt_alpha_prod.flatten()
        while len(sqrt_alpha_prod.shape) < len(original_samples.shape):
            sqrt_alpha_prod = sqrt_alpha_prod.unsqueeze(-1)

        sqrt_one_minus_alpha_prod = (1 - alphas_cumprod[timesteps]) ** 0.5
        sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.flatten()
        while len(sqrt_one_minus_alpha_prod.shape) < len(original_samples.shape):
            sqrt_one_minus_alpha_prod = sqrt_one_minus_alpha_prod.unsqueeze(-1)

        noisy_samples = sqrt_alpha_prod * original_samples + sqrt_one_minus_alpha_prod * noise

        return noisy_samples

def set_timesteps(num_inference_steps: int, timestep_spacing="leading",device=None):
        """
        Sets the discrete timesteps used for the diffusion chain. Supporting function to be run before inference.

        Args:
            num_inference_steps (`int`):
                the number of diffusion steps used when generating samples with a pre-trained model.
        """
        num_train_timesteps=1000
        if num_inference_steps > num_train_timesteps:
            raise ValueError(
                f"`num_inference_steps`: {num_inference_steps} cannot be larger than `self.config.train_timesteps`:"
                f" {num_train_timesteps} as the unet model trained with this scheduler can only handle"
                f" maximal {num_train_timesteps} timesteps."
            )

        num_inference_steps = num_inference_steps
        # "linspace", "leading", "trailing" corresponds to annotation of Table 2. of https://arxiv.org/abs/2305.08891
        if timestep_spacing == "linspace":
            timesteps = (
                np.linspace(0, num_train_timesteps - 1, num_inference_steps)
                .round()[::-1]
                .copy()
                .astype(np.int64)
            )
        elif timestep_spacing == "leading":
            step_ratio = num_train_timesteps // num_inference_steps
            # creates integer timesteps by multiplying by ratio
            # casting to int to avoid issues when num_inference_step is power of 3
            timesteps = (np.arange(0, num_inference_steps) * step_ratio).round()[::-1].copy().astype(np.int64)
            # timesteps += steps_offset
        elif timestep_spacing == "trailing":
            step_ratio = num_train_timesteps / num_inference_steps
            # creates integer timesteps by multiplying by ratio
            # casting to int to avoid issues when num_inference_step is power of 3
            timesteps = np.round(np.arange(num_train_timesteps, 0, -step_ratio)).astype(np.int64)
            timesteps -= 1
        else:
            raise ValueError(
                f"{timestep_spacing} is not supported. Please make sure to choose one of 'leading' or 'trailing'."
            )

        timesteps = torch.from_numpy(timesteps).to(device)
        return timesteps

def get_timesteps(num_inference_steps, timesteps_set, strength, device):
        # get the original timestep using init_timestep
        init_timestep = min(int(num_inference_steps * strength), num_inference_steps)

        t_start = max(num_inference_steps - init_timestep, 0)
        timesteps = timesteps_set[t_start * 1 :]

        return timesteps, num_inference_steps - t_start


def get_noised_image_latents(img,W,H,ddim_steps,strength,seed,device):
    img1 = [cv2.resize(img,(W,H))]
    img1 = np.concatenate([i[None, :] for i in img1], axis=0)
    img1 = img1.transpose(0, 3, 1, 2)
    img1 = torch.from_numpy(img1).to(dtype=torch.float32) /127.5 - 1.0
    
    image_latents=model.get_first_stage_encoding(model.encode_first_stage(img1.cuda())).detach()
    shape=image_latents.shape
    generator = torch.manual_seed(seed) 
    
    noise = randn_tensor(shape, generator=generator, device=device, dtype=torch.float32)
    
    timesteps_set=set_timesteps(ddim_steps,timestep_spacing="linspace", device=device)
    timesteps, num_inference_steps = get_timesteps(ddim_steps, timesteps_set, strength, device)
    latent_timestep = timesteps[1].repeat(1 * 1)

    init_latents = add_noise(image_latents, noise, torch.tensor(latent_timestep))
    for j in range(0, 1000, 100):
        
        x_samples=model.decode_first_stage(add_noise(image_latents, noise, torch.tensor(j)))
        init_image=(einops.rearrange(x_samples, 'b c h w -> b h w c') * 127.5 + 127.5).cpu().numpy().clip(0, 255).astype(np.uint8)
    
        cv2.imwrite("./initlatents1/"+str(j)+"init_image.png",cv2.cvtColor(init_image[0],cv2.COLOR_RGB2BGR))
    return init_latents
                  
def process(using_deformable_vae,change_according_to_strokes,iterative_editing,input_image,hint_image,prompt, a_prompt, n_prompt, num_samples, image_resolution,  ddim_steps, guess_mode, strength, scale, sag_scale,SAG_influence_step, seed, eta):
    torch.cuda.empty_cache()
    with torch.no_grad():
        ref_flag=True
        input_image_ori=input_image
        if is_gray_scale(input_image):
            print("It is a greyscale image.")
            # mask=get_mask(input_image,hint_image)
        else:
            print("It is a color image.")
            input_image_ori=input_image
            input_image=cv2.cvtColor(input_image,cv2.COLOR_RGB2LAB)[:,:,0]
            input_image=cv2.merge([input_image,input_image,input_image])
        mask=get_mask(input_image_ori,hint_image)
        cv2.imwrite("gradio_mask1.png",mask)
        
        if iterative_editing:
            mask=255-mask
            if change_according_to_strokes:
                hint_image=mask/255.*hint_image+(1-mask/255.)*input_image_ori
            else:
                hint_image=mask/255.*input_image+(1-mask/255.)*input_image_ori
        else:
            hint_image=mask/255.*input_image+(1-mask/255.)*hint_image
        hint_image=hint_image.astype(np.uint8)
        if len(prompt)==0:
            image = Image.fromarray(input_image)
            image = vis_processors["eval"](image).unsqueeze(0).to(device)
            prompt = BLIP_model.generate({"image": image})[0]
            if "a black and white photo of" in prompt or "black and white photograph of" in prompt:
                prompt=prompt.replace(prompt[:prompt.find("of")+3],"")
        print(prompt)
        H_ori,W_ori,C_ori=input_image.shape
        img = resize_image(input_image, image_resolution)
        mask = resize_image(mask, image_resolution)
        hint_image =resize_image(hint_image,image_resolution)
        mask,masked_image=prepare_mask_and_masked_image(Image.fromarray(hint_image),Image.fromarray(mask))
        mask,masked_image_latents=encode_mask(mask,masked_image)
        H, W, C = img.shape
        
        # if ref_image is None:
        ref_image=np.array([[[0]*C]*W]*H).astype(np.float32)
        # print(ref_image.shape)
        # ref_flag=False
        ref_image=resize_image(ref_image,image_resolution)
        
        # cv2.imwrite("exemplar_image.png",cv2.cvtColor(ref_image,cv2.COLOR_RGB2BGR))    
        
        # ddim_steps=1
        control = torch.from_numpy(img.copy()).float().cuda() / 255.0
        control = torch.stack([control for _ in range(num_samples)], dim=0)
        control = einops.rearrange(control, 'b h w c -> b c h w').clone()
        
        if seed == -1:
            seed = random.randint(0, 65535)
        seed_everything(seed)

        ref_image=cv2.resize(ref_image,(W,H))
        
        ref_image=torch.from_numpy(ref_image).cuda().unsqueeze(0)
        
        init_latents=None
        
        if config.save_memory:
            model.low_vram_shift(is_diffusing=False)
            
        print("no reference images, using Frozen encoder")
        cond = {"c_concat": [control], "c_crossattn": [model.get_learned_conditioning([prompt + ', ' + a_prompt] * num_samples)]}
        un_cond = {"c_concat": None if guess_mode else [control], "c_crossattn": [model.get_learned_conditioning([n_prompt] * num_samples)]}
        shape = (4, H // 8, W // 8)

        if config.save_memory:
            model.low_vram_shift(is_diffusing=True)
        noise = randn_tensor(shape, generator=generator, device=device, dtype=torch.float32)
        model.control_scales = [strength * (0.825 ** float(12 - i)) for i in range(13)] if guess_mode else ([strength] * 13)  # Magic number. IDK why. Perhaps because 0.825**12<0.01 but 0.826**12>0.01
        samples, intermediates = ddim_sampler.sample(model,ddim_steps, num_samples,
                                                    shape, cond, mask=mask, masked_image_latents=masked_image_latents,verbose=False, eta=eta,
                                                    #  x_T=image_latents,
                                                    x_T=init_latents,
                                                    unconditional_guidance_scale=scale,
                                                    sag_scale = sag_scale,
                                                    SAG_influence_step=SAG_influence_step,
                                                    noise = noise,
                                                    unconditional_conditioning=un_cond)
        

        if config.save_memory:
            model.low_vram_shift(is_diffusing=False)

        if not using_deformable_vae:
            x_samples = model.decode_first_stage(samples)
        else:
            samples = model.decode_first_stage_before_vae(samples)
            gray_content_z=vae_model.get_gray_content_z(torch.from_numpy(img.copy()).float().cuda() / 255.0)
            # print(gray_content_z.shape)
            x_samples = vae_model.decode(samples,gray_content_z)
            
        x_samples = (einops.rearrange(x_samples, 'b c h w -> b h w c') * 127.5 + 127.5).cpu().numpy().clip(0, 255).astype(np.uint8)
        
        #single image replace L channel
        results_ori = [x_samples[i] for i in range(num_samples)]
        results_ori=[cv2.resize(i,(W_ori,H_ori),interpolation=cv2.INTER_LANCZOS4) for i in results_ori]
        
        cv2.imwrite("result_ori.png",cv2.cvtColor(results_ori[0],cv2.COLOR_RGB2BGR))
        
        results_tmp=[cv2.cvtColor(np.array(i),cv2.COLOR_RGB2LAB) for i in results_ori]
        results=[cv2.merge([input_image[:,:,0],tmp[:,:,1],tmp[:,:,2]]) for tmp in results_tmp]
        results_mergeL=[cv2.cvtColor(np.asarray(i),cv2.COLOR_LAB2RGB) for i in results]#cv2.COLOR_LAB2BGR)
        cv2.imwrite("output.png",cv2.cvtColor(results_mergeL[0],cv2.COLOR_RGB2BGR))
    return results_mergeL 

def get_grayscale_img(img, progress=gr.Progress(track_tqdm=True)):
    torch.cuda.empty_cache()
    for j in tqdm.tqdm(range(1),desc="Uploading input..."):
        return img,"Uploading input image done."
    
block = gr.Blocks().queue()
with block:
    with gr.Row():
        gr.Markdown("## Control-Color")#("## Color-Anything")#Control Stable Diffusion with L channel
    with gr.Row():
        with gr.Column():
            # input_image = gr.Image(source='upload', type="numpy")
            grayscale_img = gr.Image(visible=False, type="numpy")
            input_image = gr.Image(source='upload',tool='color-sketch',interactive=True)
            Grayscale_button = gr.Button(value="Upload input image")
            text_out = gr.Textbox(value="Please upload input image first, then draw the strokes or input text prompts or give reference images as you wish.")
            prompt = gr.Textbox(label="Prompt")
            change_according_to_strokes = gr.Checkbox(label='Change according to strokes\' color', value=True)
            iterative_editing = gr.Checkbox(label='Only change the strokes\' area', value=False)
            using_deformable_vae = gr.Checkbox(label='Using deformable vae. (Less color overflow)', value=False)
            # with gr.Accordion("Input Reference", open=False):
            #     ref_image = gr.Image(source='upload', type="numpy")
            run_button = gr.Button(label="Upload prompts/strokes (optional) and Run",value="Upload prompts/strokes (optional) and Run")
            with gr.Accordion("Advanced options", open=False):
                num_samples = gr.Slider(label="Images", minimum=1, maximum=12, value=1, step=1)
                image_resolution = gr.Slider(label="Image Resolution", minimum=256, maximum=768, value=512, step=64)
                strength = gr.Slider(label="Control Strength", minimum=0.0, maximum=2.0, value=1.0, step=0.01)
                guess_mode = gr.Checkbox(label='Guess Mode', value=False)
                #detect_resolution = gr.Slider(label="Depth Resolution", minimum=128, maximum=1024, value=384, step=1)
                ddim_steps = gr.Slider(label="Steps", minimum=1, maximum=100, value=20, step=1)
                scale = gr.Slider(label="Guidance Scale", minimum=0.1, maximum=30.0, value=7.0, step=0.1)#value=9.0
                sag_scale = gr.Slider(label="SAG Scale", minimum=0.0, maximum=1.0, value=0.05, step=0.01)#0.08
                SAG_influence_step = gr.Slider(label="1000-SAG influence step", minimum=0, maximum=900, value=600, step=50)
                seed = gr.Slider(label="Seed", minimum=-1, maximum=2147483647, step=1, randomize=True)#94433242802
                eta = gr.Number(label="eta (DDIM)", value=0.0)
                a_prompt = gr.Textbox(label="Added Prompt", value='best quality, detailed, real')#extremely detailed
                n_prompt = gr.Textbox(label="Negative Prompt",
                                      value='a black and white photo, longbody, lowres, bad anatomy, bad hands, missing fingers, extra digit, fewer digits, cropped, worst quality, low quality')
        with gr.Column():
            result_gallery = gr.Gallery(label='Output', show_label=False, elem_id="gallery").style(grid=2, height='auto')
            # grayscale_img = gr.Image(interactive=False,visible=False)
           
    Grayscale_button.click(fn=get_grayscale_img,inputs=input_image,outputs=[grayscale_img,text_out])
    ips = [using_deformable_vae,change_according_to_strokes,iterative_editing,grayscale_img,input_image, prompt, a_prompt, n_prompt, num_samples, image_resolution, ddim_steps, guess_mode, strength, scale,sag_scale,SAG_influence_step, seed, eta]
    run_button.click(fn=process, inputs=ips, outputs=[result_gallery])


block.launch(server_name='0.0.0.0',share=True)