import os
import shutil
import tempfile
from pathlib import Path
from unittest.mock import patch

import ffmpy
import numpy as np
import pytest
from gradio_client import media_data
from PIL import Image, ImageCms

from gradio import components, data_classes, processing_utils, utils
from gradio.route_utils import API_PREFIX


class TestTempFileManagement:
    def test_hash_file(self):
        h1 = processing_utils.hash_file("gradio/test_data/cheetah1.jpg")
        h2 = processing_utils.hash_file("gradio/test_data/cheetah1-copy.jpg")
        h3 = processing_utils.hash_file("gradio/test_data/cheetah2.jpg")
        assert h1 == h2
        assert h1 != h3

    def test_make_temp_copy_if_needed(self, gradio_temp_dir):
        f = processing_utils.save_file_to_cache(
            "gradio/test_data/cheetah1.jpg", cache_dir=gradio_temp_dir
        )
        try:  # Delete if already exists from before this test
            os.remove(f)
        except OSError:
            pass

        f = processing_utils.save_file_to_cache(
            "gradio/test_data/cheetah1.jpg", cache_dir=gradio_temp_dir
        )
        assert len([f for f in gradio_temp_dir.glob("**/*") if f.is_file()]) == 1

        assert Path(f).name == "cheetah1.jpg"

        f = processing_utils.save_file_to_cache(
            "gradio/test_data/cheetah1.jpg", cache_dir=gradio_temp_dir
        )
        assert len([f for f in gradio_temp_dir.glob("**/*") if f.is_file()]) == 1

        f = processing_utils.save_file_to_cache(
            "gradio/test_data/cheetah1-copy.jpg", cache_dir=gradio_temp_dir
        )
        assert len([f for f in gradio_temp_dir.glob("**/*") if f.is_file()]) == 2
        assert Path(f).name == "cheetah1-copy.jpg"

    def test_save_b64_to_cache(self, gradio_temp_dir):
        base64_file_1 = media_data.BASE64_IMAGE
        base64_file_2 = media_data.BASE64_AUDIO["data"]

        f = processing_utils.save_base64_to_cache(
            base64_file_1, cache_dir=gradio_temp_dir
        )
        try:  # Delete if already exists from before this test
            os.remove(f)
        except OSError:
            pass

        f = processing_utils.save_base64_to_cache(
            base64_file_1, cache_dir=gradio_temp_dir
        )
        assert len([f for f in gradio_temp_dir.glob("**/*") if f.is_file()]) == 1

        f = processing_utils.save_base64_to_cache(
            base64_file_1, cache_dir=gradio_temp_dir
        )
        assert len([f for f in gradio_temp_dir.glob("**/*") if f.is_file()]) == 1

        f = processing_utils.save_base64_to_cache(
            base64_file_2, cache_dir=gradio_temp_dir
        )
        assert len([f for f in gradio_temp_dir.glob("**/*") if f.is_file()]) == 2

    @pytest.mark.flaky
    def test_ssrf_protected_download(self, gradio_temp_dir):
        url1 = "https://raw.githubusercontent.com/gradio-app/gradio/main/gradio/test_data/test_image.png"
        url2 = "https://raw.githubusercontent.com/gradio-app/gradio/main/gradio/test_data/cheetah1.jpg"

        f = processing_utils.save_url_to_cache(url1, cache_dir=gradio_temp_dir)
        try:  # Delete if already exists from before this test
            os.remove(f)
        except OSError:
            pass

        f = processing_utils.save_url_to_cache(url1, cache_dir=gradio_temp_dir)
        assert len([f for f in gradio_temp_dir.glob("**/*") if f.is_file()]) == 1

        f = processing_utils.save_url_to_cache(url1, cache_dir=gradio_temp_dir)
        assert len([f for f in gradio_temp_dir.glob("**/*") if f.is_file()]) == 1

        f = processing_utils.save_url_to_cache(url2, cache_dir=gradio_temp_dir)
        assert len([f for f in gradio_temp_dir.glob("**/*") if f.is_file()]) == 2

    @pytest.mark.flaky
    def test_ssrf_protected_download_with_redirect(self, gradio_temp_dir):
        url = "https://huggingface.co/datasets/Xenova/transformers.js-docs/resolve/main/bread_small.png"
        processing_utils.save_url_to_cache(url, cache_dir=gradio_temp_dir)
        assert len([f for f in gradio_temp_dir.glob("**/*") if f.is_file()]) == 1


