import streamlit as st import os import numpy as np # linear algebra import pandas as pd # data processing # from transformers import Wav2Vec2ForSequenceClassification, Wav2Vec2Processor # from utils import download_video, extract_audio, accent_classify import whisper from transformers import pipeline import yt_dlp import torchaudio import yt_dlp import ffmpeg from transformers.utils import logging logging.set_verbosity_info() # Define the resampling rate in Hertz (Hz) for audio data RATE_HZ = 16000 # Define the maximum audio interval length to consider in seconds MAX_SECONDS = 1 # Calculate the maximum audio interval length in samples by multiplying the rate and seconds MAX_LENGTH = RATE_HZ * MAX_SECONDS def download_video(url, output_dir="/app/tmp"): os.makedirs(output_dir, exist_ok=True) ydl_opts = { 'format': 'worstvideo[ext=mp4]+bestaudio[ext=m4a]/bestaudio', "outtmpl": os.path.join(output_dir, "video.%(ext)s"), "quiet": True, 'merge_output_format': 'mp4', 'quiet': True, 'noplaylist': True, 'nocheckcertificate': True, 'retries': 3, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) return os.path.join(output_dir, "video.mp4") def extract_audio(input_path, output_dir="/app/tmp"): os.makedirs(output_dir, exist_ok=True) output_path = os.path.join(output_dir, "audio.mp3") ( ffmpeg .input(input_path) .output(output_path, format='mp3', acodec='libmp3lame', audio_bitrate='192k') .overwrite_output() .run(quiet=True) ) return output_path # Split files by chunks with == MAX_LENGTH size def split_audio(file): try: # Load the audio file using torchaudio and get its sample rate. audio, rate = torchaudio.load(str(file)) # Calculate the number of segments based on the MAX_LENGTH num_segments = (len(audio[0]) // MAX_LENGTH) # Floor division to get segments # Create an empty list to store segmented audio data segmented_audio = [] # Split the audio into segments for i in range(num_segments): start = i * MAX_LENGTH end = min((i + 1) * MAX_LENGTH, len(audio[0])) segment = audio[0][start:end] # Create a transformation to resample the audio to a specified sample rate (RATE_HZ). transform = torchaudio.transforms.Resample(rate, RATE_HZ) segment = transform(segment).squeeze(0).numpy().reshape(-1) segmented_audio.append(segment) # Create a DataFrame from the segmented audio df_segments = pd.DataFrame({'audio': segmented_audio}) return df_segments except Exception as e: # If an exception occurs (e.g., file not found), return nothing print(f"Error processing file: {e}") return None def accent_classify(pipe, audio_path): audio_df = split_audio(audio_path) return pipe(np.concatenate(audio_df["audio"][:50].to_list()))[0] st.set_page_config(page_title="Accent Classifier", layout="centered") st.title("🎙️ English Accent Classifier") st.markdown("Upload a video link and get the English accent with confidence.") st.subheader("1. Upload a Video File") uploaded_file = st.file_uploader("Choose a video file", type=["mp4", "mov", "avi"]) st.subheader("2. Or Enter a Video URL") video_url = st.text_input("Paste a public video URL (Loom, or MP4):") if st.button("Analyze"): video_path = None output_dir="/app/tmp" os.makedirs(output_dir, exist_ok=True) if uploaded_file: video_path = os.path.join(output_dir, "video.mp4") with open(video_path, "wb") as f: f.write(uploaded_file.read()) st.success("✅ Video uploaded successfully.") elif video_url.strip(): with st.spinner("Downloading video from URL..."): try: video_path = download_video(video_url) except Exception as e: st.error(f"❌ Failed to download video: {e}") else: st.success(f"✅ Video downloaded: {video_path}") else: st.warning("⚠️ Please upload a video file or enter a valid URL.") if video_path and os.path.exists(video_path): st.write("Exists:", os.path.exists(video_path)) with st.spinner("Extracting audio..."): audio_path = extract_audio(video_path) st.write("Audio saved at:", audio_path) st.write("Exists:", os.path.exists(audio_path)) with st.spinner("Transcribing with Whisper..."): whisper_model = whisper.load_model("base") result = whisper_model.transcribe(audio_path) transcription = result['text'] transcription = "Hello There" pass with st.spinner("Classifying accent..."): # model_name = "dima806/english_accents_classification" # pipe = pipeline('audio-classification', model=model_name, device=-1) # GPU (device=0) or CPU (device=-1) # accent_data = accent_classify(pipe, audio_path) audio_df = split_audio(audio_path) waves = f"{np.concatenate(audio_df["audio"][:5].to_list())}" st.markdown("**Audio waves:**") st.text_area("Audio waves", waves, height=200) # audio_df = split_audio(audio_path) # print(np.concatenate(audio_df["audio"][:50].to_list())) accent_data = {"label": "American", "score": 0.9} accent = accent_data.get("label", "American") confidence = accent_data.get("score", 0.0) # pass st.success("Analysis Complete!") st.markdown(f"**Accent:** {accent}") st.markdown(f"**Confidence Score:** {confidence:.2f}%") st.markdown("**Transcription:**") st.text_area("Transcript", transcription, height=200) # Cleanup os.remove(video_path) os.remove(audio_path)