File size: 6,464 Bytes
e33dac9
 
929083d
d9486d1
 
 
 
 
 
 
 
86c6777
929083d
 
d9486d1
 
 
86c6777
 
 
 
 
 
 
 
 
d9486d1
a12cf12
d9486d1
 
 
 
 
 
 
 
 
 
 
a12cf12
d9486d1
 
 
c93e2aa
d9486d1
 
 
 
 
 
a12cf12
d9486d1
a12cf12
 
 
 
 
d9486d1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a12cf12
 
 
d9486d1
 
a12cf12
d9486d1
 
 
 
a12cf12
d9486d1
 
 
a12cf12
d9486d1
 
 
 
 
 
 
 
 
 
a12cf12
 
d9486d1
 
a12cf12
d9486d1
 
a12cf12
d9486d1
a12cf12
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d9486d1
a12cf12
 
d9486d1
 
a12cf12
 
 
 
 
 
 
d9486d1
a12cf12
 
d9486d1
a12cf12
d9486d1
a12cf12
 
 
d9486d1
 
 
 
 
a12cf12
d9486d1
 
a12cf12
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# Theorem Explain Agent - Video Generation App
# Updated: 2025-06-12 - Fixed Gradio interface
import gradio as gr
import uuid
import subprocess
import threading
import os
import time
from fastapi import FastAPI
from fastapi.responses import FileResponse
import asyncio
import sys


# A simple in-memory dictionary to track task status.
# For a production system, you'd use a database or Redis.
tasks = {}
sys.path.insert(0, os.getcwd())
# --- Download Kokoro models if they don't exist ---
model_dir = "models"
if not os.path.exists(os.path.join(model_dir, "kokoro-v0_19.onnx")):
    print("Downloading Kokoro TTS models...")
    os.makedirs(model_dir, exist_ok=True)
    os.system(f"wget -P {model_dir} https://github.com/thewh1teagle/kokoro-onnx/releases/download/model-files/kokoro-v0_19.onnx")
    os.system(f"wget -P {model_dir} https://github.com/thewh1teagle/kokoro-onnx/releases/download/model-files/voices.bin")
    print("Model download complete.")

def run_video_generation(task_id: str, topic: str, context: str, model: str):
    """
    This function runs the main generation script in a separate process.
    """
    tasks[task_id]['status'] = 'running'
    
    # Sanitize topic to create a valid directory name
    file_prefix = "".join(c if c.isalnum() else "_" for c in topic.lower())
    output_dir = os.path.join("output", file_prefix)
    
    command = [
        "python", "generate_video.py",
        "--model", model,
        "--topic", topic,
        "--context", context,
        "--output_dir", "output",
        # "--use_langfuse"
    ]

    try:
        # Using subprocess to run the existing script
        process = subprocess.run(command, check=True, capture_output=True, text=True)
        
        # Look for the output video in the directory
        video_path = None
        if os.path.exists(output_dir):
            for file in os.listdir(output_dir):
                if file.endswith("_combined.mp4"):
                    video_path = os.path.join(output_dir, file)
                    break

        if video_path and os.path.exists(video_path):
            tasks[task_id]['status'] = 'completed'
            tasks[task_id]['video_path'] = video_path
        else:
            tasks[task_id]['status'] = 'failed'
            tasks[task_id]['error'] = "Video file not found after generation."
            
    except subprocess.CalledProcessError as e:
        tasks[task_id]['status'] = 'failed'
        tasks[task_id]['error'] = str(e)
    except Exception as e:
        tasks[task_id]['status'] = 'failed'
        tasks[task_id]['error'] = str(e)

def start_generation(topic: str, context: str, model: str):
    if not all([topic, context, model]):
        return "Topic, Context, and Model cannot be empty.", ""

    task_id = str(uuid.uuid4())
    tasks[task_id] = {'status': 'queued', 'model': model}

    # Use a background thread to run the time-consuming task
    thread = threading.Thread(
        target=run_video_generation,
        args=(task_id, topic, context, model)
    )
    thread.start()
    
    return f"Task started with model {model}. Your Task ID is: {task_id}", task_id

def check_status(task_id: str):
    if not task_id:
        return "Please provide a Task ID.", None
        
    task = tasks.get(task_id)
    if not task:
        return "Task not found.", None
    
    status = task.get('status')
    model = task.get('model', 'Unknown')
    
    if status == 'completed':
        video_path = task.get('video_path')
        return f"Status: {status} (Model: {model})", video_path
    elif status == 'failed':
        error = task.get('error', 'Unknown error')
        return f"Status: {status} (Model: {model})\nError: {error}", None
    
    return f"Status: {status} (Model: {model})", None

# Create the Gradio interface
with gr.Blocks(title="Theorem Explain Agent") as demo:
    gr.Markdown("# πŸŽ“ Theorem-Explain-Agent Video Generation")
    gr.Markdown("Generate educational videos explaining mathematical theorems and concepts.")

    with gr.Tab("πŸš€ Start Generation"):
        gr.Markdown("### Enter the details for your video:")
        model_input = gr.Textbox(
            label="Model", 
            placeholder="e.g., gemini/gemini-1.5-flash, openai/gpt-4o",
            value="gemini/gemini-1.5-flash"
        )
        topic_input = gr.Textbox(
            label="Topic", 
            placeholder="e.g., The Pythagorean Theorem"
        )
        context_input = gr.Textbox(
            label="Context", 
            placeholder="A short explanation of the theorem.",
            lines=3
        )
        start_button = gr.Button("🎬 Generate Video", variant="primary")
        
        with gr.Row():
            status_output = gr.Textbox(label="Status", interactive=False)
            task_id_output = gr.Textbox(label="Task ID", interactive=False)
    
    with gr.Tab("πŸ“Š Check Status"):
        gr.Markdown("### Check the status of your video generation:")
        task_id_input = gr.Textbox(
            label="Task ID", 
            placeholder="Enter the Task ID you received"
        )
        check_button = gr.Button("πŸ” Check Status", variant="secondary")
        
        status_display = gr.Textbox(label="Status", interactive=False)
        video_output = gr.Video(label="Generated Video")

    # Connect the functions to the interface
    start_button.click(
        fn=start_generation,
        inputs=[topic_input, context_input, model_input],
        outputs=[status_output, task_id_output]
    )
    
    check_button.click(
        fn=check_status,
        inputs=[task_id_input],
        outputs=[status_display, video_output]
    )

    gr.Markdown("""
    ### πŸ“‹ How to Use:
    1. **Start Generation**: Enter a Model, Topic, and Context, then click 'Generate Video'
    2. **Copy the Task ID** that appears
    3. **Check Status**: Go to the 'Check Status' tab, paste your Task ID, and click 'Check Status'
    4. **Wait**: Video generation can take several minutes. Check periodically until complete
    5. **Download**: When complete, the video will appear and can be downloaded
    
    ### πŸ€– Supported Models:
    - `gemini/gemini-1.5-flash` (recommended)
    - `gemini/gemini-1.5-pro`
    - `openai/gpt-4o`
    - `openai/o3-mini`
    - `anthropic/claude-3-opus-20240229`
    """)

# Launch the app
if __name__ == "__main__":
    demo.launch(
        server_name="0.0.0.0",
        server_port=7860,
        show_error=True
    )