class TestImagePreprocessing:
    def test_encode_plot_to_base64(self):
        with utils.MatplotlibBackendMananger():
            import matplotlib.pyplot as plt

            plt.plot([1, 2, 3, 4])
            output_base64 = processing_utils.encode_plot_to_base64(plt)
        assert output_base64.startswith(
            "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAo"
        )

    def test_save_pil_to_file_keeps_pnginfo(self, gradio_temp_dir):
        input_img = Image.open("gradio/test_data/test_image.png")
        input_img = input_img.convert("RGB")
        input_img.info = {"key1": "value1", "key2": "value2"}
        input_img.save(gradio_temp_dir / "test_test_image.png")

        file_obj = processing_utils.save_pil_to_cache(
            input_img, cache_dir=gradio_temp_dir, format="png"
        )
        output_img = Image.open(file_obj)

        assert output_img.info == input_img.info

    def test_save_pil_to_file_keeps_all_gif_frames(self, gradio_temp_dir):
        input_img = Image.open("gradio/test_data/rectangles.gif")
        file_obj = processing_utils.save_pil_to_cache(
            input_img, cache_dir=gradio_temp_dir, format="gif"
        )
        output_img = Image.open(file_obj)
        assert output_img.n_frames == input_img.n_frames == 3  # type: ignore

    def test_np_pil_encode_to_the_same(self, gradio_temp_dir):
        arr = np.random.randint(0, 255, size=(100, 100, 3), dtype=np.uint8)
        pil = Image.fromarray(arr)
        assert processing_utils.save_pil_to_cache(
            pil, cache_dir=gradio_temp_dir
        ) == processing_utils.save_img_array_to_cache(arr, cache_dir=gradio_temp_dir)

    def test_encode_pil_to_temp_file_metadata_color_profile(self, gradio_temp_dir):
        # Read image
        img = Image.open("gradio/test_data/test_image.png")
        img_metadata = Image.open("gradio/test_data/test_image.png")
        img_metadata.info = {"key1": "value1", "key2": "value2"}

        # Creating sRGB profile
        profile = ImageCms.createProfile("sRGB")
        profile2 = ImageCms.ImageCmsProfile(profile)
        img.save(
            gradio_temp_dir / "img_color_profile.png", icc_profile=profile2.tobytes()
        )
        img_cp1 = Image.open(str(gradio_temp_dir / "img_color_profile.png"))

        # Creating XYZ profile
        profile = ImageCms.createProfile("XYZ")
        profile2 = ImageCms.ImageCmsProfile(profile)
        img.save(
            gradio_temp_dir / "img_color_profile_2.png", icc_profile=profile2.tobytes()
        )
        img_cp2 = Image.open(str(gradio_temp_dir / "img_color_profile_2.png"))

        img_path = processing_utils.save_pil_to_cache(
            img, cache_dir=gradio_temp_dir, format="png"
        )
        img_metadata_path = processing_utils.save_pil_to_cache(
            img_metadata, cache_dir=gradio_temp_dir, format="png"
        )
        img_cp1_path = processing_utils.save_pil_to_cache(
            img_cp1, cache_dir=gradio_temp_dir, format="png"
        )
        img_cp2_path = processing_utils.save_pil_to_cache(
            img_cp2, cache_dir=gradio_temp_dir, format="png"
        )

        assert len({img_path, img_metadata_path, img_cp1_path, img_cp2_path}) == 4

    def test_resize_and_crop(self):
        img = Image.open("gradio/test_data/test_image.png")
        new_img = processing_utils.resize_and_crop(img, (20, 20))
        assert new_img.size == (20, 20)
        with pytest.raises(ValueError):
            processing_utils.resize_and_crop(
                **{"img": img, "size": (20, 20), "crop_type": "test"}
            )


