|
import asyncio |
|
import aiofiles |
|
import asyncpraw |
|
import os |
|
import json |
|
import logging |
|
import tempfile |
|
import numpy as np |
|
import platform |
|
import shutil |
|
|
|
from pydantic import BaseModel |
|
from typing import List, Optional |
|
|
|
from fastapi import ( |
|
FastAPI, |
|
BackgroundTasks, |
|
Request, |
|
HTTPException, |
|
Body, |
|
WebSocket, |
|
WebSocketDisconnect, |
|
) |
|
from fastapi.responses import HTMLResponse, JSONResponse |
|
from fastapi.staticfiles import StaticFiles |
|
from fastapi.templating import Jinja2Templates |
|
|
|
from moviepy.editor import ( |
|
VideoFileClip, |
|
CompositeVideoClip, |
|
TextClip, |
|
ImageClip, |
|
ColorClip, |
|
) |
|
from moviepy.config import change_settings |
|
from PIL import Image, ImageDraw |
|
|
|
import google.generativeai as genai |
|
from google.generativeai.types import HarmBlockThreshold, HarmCategory |
|
|
|
from concurrent.futures import ProcessPoolExecutor |
|
|
|
|
|
from RedDownloader import RedDownloader |
|
|
|
|
|
from dotenv import load_dotenv |
|
|
|
load_dotenv() |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" |
|
) |
|
|
|
|
|
live_logs = [] |
|
|
|
|
|
class LiveLogHandler(logging.Handler): |
|
def __init__(self): |
|
super().__init__() |
|
|
|
def emit(self, record): |
|
log_entry = self.format(record) |
|
live_logs.append(log_entry) |
|
|
|
if len(live_logs) > 100: |
|
live_logs.pop(0) |
|
|
|
|
|
|
|
live_log_handler = LiveLogHandler() |
|
live_log_handler.setFormatter( |
|
logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") |
|
) |
|
logging.getLogger().addHandler(live_log_handler) |
|
|
|
if platform.system() == "Windows": |
|
|
|
change_settings( |
|
{ |
|
"IMAGEMAGICK_BINARY": r"C:\Program Files\ImageMagick-7.1.1-Q16-HDRI\magick.exe" |
|
} |
|
) |
|
|
|
|
|
BACKGROUND_IMAGE = "background.png" |
|
PROCESSED_VIDEOS_FILE = "processed_videos.json" |
|
MAX_VIDEO_DURATION = 60 |
|
FONT_PATH = "font/Montserrat-Black.ttf" |
|
OUTPUT_DIR = "output" |
|
|
|
|
|
CLIENT_ID = os.environ.get("REDDIT_CLIENT_ID") |
|
CLIENT_SECRET = os.environ.get("REDDIT_CLIENT_SECRET") |
|
USER_AGENT = os.environ.get("REDDIT_USER_AGENT") |
|
USERNAME = os.environ.get("REDDIT_USERNAME") |
|
PASSWORD = os.environ.get("REDDIT_PASSWORD") |
|
|
|
|
|
subreddits = [ |
|
"Damnthatsinteresting", |
|
"interestingasfuck", |
|
"BeAmazed", |
|
"nextfuckinglevel", |
|
] |
|
processing_results = {} |
|
|
|
|
|
class VideoItem(BaseModel): |
|
id: str |
|
title: str |
|
ups: int |
|
url: str |
|
subreddit: str |
|
duration: Optional[int] = None |
|
thumbnail: Optional[str] = None |
|
|
|
|
|
|
|
process_executor = ProcessPoolExecutor(max_workers=2) |
|
|
|
|
|
|
|
|
|
async def initialize_reddit(): |
|
return asyncpraw.Reddit( |
|
client_id=CLIENT_ID, |
|
client_secret=CLIENT_SECRET, |
|
user_agent=USER_AGENT, |
|
username=USERNAME, |
|
password=PASSWORD, |
|
) |
|
|
|
|
|
async def load_processed_videos(): |
|
if os.path.exists(PROCESSED_VIDEOS_FILE): |
|
async with aiofiles.open(PROCESSED_VIDEOS_FILE, "r") as f: |
|
try: |
|
return json.loads(await f.read()) |
|
except json.JSONDecodeError: |
|
logging.error( |
|
"Error decoding JSON in processed_videos.json. Starting with an empty list." |
|
) |
|
return [] |
|
return [] |
|
|
|
|
|
async def save_processed_videos(processed_videos): |
|
async with aiofiles.open(PROCESSED_VIDEOS_FILE, "w") as f: |
|
await f.write(json.dumps(processed_videos)) |
|
|
|
|
|
async def get_n_comments(submission, n=3): |
|
"""Gets the top n comments from a Reddit submission.""" |
|
try: |
|
comments = await submission.comments() |
|
await comments.replace_more(limit=None) |
|
top_comments = [comment.body for comment in comments] |
|
return top_comments[:n] |
|
except Exception as e: |
|
logging.error(f"Error getting comments: {e}") |
|
return [] |
|
|
|
|
|
async def fetch_trending_videos(reddit, processed_videos, subreddit_name): |
|
"""Fetch trending video posts from a subreddit without downloading them.""" |
|
posts_list = [] |
|
try: |
|
subreddit = await reddit.subreddit(subreddit_name) |
|
async for post in subreddit.hot(limit=10): |
|
if post.is_video and post.id not in processed_videos: |
|
try: |
|
duration = post.media["reddit_video"]["duration"] |
|
if duration <= MAX_VIDEO_DURATION: |
|
width = post.media["reddit_video"]["width"] |
|
height = post.media["reddit_video"]["height"] |
|
|
|
if (height / width) >= 1.6: |
|
|
|
if not post.media["reddit_video"].get("has_audio", True): |
|
logging.warning( |
|
f"Skipping post {post.id} due to no audio" |
|
) |
|
continue |
|
posts_list.append(post) |
|
else: |
|
logging.warning( |
|
f"Skipping post {post.id} due to ratio: width {width} > height {height}" |
|
) |
|
else: |
|
logging.warning( |
|
f"Skipping post {post.id} due to exceeding max duration." |
|
) |
|
except KeyError: |
|
logging.warning( |
|
f"Skipping post {post.id} due to missing video dimensions/duration." |
|
) |
|
return posts_list |
|
except Exception as e: |
|
logging.error(f"Error fetching videos from subreddit {subreddit_name}: {e}") |
|
return [] |
|
|
|
|
|
async def download_video(url, filename): |
|
""" |
|
Offload the synchronous RedDownloader.Download call to a thread. |
|
""" |
|
loop = asyncio.get_running_loop() |
|
|
|
def blocking_download(): |
|
try: |
|
|
|
RedDownloader.Download(url, quality=720, output=filename.split('.')[0]) |
|
return os.path.exists(filename) |
|
except Exception as e: |
|
logging.error(f"Error downloading video: {e}") |
|
return False |
|
|
|
exists = await loop.run_in_executor(None, blocking_download) |
|
if exists: |
|
logging.info(f"Video downloaded to {filename}") |
|
return filename |
|
else: |
|
logging.error(f"Video file not found after download: {filename}") |
|
return None |
|
|
|
|
|
async def generate_title_and_caption(title, comments, first_frame, api_key): |
|
genai.configure(api_key=api_key) |
|
generation_config = genai.GenerationConfig( |
|
temperature=1, max_output_tokens=8192, response_mime_type="application/json" |
|
) |
|
|
|
model = genai.GenerativeModel( |
|
model_name="gemini-2.5-flash", |
|
generation_config=generation_config, |
|
safety_settings={ |
|
HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, |
|
HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, |
|
HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE, |
|
HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE, |
|
}, |
|
system_instruction=( |
|
"Generate a JSON object which contains a short catchy youtube video title of max 15 words to place over the video, " |
|
"and also a very interesting and informational paragraph describing the subject and any informative interesting knowledge derived from the comments, " |
|
"based on the provided video title, first frame image and comments info. Do not refer to the video or comments explicitly. " |
|
"Return a JSON object with keys 'title' and 'caption'." |
|
), |
|
) |
|
try: |
|
with tempfile.NamedTemporaryFile( |
|
suffix=".png", delete=False |
|
) as temp_image_file: |
|
image = Image.fromarray(first_frame, mode="RGB") |
|
image.save(temp_image_file.name) |
|
temp_image_file.flush() |
|
image = Image.open(temp_image_file.name) |
|
parts = [f"Title: {title}\n\nComments: {comments}", image] |
|
response = await model.generate_content_async( |
|
[{"role": "user", "parts": parts}] |
|
) |
|
image.close() |
|
temp_image_file.close() |
|
if os.path.exists(temp_image_file.name): |
|
os.remove(temp_image_file.name) |
|
|
|
try: |
|
response_json = json.loads(response.text) |
|
generated_title = response_json["title"] |
|
generated_caption = response_json["caption"] |
|
return generated_title, generated_caption |
|
except (json.JSONDecodeError, KeyError, TypeError) as json_err: |
|
logging.error( |
|
f"Error parsing JSON response: {json_err}, Response Text: {response.text}" |
|
) |
|
return None, None |
|
|
|
except Exception as e: |
|
logging.error(f"Error generating title and caption: {e}") |
|
return None, None |
|
|
|
|
|
def round_corners(clip, radius): |
|
mask = Image.new("RGB", clip.size, 0) |
|
draw = ImageDraw.Draw(mask) |
|
draw.rounded_rectangle([(0, 0), clip.size], radius, fill=255) |
|
return clip.set_mask(ImageClip(np.array(mask), ismask=True)) |
|
|
|
|
|
def create_text_clip(text, font_area, font_size, color): |
|
width = font_area[2] - font_area[0] |
|
height = font_area[3] - font_area[1] |
|
|
|
txt_clip = TextClip( |
|
text, |
|
fontsize=font_size, |
|
color=color, |
|
font=FONT_PATH, |
|
size=(width, None), |
|
method="caption", |
|
align="center", |
|
) |
|
|
|
if txt_clip.h > height: |
|
scale_factor = height / txt_clip.h |
|
new_font_size = int(font_size * scale_factor) |
|
txt_clip = TextClip( |
|
text, |
|
fontsize=new_font_size, |
|
color=color, |
|
font=FONT_PATH, |
|
size=(width, None), |
|
method="caption", |
|
align="center", |
|
) |
|
|
|
vertical_offset = 8 |
|
txt_y = (height - txt_clip.h) / 4 + vertical_offset |
|
|
|
bg_clip = ColorClip(size=(width, height), color=(0, 0, 0, 0)) |
|
|
|
final_clip = CompositeVideoClip( |
|
[bg_clip, txt_clip.set_position((width / 2 - txt_clip.w / 2, txt_y))] |
|
) |
|
|
|
return final_clip.set_position((font_area[0], font_area[1])) |
|
|
|
|
|
def calculate_text_color(frame, x, y, width, height): |
|
region = frame[y : y + height, x : x + width] |
|
if region.shape[-1] == 3: |
|
grayscale = np.dot(region[..., :3], [0.2989, 0.5870, 0.1140]) |
|
else: |
|
grayscale = region |
|
mean_brightness = np.mean(grayscale) |
|
relative_luminance = mean_brightness / 255 |
|
white_contrast = (1.0 + 0.05) / (relative_luminance + 0.05) |
|
black_contrast = (relative_luminance + 0.05) / (0.0 + 0.05) |
|
return "white" if white_contrast > black_contrast else "black" |
|
|
|
|
|
def create_watermark_clip( |
|
watermark_text, frame, x_pos, y_pos, width, height, video_duration |
|
): |
|
optimal_color = calculate_text_color(frame, int(x_pos), int(y_pos), width, height) |
|
watermark_clip = TextClip( |
|
watermark_text, |
|
fontsize=30, |
|
color=optimal_color, |
|
font=FONT_PATH, |
|
) |
|
watermark_width = watermark_clip.w |
|
watermark_x = (720 - watermark_width) / 2 |
|
return watermark_clip.set_position((watermark_x, 1140)).set_duration(video_duration) |
|
|
|
|
|
def generate_video( |
|
video_path, title, comments, api_key, post_id, watermark_text="@damnsointeresting" |
|
): |
|
try: |
|
output_dir = os.path.join(OUTPUT_DIR, post_id) |
|
os.makedirs(output_dir, exist_ok=True) |
|
video = VideoFileClip(video_path) |
|
first_frame = video.get_frame(0) |
|
bg = ImageClip(BACKGROUND_IMAGE).set_duration(video.duration) |
|
audio = video.audio |
|
|
|
video_area = (40, 290, 680, 1220) |
|
video_width = video_area[2] - video_area[0] |
|
video_height = video_area[3] - video_area[1] |
|
video_ratio = video.w / video.h |
|
if video_ratio > video_width / video_height: |
|
new_width, new_height = video_width, int(video_width / video_ratio) |
|
else: |
|
new_width, new_height = int(video_height * video_ratio), video_height |
|
video = video.resize(width=new_width, height=new_height) |
|
x_pos = video_area[0] + (video_width - new_width) / 2 |
|
y_pos = video_area[1] + (video_height - new_height) / 2 |
|
video = video.set_position((x_pos, y_pos)) |
|
video = round_corners(video, 40) |
|
|
|
generated_title, generated_caption = asyncio.run( |
|
generate_title_and_caption(title, comments, first_frame, api_key) |
|
) |
|
if not generated_title or not generated_caption: |
|
logging.error("Failed to generate title or caption. Skipping video.") |
|
return None, None |
|
|
|
title_clip = create_text_clip( |
|
generated_title, (45, 190, 675, 285), 35, "white" |
|
).set_duration(video.duration) |
|
watermark_clip = create_watermark_clip( |
|
watermark_text, first_frame, 210, 1140, 300, 40, video.duration |
|
) |
|
|
|
final = CompositeVideoClip( |
|
[bg, video, title_clip, watermark_clip], size=(720, 1280) |
|
) |
|
final = final.set_duration(video.duration) |
|
|
|
if audio: |
|
final = final.set_audio(audio) |
|
|
|
output_filename = os.path.join(output_dir, f"{post_id}.mp4") |
|
final.write_videofile(output_filename, fps=30) |
|
|
|
caption_filepath = os.path.join(output_dir, f"{post_id}.txt") |
|
with open(caption_filepath, "w") as f: |
|
f.write( |
|
f"Title:\n{generated_title.strip()}\n\nCaption:\n{generated_caption.strip()}" |
|
) |
|
|
|
return output_filename, generated_title, generated_caption |
|
except Exception as e: |
|
logging.error(f"Error processing video: {e}") |
|
return None, None, None |
|
finally: |
|
if "video" in locals(): |
|
video.close() |
|
if "final" in locals(): |
|
final.close() |
|
|
|
|
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
if not os.path.exists(OUTPUT_DIR): |
|
os.makedirs(OUTPUT_DIR) |
|
if not os.path.exists("static"): |
|
os.makedirs("static") |
|
app.mount("/output", StaticFiles(directory=OUTPUT_DIR), name="output") |
|
app.mount("/static", StaticFiles(directory="static"), name="static") |
|
|
|
templates = Jinja2Templates(directory="templates") |
|
|
|
|
|
|
|
@app.get("/", response_class=HTMLResponse) |
|
async def index(request: Request): |
|
return templates.TemplateResponse("index.html", {"request": request}) |
|
|
|
|
|
@app.get("/api/videos") |
|
async def get_videos(): |
|
reddit = await initialize_reddit() |
|
processed_videos = await load_processed_videos() |
|
all_posts = [] |
|
for subreddit_name in subreddits: |
|
posts = await fetch_trending_videos(reddit, processed_videos, subreddit_name) |
|
for post in posts: |
|
try: |
|
duration = post.media["reddit_video"]["duration"] |
|
except KeyError: |
|
duration = None |
|
|
|
|
|
preview_video = None |
|
try: |
|
reddit_video = post.media.get("reddit_video") |
|
if reddit_video and "fallback_url" in reddit_video: |
|
preview_video = reddit_video["fallback_url"] |
|
except Exception: |
|
preview_video = None |
|
|
|
video_info = { |
|
"id": post.id, |
|
"title": post.title, |
|
"ups": post.ups, |
|
"url": post.url, |
|
"subreddit": subreddit_name, |
|
"duration": duration, |
|
"thumbnail": post.thumbnail if hasattr(post, "thumbnail") else None, |
|
"preview_video": preview_video, |
|
} |
|
all_posts.append(video_info) |
|
await reddit.close() |
|
all_posts.sort(key=lambda x: x["ups"], reverse=True) |
|
return JSONResponse(content=all_posts) |
|
|
|
|
|
@app.post("/api/process") |
|
async def process_selected_videos( |
|
background_tasks: BackgroundTasks, |
|
videos: List[VideoItem] = Body(...), |
|
): |
|
if not videos: |
|
raise HTTPException(status_code=400, detail="No videos provided") |
|
for video_info in videos: |
|
video_id = video_info.id |
|
if video_id in processing_results: |
|
continue |
|
processing_results[video_id] = {"status": "pending"} |
|
background_tasks.add_task(process_video_task, video_info.dict()) |
|
return JSONResponse( |
|
content={ |
|
"message": "Processing started", |
|
"video_ids": [video.id for video in videos], |
|
} |
|
) |
|
|
|
|
|
async def process_video_task(video_info: dict): |
|
video_id = video_info.get("id") |
|
try: |
|
api_key = os.environ.get("GOOGLE_API_KEY") |
|
if not api_key: |
|
processing_results[video_id] = {"status": "error", "error": "Missing GOOGLE_API_KEY"} |
|
return |
|
|
|
reddit = await initialize_reddit() |
|
submission = await reddit.submission(id=video_id) |
|
comments = await get_n_comments(submission, 5) |
|
comments_string = "\n\n".join(comments) if comments else "No comments found." |
|
|
|
temp_video_filename = f"{video_id}.mp4" |
|
downloaded_video = await download_video(video_info.get("url"), temp_video_filename) |
|
if not downloaded_video: |
|
processing_results[video_id] = {"status": "error", "error": "Failed to download video"} |
|
await reddit.close() |
|
return |
|
|
|
|
|
loop = asyncio.get_running_loop() |
|
result = await loop.run_in_executor( |
|
process_executor, |
|
generate_video, |
|
downloaded_video, |
|
video_info.get("title"), |
|
comments_string, |
|
api_key, |
|
f"{video_id}", |
|
"@damnsointeresting" |
|
) |
|
|
|
if result and result[0]: |
|
output_file, generated_title, generated_caption = result |
|
video_url = f"/output/{video_id}/{video_id}.mp4" |
|
processing_results[video_id] = { |
|
"status": "completed", |
|
"video_url": video_url, |
|
"generated_title": generated_title, |
|
"generated_caption": generated_caption |
|
} |
|
else: |
|
processing_results[video_id] = {"status": "error", "error": "Video processing failed"} |
|
|
|
if os.path.exists(temp_video_filename): |
|
os.remove(temp_video_filename) |
|
await reddit.close() |
|
except Exception as e: |
|
processing_results[video_id] = {"status": "error", "error": str(e)} |
|
|
|
|
|
@app.get("/api/results") |
|
async def get_results(): |
|
return JSONResponse(content=processing_results) |
|
|
|
|
|
@app.get("/api/settings") |
|
async def get_settings(): |
|
return JSONResponse(content={"subreddits": subreddits}) |
|
|
|
|
|
@app.post("/api/settings") |
|
async def update_settings(data: dict): |
|
new_subs = data.get("subreddits") |
|
if not isinstance(new_subs, list): |
|
raise HTTPException(status_code=400, detail="subreddits must be a list") |
|
global subreddits |
|
subreddits = new_subs |
|
return JSONResponse( |
|
content={"message": "Settings updated", "subreddits": subreddits} |
|
) |
|
|
|
|
|
@app.delete("/api/video/{video_id}") |
|
async def delete_video(video_id: str): |
|
video_folder = os.path.join(OUTPUT_DIR, video_id) |
|
if os.path.exists(video_folder): |
|
try: |
|
shutil.rmtree(video_folder) |
|
|
|
if video_id in processing_results: |
|
del processing_results[video_id] |
|
return JSONResponse(content={"message": f"Video {video_id} deleted."}) |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Error deleting video: {e}") |
|
else: |
|
raise HTTPException(status_code=404, detail="Video not found.") |
|
|
|
|
|
@app.websocket("/ws/logs") |
|
async def websocket_logs(websocket: WebSocket): |
|
await websocket.accept() |
|
last_index = 0 |
|
try: |
|
while True: |
|
|
|
await asyncio.sleep(1) |
|
|
|
if last_index < len(live_logs): |
|
for log in live_logs[last_index:]: |
|
await websocket.send_text(log) |
|
last_index = len(live_logs) |
|
except WebSocketDisconnect: |
|
logging.info("Client disconnected from live logs websocket") |
|
|
|
|
|
|
|
|
|
|