import asyncio import subprocess from pathlib import Path from typing import List import torchaudio from yt_dlp import YoutubeDL import webrtcvad from .config import AUDIO_CACHE # --------------------------------------------------------------------------- # ffmpeg helpers # --------------------------------------------------------------------------- def _run(cmd: List[str]): proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if proc.returncode != 0: raise RuntimeError(proc.stderr.decode()) # --------------------------------------------------------------------------- # Video → Audio # --------------------------------------------------------------------------- async def download_video(url: str, out_dir: Path) -> Path: """Async wrapper around yt‑dlp to pull remote video assets.""" ydl_opts = { "quiet": True, "no_warnings": True, "outtmpl": str(out_dir / "download.%(ext)s"), "format": "bestvideo+bestaudio/best / best", } loop = asyncio.get_running_loop() def _job(): with YoutubeDL(ydl_opts) as ydl: ydl.download([url]) await loop.run_in_executor(None, _job) return next(out_dir.glob("download.*")) async def extract_audio(video_path: Path, wav_path: Path, sr: int = 16000): cmd = [ "ffmpeg", "-y", "-i", str(video_path), "-vn", "-ac", "1", "-ar", str(sr), str(wav_path) ] loop = asyncio.get_running_loop() await loop.run_in_executor(None, _run, cmd) # --------------------------------------------------------------------------- # VAD trimming (WebRTC) # --------------------------------------------------------------------------- def _frame_gen(frame_ms, pcm16, sr): n = int(sr * (frame_ms / 1000.0) * 2) for i in range(0, len(pcm16), n): yield pcm16[i : i + n] def trim_silence(wav_path: Path, aggressiveness: int = 3) -> Path: sig, sr = torchaudio.load(str(wav_path)) sig = sig.squeeze(0).numpy() vad = webrtcvad.Vad(aggressiveness) frames = list(_frame_gen(30, (sig * 32768).astype("int16").tobytes(), sr)) voiced = [vad.is_speech(f, sr) for f in frames] if not any(voiced): return wav_path first, last = voiced.index(True), len(voiced) - 1 - voiced[::-1].index(True) kept = sig[first * 480 : (last + 1) * 480] out = wav_path.with_name(wav_path.stem + "_trim.wav") torchaudio.save(str(out), torchaudio.tensor(kept).unsqueeze(0), sr) return out