|
import gradio as gr |
|
|
|
import asyncio |
|
import threading |
|
|
|
def assign_event_loop_to_thread(): |
|
""" |
|
Explicitly assign a new event loop to the current thread |
|
This method can be called at the start of thread-based operations |
|
""" |
|
try: |
|
|
|
loop = asyncio.get_event_loop() |
|
except RuntimeError: |
|
|
|
loop = asyncio.new_event_loop() |
|
|
|
|
|
asyncio.set_event_loop(loop) |
|
|
|
return loop |
|
|
|
class AsyncInterruptHandler: |
|
def __init__(self): |
|
|
|
self.loop = assign_event_loop_to_thread() |
|
self.input_queue = asyncio.Queue() |
|
self.event = asyncio.Event() |
|
|
|
self.user_feedback = None |
|
self.interface = None |
|
self.interface_thread = None |
|
|
|
|
|
try: |
|
self.loop = asyncio.get_event_loop() |
|
except RuntimeError: |
|
self.loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(self.loop) |
|
|
|
async def close_interface_with_timeout(self): |
|
|
|
try: |
|
print(f"NOT Closing interface: {self.interface.is_callable()}") |
|
|
|
except asyncio.TimeoutError: |
|
print("Interface closure timed out") |
|
except Exception as e: |
|
print(f"Error closing interface: {e}") |
|
|
|
def launch_gradio_interface(self): |
|
def run_interface(): |
|
|
|
assign_event_loop_to_thread() |
|
|
|
|
|
current_loop = asyncio.get_event_loop() |
|
|
|
try: |
|
|
|
current_loop.run_until_complete(self._create_and_launch_interface()) |
|
except Exception as e: |
|
print(f"Error in run_interface: {e}") |
|
|
|
|
|
self.interface_thread = threading.Thread(target=run_interface, daemon=True) |
|
self.interface_thread.start() |
|
|
|
async def _create_and_launch_interface(self): |
|
title = 'User Input' |
|
description = 'Please provide input' |
|
|
|
async def submit_input(user_feedback): |
|
asyncio.run_coroutine_threadsafe(self.input_queue.put(user_feedback), self.loop) |
|
self.event.set() |
|
|
|
print(f"User Provided input: {user_feedback}") |
|
|
|
return user_feedback |
|
|
|
with gr.Blocks() as demo: |
|
gr.Markdown(f"###{title}") |
|
gr.Markdown(f"**{description}") |
|
|
|
input_component = gr.Textbox(label="Your input") |
|
submit_btn = gr.Button("Submit") |
|
output = gr.Textbox(label="Status") |
|
|
|
submit_btn.click( |
|
submit_input, |
|
inputs=input_component, |
|
outputs=output |
|
) |
|
|
|
self.event = asyncio.Event() |
|
self.event.clear() |
|
self.user_feedback = None |
|
|
|
self.interface = demo |
|
self.interface.launch(inbrowser=True) |
|
|
|
async def handle_interrupt(self): |
|
self.launch_gradio_interface() |
|
|
|
try: |
|
|
|
self.user_feedback = await asyncio.wait_for( |
|
self.input_queue.get(), |
|
timeout=300.0, |
|
) |
|
|
|
print(f"Finished waiting for user input {self.user_feedback}") |
|
|
|
return self.user_feedback |
|
|
|
except asyncio.TimeoutError: |
|
print("Gradio interface timed out") |
|
return None |
|
|
|
except Exception as e: |
|
print(f"Error in handle_interrupt: {e}") |
|
return None |
|
|
|
finally: |
|
await self.close_interface_with_timeout() |
|
|