File size: 3,957 Bytes
7042c3c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
import gradio as gr
import asyncio
import threading
def assign_event_loop_to_thread():
"""
Explicitly assign a new event loop to the current thread
This method can be called at the start of thread-based operations
"""
try:
# Try to get the current event loop
loop = asyncio.get_event_loop()
except RuntimeError:
# If no event loop exists, create a new one
loop = asyncio.new_event_loop()
# Set the new event loop for the current thread
asyncio.set_event_loop(loop)
return loop
class AsyncInterruptHandler:
def __init__(self):
# Assign event loop at initialization
self.loop = assign_event_loop_to_thread()
self.input_queue = asyncio.Queue()
self.event = asyncio.Event()
self.user_feedback = None
self.interface = None
self.interface_thread = None
# Get or create the event loop
try:
self.loop = asyncio.get_event_loop()
except RuntimeError:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
async def close_interface_with_timeout(self):
# Get the current thread's event loop
try:
print(f"NOT Closing interface: {self.interface.is_callable()}")
except asyncio.TimeoutError:
print("Interface closure timed out")
except Exception as e:
print(f"Error closing interface: {e}")
def launch_gradio_interface(self):
def run_interface():
# Explicitly assign event loop for this thread
assign_event_loop_to_thread()
# Get the current thread's event loop
current_loop = asyncio.get_event_loop()
try:
# Run the interface creation coroutine
current_loop.run_until_complete(self._create_and_launch_interface())
except Exception as e:
print(f"Error in run_interface: {e}")
# Create and start the thread
self.interface_thread = threading.Thread(target=run_interface, daemon=True)
self.interface_thread.start()
async def _create_and_launch_interface(self):
title = 'User Input'
description = 'Please provide input'
async def submit_input(user_feedback):
asyncio.run_coroutine_threadsafe(self.input_queue.put(user_feedback), self.loop)
self.event.set()
print(f"User Provided input: {user_feedback}")
return user_feedback
with gr.Blocks() as demo:
gr.Markdown(f"###{title}")
gr.Markdown(f"**{description}")
input_component = gr.Textbox(label="Your input")
submit_btn = gr.Button("Submit")
output = gr.Textbox(label="Status")
submit_btn.click(
submit_input,
inputs=input_component,
outputs=output
)
self.event = asyncio.Event()
self.event.clear()
self.user_feedback = None
self.interface = demo
self.interface.launch(inbrowser=True)
async def handle_interrupt(self):
self.launch_gradio_interface()
try:
# Use the current loop's queue and event
self.user_feedback = await asyncio.wait_for(
self.input_queue.get(),
timeout=300.0, # 5-minute timeout
)
print(f"Finished waiting for user input {self.user_feedback}")
return self.user_feedback
except asyncio.TimeoutError:
print("Gradio interface timed out")
return None
except Exception as e:
print(f"Error in handle_interrupt: {e}")
return None
finally:
await self.close_interface_with_timeout()
|