Spaces:
Running
Running
import glob | |
import itertools | |
import os | |
import random | |
import gc | |
import shutil | |
from typing import List | |
from loguru import logger | |
from moviepy import ( | |
AudioFileClip, | |
ColorClip, | |
CompositeAudioClip, | |
CompositeVideoClip, | |
ImageClip, | |
TextClip, | |
VideoFileClip, | |
afx, | |
concatenate_videoclips, | |
) | |
from moviepy.video.tools.subtitles import SubtitlesClip | |
from PIL import ImageFont | |
from app.models import const | |
from app.models.schema import ( | |
MaterialInfo, | |
VideoAspect, | |
VideoConcatMode, | |
VideoParams, | |
VideoTransitionMode, | |
) | |
from app.services.utils import video_effects | |
from app.utils import utils | |
class SubClippedVideoClip: | |
def __init__(self, file_path, start_time=None, end_time=None, width=None, height=None, duration=None): | |
self.file_path = file_path | |
self.start_time = start_time | |
self.end_time = end_time | |
self.width = width | |
self.height = height | |
if duration is None: | |
self.duration = end_time - start_time | |
else: | |
self.duration = duration | |
def __str__(self): | |
return f"SubClippedVideoClip(file_path={self.file_path}, start_time={self.start_time}, end_time={self.end_time}, duration={self.duration}, width={self.width}, height={self.height})" | |
audio_codec = "aac" | |
video_codec = "libx264" | |
fps = 30 | |
def close_clip(clip): | |
if clip is None: | |
return | |
try: | |
# close main resources | |
if hasattr(clip, 'reader') and clip.reader is not None: | |
clip.reader.close() | |
# close audio resources | |
if hasattr(clip, 'audio') and clip.audio is not None: | |
if hasattr(clip.audio, 'reader') and clip.audio.reader is not None: | |
clip.audio.reader.close() | |
del clip.audio | |
# close mask resources | |
if hasattr(clip, 'mask') and clip.mask is not None: | |
if hasattr(clip.mask, 'reader') and clip.mask.reader is not None: | |
clip.mask.reader.close() | |
del clip.mask | |
# handle child clips in composite clips | |
if hasattr(clip, 'clips') and clip.clips: | |
for child_clip in clip.clips: | |
if child_clip is not clip: # avoid possible circular references | |
close_clip(child_clip) | |
# clear clip list | |
if hasattr(clip, 'clips'): | |
clip.clips = [] | |
except Exception as e: | |
logger.error(f"failed to close clip: {str(e)}") | |
del clip | |
gc.collect() | |
def delete_files(files: List[str] | str): | |
if isinstance(files, str): | |
files = [files] | |
for file in files: | |
try: | |
os.remove(file) | |
except: | |
pass | |
def get_bgm_file(bgm_type: str = "random", bgm_file: str = ""): | |
if not bgm_type: | |
return "" | |
if bgm_file and os.path.exists(bgm_file): | |
return bgm_file | |
if bgm_type == "random": | |
suffix = "*.mp3" | |
song_dir = utils.song_dir() | |
files = glob.glob(os.path.join(song_dir, suffix)) | |
return random.choice(files) | |
return "" | |
def combine_videos( | |
combined_video_path: str, | |
video_paths: List[str], | |
audio_file: str, | |
video_aspect: VideoAspect = VideoAspect.portrait, | |
video_concat_mode: VideoConcatMode = VideoConcatMode.random, | |
video_transition_mode: VideoTransitionMode = None, | |
max_clip_duration: int = 5, | |
threads: int = 2, | |
) -> str: | |
audio_clip = AudioFileClip(audio_file) | |
audio_duration = audio_clip.duration | |
logger.info(f"audio duration: {audio_duration} seconds") | |
# Required duration of each clip | |
req_dur = audio_duration / len(video_paths) | |
req_dur = max_clip_duration | |
logger.info(f"maximum clip duration: {req_dur} seconds") | |
output_dir = os.path.dirname(combined_video_path) | |
aspect = VideoAspect(video_aspect) | |
video_width, video_height = aspect.to_resolution() | |
processed_clips = [] | |
subclipped_items = [] | |
video_duration = 0 | |
for video_path in video_paths: | |
clip = VideoFileClip(video_path) | |
clip_duration = clip.duration | |
clip_w, clip_h = clip.size | |
close_clip(clip) | |
start_time = 0 | |
while start_time < clip_duration: | |
end_time = min(start_time + max_clip_duration, clip_duration) | |
if clip_duration - start_time >= max_clip_duration: | |
subclipped_items.append(SubClippedVideoClip(file_path= video_path, start_time=start_time, end_time=end_time, width=clip_w, height=clip_h)) | |
start_time = end_time | |
if video_concat_mode.value == VideoConcatMode.sequential.value: | |
break | |
# random subclipped_items order | |
if video_concat_mode.value == VideoConcatMode.random.value: | |
random.shuffle(subclipped_items) | |
logger.debug(f"total subclipped items: {len(subclipped_items)}") | |
# Add downloaded clips over and over until the duration of the audio (max_duration) has been reached | |
for i, subclipped_item in enumerate(subclipped_items): | |
if video_duration > audio_duration: | |
break | |
logger.debug(f"processing clip {i+1}: {subclipped_item.width}x{subclipped_item.height}, current duration: {video_duration:.2f}s, remaining: {audio_duration - video_duration:.2f}s") | |
try: | |
clip = VideoFileClip(subclipped_item.file_path).subclipped(subclipped_item.start_time, subclipped_item.end_time) | |
clip_duration = clip.duration | |
# Not all videos are same size, so we need to resize them | |
clip_w, clip_h = clip.size | |
if clip_w != video_width or clip_h != video_height: | |
clip_ratio = clip.w / clip.h | |
video_ratio = video_width / video_height | |
logger.debug(f"resizing clip, source: {clip_w}x{clip_h}, ratio: {clip_ratio:.2f}, target: {video_width}x{video_height}, ratio: {video_ratio:.2f}") | |
if clip_ratio == video_ratio: | |
clip = clip.resized(new_size=(video_width, video_height)) | |
else: | |
if clip_ratio > video_ratio: | |
scale_factor = video_width / clip_w | |
else: | |
scale_factor = video_height / clip_h | |
new_width = int(clip_w * scale_factor) | |
new_height = int(clip_h * scale_factor) | |
background = ColorClip(size=(video_width, video_height), color=(0, 0, 0)).with_duration(clip_duration) | |
clip_resized = clip.resized(new_size=(new_width, new_height)).with_position("center") | |
clip = CompositeVideoClip([background, clip_resized]) | |
shuffle_side = random.choice(["left", "right", "top", "bottom"]) | |
if video_transition_mode.value == VideoTransitionMode.none.value: | |
clip = clip | |
elif video_transition_mode.value == VideoTransitionMode.fade_in.value: | |
clip = video_effects.fadein_transition(clip, 1) | |
elif video_transition_mode.value == VideoTransitionMode.fade_out.value: | |
clip = video_effects.fadeout_transition(clip, 1) | |
elif video_transition_mode.value == VideoTransitionMode.slide_in.value: | |
clip = video_effects.slidein_transition(clip, 1, shuffle_side) | |
elif video_transition_mode.value == VideoTransitionMode.slide_out.value: | |
clip = video_effects.slideout_transition(clip, 1, shuffle_side) | |
elif video_transition_mode.value == VideoTransitionMode.shuffle.value: | |
transition_funcs = [ | |
lambda c: video_effects.fadein_transition(c, 1), | |
lambda c: video_effects.fadeout_transition(c, 1), | |
lambda c: video_effects.slidein_transition(c, 1, shuffle_side), | |
lambda c: video_effects.slideout_transition(c, 1, shuffle_side), | |
] | |
shuffle_transition = random.choice(transition_funcs) | |
clip = shuffle_transition(clip) | |
if clip.duration > max_clip_duration: | |
clip = clip.subclipped(0, max_clip_duration) | |
# wirte clip to temp file | |
clip_file = f"{output_dir}/temp-clip-{i+1}.mp4" | |
clip.write_videofile(clip_file, logger=None, fps=fps, codec=video_codec) | |
close_clip(clip) | |
processed_clips.append(SubClippedVideoClip(file_path=clip_file, duration=clip.duration, width=clip_w, height=clip_h)) | |
video_duration += clip.duration | |
except Exception as e: | |
logger.error(f"failed to process clip: {str(e)}") | |
# loop processed clips until the video duration matches or exceeds the audio duration. | |
if video_duration < audio_duration: | |
logger.warning(f"video duration ({video_duration:.2f}s) is shorter than audio duration ({audio_duration:.2f}s), looping clips to match audio length.") | |
base_clips = processed_clips.copy() | |
for clip in itertools.cycle(base_clips): | |
if video_duration >= audio_duration: | |
break | |
processed_clips.append(clip) | |
video_duration += clip.duration | |
logger.info(f"video duration: {video_duration:.2f}s, audio duration: {audio_duration:.2f}s, looped {len(processed_clips)-len(base_clips)} clips") | |
# merge video clips progressively, avoid loading all videos at once to avoid memory overflow | |
logger.info("starting clip merging process") | |
if not processed_clips: | |
logger.warning("no clips available for merging") | |
return combined_video_path | |
# if there is only one clip, use it directly | |
if len(processed_clips) == 1: | |
logger.info("using single clip directly") | |
shutil.copy(processed_clips[0].file_path, combined_video_path) | |
delete_files(processed_clips) | |
logger.info("video combining completed") | |
return combined_video_path | |
# create initial video file as base | |
base_clip_path = processed_clips[0].file_path | |
temp_merged_video = f"{output_dir}/temp-merged-video.mp4" | |
temp_merged_next = f"{output_dir}/temp-merged-next.mp4" | |
# copy first clip as initial merged video | |
shutil.copy(base_clip_path, temp_merged_video) | |
# merge remaining video clips one by one | |
for i, clip in enumerate(processed_clips[1:], 1): | |
logger.info(f"merging clip {i}/{len(processed_clips)-1}, duration: {clip.duration:.2f}s") | |
try: | |
# load current base video and next clip to merge | |
base_clip = VideoFileClip(temp_merged_video) | |
next_clip = VideoFileClip(clip.file_path) | |
# merge these two clips | |
merged_clip = concatenate_videoclips([base_clip, next_clip]) | |
# save merged result to temp file | |
merged_clip.write_videofile( | |
filename=temp_merged_next, | |
threads=threads, | |
logger=None, | |
temp_audiofile_path=output_dir, | |
audio_codec=audio_codec, | |
fps=fps, | |
) | |
close_clip(base_clip) | |
close_clip(next_clip) | |
close_clip(merged_clip) | |
# replace base file with new merged file | |
delete_files(temp_merged_video) | |
os.rename(temp_merged_next, temp_merged_video) | |
except Exception as e: | |
logger.error(f"failed to merge clip: {str(e)}") | |
continue | |
# after merging, rename final result to target file name | |
os.rename(temp_merged_video, combined_video_path) | |
# clean temp files | |
clip_files = [clip.file_path for clip in processed_clips] | |
delete_files(clip_files) | |
logger.info("video combining completed") | |
return combined_video_path | |
def wrap_text(text, max_width, font="Arial", fontsize=60): | |
# Create ImageFont | |
font = ImageFont.truetype(font, fontsize) | |
def get_text_size(inner_text): | |
inner_text = inner_text.strip() | |
left, top, right, bottom = font.getbbox(inner_text) | |
return right - left, bottom - top | |
width, height = get_text_size(text) | |
if width <= max_width: | |
return text, height | |
processed = True | |
_wrapped_lines_ = [] | |
words = text.split(" ") | |
_txt_ = "" | |
for word in words: | |
_before = _txt_ | |
_txt_ += f"{word} " | |
_width, _height = get_text_size(_txt_) | |
if _width <= max_width: | |
continue | |
else: | |
if _txt_.strip() == word.strip(): | |
processed = False | |
break | |
_wrapped_lines_.append(_before) | |
_txt_ = f"{word} " | |
_wrapped_lines_.append(_txt_) | |
if processed: | |
_wrapped_lines_ = [line.strip() for line in _wrapped_lines_] | |
result = "\n".join(_wrapped_lines_).strip() | |
height = len(_wrapped_lines_) * height | |
return result, height | |
_wrapped_lines_ = [] | |
chars = list(text) | |
_txt_ = "" | |
for word in chars: | |
_txt_ += word | |
_width, _height = get_text_size(_txt_) | |
if _width <= max_width: | |
continue | |
else: | |
_wrapped_lines_.append(_txt_) | |
_txt_ = "" | |
_wrapped_lines_.append(_txt_) | |
result = "\n".join(_wrapped_lines_).strip() | |
height = len(_wrapped_lines_) * height | |
return result, height | |
def generate_video( | |
video_path: str, | |
audio_path: str, | |
subtitle_path: str, | |
output_file: str, | |
params: VideoParams, | |
): | |
aspect = VideoAspect(params.video_aspect) | |
video_width, video_height = aspect.to_resolution() | |
logger.info(f"generating video: {video_width} x {video_height}") | |
logger.info(f" ① video: {video_path}") | |
logger.info(f" ② audio: {audio_path}") | |
logger.info(f" ③ subtitle: {subtitle_path}") | |
logger.info(f" ④ output: {output_file}") | |
# https://github.com/harry0703/MoneyPrinterTurbo/issues/217 | |
# PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 'final-1.mp4.tempTEMP_MPY_wvf_snd.mp3' | |
# write into the same directory as the output file | |
output_dir = os.path.dirname(output_file) | |
font_path = "" | |
if params.subtitle_enabled: | |
if not params.font_name: | |
params.font_name = "STHeitiMedium.ttc" | |
font_path = os.path.join(utils.font_dir(), params.font_name) | |
if os.name == "nt": | |
font_path = font_path.replace("\\", "/") | |
logger.info(f" ⑤ font: {font_path}") | |
def create_text_clip(subtitle_item): | |
params.font_size = int(params.font_size) | |
params.stroke_width = int(params.stroke_width) | |
phrase = subtitle_item[1] | |
max_width = video_width * 0.9 | |
wrapped_txt, txt_height = wrap_text( | |
phrase, max_width=max_width, font=font_path, fontsize=params.font_size | |
) | |
interline = int(params.font_size * 0.25) | |
size=(int(max_width), int(txt_height + params.font_size * 0.25 + (interline * (wrapped_txt.count("\n") + 1)))) | |
_clip = TextClip( | |
text=wrapped_txt, | |
font=font_path, | |
font_size=params.font_size, | |
color=params.text_fore_color, | |
bg_color=params.text_background_color, | |
stroke_color=params.stroke_color, | |
stroke_width=params.stroke_width, | |
# interline=interline, | |
# size=size, | |
) | |
duration = subtitle_item[0][1] - subtitle_item[0][0] | |
_clip = _clip.with_start(subtitle_item[0][0]) | |
_clip = _clip.with_end(subtitle_item[0][1]) | |
_clip = _clip.with_duration(duration) | |
if params.subtitle_position == "bottom": | |
_clip = _clip.with_position(("center", video_height * 0.95 - _clip.h)) | |
elif params.subtitle_position == "top": | |
_clip = _clip.with_position(("center", video_height * 0.05)) | |
elif params.subtitle_position == "custom": | |
# Ensure the subtitle is fully within the screen bounds | |
margin = 10 # Additional margin, in pixels | |
max_y = video_height - _clip.h - margin | |
min_y = margin | |
custom_y = (video_height - _clip.h) * (params.custom_position / 100) | |
custom_y = max( | |
min_y, min(custom_y, max_y) | |
) # Constrain the y value within the valid range | |
_clip = _clip.with_position(("center", custom_y)) | |
else: # center | |
_clip = _clip.with_position(("center", "center")) | |
return _clip | |
video_clip = VideoFileClip(video_path).without_audio() | |
audio_clip = AudioFileClip(audio_path).with_effects( | |
[afx.MultiplyVolume(params.voice_volume)] | |
) | |
def make_textclip(text): | |
return TextClip( | |
text=text, | |
font=font_path, | |
font_size=params.font_size, | |
) | |
if subtitle_path and os.path.exists(subtitle_path): | |
sub = SubtitlesClip( | |
subtitles=subtitle_path, encoding="utf-8", make_textclip=make_textclip | |
) | |
text_clips = [] | |
for item in sub.subtitles: | |
clip = create_text_clip(subtitle_item=item) | |
text_clips.append(clip) | |
video_clip = CompositeVideoClip([video_clip, *text_clips]) | |
bgm_file = get_bgm_file(bgm_type=params.bgm_type, bgm_file=params.bgm_file) | |
if bgm_file: | |
try: | |
bgm_clip = AudioFileClip(bgm_file).with_effects( | |
[ | |
afx.MultiplyVolume(params.bgm_volume), | |
afx.AudioFadeOut(3), | |
afx.AudioLoop(duration=video_clip.duration), | |
] | |
) | |
audio_clip = CompositeAudioClip([audio_clip, bgm_clip]) | |
except Exception as e: | |
logger.error(f"failed to add bgm: {str(e)}") | |
video_clip = video_clip.with_audio(audio_clip) | |
video_clip.write_videofile( | |
output_file, | |
audio_codec=audio_codec, | |
temp_audiofile_path=output_dir, | |
threads=params.n_threads or 2, | |
logger=None, | |
fps=fps, | |
) | |
video_clip.close() | |
del video_clip | |
def preprocess_video(materials: List[MaterialInfo], clip_duration=4): | |
for material in materials: | |
if not material.url: | |
continue | |
ext = utils.parse_extension(material.url) | |
try: | |
clip = VideoFileClip(material.url) | |
except Exception: | |
clip = ImageClip(material.url) | |
width = clip.size[0] | |
height = clip.size[1] | |
if width < 480 or height < 480: | |
logger.warning(f"low resolution material: {width}x{height}, minimum 480x480 required") | |
continue | |
if ext in const.FILE_TYPE_IMAGES: | |
logger.info(f"processing image: {material.url}") | |
# Create an image clip and set its duration to 3 seconds | |
clip = ( | |
ImageClip(material.url) | |
.with_duration(clip_duration) | |
.with_position("center") | |
) | |
# Apply a zoom effect using the resize method. | |
# A lambda function is used to make the zoom effect dynamic over time. | |
# The zoom effect starts from the original size and gradually scales up to 120%. | |
# t represents the current time, and clip.duration is the total duration of the clip (3 seconds). | |
# Note: 1 represents 100% size, so 1.2 represents 120% size. | |
zoom_clip = clip.resized( | |
lambda t: 1 + (clip_duration * 0.03) * (t / clip.duration) | |
) | |
# Optionally, create a composite video clip containing the zoomed clip. | |
# This is useful when you want to add other elements to the video. | |
final_clip = CompositeVideoClip([zoom_clip]) | |
# Output the video to a file. | |
video_file = f"{material.url}.mp4" | |
final_clip.write_videofile(video_file, fps=30, logger=None) | |
close_clip(clip) | |
material.url = video_file | |
logger.success(f"image processed: {video_file}") | |
return materials |