PatienceIzere's picture
Update app.py
d285286 verified
import os
import streamlit as st
import tempfile
import base64
import time
from datetime import datetime
from hf_transcriber import HFTranscriber
from huggingface_hub import login
from dotenv import load_dotenv, find_dotenv
# Set page config first
st.set_page_config(
page_title="🎡 Audio to Sheet Music Transcriber",
page_icon="🎡",
layout="wide"
)
# Load environment variables
env_path = find_dotenv()
if env_path:
load_dotenv(env_path)
# Hugging Face authentication
HUGGINGFACE_TOKEN = os.environ.get('HUGGINGFACE_TOKEN') or os.environ.get('HF_TOKEN') or (st.secrets.get('HUGGINGFACE_TOKEN') if st.secrets.has_key('HUGGINGFACE_TOKEN') else None) or (st.secrets.get('HF_TOKEN') if st.secrets.has_key('HF_TOKEN') else None)
# Check if we have a valid token
if HUGGINGFACE_TOKEN and HUGGINGFACE_TOKEN.startswith('hf_'):
try:
login(token=HUGGINGFACE_TOKEN, add_to_git_credential=False)
st.sidebar.success("βœ… Authenticated with Hugging Face")
except Exception as e:
st.sidebar.warning(f"⚠️ Using public models (rate limited): {str(e)}")
HUGGINGFACE_TOKEN = None # Reset token if invalid
else:
st.sidebar.warning("""
⚠️ No Hugging Face token found. Using public models (rate limited).
To use your own token:
1. Get your token from [Hugging Face Settings](https://huggingface.co/settings/tokens)
2. Add it to your Space's secrets as `HUGGINGFACE_TOKEN`
""")
# Configuration dictionary to store app settings
app_config = {
'RECORDING_ENABLED': False,
'AUDIO_DEVICES': [],
'DEFAULT_MODEL': 'openai/whisper-small' # Default model for transcription
}
def init_recording():
"""Initialize recording capability and return status."""
try:
# Try to import recording-related modules
from recorder import AudioRecorder, list_audio_devices
# Update config with recording components
app_config['AudioRecorder'] = AudioRecorder
app_config['list_audio_devices'] = list_audio_devices
# Try to list audio devices to verify everything works
try:
devices = list_audio_devices()
app_config['AUDIO_DEVICES'] = devices
if not devices or not any(d.get('max_input_channels', 0) > 0 for d in devices):
app_config['RECORDING_ENABLED'] = False
else:
app_config['RECORDING_ENABLED'] = True
return True
except Exception as e:
app_config['RECORDING_ENABLED'] = False
app_config['AUDIO_DEVICES'] = []
return False
except ImportError:
app_config['RECORDING_ENABLED'] = False
return False
except Exception:
app_config['RECORDING_ENABLED'] = False
return False
# Initialize recording capability
init_recording()
def get_binary_file_downloader_html(bin_file, file_label='File'):
"""Generate a link to download the given file."""
with open(bin_file, 'rb') as f:
data = f.read()
bin_str = base64.b64encode(data).decode()
href = f'<a href="data:application/octet-stream;base64,{bin_str}" download="{os.path.basename(bin_file)}">Download {file_label}</a>'
return href
def render_file_uploader():
"""Render the file uploader component."""
st.info("ℹ️ Please upload an audio file for transcription (WAV, MP3, or OGG format)")
return st.file_uploader(
"Choose an audio file",
type=["wav", "mp3", "ogg"],
help="Select an audio file to transcribe (max 30MB)",
key="file_uploader"
)
def save_uploaded_file(uploaded_file):
"""Save uploaded file to a temporary file and return the path."""
try:
#Validate file type
allowed_types = ["wav", "mp3", "ogg"]
file_ext = os.path.splitext(uploaded_file.name)[1].lower()
if file_ext not in allowed_types:
raise ValueError(f"Unsupported file type: {file_ext}. Allowed: {', '.join(allowed_types)}")
#Create temp file
with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_ext}") as tmp_file:
#Read files in chunks to handle large files
for chunk in uploaded_file.chunks(4096):
tmp_file.write(chunk)
return tmp_file.name
except Exception as e:
st.error(f"Error saving file: {str(e)}")
if 'tmp_file' in locals() and os.path.exists(tmp_file.name):
try:
os.unlink(tmp_file.name)
except:
pass
raise
def transcribe_audio(file_path, model_name):
"""Transcribe audio using the specified model."""
try:
#debug info
st.sidebar.info("ℹ️ Starting transcription...")
# Debug: Show authentication status
hf_token = os.getenv('HUGGINGFACE_TOKEN') or os.getenv('HF_TOKEN')
st.sidebar.info(f"πŸ”‘ Using model: {model_name}")
st.sidebar.info(f"πŸ”‘ Token present: {'Yes' if hf_token else 'No'}")
transcriber = HFTranscriber(model_name=model_name)
# Read the audio file
try:
audio_data, sample_rate = librosa.load(file_path, sr=16000, mono=True)
st.sidebar.info(f"πŸ”Š Loaded audio: {len(audio_data)/sample_rate:.2f} seconds @ {sample_rate}Hz")
except Exception as e:
raise Exception(f"❌ Error loading audio file: {str(e)}")
# Transcribe
with st.spinner("πŸ”„ Transcribing audio..."):
result = transcriber.transcribe_audio(audio_data, sample_rate)
if not result or 'text' not in result:
raise ValueError("❌ No transcription results returned. The model might not be accessible.")
return result
except Exception as e:
st.sidebar.error("❌ Transcription failed: {str(e)}")
st.sidebar.error("This might be due to:")
st.sidebar.error("1. Invalid or missing Hugging Face token")
st.sidebar.error("2. Insufficient permissions for the model")
st.sidebar.error("3. Network connectivity issues")
st.sidebar.error("4. Model not found or not accessible")
raise
# Add debug info
st.sidebar.error("πŸ” Debug Info:")
st.sidebar.json({
"model": model_name,
"token_present": bool(hf_token),
"token_prefix": hf_token[:8] + '...' if hf_token else None,
"error": str(e)
})
return None
def record_audio():
"""Handle audio recording functionality."""
st.header("🎀 Record Audio")
if not app_config['RECORDING_ENABLED']:
st.warning("Audio recording is not available on this device.")
return
AudioRecorder = app_config['AudioRecorder']
if 'recorder' not in st.session_state:
st.session_state.recorder = AudioRecorder()
col1, col2 = st.columns(2)
with col1:
if st.button("🎀 Start Recording"):
st.session_state.recorder.start()
st.session_state.recording = True
st.experimental_rerun()
with col2:
if st.button("⏹️ Stop Recording") and st.session_state.get('recording', False):
audio_data = st.session_state.recorder.stop()
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = os.path.join("outputs", f"recording_{timestamp}.wav")
os.makedirs("outputs", exist_ok=True)
audio_data.export(output_file, format="wav")
st.session_state.recorded_file = output_file
st.session_state.recording = False
st.experimental_rerun()
if st.session_state.get('recording', False):
st.warning("Recording in progress... Click 'Stop Recording' when finished.")
if 'recorded_file' in st.session_state and os.path.exists(st.session_state.recorded_file):
st.audio(st.session_state.recorded_file)
return st.session_state.recorded_file
return None
# Add this before your main() function
def test_hf_connection():
from transformers import pipeline
try:
pipe = pipeline("text-classification", model="distilbert-base-uncased-finetuned-sst-2-english")
result = pipe("This is a test")
st.sidebar.success("βœ… Connection to Hugging Face successful!")
st.sidebar.json(result[0])
except Exception as e:
st.sidebar.error(f"❌ Connection failed: {str(e)}")
def main():
st.title("🎡 Audio to Sheet Music Transcriber")
st.markdown("### Record or upload audio for transcription")
# Model selection in sidebar
with st.sidebar:
st.header("πŸ”§ Settings")
# Model selection
model_options = {
"Whisper Small (Recommended)": "openai/whisper-small",
"Whisper Tiny": "openai/whisper-tiny",
"Whisper Base": "openai/whisper-base",
"Wav2Vec2 Base": "facebook/wav2vec2-base-960h"
}
selected_model = st.selectbox(
"Select Model",
options=list(model_options.keys()),
index=0,
help="Choose the transcription model. Whisper models generally provide better accuracy."
)
model_name = model_options[selected_model]
# Main content area - Tabs for different input methods
if app_config['RECORDING_ENABLED']:
tab1, tab2 = st.tabs(["🎀 Record Audio", "πŸ“ Upload File"])
recorded_file = None
uploaded_file = None
with tab1:
recorded_file = record_audio()
with tab2:
uploaded_file = render_file_uploader()
else:
# If recording is not available, just show the file uploader
st.info("🎀 Audio recording is not available in this environment. Please upload an audio file instead.")
uploaded_file = render_file_uploader()
recorded_file = None
if uploaded_file is not None or recorded_file:
with st.spinner("Processing audio..."):
try:
# Get the file path (either recorded or uploaded)
if recorded_file:
temp_file_path = recorded_file
file_ext = os.path.splitext(temp_file_path)[1][1:]
else:
temp_file_path = save_uploaded_file(uploaded_file)
file_ext = os.path.splitext(uploaded_file.name)[1][1:]
# Display the audio player
st.audio(temp_file_path, format=f'audio/{file_ext}')
# Show file info
file_size = os.path.getsize(temp_file_path) / (1024 * 1024) # in MB
st.info(f"πŸ“‚ Processing: {os.path.basename(temp_file_path)} ({file_size:.2f} MB)")
except Exception as e:
st.error(f"Error processing uploaded file: {str(e)}")
if 'temp_file_path' in locals() and os.path.exists(temp_file_path):
try:
os.remove(temp_file_path)
except:
pass
# Transcription Section
if uploaded_file is not None:
audio_file = temp_file_path
# Add model selection
model_options = {
"Whisper Small": "openai/whisper-small",
"Whisper Tiny": "openai/whisper-tiny",
"Whisper Base": "openai/whisper-base",
"Wav2Vec2 Base": "facebook/wav2vec2-base-960h",
"SpeechT5": "microsoft/speecht5_asr"
}
selected_model = st.selectbox(
"Select Transcription Model",
options=list(model_options.keys()),
index=0
)
if st.button("🎡 Transcribe Audio"):
try:
with st.spinner("Transcribing audio..."):
# Initialize the transcriber with the selected model
model_name = model_options[selected_model]
transcriber = HFTranscriber(model_name=model_name)
# Create output directory
os.makedirs("outputs", exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
try:
# Transcribe the audio
result = transcriber.transcribe_audio(audio_file)
# Save transcription to file
txt_output = os.path.join("outputs", f"transcription_{timestamp}.txt")
with open(txt_output, 'w', encoding='utf-8') as f:
f.write(result['transcription'])
# Show success message
st.success("🎡 Transcription completed successfully!")
# Display the transcription
st.markdown("### Transcription Result")
st.text_area("Transcription", result['transcription'], height=200)
# Show download link
st.markdown("### Download Results")
st.download_button(
label="Download Transcription",
data=result['transcription'],
file_name=f"transcription_{timestamp}.txt",
mime="text/plain"
)
# If we have timing information, show it
if 'word_timestamps' in result and result['word_timestamps']:
st.markdown("### Word-level Timestamps")
st.json(result['word_timestamps'])
st.markdown(f"**MIDI File:** {get_binary_file_downloader_html(midi_output, 'Download MIDI')}",
unsafe_allow_html=True)
# Display MusicXML file
if os.path.exists(musicxml_output):
st.markdown(f"**MusicXML File:** {get_binary_file_downloader_html(musicxml_output, 'Download MusicXML')}",
unsafe_allow_html=True)
# Display a preview of the transcription if possible
try:
from IPython.display import display, Audio
audio = Audio(audio_file)
st.audio(audio_file, format='audio/wav')
except Exception as e:
st.warning(f"Could not display audio preview: {str(e)}")
except Exception as e:
st.error(f"❌ Error during transcription: {str(e)}")
st.exception(e) # Show full traceback for debugging
except Exception as e:
st.error(f"An error occurred during transcription setup: {str(e)}")
st.exception(e) # Show full traceback for debugging
# Clean up temporary files
if os.path.exists("temp_uploads"):
for file in os.listdir("temp_uploads"):
try:
os.remove(os.path.join("temp_uploads", file))
except:
pass
def clean_up_recordings(keep_last=5):
"""Clean up old recording files, keeping only the most recent ones."""
try:
# Use absolute path for the recordings directory
recordings_dir = os.path.abspath("recordings")
# Ensure the recordings directory exists
os.makedirs(recordings_dir, exist_ok=True)
# Get all wav files and sort by modification time
recordings = sorted(
glob.glob(os.path.join(recordings_dir, "*.wav")),
key=os.path.getmtime,
reverse=True
)
# Remove old recordings, keeping the specified number
for old_recording in recordings[keep_last:]:
try:
if os.path.exists(old_recording):
os.remove(old_recording)
print(f"Removed old recording: {old_recording}")
except Exception as e:
print(f"Error removing {old_recording}: {e}")
except Exception as e:
print(f"Error in clean_up_recordings: {e}")
if __name__ == "__main__":
# Create necessary directories
os.makedirs("outputs", exist_ok=True)
# Run the app
main()
# Add footer
st.markdown("---")
st.markdown("### About")
st.markdown("""
This app uses Hugging Face's Transformers library for speech-to-text transcription.
Models are loaded on-demand and require an internet connection.
**Note:** This version supports both file uploads and live recording (if your device supports it).
""")