import json
import ray
import time
import asyncio
import os
from clip_transform import CLIPTransform
from response_state_manager import ResponseStateManager
from respond_to_prompt_async import RespondToPromptAsync
import asyncio
import subprocess

class CharlesApp:
    def __init__(self):
        self._needs_init = True
        self._charles_actor_debug_output = ""
        self._state = "Initializing"
        self._clip_transform = CLIPTransform()
        
    
    def set_state(self, state, skip_print=False):
        self._state = state
        if not skip_print:
            print(state)
        # check if self._app_interface_actor exists
        if hasattr(self, '_app_interface_actor'):
            self._app_interface_actor.set_state.remote(self._state)
    
    async def _initalize_resources(self):
        # Initialize resources
        self.set_state("001 - creating AppInterfaceActor")
        from app_interface_actor import AppInterfaceActor
        self._app_interface_actor = AppInterfaceActor.get_singleton()
        self._audio_output_queue = await self._app_interface_actor.get_audio_output_queue.remote()

        self.set_state("002 - creating ResponseStateManager")
        self._response_state_manager = ResponseStateManager()

        self.set_state("003 - creating PromptManager")
        from prompt_manager import PromptManager
        self._prompt_manager = PromptManager()

        self.set_state("004 - creating RespondToPromptAsync")
        self._respond_to_prompt = None
        self._respond_to_prompt_task = None

        self.set_state("005 - create SpeechToTextVoskActor")
        from speech_to_text_vosk_actor import SpeechToTextVoskActor
        self._speech_to_text_actor = SpeechToTextVoskActor.remote("small")
        # self._speech_to_text_actor = SpeechToTextVoskActor.remote("big")

        self.set_state("006 - create Prototypes")
        from prototypes import Prototypes
        self._prototypes = Prototypes()

        self.set_state("007 - create animator")
        from charles_animator import CharlesAnimator
        self._animator = CharlesAnimator()

        self._needs_init = True
        self.set_state("010 - Initialized")
        
    async def start(self):
        if self._needs_init:
            await self._initalize_resources()
            
        debug_output_history = []

        async def render_debug_output(list_of_strings):
            table_content = "##### Chat history\n"
            for item in reversed(list_of_strings):
                # table_content += f"\n```markdown\n{item}\n```\n"
                table_content += f"\n{item}\n"
            self._charles_actor_debug_output = table_content
            await self._app_interface_actor.set_debug_output.remote(self._charles_actor_debug_output)

        async def add_debug_output(output):
            debug_output_history.append(output)
            if len(debug_output_history) > 10:
                debug_output_history.pop(0)
            await render_debug_output(debug_output_history)
        
        self.set_state("Waiting for input")
        total_video_frames = 0
        skipped_video_frames = 0
        total_audio_frames = 0
        loops = 0
        start_time = time.time()
        vector_debug = "--n/a--"
        
        process_speech_to_text_future = []
        human_preview_text = ""
        additional_prompt = None
        previous_prompt = ""
        is_talking = False
        has_spoken_for_this_prompt = False

        while True:
            response_step_obs, response_state = self._response_state_manager.begin_next_step()
            audio_frames = await self._app_interface_actor.dequeue_audio_input_frames_async.remote()    
            video_frames = await self._app_interface_actor.dequeue_video_input_frames_async.remote()

            if len(audio_frames) > 0:
                total_audio_frames += len(audio_frames)
                # Concatenate all audio frames into a single buffer
                audio_buffer = b"".join([buffer.tobytes() for buffer in audio_frames])
                future = self._speech_to_text_actor.process_speech.remote(audio_buffer)
                process_speech_to_text_future.append(future)
            # audio_frames_task = None

            if len(video_frames) > 0:
                vector_debug = f"found {len(video_frames)} video frames"
                total_video_frames += 1
                skipped_video_frames += (len(video_frames) -1)
                image_as_array = video_frames[-1]
                image_vector = self._clip_transform.image_to_embeddings(image_as_array)
                image_vector = image_vector[0]
                distances, closest_item_key, distance_debug_str = self._prototypes.get_distances(image_vector)
                vector_debug = f"{closest_item_key} {distance_debug_str}"

            if len(process_speech_to_text_future) > 0:
                ready, _ = ray.wait([process_speech_to_text_future[0]], timeout=0)
                if ready:
                    prompt, speaker_finished, raw_json = await process_speech_to_text_future[0]
                    del process_speech_to_text_future[0]

                    prompts_to_ignore = ["um", "uh", "ah", "huh", "hmm", "the", "but", "by", "just", "i'm"]

                    if speaker_finished and len(prompt) > 0 and prompt not in prompts_to_ignore:
                        print(f"Prompt: {prompt}")
                        response_preview_text = self._response_state_manager.pretty_print_current_responses()
                        if len(response_preview_text) > 0:
                            await add_debug_output(response_preview_text)
                        human_preview_text = ""
                        if additional_prompt is not None:
                            prompt = additional_prompt + ". " + prompt
                        await add_debug_output(f"👨 {prompt}")
                        self._prompt_manager.replace_or_append_user_message(prompt)
                        if self._respond_to_prompt_task is not None:
                            await self._respond_to_prompt.terminate()
                            self._respond_to_prompt_task.cancel()
                        self._respond_to_prompt = RespondToPromptAsync(self._response_state_manager, self._audio_output_queue)
                        self._respond_to_prompt_task = asyncio.create_task(self._respond_to_prompt.run(prompt, self._prompt_manager.messages))
                        additional_prompt = None
                        previous_prompt = prompt
                        is_talking = False
                        has_spoken_for_this_prompt = False
                        response_step_obs, response_state = self._response_state_manager.reset_episode()
                    elif len(prompt) > 0 and prompt not in prompts_to_ignore:
                        # sometimes we get a false signal of speaker_finsihed
                        # in which case we get new prompts before we have spoken
                        if len(previous_prompt) > 0 and not has_spoken_for_this_prompt:
                            additional_prompt = previous_prompt
                            has_spoken_for_this_prompt = True
                            if self._respond_to_prompt_task is not None:
                                await self._respond_to_prompt.terminate()
                                self._respond_to_prompt_task.cancel()
                            self._respond_to_prompt_task = None
                            self._respond_to_prompt = None
                            response_step_obs, response_state = self._response_state_manager.reset_episode()                        
                        if additional_prompt is not None:
                            prompt = additional_prompt + ". " + prompt                        
                        human_preview_text = f"👨❓ {prompt}"

            # i choose to add each line of responce one at a time as them come in
            for new_response in response_step_obs.llm_responses:
                self._prompt_manager.append_assistant_message(new_response)

            list_of_strings = debug_output_history.copy()
            robot_preview_text = self._response_state_manager.pretty_print_preview_text()
            response_preview_text = self._response_state_manager.pretty_print_current_responses()
            if len(robot_preview_text) > 0:
                response_preview_text += robot_preview_text+"  \n"
            list_of_strings.append(response_preview_text)
            if len(human_preview_text) > 0:
                list_of_strings.append(human_preview_text)
            if len(list_of_strings) > 10:
                list_of_strings.pop(0)
            await render_debug_output(list_of_strings)


            await asyncio.sleep(0.001)

            # add observations to the environment state
            count = len(self._audio_output_queue)
            is_talking = bool(count > 0)
            has_spoken_for_this_prompt = has_spoken_for_this_prompt or is_talking
            frame = self._animator.update(is_talking)
            frame_ref = ray.put(frame)
            await self._app_interface_actor.enqueue_video_output_frame.remote(frame_ref)                

            loops+=1
            self.set_state(
                f"Processed {total_video_frames} video frames \
                    and {total_audio_frames} audio frames, \
                    loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. \
                    Is speaking: {is_talking}({count}). \
                    {vector_debug}\
                    ", skip_print=True)

def init_ray():
    try:
        subprocess.check_output(["ray", "start", "--include-dashboard=True", "--head"])
    except Exception as e:
        print (f"charles_actor.py init_ray: {e}")
    # Connect to a running Ray cluster
    while not ray.is_initialized():
        time.sleep(0.1)
        ray_address = os.getenv('RAY_ADDRESS')
        if ray_address:
            ray.init(ray_address, namespace="project_charles")
        else:
            ray.init(namespace="project_charles")

async def main():
    if not ray.is_initialized():
        init_ray()

    charles_actor = CharlesApp()
    await charles_actor.start()
    

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())