avfranco's picture
ea4all-gradio-agents-mcp-hackathon-kickoff
7042c3c
import gradio as gr
import asyncio
import threading
def assign_event_loop_to_thread():
"""
Explicitly assign a new event loop to the current thread
This method can be called at the start of thread-based operations
"""
try:
# Try to get the current event loop
loop = asyncio.get_event_loop()
except RuntimeError:
# If no event loop exists, create a new one
loop = asyncio.new_event_loop()
# Set the new event loop for the current thread
asyncio.set_event_loop(loop)
return loop
class AsyncInterruptHandler:
def __init__(self):
# Assign event loop at initialization
self.loop = assign_event_loop_to_thread()
self.input_queue = asyncio.Queue()
self.event = asyncio.Event()
self.user_feedback = None
self.interface = None
self.interface_thread = None
# Get or create the event loop
try:
self.loop = asyncio.get_event_loop()
except RuntimeError:
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
async def close_interface_with_timeout(self):
# Get the current thread's event loop
try:
print(f"NOT Closing interface: {self.interface.is_callable()}")
except asyncio.TimeoutError:
print("Interface closure timed out")
except Exception as e:
print(f"Error closing interface: {e}")
def launch_gradio_interface(self):
def run_interface():
# Explicitly assign event loop for this thread
assign_event_loop_to_thread()
# Get the current thread's event loop
current_loop = asyncio.get_event_loop()
try:
# Run the interface creation coroutine
current_loop.run_until_complete(self._create_and_launch_interface())
except Exception as e:
print(f"Error in run_interface: {e}")
# Create and start the thread
self.interface_thread = threading.Thread(target=run_interface, daemon=True)
self.interface_thread.start()
async def _create_and_launch_interface(self):
title = 'User Input'
description = 'Please provide input'
async def submit_input(user_feedback):
asyncio.run_coroutine_threadsafe(self.input_queue.put(user_feedback), self.loop)
self.event.set()
print(f"User Provided input: {user_feedback}")
return user_feedback
with gr.Blocks() as demo:
gr.Markdown(f"###{title}")
gr.Markdown(f"**{description}")
input_component = gr.Textbox(label="Your input")
submit_btn = gr.Button("Submit")
output = gr.Textbox(label="Status")
submit_btn.click(
submit_input,
inputs=input_component,
outputs=output
)
self.event = asyncio.Event()
self.event.clear()
self.user_feedback = None
self.interface = demo
self.interface.launch(inbrowser=True)
async def handle_interrupt(self):
self.launch_gradio_interface()
try:
# Use the current loop's queue and event
self.user_feedback = await asyncio.wait_for(
self.input_queue.get(),
timeout=300.0, # 5-minute timeout
)
print(f"Finished waiting for user input {self.user_feedback}")
return self.user_feedback
except asyncio.TimeoutError:
print("Gradio interface timed out")
return None
except Exception as e:
print(f"Error in handle_interrupt: {e}")
return None
finally:
await self.close_interface_with_timeout()