Spaces:
Running
Running
File size: 15,125 Bytes
10b0de3 a233652 10b0de3 a233652 10b0de3 a233652 10b0de3 a233652 10b0de3 594b1dc 10b0de3 bfebc17 10b0de3 bfebc17 10b0de3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 |
from functools import wraps
import logging
import gradio as gr
import os
import modal
from openai import OpenAI
from dotenv import load_dotenv
import re
import time
import uuid
import yt_dlp
import tempfile
import shutil
from pathlib import Path
load_dotenv()
process_media_remotely = modal.Function.from_name("clipscript-processing-service", "process_media")
asr_handle = modal.Cls.from_name("clipscript-asr-service", "ASR")
upload_volume = modal.Volume.from_name("clipscript-uploads", create_if_missing=True)
llm = "deepseek/deepseek-r1-0528:free"
api_key = os.environ.get("OPENROUTER_API_KEY")
def retry_on_rate_limit(max_retries: int = 5, base_delay: float = 2.0):
"""Decorator for exponential backoff on rate limits"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
delay = base_delay
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
# Check for 429 status code in different ways
status_code = getattr(getattr(e, 'response', None), 'status_code', None)
if status_code == 429 or '429' in str(e) or 'rate limit' in str(e).lower():
logging.warning(f"Rate limit hit. Retrying in {delay:.1f} seconds...")
time.sleep(delay)
delay *= 2
else:
raise
raise Exception("Max retries exceeded due to rate limits or other persistent errors.")
return wrapper
return decorator
def extract_youtube_video_id(url: str) -> str:
"""Extract YouTube video ID from various YouTube URL formats."""
patterns = [
r'(?:youtube\.com\/watch\?v=|youtu\.be\/|youtube\.com\/embed\/|youtube\.com\/v\/)([^&\n?#]+)',
r'youtube\.com\/watch\?.*v=([^&\n?#]+)'
]
for pattern in patterns:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
def get_youtube_thumbnail_url(video_id: str) -> str:
"""Get the high quality thumbnail URL for a YouTube video."""
return f"https://img.youtube.com/vi/{video_id}/maxresdefault.jpg"
client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=api_key,
)
def download_and_convert_youtube_audio(url: str) -> str:
"""
Downloads audio from a YouTube URL and converts it to a 16kHz mono WAV file.
Uses a temporary directory for all intermediate files, ensuring cleanup.
Returns the path to the final temporary WAV file.
"""
temp_dir = tempfile.mkdtemp()
cookie_file_path = None
try:
# Check for YouTube cookies in secrets and write to a temporary file
youtube_cookies = os.environ.get("YOUTUBE_COOKIES")
if youtube_cookies:
# Use NamedTemporaryFile to handle the file creation and cleanup
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as tmp_cookie_file:
tmp_cookie_file.write(youtube_cookies)
cookie_file_path = tmp_cookie_file.name
print("Using YouTube cookies from secrets.")
output_tmpl = os.path.join(temp_dir, "audio.%(ext)s")
ydl_opts = {
"format": "bestaudio/best",
"outtmpl": output_tmpl,
"postprocessors": [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'wav',
}],
'postprocessor_args': {
'extractaudio': ['-ar', '16000', '-ac', '1']
},
"quiet": True,
}
# Add cookiefile to options if it exists
if cookie_file_path:
ydl_opts['cookiefile'] = cookie_file_path
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
# Find the downloaded .wav file
downloaded_files = list(Path(temp_dir).glob("*.wav"))
if not downloaded_files:
raise FileNotFoundError("yt-dlp failed to create a WAV file. The video might be protected or unavailable.")
# Move the final file to a new temporary location so we can clean up the directory
source_path = downloaded_files[0]
fd, dest_path = tempfile.mkstemp(suffix=".wav")
os.close(fd)
shutil.move(source_path, dest_path)
return dest_path
finally:
# Clean up the cookie file if it was created
if cookie_file_path and os.path.exists(cookie_file_path):
os.remove(cookie_file_path)
shutil.rmtree(temp_dir)
def handle_transcription(file, url):
if not file and not (url and url.strip()):
gr.Warning("Please upload a file or enter a URL.")
return "Error: Please upload a file or enter a URL."
gr.Info("Starting secure transcription... This might take a moment.")
try:
result = None
if url and url.strip():
video_id = extract_youtube_video_id(url)
if video_id:
converted_wav_path = None
try:
print(f"Detected YouTube URL. Processing locally: {url}")
converted_wav_path = download_and_convert_youtube_audio(url)
# Read audio bytes and call ASR service
with open(converted_wav_path, "rb") as f:
audio_bytes = f.read()
print("Sending audio bytes to ASR service.")
result = asr_handle().transcribe.remote(audio_bytes=audio_bytes)
finally:
# Clean up the final temp file
if converted_wav_path and os.path.exists(converted_wav_path):
os.remove(converted_wav_path)
else:
# Process other URLs remotely and securely.
print(f"Sending URL to Modal for processing: {url}")
result = process_media_remotely.remote(url=url)
elif file is not None:
# For file uploads:
# 1. Generate a unique ID for the file.
upload_id = f"upload-{uuid.uuid4()}"
print(f"Uploading file to Modal volume with ID: {upload_id}")
# 2. Upload the local file to the remote volume
with upload_volume.batch_upload() as batch:
batch.put_file(file, upload_id)
# 3. Trigger remote processing by passing the upload ID.
print(f"Sending upload ID to Modal for processing: {upload_id}")
result = process_media_remotely.remote(upload_id=upload_id)
if result.get("error"):
return f"Error from ASR service: {result['error']}"
return result["text"]
except Exception as e:
print(f"An error occurred: {e}")
# It's good practice to remove the local temp file if it exists
if file and os.path.exists(file):
os.remove(file)
return f"Error: {str(e)}"
finally:
# Gradio's gr.File widget creates a temporary file. We should clean it up.
if file and os.path.exists(file):
os.remove(file)
def add_transcript_to_chat(transcript: str):
if transcript.startswith("Error"):
gr.Error("Transcription failed. Please check the logs.")
return []
gr.Info("Transcript ready! Generating blog post...")
# Return empty list for display but store transcript for LLM processing
return []
def user_chat(user_message: str, history: list):
return "", history + [{"role": "user", "content": user_message}]
@retry_on_rate_limit(max_retries=3, base_delay=1.0)
def _stream_chat_response(history: list, system_prompt: str, transcript: str = None):
if not history and not transcript:
# Don't do anything if there's no history and no transcript
return
if transcript and transcript.startswith("Error"):
return
# Include transcript as first user message if provided, but don't display it
messages = [{"role": "system", "content": system_prompt}]
if transcript:
messages.append({"role": "user", "content": transcript})
messages.extend(history)
stream = client.chat.completions.create(
model=llm,
messages=messages,
stream=True
)
history.append({"role": "assistant", "content": ""})
response_content = ""
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
response_content += content
history[-1]["content"] = response_content
yield history
def generate_blog_post(history: list, transcript: str, context: str):
system_prompt = """You are an expert blog writer and editor. Your task is to transform a raw video transcription into a well-structured, engaging, and publish-ready blog post in Markdown format.
Core Mandate: Erase the Video Origin
This is a critical function. The reader must not know the content came from a video.
Eliminate all video-specific language: Remove phrases like "in this video," "thanks for watching," "as you can see here," "welcome to the channel," etc.
Scrub all platform calls-to-action: No "like and subscribe," "hit the bell icon," or "comment below."
Remove sponsor messages and ads: Completely omit any sponsor mentions.
Rephrase visual references: Convert "look at this screen" to a description of the information itself (e.g., "The data reveals that...").
Content & Formatting Rules:
Title: Create a compelling, SEO-friendly H1 title.
Structure: Use ## for main headings and ### for subheadings to create a logical flow.
Readability: Use short paragraphs, bulleted/numbered lists, and bolding for key terms.
Refine Prose: Convert conversational speech into clean, professional writing.
Remove all filler words (um, uh, like, you know).
Fix grammar and consolidate rambling sentences.
Flow: Start with a strong introduction and end with a concise summary or conclusion.
Your output must be a complete, polished article in Markdown."""
# Combine transcript with additional context if provided
full_transcript = transcript
if context and context.strip():
full_transcript = f"{transcript}\n\n--- Additional Context ---\n{context.strip()}\n\nThis is some additional context relevant to the transcription above."
yield from _stream_chat_response(history, system_prompt, full_transcript)
def bot_chat(history: list):
system_prompt = "You are a helpful assistant that helps refine a blog post created from an audio transcript. The user will provide instructions for changes and you will return only the updated blog post."
yield from _stream_chat_response(history, system_prompt)
def update_thumbnail_display(url: str):
"""Update the thumbnail display when YouTube URL is entered."""
if not url or not url.strip():
return gr.update(visible=False, value=None)
video_id = extract_youtube_video_id(url)
if video_id:
thumbnail_url = get_youtube_thumbnail_url(video_id)
return gr.update(visible=True, value=thumbnail_url)
else:
return gr.update(visible=False, value=None)
# Gradio Interface
theme = gr.themes.Ocean()
with gr.Blocks(title="ClipScript", theme=theme) as demo:
gr.Markdown("# 🎬➡️📝 ClipScript: Video-to-Blog Transformer", elem_classes="hero-title")
gr.Markdown("### Upload an audio file, or provide a YouTube/direct URL *of any size*.")
with gr.Row():
# Column 1: File input, URL input, and thumbnail
with gr.Column(scale=1):
file_input = gr.File(label="Upload any audio file (Recommended)", type="filepath", height=200, file_types=["audio", ".webm", ".mp3", ".mp4", ".m4a", ".ogg", ".wav"])
with gr.Row():
with gr.Column():
url_input = gr.Textbox(
label="YouTube or Direct Audio URL",
placeholder="youtube.com/watch?v=... OR xyz.com/audio.mp3",
scale=2
)
# YouTube thumbnail display
thumbnail_display = gr.Image(
label="Thumbnail",
visible=False,
height=100,
show_download_button=False,
interactive=False,
scale=2
)
# Column 2: Transcript view
with gr.Column(scale=2):
transcript_output = gr.Textbox(label="Transcription POWERED by Modal Labs", lines=12, interactive=True, show_copy_button=True)
transcribe_button = gr.Button("Blogify", variant="primary")
gr.Markdown("---")
# Add Context section
context_input = gr.Textbox(
label="Additional Context",
placeholder="Enter any additional context, code, articles, or any references that relate to the video content...",
lines=5,
interactive=True
)
chatbot = gr.Chatbot(
label="Blog Post", type="messages", height=500, show_copy_all_button=True, show_copy_button=True, show_share_button=True
)
chat_input = gr.Textbox(
label="Your message",
placeholder="Refine the blog post or ask for changes...",
container=False,
)
clear_button = gr.ClearButton([chat_input, chatbot])
# Event handlers to disable/enable inputs based on usage
def on_file_upload(file):
if file is not None:
return gr.update(interactive=False), gr.update(visible=False, value=None)
else:
return gr.update(interactive=True), gr.update(visible=False, value=None)
def on_url_change(url):
if url and url.strip():
thumbnail_update = update_thumbnail_display(url)
return gr.update(interactive=False), thumbnail_update
else:
return gr.update(interactive=True), gr.update(visible=False, value=None)
file_input.change(fn=on_file_upload, inputs=file_input, outputs=[url_input, thumbnail_display])
url_input.change(fn=on_url_change, inputs=url_input, outputs=[file_input, thumbnail_display])
# Chained events for blog generation
(
transcribe_button.click(
fn=handle_transcription,
inputs=[file_input, url_input],
outputs=transcript_output,
)
.then(
fn=lambda: gr.update(value=None, interactive=True),
outputs=file_input,
queue=False,
)
.then(
fn=add_transcript_to_chat,
inputs=transcript_output,
outputs=chatbot,
queue=False,
)
.then(fn=generate_blog_post, inputs=[chatbot, transcript_output, context_input], outputs=chatbot)
)
# Event handler for follow-up chat
chat_input.submit(
fn=user_chat,
inputs=[chat_input, chatbot],
outputs=[chat_input, chatbot],
queue=False,
).then(fn=bot_chat, inputs=chatbot, outputs=chatbot)
if __name__ == "__main__":
demo.launch() |