CatPtain's picture
Upload 44 files
80fda88 verified
import glob
import itertools
import os
import random
import gc
import shutil
from typing import List
from loguru import logger
from moviepy import (
AudioFileClip,
ColorClip,
CompositeAudioClip,
CompositeVideoClip,
ImageClip,
TextClip,
VideoFileClip,
afx,
concatenate_videoclips,
)
from moviepy.video.tools.subtitles import SubtitlesClip
from PIL import ImageFont
from app.models import const
from app.models.schema import (
MaterialInfo,
VideoAspect,
VideoConcatMode,
VideoParams,
VideoTransitionMode,
)
from app.services.utils import video_effects
from app.utils import utils
class SubClippedVideoClip:
def __init__(self, file_path, start_time=None, end_time=None, width=None, height=None, duration=None):
self.file_path = file_path
self.start_time = start_time
self.end_time = end_time
self.width = width
self.height = height
if duration is None:
self.duration = end_time - start_time
else:
self.duration = duration
def __str__(self):
return f"SubClippedVideoClip(file_path={self.file_path}, start_time={self.start_time}, end_time={self.end_time}, duration={self.duration}, width={self.width}, height={self.height})"
audio_codec = "aac"
video_codec = "libx264"
fps = 30
def close_clip(clip):
if clip is None:
return
try:
# close main resources
if hasattr(clip, 'reader') and clip.reader is not None:
clip.reader.close()
# close audio resources
if hasattr(clip, 'audio') and clip.audio is not None:
if hasattr(clip.audio, 'reader') and clip.audio.reader is not None:
clip.audio.reader.close()
del clip.audio
# close mask resources
if hasattr(clip, 'mask') and clip.mask is not None:
if hasattr(clip.mask, 'reader') and clip.mask.reader is not None:
clip.mask.reader.close()
del clip.mask
# handle child clips in composite clips
if hasattr(clip, 'clips') and clip.clips:
for child_clip in clip.clips:
if child_clip is not clip: # avoid possible circular references
close_clip(child_clip)
# clear clip list
if hasattr(clip, 'clips'):
clip.clips = []
except Exception as e:
logger.error(f"failed to close clip: {str(e)}")
del clip
gc.collect()
def delete_files(files: List[str] | str):
if isinstance(files, str):
files = [files]
for file in files:
try:
os.remove(file)
except:
pass
def get_bgm_file(bgm_type: str = "random", bgm_file: str = ""):
if not bgm_type:
return ""
if bgm_file and os.path.exists(bgm_file):
return bgm_file
if bgm_type == "random":
suffix = "*.mp3"
song_dir = utils.song_dir()
files = glob.glob(os.path.join(song_dir, suffix))
return random.choice(files)
return ""
def combine_videos(
combined_video_path: str,
video_paths: List[str],
audio_file: str,
video_aspect: VideoAspect = VideoAspect.portrait,
video_concat_mode: VideoConcatMode = VideoConcatMode.random,
video_transition_mode: VideoTransitionMode = None,
max_clip_duration: int = 5,
threads: int = 2,
) -> str:
audio_clip = AudioFileClip(audio_file)
audio_duration = audio_clip.duration
logger.info(f"audio duration: {audio_duration} seconds")
# Required duration of each clip
req_dur = audio_duration / len(video_paths)
req_dur = max_clip_duration
logger.info(f"maximum clip duration: {req_dur} seconds")
output_dir = os.path.dirname(combined_video_path)
aspect = VideoAspect(video_aspect)
video_width, video_height = aspect.to_resolution()
processed_clips = []
subclipped_items = []
video_duration = 0
for video_path in video_paths:
clip = VideoFileClip(video_path)
clip_duration = clip.duration
clip_w, clip_h = clip.size
close_clip(clip)
start_time = 0
while start_time < clip_duration:
end_time = min(start_time + max_clip_duration, clip_duration)
if clip_duration - start_time >= max_clip_duration:
subclipped_items.append(SubClippedVideoClip(file_path= video_path, start_time=start_time, end_time=end_time, width=clip_w, height=clip_h))
start_time = end_time
if video_concat_mode.value == VideoConcatMode.sequential.value:
break
# random subclipped_items order
if video_concat_mode.value == VideoConcatMode.random.value:
random.shuffle(subclipped_items)
logger.debug(f"total subclipped items: {len(subclipped_items)}")
# Add downloaded clips over and over until the duration of the audio (max_duration) has been reached
for i, subclipped_item in enumerate(subclipped_items):
if video_duration > audio_duration:
break
logger.debug(f"processing clip {i+1}: {subclipped_item.width}x{subclipped_item.height}, current duration: {video_duration:.2f}s, remaining: {audio_duration - video_duration:.2f}s")
try:
clip = VideoFileClip(subclipped_item.file_path).subclipped(subclipped_item.start_time, subclipped_item.end_time)
clip_duration = clip.duration
# Not all videos are same size, so we need to resize them
clip_w, clip_h = clip.size
if clip_w != video_width or clip_h != video_height:
clip_ratio = clip.w / clip.h
video_ratio = video_width / video_height
logger.debug(f"resizing clip, source: {clip_w}x{clip_h}, ratio: {clip_ratio:.2f}, target: {video_width}x{video_height}, ratio: {video_ratio:.2f}")
if clip_ratio == video_ratio:
clip = clip.resized(new_size=(video_width, video_height))
else:
if clip_ratio > video_ratio:
scale_factor = video_width / clip_w
else:
scale_factor = video_height / clip_h
new_width = int(clip_w * scale_factor)
new_height = int(clip_h * scale_factor)
background = ColorClip(size=(video_width, video_height), color=(0, 0, 0)).with_duration(clip_duration)
clip_resized = clip.resized(new_size=(new_width, new_height)).with_position("center")
clip = CompositeVideoClip([background, clip_resized])
shuffle_side = random.choice(["left", "right", "top", "bottom"])
if video_transition_mode.value == VideoTransitionMode.none.value:
clip = clip
elif video_transition_mode.value == VideoTransitionMode.fade_in.value:
clip = video_effects.fadein_transition(clip, 1)
elif video_transition_mode.value == VideoTransitionMode.fade_out.value:
clip = video_effects.fadeout_transition(clip, 1)
elif video_transition_mode.value == VideoTransitionMode.slide_in.value:
clip = video_effects.slidein_transition(clip, 1, shuffle_side)
elif video_transition_mode.value == VideoTransitionMode.slide_out.value:
clip = video_effects.slideout_transition(clip, 1, shuffle_side)
elif video_transition_mode.value == VideoTransitionMode.shuffle.value:
transition_funcs = [
lambda c: video_effects.fadein_transition(c, 1),
lambda c: video_effects.fadeout_transition(c, 1),
lambda c: video_effects.slidein_transition(c, 1, shuffle_side),
lambda c: video_effects.slideout_transition(c, 1, shuffle_side),
]
shuffle_transition = random.choice(transition_funcs)
clip = shuffle_transition(clip)
if clip.duration > max_clip_duration:
clip = clip.subclipped(0, max_clip_duration)
# wirte clip to temp file
clip_file = f"{output_dir}/temp-clip-{i+1}.mp4"
clip.write_videofile(clip_file, logger=None, fps=fps, codec=video_codec)
close_clip(clip)
processed_clips.append(SubClippedVideoClip(file_path=clip_file, duration=clip.duration, width=clip_w, height=clip_h))
video_duration += clip.duration
except Exception as e:
logger.error(f"failed to process clip: {str(e)}")
# loop processed clips until the video duration matches or exceeds the audio duration.
if video_duration < audio_duration:
logger.warning(f"video duration ({video_duration:.2f}s) is shorter than audio duration ({audio_duration:.2f}s), looping clips to match audio length.")
base_clips = processed_clips.copy()
for clip in itertools.cycle(base_clips):
if video_duration >= audio_duration:
break
processed_clips.append(clip)
video_duration += clip.duration
logger.info(f"video duration: {video_duration:.2f}s, audio duration: {audio_duration:.2f}s, looped {len(processed_clips)-len(base_clips)} clips")
# merge video clips progressively, avoid loading all videos at once to avoid memory overflow
logger.info("starting clip merging process")
if not processed_clips:
logger.warning("no clips available for merging")
return combined_video_path
# if there is only one clip, use it directly
if len(processed_clips) == 1:
logger.info("using single clip directly")
shutil.copy(processed_clips[0].file_path, combined_video_path)
delete_files(processed_clips)
logger.info("video combining completed")
return combined_video_path
# create initial video file as base
base_clip_path = processed_clips[0].file_path
temp_merged_video = f"{output_dir}/temp-merged-video.mp4"
temp_merged_next = f"{output_dir}/temp-merged-next.mp4"
# copy first clip as initial merged video
shutil.copy(base_clip_path, temp_merged_video)
# merge remaining video clips one by one
for i, clip in enumerate(processed_clips[1:], 1):
logger.info(f"merging clip {i}/{len(processed_clips)-1}, duration: {clip.duration:.2f}s")
try:
# load current base video and next clip to merge
base_clip = VideoFileClip(temp_merged_video)
next_clip = VideoFileClip(clip.file_path)
# merge these two clips
merged_clip = concatenate_videoclips([base_clip, next_clip])
# save merged result to temp file
merged_clip.write_videofile(
filename=temp_merged_next,
threads=threads,
logger=None,
temp_audiofile_path=output_dir,
audio_codec=audio_codec,
fps=fps,
)
close_clip(base_clip)
close_clip(next_clip)
close_clip(merged_clip)
# replace base file with new merged file
delete_files(temp_merged_video)
os.rename(temp_merged_next, temp_merged_video)
except Exception as e:
logger.error(f"failed to merge clip: {str(e)}")
continue
# after merging, rename final result to target file name
os.rename(temp_merged_video, combined_video_path)
# clean temp files
clip_files = [clip.file_path for clip in processed_clips]
delete_files(clip_files)
logger.info("video combining completed")
return combined_video_path
def wrap_text(text, max_width, font="Arial", fontsize=60):
# Create ImageFont
font = ImageFont.truetype(font, fontsize)
def get_text_size(inner_text):
inner_text = inner_text.strip()
left, top, right, bottom = font.getbbox(inner_text)
return right - left, bottom - top
width, height = get_text_size(text)
if width <= max_width:
return text, height
processed = True
_wrapped_lines_ = []
words = text.split(" ")
_txt_ = ""
for word in words:
_before = _txt_
_txt_ += f"{word} "
_width, _height = get_text_size(_txt_)
if _width <= max_width:
continue
else:
if _txt_.strip() == word.strip():
processed = False
break
_wrapped_lines_.append(_before)
_txt_ = f"{word} "
_wrapped_lines_.append(_txt_)
if processed:
_wrapped_lines_ = [line.strip() for line in _wrapped_lines_]
result = "\n".join(_wrapped_lines_).strip()
height = len(_wrapped_lines_) * height
return result, height
_wrapped_lines_ = []
chars = list(text)
_txt_ = ""
for word in chars:
_txt_ += word
_width, _height = get_text_size(_txt_)
if _width <= max_width:
continue
else:
_wrapped_lines_.append(_txt_)
_txt_ = ""
_wrapped_lines_.append(_txt_)
result = "\n".join(_wrapped_lines_).strip()
height = len(_wrapped_lines_) * height
return result, height
def generate_video(
video_path: str,
audio_path: str,
subtitle_path: str,
output_file: str,
params: VideoParams,
):
aspect = VideoAspect(params.video_aspect)
video_width, video_height = aspect.to_resolution()
logger.info(f"generating video: {video_width} x {video_height}")
logger.info(f" ① video: {video_path}")
logger.info(f" ② audio: {audio_path}")
logger.info(f" ③ subtitle: {subtitle_path}")
logger.info(f" ④ output: {output_file}")
# https://github.com/harry0703/MoneyPrinterTurbo/issues/217
# PermissionError: [WinError 32] The process cannot access the file because it is being used by another process: 'final-1.mp4.tempTEMP_MPY_wvf_snd.mp3'
# write into the same directory as the output file
output_dir = os.path.dirname(output_file)
font_path = ""
if params.subtitle_enabled:
if not params.font_name:
params.font_name = "STHeitiMedium.ttc"
font_path = os.path.join(utils.font_dir(), params.font_name)
if os.name == "nt":
font_path = font_path.replace("\\", "/")
logger.info(f" ⑤ font: {font_path}")
def create_text_clip(subtitle_item):
params.font_size = int(params.font_size)
params.stroke_width = int(params.stroke_width)
phrase = subtitle_item[1]
max_width = video_width * 0.9
wrapped_txt, txt_height = wrap_text(
phrase, max_width=max_width, font=font_path, fontsize=params.font_size
)
interline = int(params.font_size * 0.25)
size=(int(max_width), int(txt_height + params.font_size * 0.25 + (interline * (wrapped_txt.count("\n") + 1))))
_clip = TextClip(
text=wrapped_txt,
font=font_path,
font_size=params.font_size,
color=params.text_fore_color,
bg_color=params.text_background_color,
stroke_color=params.stroke_color,
stroke_width=params.stroke_width,
# interline=interline,
# size=size,
)
duration = subtitle_item[0][1] - subtitle_item[0][0]
_clip = _clip.with_start(subtitle_item[0][0])
_clip = _clip.with_end(subtitle_item[0][1])
_clip = _clip.with_duration(duration)
if params.subtitle_position == "bottom":
_clip = _clip.with_position(("center", video_height * 0.95 - _clip.h))
elif params.subtitle_position == "top":
_clip = _clip.with_position(("center", video_height * 0.05))
elif params.subtitle_position == "custom":
# Ensure the subtitle is fully within the screen bounds
margin = 10 # Additional margin, in pixels
max_y = video_height - _clip.h - margin
min_y = margin
custom_y = (video_height - _clip.h) * (params.custom_position / 100)
custom_y = max(
min_y, min(custom_y, max_y)
) # Constrain the y value within the valid range
_clip = _clip.with_position(("center", custom_y))
else: # center
_clip = _clip.with_position(("center", "center"))
return _clip
video_clip = VideoFileClip(video_path).without_audio()
audio_clip = AudioFileClip(audio_path).with_effects(
[afx.MultiplyVolume(params.voice_volume)]
)
def make_textclip(text):
return TextClip(
text=text,
font=font_path,
font_size=params.font_size,
)
if subtitle_path and os.path.exists(subtitle_path):
sub = SubtitlesClip(
subtitles=subtitle_path, encoding="utf-8", make_textclip=make_textclip
)
text_clips = []
for item in sub.subtitles:
clip = create_text_clip(subtitle_item=item)
text_clips.append(clip)
video_clip = CompositeVideoClip([video_clip, *text_clips])
bgm_file = get_bgm_file(bgm_type=params.bgm_type, bgm_file=params.bgm_file)
if bgm_file:
try:
bgm_clip = AudioFileClip(bgm_file).with_effects(
[
afx.MultiplyVolume(params.bgm_volume),
afx.AudioFadeOut(3),
afx.AudioLoop(duration=video_clip.duration),
]
)
audio_clip = CompositeAudioClip([audio_clip, bgm_clip])
except Exception as e:
logger.error(f"failed to add bgm: {str(e)}")
video_clip = video_clip.with_audio(audio_clip)
video_clip.write_videofile(
output_file,
audio_codec=audio_codec,
temp_audiofile_path=output_dir,
threads=params.n_threads or 2,
logger=None,
fps=fps,
)
video_clip.close()
del video_clip
def preprocess_video(materials: List[MaterialInfo], clip_duration=4):
for material in materials:
if not material.url:
continue
ext = utils.parse_extension(material.url)
try:
clip = VideoFileClip(material.url)
except Exception:
clip = ImageClip(material.url)
width = clip.size[0]
height = clip.size[1]
if width < 480 or height < 480:
logger.warning(f"low resolution material: {width}x{height}, minimum 480x480 required")
continue
if ext in const.FILE_TYPE_IMAGES:
logger.info(f"processing image: {material.url}")
# Create an image clip and set its duration to 3 seconds
clip = (
ImageClip(material.url)
.with_duration(clip_duration)
.with_position("center")
)
# Apply a zoom effect using the resize method.
# A lambda function is used to make the zoom effect dynamic over time.
# The zoom effect starts from the original size and gradually scales up to 120%.
# t represents the current time, and clip.duration is the total duration of the clip (3 seconds).
# Note: 1 represents 100% size, so 1.2 represents 120% size.
zoom_clip = clip.resized(
lambda t: 1 + (clip_duration * 0.03) * (t / clip.duration)
)
# Optionally, create a composite video clip containing the zoomed clip.
# This is useful when you want to add other elements to the video.
final_clip = CompositeVideoClip([zoom_clip])
# Output the video to a file.
video_file = f"{material.url}.mp4"
final_clip.write_videofile(video_file, fps=30, logger=None)
close_clip(clip)
material.url = video_file
logger.success(f"image processed: {video_file}")
return materials