File size: 21,792 Bytes
d9486d1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
import os
import re
import json
import glob
from typing import List, Optional
import uuid
import asyncio

from mllm_tools.utils import _prepare_text_inputs
from src.utils.utils import extract_xml
from task_generator import (
    get_prompt_scene_plan,
    get_prompt_scene_vision_storyboard,
    get_prompt_scene_technical_implementation,
    get_prompt_scene_animation_narration,
    get_prompt_context_learning_scene_plan,
    get_prompt_context_learning_vision_storyboard,
    get_prompt_context_learning_technical_implementation,
    get_prompt_context_learning_animation_narration,
    get_prompt_context_learning_code
)
from src.rag.rag_integration import RAGIntegration

class VideoPlanner:
    """A class for planning and generating video content.

    This class handles the planning and generation of video content including scene outlines,
    vision storyboards, technical implementations, and animation narrations.

    Args:
        planner_model: The model used for planning tasks
        helper_model: Optional helper model, defaults to planner_model if None
        output_dir (str): Directory for output files. Defaults to "output"
        print_response (bool): Whether to print model responses. Defaults to False
        use_context_learning (bool): Whether to use context learning. Defaults to False
        context_learning_path (str): Path to context learning examples. Defaults to "data/context_learning"
        use_rag (bool): Whether to use RAG. Defaults to False
        session_id (str): Session identifier. Defaults to None
        chroma_db_path (str): Path to ChromaDB. Defaults to "data/rag/chroma_db"
        manim_docs_path (str): Path to Manim docs. Defaults to "data/rag/manim_docs"
        embedding_model (str): Name of embedding model. Defaults to "text-embedding-ada-002"
        use_langfuse (bool): Whether to use Langfuse logging. Defaults to True
    """

    def __init__(self, planner_model, helper_model=None, output_dir="output", print_response=False, use_context_learning=False, context_learning_path="data/context_learning", use_rag=False, session_id=None, chroma_db_path="data/rag/chroma_db", manim_docs_path="data/rag/manim_docs", embedding_model="text-embedding-ada-002", use_langfuse=True):
        self.planner_model = planner_model
        self.helper_model = helper_model if helper_model is not None else planner_model
        self.output_dir = output_dir
        self.print_response = print_response
        self.use_context_learning = use_context_learning
        self.context_learning_path = context_learning_path
        # Initialize different types of context examples
        self.scene_plan_examples = self._load_context_examples('scene_plan') if use_context_learning else None
        self.vision_storyboard_examples = self._load_context_examples('scene_vision_storyboard') if use_context_learning else None
        self.technical_implementation_examples = self._load_context_examples('technical_implementation') if use_context_learning else None
        self.animation_narration_examples = self._load_context_examples('scene_animation_narration') if use_context_learning else None
        self.code_examples = self._load_context_examples('code') if use_context_learning else None
        self.use_rag = use_rag
        self.rag_integration = None
        if use_rag:
            self.rag_integration = RAGIntegration(
                helper_model=helper_model,
                output_dir=output_dir,
                chroma_db_path=chroma_db_path,
                manim_docs_path=manim_docs_path,
                embedding_model=embedding_model,
                use_langfuse=use_langfuse,
                session_id=session_id
            )
        self.relevant_plugins = []  # Initialize as an empty list

    def _load_context_examples(self, example_type: str) -> str:
        """Load context learning examples of a specific type from files.

        Args:
            example_type (str): Type of examples to load ('scene_plan', 'scene_vision_storyboard', etc.)

        Returns:
            str: Formatted string containing the loaded examples, or None if no examples found
        """
        examples = []
        
        # Define file patterns for different types
        file_patterns = {
            'scene_plan': '*_scene_plan.txt',
            'scene_vision_storyboard': '*_scene_vision_storyboard.txt',
            'technical_implementation': '*_technical_implementation.txt',
            'scene_animation_narration': '*_scene_animation_narration.txt',
            'code': '*.py'
        }
        
        pattern = file_patterns.get(example_type)
        if not pattern:
            return None

        # Search in subdirectories of context_learning_path
        for root, _, _ in os.walk(self.context_learning_path):
            for example_file in glob.glob(os.path.join(root, pattern)):
                with open(example_file, 'r') as f:
                    content = f.read()
                    if example_type == 'code':
                        examples.append(f"# Example from {os.path.basename(example_file)}\n{content}\n")
                    else:
                        examples.append(f"# Example from {os.path.basename(example_file)}\n{content}\n")

        # Format examples using appropriate template
        if examples:
            formatted_examples = self._format_examples(example_type, examples)
            return formatted_examples
        return None

    def _format_examples(self, example_type: str, examples: List[str]) -> str:
        """Format examples using the appropriate template based on their type.

        Args:
            example_type (str): Type of examples to format
            examples (List[str]): List of example strings to format

        Returns:
            str: Formatted examples string, or None if no template found
        """
        templates = {
            'scene_plan': get_prompt_context_learning_scene_plan,
            'scene_vision_storyboard': get_prompt_context_learning_vision_storyboard,
            'technical_implementation': get_prompt_context_learning_technical_implementation,
            'scene_animation_narration': get_prompt_context_learning_animation_narration,
            'code': get_prompt_context_learning_code
        }
        
        template = templates.get(example_type)
        if template:
            return template(examples="\n".join(examples))
        return None

    def generate_scene_outline(self,
                            topic: str,
                            description: str,
                            session_id: str) -> str:
        """Generate a scene outline based on the topic and description.

        Args:
            topic (str): The topic of the video
            description (str): Description of the video content
            session_id (str): Session identifier

        Returns:
            str: Generated scene outline
        """
        # Detect relevant plugins upfront if RAG is enabled
        if self.use_rag:
            self.relevant_plugins = self.rag_integration.detect_relevant_plugins(topic, description) or []
            self.rag_integration.set_relevant_plugins(self.relevant_plugins)
            print(f"Detected relevant plugins: {self.relevant_plugins}")

        prompt = get_prompt_scene_plan(topic, description)
        
        if self.use_context_learning and self.scene_plan_examples:
            prompt += f"\n\nHere are some example scene plans for reference:\n{self.scene_plan_examples}"

        # Generate plan using planner model
        response_text = self.planner_model(
            _prepare_text_inputs(prompt),
            metadata={"generation_name": "scene_outline", "tags": [topic, "scene-outline"], "session_id": session_id}
        )
        # extract scene outline <SCENE_OUTLINE> ... </SCENE_OUTLINE>
        scene_outline_match = re.search(r'(<SCENE_OUTLINE>.*?</SCENE_OUTLINE>)', response_text, re.DOTALL)
        scene_outline = scene_outline_match.group(1) if scene_outline_match else response_text

        # replace all spaces and special characters with underscores for file path compatibility
        file_prefix = topic.lower()
        file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix)
        # save plan to file
        os.makedirs(os.path.join(self.output_dir, file_prefix), exist_ok=True) # Ensure directory exists
        with open(os.path.join(self.output_dir, file_prefix, f"{file_prefix}_scene_outline.txt"), "w") as f:
            f.write(scene_outline)
        print(f"Plan saved to {file_prefix}_scene_outline.txt")

        return scene_outline

    async def _generate_scene_implementation_single(self, topic: str, description: str, scene_outline_i: str, i: int, file_prefix: str, session_id: str, scene_trace_id: str) -> str:
        """Generate implementation plan for a single scene.

        Args:
            topic (str): The topic of the video
            description (str): Description of the video content
            scene_outline_i (str): Outline for this specific scene
            i (int): Scene number
            file_prefix (str): Prefix for output files
            session_id (str): Session identifier
            scene_trace_id (str): Unique trace ID for this scene

        Returns:
            str: Generated implementation plan for the scene
        """
        # Initialize empty implementation plan
        implementation_plan = ""
        scene_dir = os.path.join(self.output_dir, file_prefix, f"scene{i}")
        subplan_dir = os.path.join(scene_dir, "subplans")
        os.makedirs(scene_dir, exist_ok=True)
        os.makedirs(subplan_dir, exist_ok=True)

        # Save scene_trace_id to file
        trace_id_file = os.path.join(subplan_dir, "scene_trace_id.txt")
        with open(trace_id_file, 'w') as f:
            f.write(scene_trace_id)
        print(f"Scene trace ID saved to {trace_id_file}")

        # ===== Step 1: Generate Scene Vision and Storyboard =====
        # ===================================================
        prompt_vision_storyboard = get_prompt_scene_vision_storyboard(i, topic, description, scene_outline_i, self.relevant_plugins)

        # Add vision storyboard examples only for this stage if available
        if self.use_context_learning and self.vision_storyboard_examples:
            prompt_vision_storyboard += f"\n\nHere are some example storyboards:\n{self.vision_storyboard_examples}"
        
        if self.rag_integration:
            # Use the already detected plugins instead of detecting again
            # relevant_plugins = self.relevant_plugins # Removed redundant variable
            # print(f"Using detected plugins: {relevant_plugins}") # Removed redundant print

            # Generate RAG queries
            rag_queries = self.rag_integration._generate_rag_queries_storyboard(
                scene_plan=scene_outline_i,
                scene_trace_id=scene_trace_id,
                topic=topic,
                scene_number=i,
                session_id=session_id,
                relevant_plugins=self.relevant_plugins # Use self.relevant_plugins directly
            )

            retrieved_docs = self.rag_integration.get_relevant_docs(
                rag_queries=rag_queries,
                scene_trace_id=scene_trace_id,
                topic=topic,
                scene_number=i
            )
            
            # Add documentation to prompt
            prompt_vision_storyboard += f"\n\n{retrieved_docs}"

        vision_storyboard_plan = self.planner_model(
            _prepare_text_inputs(prompt_vision_storyboard),
            metadata={"generation_name": "scene_vision_storyboard", "trace_id": scene_trace_id, "tags": [topic, f"scene{i}"], "session_id": session_id}
        )
        # extract vision storyboard plan <SCENE_VISION_STORYBOARD_PLAN> ... </SCENE_VISION_STORYBOARD_PLAN>
        vision_match = re.search(r'(<SCENE_VISION_STORYBOARD_PLAN>.*?</SCENE_VISION_STORYBOARD_PLAN>)', vision_storyboard_plan, re.DOTALL)
        vision_storyboard_plan = vision_match.group(1) if vision_match else vision_storyboard_plan
        implementation_plan += vision_storyboard_plan + "\n\n"
        file_path_vs = os.path.join(subplan_dir, f"{file_prefix}_scene{i}_vision_storyboard_plan.txt")
        with open(file_path_vs, "w") as f:
            f.write(vision_storyboard_plan)
        print(f"Scene {i} Vision and Storyboard Plan saved to {file_path_vs}")

        # ===== Step 2: Generate Technical Implementation Plan =====
        # =========================================================
        prompt_technical_implementation = get_prompt_scene_technical_implementation(i, topic, description, scene_outline_i, vision_storyboard_plan, self.relevant_plugins)

        # Add technical implementation examples only for this stage if available
        if self.use_context_learning and self.technical_implementation_examples:
            prompt_technical_implementation += f"\n\nHere are some example technical implementations:\n{self.technical_implementation_examples}"

        if self.rag_integration:
            # Use the already detected plugins instead of detecting again
            # relevant_plugins = self.relevant_plugins # Removed redundant variable
            # print(f"Using detected plugins: {relevant_plugins}") # Removed redundant print

            # Generate RAG queries
            rag_queries = self.rag_integration._generate_rag_queries_technical(
                storyboard=vision_storyboard_plan,
                scene_trace_id=scene_trace_id,
                topic=topic,
                scene_number=i,
                session_id=session_id,
                relevant_plugins=self.relevant_plugins # Use self.relevant_plugins directly
            )

            retrieved_docs = self.rag_integration.get_relevant_docs(
                rag_queries=rag_queries,
                scene_trace_id=scene_trace_id,
                topic=topic,
                scene_number=i
            )

            # Add documentation to prompt
            prompt_technical_implementation += f"\n\n{retrieved_docs}"

        technical_implementation_plan = self.planner_model(
            _prepare_text_inputs(prompt_technical_implementation),
            metadata={"generation_name": "scene_technical_implementation", "trace_id": scene_trace_id, "tags": [topic, f"scene{i}"], "session_id": session_id}
        )
        # extract technical implementation plan <SCENE_TECHNICAL_IMPLEMENTATION_PLAN> ... </SCENE_TECHNICAL_IMPLEMENTATION_PLAN>
        technical_match = re.search(r'(<SCENE_TECHNICAL_IMPLEMENTATION_PLAN>.*?</SCENE_TECHNICAL_IMPLEMENTATION_PLAN>)', technical_implementation_plan, re.DOTALL)
        technical_implementation_plan = technical_match.group(1) if technical_match else technical_implementation_plan
        implementation_plan += technical_implementation_plan + "\n\n"
        file_path_ti = os.path.join(subplan_dir, f"{file_prefix}_scene{i}_technical_implementation_plan.txt")
        with open(file_path_ti, "w") as f:
            f.write(technical_implementation_plan)
        print(f"Scene {i} Technical Implementation Plan saved to {file_path_ti}")
       
        # ===== Step 3: Generate Animation and Narration Plan =====
        # =========================================================
        prompt_animation_narration = get_prompt_scene_animation_narration(i, topic, description, scene_outline_i, vision_storyboard_plan, technical_implementation_plan, self.relevant_plugins)
        
        # Add animation narration examples only for this stage if available
        if self.use_context_learning and self.animation_narration_examples:
            prompt_animation_narration += f"\n\nHere are some example animation and narration plans:\n{self.animation_narration_examples}"
        
        if self.rag_integration:
            rag_queries = self.rag_integration._generate_rag_queries_narration(
                storyboard=vision_storyboard_plan,
                scene_trace_id=scene_trace_id,
                topic=topic,
                scene_number=i,
                session_id=session_id,
                relevant_plugins=self.relevant_plugins # Use self.relevant_plugins directly
            )
            retrieved_docs = self.rag_integration.get_relevant_docs(
                rag_queries=rag_queries,
                scene_trace_id=scene_trace_id,
                topic=topic,
                scene_number=i
            )
            prompt_animation_narration += f"\n\n{retrieved_docs}"

        animation_narration_plan = self.planner_model(
            _prepare_text_inputs(prompt_animation_narration),
            metadata={"generation_name": "scene_animation_narration", "trace_id": scene_trace_id, "tags": [topic, f"scene{i}"], "session_id": session_id}
        )
        # extract animation narration plan <SCENE_ANIMATION_NARRATION_PLAN> ... </SCENE_ANIMATION_NARRATION_PLAN>
        animation_match = re.search(r'(<SCENE_ANIMATION_NARRATION_PLAN>.*?</SCENE_ANIMATION_NARRATION_PLAN>)', animation_narration_plan, re.DOTALL)
        animation_narration_plan = animation_match.group(1) if animation_match else animation_narration_plan
        implementation_plan += animation_narration_plan + "\n\n"
        file_path_an = os.path.join(subplan_dir, f"{file_prefix}_scene{i}_animation_narration_plan.txt")
        with open(file_path_an, "w") as f:
            f.write(animation_narration_plan)
        print(f"Scene {i} Animation and Narration Plan saved to {file_path_an}")

        # ===== Step 4: Save Implementation Plan =====
        # ==========================================
        # save the overall implementation plan to file
        with open(os.path.join(self.output_dir, file_prefix, f"scene{i}", f"{file_prefix}_scene{i}_implementation_plan.txt"), "w") as f:
            f.write(f"# Scene {i} Implementation Plan\n\n")
            f.write(implementation_plan)
        print(f"Scene {i} Implementation Plan saved to {file_path_ti}")

        return implementation_plan

    async def generate_scene_implementation(self,
                                      topic: str,
                                      description: str,
                                      plan: str,
                                      session_id: str) -> List[str]:
        """Generate detailed implementation plans for all scenes.

        Args:
            topic (str): The topic of the video
            description (str): Description of the video content
            plan (str): Overall scene plan
            session_id (str): Session identifier

        Returns:
            List[str]: List of implementation plans for each scene
        """
        # extract scene outline <SCENE_OUTLINE> ... </SCENE_OUTLINE>
        scene_outline = re.search(r'(<SCENE_OUTLINE>.*?</SCENE_OUTLINE>)', plan, re.DOTALL).group(1)
        # check the number of scenes in the outline
        scene_number = len(re.findall(r'<SCENE_(\d+)>[^<]', scene_outline))
        # replace all spaces and special characters with underscores for file path compatibility
        file_prefix = topic.lower()
        file_prefix = re.sub(r'[^a-z0-9_]+', '_', file_prefix)
        # generate implementation plan for each scene
        all_scene_implementation_plans = []

        tasks = []
        for i in range(1, scene_number):
            print(f"Generating implementation plan for scene {i} in topic {topic}")
            scene_outline_i = re.search(r'(<SCENE_{i}>.*?</SCENE_{i}>)'.format(i=i), scene_outline, re.DOTALL).group(1)
            scene_trace_id = str(uuid.uuid4())
            task = asyncio.create_task(self._generate_scene_implementation_single(topic, description, scene_outline_i, i, file_prefix, session_id, scene_trace_id))
            tasks.append(task)

        all_scene_implementation_plans = await asyncio.gather(*tasks)
        return all_scene_implementation_plans

    async def generate_scene_implementation_concurrently(self,
                                              topic: str,
                                              description: str,
                                              plan: str,
                                              session_id: str,
                                              scene_semaphore) -> List[str]:
        """Generate detailed implementation plans for all scenes concurrently with controlled concurrency.

        Args:
            topic (str): The topic of the video
            description (str): Description of the video content
            plan (str): Overall scene plan
            session_id (str): Session identifier
            scene_semaphore: Semaphore to control concurrent scene generation

        Returns:
            List[str]: List of implementation plans for each scene
        """
        scene_outline = extract_xml(plan)
        scene_number = len(re.findall(r'<SCENE_(\d+)>[^<]', scene_outline))
        file_prefix = re.sub(r'[^a-z0-9_]+', '_', topic.lower())
        all_scene_implementation_plans = []

        async def generate_single_scene_implementation(i):
            async with scene_semaphore:  # controls parallelism
                print(f"Generating implementation plan for scene {i} in topic {topic}")
                scene_outline_i = re.search(r'(<SCENE_{i}>.*?</SCENE_{i}>)'.format(i=i), scene_outline, re.DOTALL).group(1)
                scene_trace_id = str(uuid.uuid4())  # Generate UUID here
                return await self._generate_scene_implementation_single(topic, description, scene_outline_i, i, file_prefix, session_id, scene_trace_id)

        tasks = [generate_single_scene_implementation(i + 1) for i in range(scene_number)]
        all_scene_implementation_plans = await asyncio.gather(*tasks)
        return all_scene_implementation_plans