File size: 3,957 Bytes
7042c3c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import gradio as gr

import asyncio
import threading

def assign_event_loop_to_thread():
    """
    Explicitly assign a new event loop to the current thread
    This method can be called at the start of thread-based operations
    """
    try:
        # Try to get the current event loop
        loop = asyncio.get_event_loop()
    except RuntimeError:
        # If no event loop exists, create a new one
        loop = asyncio.new_event_loop()
        
        # Set the new event loop for the current thread
        asyncio.set_event_loop(loop)
  
    return loop

class AsyncInterruptHandler:
    def __init__(self):
        # Assign event loop at initialization
        self.loop = assign_event_loop_to_thread()
        self.input_queue = asyncio.Queue()
        self.event = asyncio.Event()

        self.user_feedback = None
        self.interface = None
        self.interface_thread = None

        # Get or create the event loop
        try:
            self.loop = asyncio.get_event_loop()
        except RuntimeError:
            self.loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self.loop)

    async def close_interface_with_timeout(self):
    # Get the current thread's event loop
        try:
            print(f"NOT Closing interface: {self.interface.is_callable()}")
            
        except asyncio.TimeoutError:
            print("Interface closure timed out")
        except Exception as e:
            print(f"Error closing interface: {e}")

    def launch_gradio_interface(self):
        def run_interface():
            # Explicitly assign event loop for this thread
            assign_event_loop_to_thread()
            
            # Get the current thread's event loop
            current_loop = asyncio.get_event_loop()
            
            try:
                # Run the interface creation coroutine
                current_loop.run_until_complete(self._create_and_launch_interface())
            except Exception as e:
                print(f"Error in run_interface: {e}")
        
        # Create and start the thread
        self.interface_thread = threading.Thread(target=run_interface, daemon=True)
        self.interface_thread.start()

    async def _create_and_launch_interface(self):
        title = 'User Input'
        description = 'Please provide input'

        async def submit_input(user_feedback):
            asyncio.run_coroutine_threadsafe(self.input_queue.put(user_feedback), self.loop)
            self.event.set()
            
            print(f"User Provided input: {user_feedback}")
            
            return user_feedback

        with gr.Blocks() as demo:
            gr.Markdown(f"###{title}")
            gr.Markdown(f"**{description}")

            input_component = gr.Textbox(label="Your input")
            submit_btn = gr.Button("Submit")
            output = gr.Textbox(label="Status")

            submit_btn.click(
                submit_input, 
                inputs=input_component, 
                outputs=output
            )

        self.event = asyncio.Event()
        self.event.clear()
        self.user_feedback = None
        
        self.interface = demo
        self.interface.launch(inbrowser=True)

    async def handle_interrupt(self):
        self.launch_gradio_interface()

        try:
            # Use the current loop's queue and event
            self.user_feedback = await asyncio.wait_for(
                self.input_queue.get(), 
                timeout=300.0,  # 5-minute timeout
            )

            print(f"Finished waiting for user input {self.user_feedback}")

            return self.user_feedback
        
        except asyncio.TimeoutError:
            print("Gradio interface timed out")
            return None
        
        except Exception as e:
            print(f"Error in handle_interrupt: {e}")
            return None
        
        finally:
            await self.close_interface_with_timeout()