class TestAudioPreprocessing:
    def test_audio_from_file(self):
        audio = processing_utils.audio_from_file("gradio/test_data/test_audio.wav")
        assert audio[0] == 22050
        assert isinstance(audio[1], np.ndarray)

    def test_audio_to_file(self):
        audio = processing_utils.audio_from_file("gradio/test_data/test_audio.wav")
        processing_utils.audio_to_file(audio[0], audio[1], "test_audio_to_file")
        assert os.path.exists("test_audio_to_file")
        os.remove("test_audio_to_file")

    def test_convert_to_16_bit_wav(self):
        # Generate a random audio sample and set the amplitude
        audio = np.random.randint(-100, 100, size=(100), dtype="int16")
        audio[0] = -32767
        audio[1] = 32766

        audio_ = audio.astype("float64")
        audio_ = processing_utils.convert_to_16_bit_wav(audio_)
        assert np.allclose(audio, audio_)
        assert audio_.dtype == "int16"

        audio_ = audio.astype("float32")
        audio_ = processing_utils.convert_to_16_bit_wav(audio_)
        assert np.allclose(audio, audio_)
        assert audio_.dtype == "int16"

        audio_ = processing_utils.convert_to_16_bit_wav(audio)
        assert np.allclose(audio, audio_)
        assert audio_.dtype == "int16"


class TestOutputPreprocessing:
    float_dtype_list = [
        float,
        float,
        np.double,
        np.single,
        np.float32,
        np.float64,
        "float32",
        "float64",
    ]

    def test_float_conversion_dtype(self):
        """Test any conversion from a float dtype to an other."""

        x = np.array([-1, 1])
        # Test all combinations of dtypes conversions
        dtype_combin = np.array(
            np.meshgrid(
                TestOutputPreprocessing.float_dtype_list,
                TestOutputPreprocessing.float_dtype_list,
            )
        ).T.reshape(-1, 2)

        for dtype_in, dtype_out in dtype_combin:
            x = x.astype(dtype_in)
            y = processing_utils._convert(x, dtype_out)
            assert y.dtype == np.dtype(dtype_out)

    def test_subclass_conversion(self):
        """Check subclass conversion behavior"""
        x = np.array([-1, 1])
        for dtype in TestOutputPreprocessing.float_dtype_list:
            x = x.astype(dtype)
            y = processing_utils._convert(x, np.floating)
            assert y.dtype == x.dtype


class TestVideoProcessing:
    def test_video_has_playable_codecs(self, test_file_dir):
        assert processing_utils.video_is_playable(
            str(test_file_dir / "video_sample.mp4")
        )
        assert processing_utils.video_is_playable(
            str(test_file_dir / "video_sample.ogg")
        )
        assert processing_utils.video_is_playable(
            str(test_file_dir / "video_sample.webm")
        )
        assert not processing_utils.video_is_playable(
            str(test_file_dir / "bad_video_sample.mp4")
        )

    def raise_ffmpy_runtime_exception(*args, **kwargs):
        raise ffmpy.FFRuntimeError("", "", "", "")  # type: ignore

    @pytest.mark.parametrize(
        "exception_to_raise", [raise_ffmpy_runtime_exception, KeyError(), IndexError()]
    )
    def test_video_has_playable_codecs_catches_exceptions(
        self, exception_to_raise, test_file_dir
    ):
        with (
            patch("ffmpy.FFprobe.run", side_effect=exception_to_raise),
            tempfile.NamedTemporaryFile(
                suffix="out.avi", delete=False
            ) as tmp_not_playable_vid,
        ):
            shutil.copy(
                str(test_file_dir / "bad_video_sample.mp4"),
                tmp_not_playable_vid.name,
            )
            assert processing_utils.video_is_playable(tmp_not_playable_vid.name)

    def test_convert_video_to_playable_mp4(self, test_file_dir):
        with tempfile.NamedTemporaryFile(
            suffix="out.avi", delete=False
        ) as tmp_not_playable_vid:
            shutil.copy(
                str(test_file_dir / "bad_video_sample.mp4"), tmp_not_playable_vid.name
            )
            with patch("os.remove", wraps=os.remove) as mock_remove:
                playable_vid = processing_utils.convert_video_to_playable_mp4(
                    tmp_not_playable_vid.name
                )
            # check tempfile got deleted
            assert not Path(mock_remove.call_args[0][0]).exists()
            assert processing_utils.video_is_playable(playable_vid)

    @patch("ffmpy.FFmpeg.run", side_effect=raise_ffmpy_runtime_exception)
    def test_video_conversion_returns_original_video_if_fails(
        self, mock_run, test_file_dir
    ):
        with tempfile.NamedTemporaryFile(
            suffix="out.avi", delete=False
        ) as tmp_not_playable_vid:
            shutil.copy(
                str(test_file_dir / "bad_video_sample.mp4"), tmp_not_playable_vid.name
            )
            playable_vid = processing_utils.convert_video_to_playable_mp4(
                tmp_not_playable_vid.name
            )
            # If the conversion succeeded it'd be .mp4
            assert Path(playable_vid).suffix == ".avi"


