Spaces:
Sleeping
Sleeping
import os | |
import subprocess | |
import requests | |
import torch | |
import yt_dlp | |
import soundfile as sf | |
from transformers import Wav2Vec2ForSequenceClassification, Wav2Vec2FeatureExtractor | |
from scipy.special import softmax | |
from core.logger import logger | |
MODEL_ID = "dima806/english_accents_classification" | |
accent_model = Wav2Vec2ForSequenceClassification.from_pretrained(MODEL_ID) | |
feature_extractor = Wav2Vec2FeatureExtractor.from_pretrained(MODEL_ID) | |
labels = list(accent_model.config.id2label.values()) | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
accent_model.to(device) | |
def download_video(url, output_dir): | |
logger.info(f"Downloading video from: {url}") | |
try: | |
if any(x in url for x in ["youtube.com", "youtu.be", "loom.com"]): | |
ydl_opts = { | |
'format': 'bestvideo+bestaudio/best', | |
'merge_output_format': 'mp4', | |
'outtmpl': os.path.join(output_dir, 'input_video.%(ext)s'), | |
'quiet': True, | |
'no_warnings': True | |
} | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
ydl.download([url]) | |
else: | |
response = requests.get(url, stream=True, timeout=20) | |
response.raise_for_status() | |
filepath = os.path.join(output_dir, "input_video.mp4") | |
with open(filepath, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
f.write(chunk) | |
except Exception as e: | |
logger.error(f"Failed to download video: {e}") | |
raise | |
def extract_audio(video_path, audio_path): | |
logger.info("Extracting audio from video...") | |
subprocess.run([ | |
'ffmpeg', '-y', '-i', video_path, | |
'-ss', '00:00:15', '-t', '00:00:30', | |
'-ar', '16000', '-ac', '1', | |
'-loglevel', 'error', audio_path | |
], check=True) | |
def transcribe(audio_path, whisper_model): | |
logger.info("Transcribing with Whisper...") | |
result = whisper_model.transcribe(audio_path) | |
return result["text"], result["segments"], result["language"] | |
def classify_accent(audio_path): | |
logger.info("Running accent classification...") | |
waveform, sample_rate = sf.read(audio_path) | |
inputs = feature_extractor(waveform, sampling_rate=sample_rate, return_tensors="pt", padding=True) | |
inputs = {k: v.to(device) for k, v in inputs.items()} | |
with torch.no_grad(): | |
logits = accent_model(**inputs).logits | |
probs = softmax(logits[0].cpu().numpy()) | |
top_indices = probs.argsort()[::-1][:3] | |
top_accents = [{"accent": labels[i], "confidence": round(float(probs[i]) * 100, 2)} for i in top_indices] | |
return top_accents[0]["accent"], top_accents[0]["confidence"], top_accents | |
def compute_fluency(segments): | |
if not segments: | |
return 0 | |
total_time = segments[-1]['end'] | |
speaking_time = sum(seg['end'] - seg['start'] for seg in segments) | |
return int(min(speaking_time / total_time * 100, 100)) | |