import gradio as gr import asyncio import threading def assign_event_loop_to_thread(): """ Explicitly assign a new event loop to the current thread This method can be called at the start of thread-based operations """ try: # Try to get the current event loop loop = asyncio.get_event_loop() except RuntimeError: # If no event loop exists, create a new one loop = asyncio.new_event_loop() # Set the new event loop for the current thread asyncio.set_event_loop(loop) return loop class AsyncInterruptHandler: def __init__(self): # Assign event loop at initialization self.loop = assign_event_loop_to_thread() self.input_queue = asyncio.Queue() self.event = asyncio.Event() self.user_feedback = None self.interface = None self.interface_thread = None # Get or create the event loop try: self.loop = asyncio.get_event_loop() except RuntimeError: self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) async def close_interface_with_timeout(self): # Get the current thread's event loop try: print(f"NOT Closing interface: {self.interface.is_callable()}") except asyncio.TimeoutError: print("Interface closure timed out") except Exception as e: print(f"Error closing interface: {e}") def launch_gradio_interface(self): def run_interface(): # Explicitly assign event loop for this thread assign_event_loop_to_thread() # Get the current thread's event loop current_loop = asyncio.get_event_loop() try: # Run the interface creation coroutine current_loop.run_until_complete(self._create_and_launch_interface()) except Exception as e: print(f"Error in run_interface: {e}") # Create and start the thread self.interface_thread = threading.Thread(target=run_interface, daemon=True) self.interface_thread.start() async def _create_and_launch_interface(self): title = 'User Input' description = 'Please provide input' async def submit_input(user_feedback): asyncio.run_coroutine_threadsafe(self.input_queue.put(user_feedback), self.loop) self.event.set() print(f"User Provided input: {user_feedback}") return user_feedback with gr.Blocks() as demo: gr.Markdown(f"###{title}") gr.Markdown(f"**{description}") input_component = gr.Textbox(label="Your input") submit_btn = gr.Button("Submit") output = gr.Textbox(label="Status") submit_btn.click( submit_input, inputs=input_component, outputs=output ) self.event = asyncio.Event() self.event.clear() self.user_feedback = None self.interface = demo self.interface.launch(inbrowser=True) async def handle_interrupt(self): self.launch_gradio_interface() try: # Use the current loop's queue and event self.user_feedback = await asyncio.wait_for( self.input_queue.get(), timeout=300.0, # 5-minute timeout ) print(f"Finished waiting for user input {self.user_feedback}") return self.user_feedback except asyncio.TimeoutError: print("Gradio interface timed out") return None except Exception as e: print(f"Error in handle_interrupt: {e}") return None finally: await self.close_interface_with_timeout()