Spaces:
Sleeping
Sleeping
import os | |
import re | |
import uuid | |
import zipfile | |
import subprocess | |
import gradio as gr | |
from tqdm import tqdm | |
from datetime import datetime | |
from pydub import AudioSegment | |
from moviepy import VideoFileClip | |
from PIL import Image | |
# ----------------------- Internationalization ----------------------- | |
from i18n_local import en, ru, es, fr, de, it, ja, ko, ar, hi, tr | |
i18n = gr.I18n( | |
en=en, ru=ru, es=es, fr=fr, de=de, it=it, | |
ja=ja, ko=ko, ar=ar, hi=hi, tr=tr | |
) | |
# ----------------------- FFmpeg utils ----------------------- | |
def _run_ffmpeg(args): | |
try: | |
res = subprocess.run(["ffmpeg", *args], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) | |
return res.stdout | |
except Exception: | |
return None | |
def ffmpeg_writable_formats(): | |
""" | |
Returns a set of FFmpeg format names (including aliases) available for writing (E flag). | |
Example: {'mp4', 'mov', 'm4a', '3gp', 'matroska', 'webm', ...} | |
""" | |
out = _run_ffmpeg(["-hide_banner", "-v", "error", "-formats"]) | |
if not out: | |
return set() | |
fmts = set() | |
for line in out.splitlines(): | |
# lines look like: " DE matroska,webm Matroska / WebM" | |
if re.match(r"^\s*[D\s]*E\s+", line): | |
m = re.search(r"^\s*[D\s]*E\s+([^\s]+)", line) | |
if not m: | |
continue | |
names = m.group(1) | |
for name in names.split(","): | |
fmts.add(name.strip()) | |
return fmts | |
def ffmpeg_audio_encoders(): | |
""" | |
Returns a set of available audio encoders, e.g. {'aac','libmp3lame','libopus',...} | |
""" | |
out = _run_ffmpeg(["-hide_banner", "-v", "error", "-encoders"]) | |
if not out: | |
return set() | |
enc = set() | |
for line in out.splitlines(): | |
# lines look like: " A..... libmp3lame MP3 (MPEG audio layer 3) (codec mp3)" | |
m = re.match(r"^\s*A\S*\s+([^\s]+)", line) | |
if m: | |
enc.add(m.group(1).strip()) | |
return enc | |
# Extension -> FFmpeg container mapping (curated, common ones) | |
AUDIO_EXT_TO_FFMPEG_FORMAT = { | |
"mp3": "mp3", | |
"wav": "wav", | |
"w64": "w64", | |
"flac": "flac", | |
"ogg": "ogg", | |
"oga": "ogg", | |
"opus": "ogg", # ogg container; needs libopus | |
"spx": "ogg", # ogg container; needs libspeex | |
"aac": "adts", | |
"m4a": "mp4", | |
"m4b": "mp4", | |
"m4r": "mp4", | |
"ac3": "ac3", | |
"aiff": "aiff", | |
"aif": "aiff", | |
"aifc": "aiff", | |
"caf": "caf", | |
"au": "au", | |
"amr": "amr", | |
"dts": "dts", | |
"mp2": "mp2", | |
"wma": "asf", | |
"wv": "wv", | |
"mka": "matroska", | |
} | |
# Some extensions require specific encoders | |
AUDIO_REQUIRED_CODECS = { | |
"mp3": ["libmp3lame"], | |
"opus": ["libopus"], | |
"spx": ["libspeex"], | |
# others rely on FFmpeg defaults | |
} | |
VIDEO_EXT_TO_FFMPEG_FORMAT = { | |
"mp4": "mp4", | |
"m4v": "mp4", | |
"mov": "mov", | |
"avi": "avi", | |
"mkv": "matroska", | |
"webm": "webm", | |
"flv": "flv", | |
"ogv": "ogg", | |
"mpeg": "mpeg", | |
"mpg": "mpeg", | |
"ts": "mpegts", | |
"m2ts": "mpegts", | |
"mxf": "mxf", | |
"3gp": "3gp", | |
"3g2": "3g2", | |
"asf": "asf", | |
"wmv": "asf", | |
"vob": "vob", | |
} | |
def available_audio_extensions(): | |
writable = ffmpeg_writable_formats() | |
encoders = ffmpeg_audio_encoders() | |
exts = [] | |
for ext, ffmt in AUDIO_EXT_TO_FFMPEG_FORMAT.items(): | |
if ffmt not in writable: | |
continue | |
req = AUDIO_REQUIRED_CODECS.get(ext) | |
if req and not any(r in encoders for r in req): | |
continue | |
exts.append(ext) | |
# fallback if ffmpeg is missing or query failed | |
if not exts: | |
exts = ["mp3", "wav", "flac", "ogg", "aac", "m4a", "aiff", "wma", "opus"] | |
return sorted(set(exts)) | |
def available_video_extensions(): | |
writable = ffmpeg_writable_formats() | |
exts = [ext for ext, ffmt in VIDEO_EXT_TO_FFMPEG_FORMAT.items() if ffmt in writable] | |
if not exts: | |
exts = ["mp4", "mkv", "avi", "mov", "webm", "flv", "mpeg", "mpg", "ts"] | |
return sorted(set(exts)) | |
# ----------------------- Pillow utils (images) ----------------------- | |
def available_image_extensions(): | |
# All registered extensions Pillow knows how to save (best-effort) | |
ext2fmt = Image.registered_extensions() # {".jpg":"JPEG", ...} | |
save_ok = set(getattr(Image, "SAVE", {}).keys()) or set() | |
if not save_ok: | |
# If SAVE registry is unavailable, assume registered formats are savable | |
save_ok = set(ext2fmt.values()) | |
exts = [] | |
for ext, fmt in ext2fmt.items(): | |
if fmt in save_ok: | |
e = ext.lstrip(".").lower() | |
exts.append(e) | |
if not exts: | |
exts = ["png", "jpg", "jpeg", "webp", "bmp", "tiff", "gif", "ico", "ppm", "pgm", "pbm", "pnm", "tga", "xbm", "xpm", "pdf", "eps"] | |
return sorted(set(exts)) | |
def pil_format_for_ext(ext): | |
ext = ext.lower().strip(".") | |
for k, v in Image.registered_extensions().items(): | |
if k.lstrip(".").lower() == ext: | |
return v | |
fallback = { | |
"jpg": "JPEG", | |
"jpeg": "JPEG", | |
"png": "PNG", | |
"webp": "WEBP", | |
"bmp": "BMP", | |
"tiff": "TIFF", | |
"tif": "TIFF", | |
"gif": "GIF", | |
"ico": "ICO", | |
"ppm": "PPM", | |
"pgm": "PPM", | |
"pbm": "PPM", | |
"pnm": "PPM", | |
"tga": "TGA", | |
"xbm": "XBM", | |
"xpm": "XPM", | |
"pdf": "PDF", | |
"eps": "EPS", | |
} | |
return fallback.get(ext, None) | |
# ---------- AUDIO PROCESSING ---------- | |
def convert_audio(input_files, output_ext, session_id, merge_files, gap_duration): | |
"""Convert/merge audio into the selected format (by extension).""" | |
output_files = [] | |
merged_audio = AudioSegment.silent(duration=0) | |
os.makedirs(session_id, exist_ok=True) | |
ff_format = AUDIO_EXT_TO_FFMPEG_FORMAT.get(output_ext, output_ext) | |
codec = None | |
if output_ext == "opus": | |
codec = "libopus" | |
elif output_ext == "spx": | |
codec = "libspeex" | |
for input_file in tqdm(input_files, desc="Converting audio files"): | |
file_path = input_file if isinstance(input_file, str) else input_file.name | |
audio = AudioSegment.from_file(file_path) | |
base_name = os.path.splitext(os.path.basename(file_path))[0] | |
output_filename = f"{base_name}.{output_ext}" | |
output_path = os.path.join(session_id, output_filename) | |
audio.export(output_path, format=ff_format, codec=codec) | |
if merge_files: | |
merged_audio += audio + AudioSegment.silent(duration=gap_duration) | |
else: | |
output_files.append(output_path) | |
if merge_files: | |
merged_output_path = os.path.join(session_id, f"merged_output.{output_ext}") | |
merged_audio.export(merged_output_path, format=ff_format, codec=codec) | |
return [merged_output_path] | |
return output_files | |
# ---------- IMAGE PROCESSING ---------- | |
def convert_images(input_files, output_ext, session_id): | |
"""Simple image format conversion.""" | |
os.makedirs(session_id, exist_ok=True) | |
output_files = [] | |
pil_fmt = pil_format_for_ext(output_ext) | |
if not pil_fmt: | |
raise gr.Error(f"Pillow cannot save to format: {output_ext}") | |
for input_file in tqdm(input_files, desc="Converting images"): | |
file_path = input_file if isinstance(input_file, str) else input_file.name | |
base_name = os.path.splitext(os.path.basename(file_path))[0] | |
output_filename = f"{base_name}.{output_ext}" | |
output_path = os.path.join(session_id, output_filename) | |
with Image.open(file_path) as img: | |
img.load() | |
# For JPEG ensure RGB mode | |
if pil_fmt.upper() == "JPEG": | |
img = img.convert("RGB") | |
img.save(output_path, format=pil_fmt) | |
output_files.append(output_path) | |
return output_files | |
# ---------- ZIP CREATION ---------- | |
def create_zip(files_to_zip, session_id): | |
zip_filename = f"{session_id}.zip" | |
with zipfile.ZipFile(zip_filename, 'w') as zipf: | |
for file in tqdm(files_to_zip, desc="Creating ZIP archive"): | |
zipf.write(file, os.path.basename(file)) | |
return zip_filename | |
# ---------- AUDIO HANDLER ---------- | |
def process_audio_files(files, output_ext, merge_files, gap_duration, progress=gr.Progress(track_tqdm=True)): | |
if not files: | |
raise gr.Error("Please upload at least one audio file!") | |
session_id = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + "_" + str(uuid.uuid4())[:8] | |
print(f"\nStarting audio session: {session_id}") | |
print(f"Files to convert: {len(files)} to .{output_ext}") | |
output_files = convert_audio(files, output_ext, session_id, merge_files, gap_duration) | |
if len(output_files) > 1: | |
print("Creating ZIP archive...") | |
zip_filename = create_zip(output_files, session_id) | |
return zip_filename | |
return output_files[0] | |
# ---------- IMAGE HANDLER ---------- | |
def process_image_files(files, output_ext, progress=gr.Progress(track_tqdm=True)): | |
if not files: | |
raise gr.Error("Please upload at least one image!") | |
session_id = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + "_" + str(uuid.uuid4())[:8] | |
print(f"\nStarting image session: {session_id}") | |
print(f"Files to convert: {len(files)} to .{output_ext}") | |
output_files = convert_images(files, output_ext, session_id) | |
if len(output_files) > 1: | |
print("Creating ZIP archive...") | |
zip_filename = create_zip(output_files, session_id) | |
return zip_filename | |
return output_files[0] | |
# ---------- VIDEO HANDLER ---------- | |
def process_video(input_video, conversion_type, output_ext, progress=gr.Progress(track_tqdm=True)): | |
if not input_video: | |
raise gr.Error("Please upload a video file!") | |
session_id = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + "_" + str(uuid.uuid4())[:8] | |
os.makedirs(session_id, exist_ok=True) | |
input_path = input_video if isinstance(input_video, str) else input_video.name | |
base_name = os.path.splitext(os.path.basename(input_path))[0] | |
output_filename = f"{base_name}_converted.{output_ext}" | |
output_path = os.path.join(session_id, output_filename) | |
print(f"\nStarting video session: {session_id}") | |
print(f"Conversion type: {conversion_type}, Output: .{output_ext}") | |
try: | |
clip = VideoFileClip(input_path) | |
if conversion_type == "Video to Video": | |
# Let MoviePy use its defaults; for some containers (e.g. webm) this may require specific codecs | |
clip.write_videofile(output_path, logger=None) | |
elif conversion_type == "Video to Audio": | |
if clip.audio is None: | |
raise gr.Error("The uploaded video does not contain an audio track.") | |
audio_clip = clip.audio | |
audio_codec = None | |
if output_ext == "opus": | |
if "libopus" in ffmpeg_audio_encoders(): | |
audio_codec = "libopus" | |
elif output_ext == "spx": | |
if "libspeex" in ffmpeg_audio_encoders(): | |
audio_codec = "libspeex" | |
audio_clip.write_audiofile(output_path, logger=None, codec=audio_codec) | |
audio_clip.close() | |
clip.close() | |
except Exception as e: | |
print(f"An error occurred: {e}") | |
raise gr.Error(f"Processing error: {e}") | |
print("Video processing complete!") | |
return output_path | |
# ---------- FORMAT CHOICES ---------- | |
def update_format_choices(conversion_type): | |
if conversion_type == "Video to Video": | |
vf = available_video_extensions() | |
value = "mp4" if "mp4" in vf else (vf[0] if vf else None) | |
return gr.Dropdown(choices=vf, value=value, label="Output Video Format") | |
else: | |
af = available_audio_extensions() | |
value = "mp3" if "mp3" in af else (af[0] if af else None) | |
return gr.Dropdown(choices=af, value=value, label="Output Audio Format") | |
# ---------- UI ---------- | |
AUDIO_FORMATS = available_audio_extensions() | |
VIDEO_FORMATS = available_video_extensions() | |
IMAGE_FORMATS = available_image_extensions() | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.HTML(i18n("title_app")) | |
with gr.Tabs(): | |
# AUDIO TAB | |
with gr.TabItem(i18n("audio_tab")): | |
gr.HTML(i18n("audio_description")) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
audio_file_input = gr.Files(label=i18n("label_audio_file_input"), file_types=["audio"], height=160) | |
with gr.Column(scale=1): | |
default_audio = "mp3" if "mp3" in AUDIO_FORMATS else (AUDIO_FORMATS[0] if AUDIO_FORMATS else None) | |
audio_format_choice = gr.Dropdown(choices=AUDIO_FORMATS, label=i18n("label_audio_format_choice"), value=default_audio) | |
merge_files_checkbox = gr.Checkbox(label=i18n("label_merge_files_checkbox")) | |
gap_slider = gr.Slider(minimum=0, maximum=5000, step=100, value=500, label=i18n("label_gap_slider")) | |
audio_submit_button = gr.Button(i18n("сonvert"), variant="primary") | |
audio_output_file = gr.File(label=i18n("download_result")) | |
audio_submit_button.click( | |
fn=process_audio_files, | |
inputs=[audio_file_input, audio_format_choice, merge_files_checkbox, gap_slider], | |
outputs=audio_output_file | |
) | |
# IMAGE TAB | |
with gr.TabItem(i18n("image_tab")): | |
gr.HTML(i18n("image_description")) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
image_file_input = gr.Files(label=i18n("label_image_file_input"), file_types=["image"], height=160) | |
with gr.Column(scale=1): | |
default_image = "png" if "png" in IMAGE_FORMATS else (IMAGE_FORMATS[0] if IMAGE_FORMATS else None) | |
image_format_choice = gr.Dropdown(choices=IMAGE_FORMATS, label=i18n("label_image_format_choice"), value=default_image) | |
image_submit_button = gr.Button(i18n("сonvert"), variant="primary") | |
image_output_file = gr.File(label=i18n("download_result")) | |
image_submit_button.click( | |
fn=process_image_files, | |
inputs=[image_file_input, image_format_choice], | |
outputs=image_output_file | |
) | |
# VIDEO TAB | |
with gr.TabItem(i18n("video_tab")): | |
gr.HTML(i18n("video_description")) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
video_input = gr.File(label=i18n("label_video_input"), file_types=["video"], height=160) | |
with gr.Column(scale=1): | |
conversion_type_radio = gr.Radio( | |
choices=["Video to Video", "Video to Audio"], | |
label=i18n("label_conversion_type_radio"), | |
value="Video to Video" | |
) | |
default_video = "mp4" if "mp4" in VIDEO_FORMATS else (VIDEO_FORMATS[0] if VIDEO_FORMATS else None) | |
video_format_dropdown = gr.Dropdown( | |
choices=VIDEO_FORMATS, | |
label=i18n("label_video_format_dropdown"), | |
value=default_video | |
) | |
video_submit_button = gr.Button(i18n("сonvert"), variant="primary") | |
video_output_file = gr.File(label=i18n("download_result")) | |
conversion_type_radio.change( | |
fn=update_format_choices, | |
inputs=conversion_type_radio, | |
outputs=video_format_dropdown | |
) | |
video_submit_button.click( | |
fn=process_video, | |
inputs=[video_input, conversion_type_radio, video_format_dropdown], | |
outputs=video_output_file | |
) | |
if __name__ == "__main__": | |
demo.launch(i18n=i18n, debug=True) | |