Spaces:
Build error
Build error
File size: 2,491 Bytes
3859913 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
import asyncio
import subprocess
from pathlib import Path
from typing import List
import torchaudio
from yt_dlp import YoutubeDL
import webrtcvad
from .config import AUDIO_CACHE
# ---------------------------------------------------------------------------
# ffmpeg helpers
# ---------------------------------------------------------------------------
def _run(cmd: List[str]):
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.decode())
# ---------------------------------------------------------------------------
# Video → Audio
# ---------------------------------------------------------------------------
async def download_video(url: str, out_dir: Path) -> Path:
"""Async wrapper around yt‑dlp to pull remote video assets."""
ydl_opts = {
"quiet": True,
"no_warnings": True,
"outtmpl": str(out_dir / "download.%(ext)s"),
"format": "bestvideo+bestaudio/best / best",
}
loop = asyncio.get_running_loop()
def _job():
with YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
await loop.run_in_executor(None, _job)
return next(out_dir.glob("download.*"))
async def extract_audio(video_path: Path, wav_path: Path, sr: int = 16000):
cmd = [
"ffmpeg", "-y", "-i", str(video_path),
"-vn", "-ac", "1", "-ar", str(sr), str(wav_path)
]
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, _run, cmd)
# ---------------------------------------------------------------------------
# VAD trimming (WebRTC)
# ---------------------------------------------------------------------------
def _frame_gen(frame_ms, pcm16, sr):
n = int(sr * (frame_ms / 1000.0) * 2)
for i in range(0, len(pcm16), n):
yield pcm16[i : i + n]
def trim_silence(wav_path: Path, aggressiveness: int = 3) -> Path:
sig, sr = torchaudio.load(str(wav_path))
sig = sig.squeeze(0).numpy()
vad = webrtcvad.Vad(aggressiveness)
frames = list(_frame_gen(30, (sig * 32768).astype("int16").tobytes(), sr))
voiced = [vad.is_speech(f, sr) for f in frames]
if not any(voiced):
return wav_path
first, last = voiced.index(True), len(voiced) - 1 - voiced[::-1].index(True)
kept = sig[first * 480 : (last + 1) * 480]
out = wav_path.with_name(wav_path.stem + "_trim.wav")
torchaudio.save(str(out), torchaudio.tensor(kept).unsqueeze(0), sr)
return out |