Spaces:
Sleeping
Sleeping
import shutil | |
import subprocess | |
from pathlib import Path | |
from typing import Literal | |
import numpy as np | |
VideoCodec = Literal["h264", "vp9", "gif"] | |
def _check_ffmpeg_installed() -> None: | |
"""Raise an error if ffmpeg is not available on the system PATH.""" | |
if shutil.which("ffmpeg") is None: | |
raise RuntimeError( | |
"ffmpeg is required to write video but was not found on your system. " | |
"Please install ffmpeg and ensure it is available on your PATH." | |
) | |
def _check_array_format(video: np.ndarray) -> None: | |
"""Raise an error if the array is not in the expected format.""" | |
if not (video.ndim == 4 and video.shape[-1] == 3): | |
raise ValueError( | |
f"Expected RGB input shaped (F, H, W, 3), got {video.shape}. " | |
f"Input has {video.ndim} dimensions, expected 4." | |
) | |
if video.dtype != np.uint8: | |
raise TypeError( | |
f"Expected dtype=uint8, got {video.dtype}. " | |
"Please convert your video data to uint8 format." | |
) | |
def _check_path(file_path: str | Path) -> None: | |
"""Raise an error if the parent directory does not exist.""" | |
file_path = Path(file_path) | |
if not file_path.parent.exists(): | |
try: | |
file_path.parent.mkdir(parents=True, exist_ok=True) | |
except OSError as e: | |
raise ValueError( | |
f"Failed to create parent directory {file_path.parent}: {e}" | |
) | |
def write_video( | |
file_path: str | Path, video: np.ndarray, fps: float, codec: VideoCodec | |
) -> None: | |
"""RGB uint8 only, shape (F, H, W, 3).""" | |
_check_ffmpeg_installed() | |
_check_path(file_path) | |
if codec not in {"h264", "vp9", "gif"}: | |
raise ValueError("Unsupported codec. Use h264, vp9, or gif.") | |
arr = np.asarray(video) | |
_check_array_format(arr) | |
frames = np.ascontiguousarray(arr) | |
_, height, width, _ = frames.shape | |
out_path = str(file_path) | |
cmd = [ | |
"ffmpeg", | |
"-y", | |
"-f", | |
"rawvideo", | |
"-s", | |
f"{width}x{height}", | |
"-pix_fmt", | |
"rgb24", | |
"-r", | |
str(fps), | |
"-i", | |
"-", | |
"-an", | |
] | |
if codec == "gif": | |
video_filter = "split[s0][s1];[s0]palettegen[p];[s1][p]paletteuse" | |
cmd += [ | |
"-vf", | |
video_filter, | |
"-loop", | |
"0", | |
] | |
elif codec == "h264": | |
cmd += [ | |
"-vcodec", | |
"libx264", | |
"-pix_fmt", | |
"yuv420p", | |
"-movflags", | |
"+faststart", | |
] | |
elif codec == "vp9": | |
bpp = 0.08 | |
bps = int(width * height * fps * bpp) | |
if bps >= 1_000_000: | |
bitrate = f"{round(bps / 1_000_000)}M" | |
elif bps >= 1_000: | |
bitrate = f"{round(bps / 1_000)}k" | |
else: | |
bitrate = str(max(bps, 1)) | |
cmd += [ | |
"-vcodec", | |
"libvpx-vp9", | |
"-b:v", | |
bitrate, | |
"-pix_fmt", | |
"yuv420p", | |
] | |
cmd += [out_path] | |
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE) | |
try: | |
for frame in frames: | |
proc.stdin.write(frame.tobytes()) | |
finally: | |
if proc.stdin: | |
proc.stdin.close() | |
stderr = ( | |
proc.stderr.read().decode("utf-8", errors="ignore") if proc.stderr else "" | |
) | |
ret = proc.wait() | |
if ret != 0: | |
raise RuntimeError(f"ffmpeg failed with code {ret}\n{stderr}") | |