def test_add_root_url():
    data = {
        "file": {
            "path": "path",
            "url": f"{API_PREFIX}/file=path",
            "meta": {"_type": "gradio.FileData"},
        },
        "file2": {
            "path": "path2",
            "url": "https://www.gradio.app",
            "meta": {"_type": "gradio.FileData"},
        },
    }
    root_url = "http://localhost:7860"
    expected = {
        "file": {
            "path": "path",
            "url": f"{root_url}{API_PREFIX}/file=path",
            "meta": {"_type": "gradio.FileData"},
        },
        "file2": {
            "path": "path2",
            "url": "https://www.gradio.app",
            "meta": {"_type": "gradio.FileData"},
        },
    }
    assert processing_utils.add_root_url(data, root_url, None) == expected
    new_root_url = "https://1234.gradio.live"
    new_expected = {
        "file": {
            "path": "path",
            "url": f"{new_root_url}{API_PREFIX}/file=path",
            "meta": {"_type": "gradio.FileData"},
        },
        "file2": {
            "path": "path2",
            "url": "https://www.gradio.app",
            "meta": {"_type": "gradio.FileData"},
        },
    }
    assert (
        processing_utils.add_root_url(expected, new_root_url, root_url) == new_expected
    )


def test_hash_url_encodes_url():
    assert processing_utils.hash_url(
        "https://www.gradio.app/image 1.jpg"
    ) == processing_utils.hash_bytes(b"https://www.gradio.app/image 1.jpg")


@pytest.mark.asyncio
async def test_json_data_not_moved_to_cache():
    data = data_classes.JsonData(
        root={
            "file": {
                "path": "path",
                "url": f"{API_PREFIX}/file=path",
                "meta": {"_type": "gradio.FileData"},
            }
        }
    )
    assert (
        processing_utils.move_files_to_cache(data, components.Number(), False) == data
    )
    assert processing_utils.move_files_to_cache(data, components.Number(), True) == data
    assert (
        await processing_utils.async_move_files_to_cache(
            data, components.Number(), False
        )
        == data
    )
    assert (
        await processing_utils.async_move_files_to_cache(
            data, components.Number(), True
        )
        == data
    )


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "url",
    [
        "https://localhost",
        "http://127.0.0.1/file/a/b/c",
        "http://[::1]",
        "https://192.168.0.1",
        "http://10.0.0.1?q=a",
        "http://192.168.1.250.nip.io",
    ],
)
async def test_local_urls_fail(url):
    with pytest.raises(ValueError, match="failed validation"):
        await processing_utils.async_validate_url(url)


@pytest.mark.asyncio
@pytest.mark.parametrize(
    "url",
    [
        "https://google.com",
        "https://8.8.8.8/",
        "http://93.184.215.14.nip.io/",
        "https://huggingface.co/datasets/dylanebert/3dgs/resolve/main/luigi/luigi.ply",
    ],
)
async def test_public_urls_pass(url):
    await processing_utils.async_validate_url(url)


def test_public_request_pass():
    tempdir = tempfile.TemporaryDirectory()
    file = processing_utils.ssrf_protected_download(
        "https://en.wikipedia.org/static/images/icons/wikipedia.png", tempdir.name
    )
    assert os.path.exists(file)
    assert os.path.getsize(file) == 13444


@pytest.mark.asyncio
async def test_async_public_request_pass():
    tempdir = tempfile.TemporaryDirectory()
    file = await processing_utils.async_ssrf_protected_download(
        "https://en.wikipedia.org/static/images/icons/wikipedia.png", tempdir.name
    )
    assert os.path.exists(file)
    assert os.path.getsize(file) == 13444


def test_private_request_fail():
    with pytest.raises(ValueError, match="failed validation"):
        tempdir = tempfile.TemporaryDirectory()
        processing_utils.ssrf_protected_download(
            "http://192.168.1.250.nip.io/image.png", tempdir.name
        )


@pytest.mark.asyncio
async def test_async_private_request_fail():
    with pytest.raises(ValueError, match="failed validation"):
        tempdir = tempfile.TemporaryDirectory()
        await processing_utils.async_ssrf_protected_download(
            "http://192.168.1.250.nip.io/image.png", tempdir.name
        )