Audio / src /streamlit_app.py
7H4M3R's picture
Update src/streamlit_app.py
424f75b verified
raw
history blame
6.09 kB
import streamlit as st
import os
import numpy as np # linear algebra
import pandas as pd # data processing
# from transformers import Wav2Vec2ForSequenceClassification, Wav2Vec2Processor
# from utils import download_video, extract_audio, accent_classify
import whisper
from transformers import pipeline
import yt_dlp
import torchaudio
import yt_dlp
import ffmpeg
from transformers.utils import logging
logging.set_verbosity_info()
# Define the resampling rate in Hertz (Hz) for audio data
RATE_HZ = 16000
# Define the maximum audio interval length to consider in seconds
MAX_SECONDS = 1
# Calculate the maximum audio interval length in samples by multiplying the rate and seconds
MAX_LENGTH = RATE_HZ * MAX_SECONDS
def download_video(url, output_dir="/app/tmp"):
os.makedirs(output_dir, exist_ok=True)
ydl_opts = {
'format': 'worstvideo[ext=mp4]+bestaudio[ext=m4a]/bestaudio',
"outtmpl": os.path.join(output_dir, "video.%(ext)s"),
"quiet": True,
'merge_output_format': 'mp4',
'quiet': True,
'noplaylist': True,
'nocheckcertificate': True,
'retries': 3,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
return os.path.join(output_dir, "video.mp4")
def extract_audio(input_path, output_dir="/app/tmp"):
os.makedirs(output_dir, exist_ok=True)
output_path = os.path.join(output_dir, "audio.mp3")
(
ffmpeg
.input(input_path)
.output(output_path, format='mp3', acodec='libmp3lame', audio_bitrate='192k')
.overwrite_output()
.run(quiet=True)
)
return output_path
# Split files by chunks with == MAX_LENGTH size
def split_audio(file):
try:
# Load the audio file using torchaudio and get its sample rate.
audio, rate = torchaudio.load(str(file))
# Calculate the number of segments based on the MAX_LENGTH
num_segments = (len(audio[0]) // MAX_LENGTH) # Floor division to get segments
# Create an empty list to store segmented audio data
segmented_audio = []
# Split the audio into segments
for i in range(num_segments):
start = i * MAX_LENGTH
end = min((i + 1) * MAX_LENGTH, len(audio[0]))
segment = audio[0][start:end]
# Create a transformation to resample the audio to a specified sample rate (RATE_HZ).
transform = torchaudio.transforms.Resample(rate, RATE_HZ)
segment = transform(segment).squeeze(0).numpy().reshape(-1)
segmented_audio.append(segment)
# Create a DataFrame from the segmented audio
df_segments = pd.DataFrame({'audio': segmented_audio})
return df_segments
except Exception as e:
# If an exception occurs (e.g., file not found), return nothing
print(f"Error processing file: {e}")
return None
def accent_classify(pipe, audio_path):
audio_df = split_audio(audio_path)
return pipe(np.concatenate(audio_df["audio"][:50].to_list()))[0]
st.set_page_config(page_title="Accent Classifier", layout="centered")
st.title("πŸŽ™οΈ English Accent Classifier")
st.markdown("Upload a video link and get the English accent with confidence.")
st.subheader("1. Upload a Video File")
uploaded_file = st.file_uploader("Choose a video file", type=["mp4", "mov", "avi"])
st.subheader("2. Or Enter a Video URL")
video_url = st.text_input("Paste a public video URL (Loom, or MP4):")
if st.button("Analyze"):
video_path = None
output_dir="/app/tmp"
os.makedirs(output_dir, exist_ok=True)
if uploaded_file:
video_path = os.path.join(output_dir, "video.mp4")
with open(video_path, "wb") as f:
f.write(uploaded_file.read())
st.success("βœ… Video uploaded successfully.")
elif video_url.strip():
with st.spinner("Downloading video from URL..."):
try:
video_path = download_video(video_url)
except Exception as e:
st.error(f"❌ Failed to download video: {e}")
else:
st.success(f"βœ… Video downloaded: {video_path}")
else:
st.warning("⚠️ Please upload a video file or enter a valid URL.")
if video_path and os.path.exists(video_path):
st.write("Exists:", os.path.exists(video_path))
with st.spinner("Extracting audio..."):
audio_path = extract_audio(video_path)
st.write("Audio saved at:", audio_path)
st.write("Exists:", os.path.exists(audio_path))
with st.spinner("Transcribing with Whisper..."):
whisper_model = whisper.load_model("base")
result = whisper_model.transcribe(audio_path)
transcription = result['text']
transcription = "Hello There"
pass
with st.spinner("Classifying accent..."):
# model_name = "dima806/english_accents_classification"
# pipe = pipeline('audio-classification', model=model_name, device=-1) # GPU (device=0) or CPU (device=-1)
# accent_data = accent_classify(pipe, audio_path)
audio_df = split_audio(audio_path)
waves = f"{np.concatenate(audio_df["audio"][:5].to_list())}"
st.markdown("**Audio waves:**")
st.text_area("Audio waves", waves, height=200)
# audio_df = split_audio(audio_path)
# print(np.concatenate(audio_df["audio"][:50].to_list()))
accent_data = {"label": "American", "score": 0.9}
accent = accent_data.get("label", "American")
confidence = accent_data.get("score", 0.0)
# pass
st.success("Analysis Complete!")
st.markdown(f"**Accent:** {accent}")
st.markdown(f"**Confidence Score:** {confidence:.2f}%")
st.markdown("**Transcription:**")
st.text_area("Transcript", transcription, height=200)
# Cleanup
os.remove(video_path)
os.remove(audio_path)