import os import streamlit as st import tempfile import base64 import time from datetime import datetime from hf_transcriber import HFTranscriber from huggingface_hub import login from dotenv import load_dotenv, find_dotenv # Set page config first st.set_page_config( page_title="đŸŽĩ Audio to Sheet Music Transcriber", page_icon="đŸŽĩ", layout="wide" ) # Load environment variables env_path = find_dotenv() if env_path: load_dotenv(env_path) # Hugging Face authentication HUGGINGFACE_TOKEN = os.environ.get('HUGGINGFACE_TOKEN') or os.environ.get('HF_TOKEN') or (st.secrets.get('HUGGINGFACE_TOKEN') if st.secrets.has_key('HUGGINGFACE_TOKEN') else None) or (st.secrets.get('HF_TOKEN') if st.secrets.has_key('HF_TOKEN') else None) # Check if we have a valid token if HUGGINGFACE_TOKEN and HUGGINGFACE_TOKEN.startswith('hf_'): try: login(token=HUGGINGFACE_TOKEN, add_to_git_credential=False) st.sidebar.success("✅ Authenticated with Hugging Face") except Exception as e: st.sidebar.warning(f"âš ī¸ Using public models (rate limited): {str(e)}") HUGGINGFACE_TOKEN = None # Reset token if invalid else: st.sidebar.warning(""" âš ī¸ No Hugging Face token found. Using public models (rate limited). To use your own token: 1. Get your token from [Hugging Face Settings](https://huggingface.co/settings/tokens) 2. Add it to your Space's secrets as `HUGGINGFACE_TOKEN` """) # Configuration dictionary to store app settings app_config = { 'RECORDING_ENABLED': False, 'AUDIO_DEVICES': [], 'DEFAULT_MODEL': 'openai/whisper-small' # Default model for transcription } def init_recording(): """Initialize recording capability and return status.""" try: # Try to import recording-related modules from recorder import AudioRecorder, list_audio_devices # Update config with recording components app_config['AudioRecorder'] = AudioRecorder app_config['list_audio_devices'] = list_audio_devices # Try to list audio devices to verify everything works try: devices = list_audio_devices() app_config['AUDIO_DEVICES'] = devices if not devices or not any(d.get('max_input_channels', 0) > 0 for d in devices): app_config['RECORDING_ENABLED'] = False else: app_config['RECORDING_ENABLED'] = True return True except Exception as e: app_config['RECORDING_ENABLED'] = False app_config['AUDIO_DEVICES'] = [] return False except ImportError: app_config['RECORDING_ENABLED'] = False return False except Exception: app_config['RECORDING_ENABLED'] = False return False # Initialize recording capability init_recording() def get_binary_file_downloader_html(bin_file, file_label='File'): """Generate a link to download the given file.""" with open(bin_file, 'rb') as f: data = f.read() bin_str = base64.b64encode(data).decode() href = f'Download {file_label}' return href def render_file_uploader(): """Render the file uploader component.""" st.info("â„šī¸ Please upload an audio file for transcription (WAV, MP3, or OGG format)") return st.file_uploader( "Choose an audio file", type=["wav", "mp3", "ogg"], help="Select an audio file to transcribe (max 30MB)", key="file_uploader" ) def save_uploaded_file(uploaded_file): """Save uploaded file to a temporary file and return the path.""" try: #Validate file type allowed_types = ["wav", "mp3", "ogg"] file_ext = os.path.splitext(uploaded_file.name)[1].lower() if file_ext not in allowed_types: raise ValueError(f"Unsupported file type: {file_ext}. Allowed: {', '.join(allowed_types)}") #Create temp file with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_ext}") as tmp_file: #Read files in chunks to handle large files for chunk in uploaded_file.chunks(4096): tmp_file.write(chunk) return tmp_file.name except Exception as e: st.error(f"Error saving file: {str(e)}") if 'tmp_file' in locals() and os.path.exists(tmp_file.name): try: os.unlink(tmp_file.name) except: pass raise def transcribe_audio(file_path, model_name): """Transcribe audio using the specified model.""" try: #debug info st.sidebar.info("â„šī¸ Starting transcription...") # Debug: Show authentication status hf_token = os.getenv('HUGGINGFACE_TOKEN') or os.getenv('HF_TOKEN') st.sidebar.info(f"🔑 Using model: {model_name}") st.sidebar.info(f"🔑 Token present: {'Yes' if hf_token else 'No'}") transcriber = HFTranscriber(model_name=model_name) # Read the audio file try: audio_data, sample_rate = librosa.load(file_path, sr=16000, mono=True) st.sidebar.info(f"🔊 Loaded audio: {len(audio_data)/sample_rate:.2f} seconds @ {sample_rate}Hz") except Exception as e: raise Exception(f"❌ Error loading audio file: {str(e)}") # Transcribe with st.spinner("🔄 Transcribing audio..."): result = transcriber.transcribe_audio(audio_data, sample_rate) if not result or 'text' not in result: raise ValueError("❌ No transcription results returned. The model might not be accessible.") return result except Exception as e: st.sidebar.error("❌ Transcription failed: {str(e)}") st.sidebar.error("This might be due to:") st.sidebar.error("1. Invalid or missing Hugging Face token") st.sidebar.error("2. Insufficient permissions for the model") st.sidebar.error("3. Network connectivity issues") st.sidebar.error("4. Model not found or not accessible") raise # Add debug info st.sidebar.error("🔍 Debug Info:") st.sidebar.json({ "model": model_name, "token_present": bool(hf_token), "token_prefix": hf_token[:8] + '...' if hf_token else None, "error": str(e) }) return None def record_audio(): """Handle audio recording functionality.""" st.header("🎤 Record Audio") if not app_config['RECORDING_ENABLED']: st.warning("Audio recording is not available on this device.") return AudioRecorder = app_config['AudioRecorder'] if 'recorder' not in st.session_state: st.session_state.recorder = AudioRecorder() col1, col2 = st.columns(2) with col1: if st.button("🎤 Start Recording"): st.session_state.recorder.start() st.session_state.recording = True st.experimental_rerun() with col2: if st.button("âšī¸ Stop Recording") and st.session_state.get('recording', False): audio_data = st.session_state.recorder.stop() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = os.path.join("outputs", f"recording_{timestamp}.wav") os.makedirs("outputs", exist_ok=True) audio_data.export(output_file, format="wav") st.session_state.recorded_file = output_file st.session_state.recording = False st.experimental_rerun() if st.session_state.get('recording', False): st.warning("Recording in progress... Click 'Stop Recording' when finished.") if 'recorded_file' in st.session_state and os.path.exists(st.session_state.recorded_file): st.audio(st.session_state.recorded_file) return st.session_state.recorded_file return None # Add this before your main() function def test_hf_connection(): from transformers import pipeline try: pipe = pipeline("text-classification", model="distilbert-base-uncased-finetuned-sst-2-english") result = pipe("This is a test") st.sidebar.success("✅ Connection to Hugging Face successful!") st.sidebar.json(result[0]) except Exception as e: st.sidebar.error(f"❌ Connection failed: {str(e)}") def main(): st.title("đŸŽĩ Audio to Sheet Music Transcriber") st.markdown("### Record or upload audio for transcription") # Model selection in sidebar with st.sidebar: st.header("🔧 Settings") # Model selection model_options = { "Whisper Small (Recommended)": "openai/whisper-small", "Whisper Tiny": "openai/whisper-tiny", "Whisper Base": "openai/whisper-base", "Wav2Vec2 Base": "facebook/wav2vec2-base-960h" } selected_model = st.selectbox( "Select Model", options=list(model_options.keys()), index=0, help="Choose the transcription model. Whisper models generally provide better accuracy." ) model_name = model_options[selected_model] # Main content area - Tabs for different input methods if app_config['RECORDING_ENABLED']: tab1, tab2 = st.tabs(["🎤 Record Audio", "📁 Upload File"]) recorded_file = None uploaded_file = None with tab1: recorded_file = record_audio() with tab2: uploaded_file = render_file_uploader() else: # If recording is not available, just show the file uploader st.info("🎤 Audio recording is not available in this environment. Please upload an audio file instead.") uploaded_file = render_file_uploader() recorded_file = None if uploaded_file is not None or recorded_file: with st.spinner("Processing audio..."): try: # Get the file path (either recorded or uploaded) if recorded_file: temp_file_path = recorded_file file_ext = os.path.splitext(temp_file_path)[1][1:] else: temp_file_path = save_uploaded_file(uploaded_file) file_ext = os.path.splitext(uploaded_file.name)[1][1:] # Display the audio player st.audio(temp_file_path, format=f'audio/{file_ext}') # Show file info file_size = os.path.getsize(temp_file_path) / (1024 * 1024) # in MB st.info(f"📂 Processing: {os.path.basename(temp_file_path)} ({file_size:.2f} MB)") except Exception as e: st.error(f"Error processing uploaded file: {str(e)}") if 'temp_file_path' in locals() and os.path.exists(temp_file_path): try: os.remove(temp_file_path) except: pass # Transcription Section if uploaded_file is not None: audio_file = temp_file_path # Add model selection model_options = { "Whisper Small": "openai/whisper-small", "Whisper Tiny": "openai/whisper-tiny", "Whisper Base": "openai/whisper-base", "Wav2Vec2 Base": "facebook/wav2vec2-base-960h", "SpeechT5": "microsoft/speecht5_asr" } selected_model = st.selectbox( "Select Transcription Model", options=list(model_options.keys()), index=0 ) if st.button("đŸŽĩ Transcribe Audio"): try: with st.spinner("Transcribing audio..."): # Initialize the transcriber with the selected model model_name = model_options[selected_model] transcriber = HFTranscriber(model_name=model_name) # Create output directory os.makedirs("outputs", exist_ok=True) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") try: # Transcribe the audio result = transcriber.transcribe_audio(audio_file) # Save transcription to file txt_output = os.path.join("outputs", f"transcription_{timestamp}.txt") with open(txt_output, 'w', encoding='utf-8') as f: f.write(result['transcription']) # Show success message st.success("đŸŽĩ Transcription completed successfully!") # Display the transcription st.markdown("### Transcription Result") st.text_area("Transcription", result['transcription'], height=200) # Show download link st.markdown("### Download Results") st.download_button( label="Download Transcription", data=result['transcription'], file_name=f"transcription_{timestamp}.txt", mime="text/plain" ) # If we have timing information, show it if 'word_timestamps' in result and result['word_timestamps']: st.markdown("### Word-level Timestamps") st.json(result['word_timestamps']) st.markdown(f"**MIDI File:** {get_binary_file_downloader_html(midi_output, 'Download MIDI')}", unsafe_allow_html=True) # Display MusicXML file if os.path.exists(musicxml_output): st.markdown(f"**MusicXML File:** {get_binary_file_downloader_html(musicxml_output, 'Download MusicXML')}", unsafe_allow_html=True) # Display a preview of the transcription if possible try: from IPython.display import display, Audio audio = Audio(audio_file) st.audio(audio_file, format='audio/wav') except Exception as e: st.warning(f"Could not display audio preview: {str(e)}") except Exception as e: st.error(f"❌ Error during transcription: {str(e)}") st.exception(e) # Show full traceback for debugging except Exception as e: st.error(f"An error occurred during transcription setup: {str(e)}") st.exception(e) # Show full traceback for debugging # Clean up temporary files if os.path.exists("temp_uploads"): for file in os.listdir("temp_uploads"): try: os.remove(os.path.join("temp_uploads", file)) except: pass def clean_up_recordings(keep_last=5): """Clean up old recording files, keeping only the most recent ones.""" try: # Use absolute path for the recordings directory recordings_dir = os.path.abspath("recordings") # Ensure the recordings directory exists os.makedirs(recordings_dir, exist_ok=True) # Get all wav files and sort by modification time recordings = sorted( glob.glob(os.path.join(recordings_dir, "*.wav")), key=os.path.getmtime, reverse=True ) # Remove old recordings, keeping the specified number for old_recording in recordings[keep_last:]: try: if os.path.exists(old_recording): os.remove(old_recording) print(f"Removed old recording: {old_recording}") except Exception as e: print(f"Error removing {old_recording}: {e}") except Exception as e: print(f"Error in clean_up_recordings: {e}") if __name__ == "__main__": # Create necessary directories os.makedirs("outputs", exist_ok=True) # Run the app main() # Add footer st.markdown("---") st.markdown("### About") st.markdown(""" This app uses Hugging Face's Transformers library for speech-to-text transcription. Models are loaded on-demand and require an internet connection. **Note:** This version supports both file uploads and live recording (if your device supports it). """)