Spaces:
Running
Running
| import os | |
| import streamlit as st | |
| import tempfile | |
| import base64 | |
| import time | |
| from datetime import datetime | |
| from hf_transcriber import HFTranscriber | |
| from huggingface_hub import login | |
| from dotenv import load_dotenv, find_dotenv | |
| # Set page config first | |
| st.set_page_config( | |
| page_title="π΅ Audio to Sheet Music Transcriber", | |
| page_icon="π΅", | |
| layout="wide" | |
| ) | |
| # Load environment variables | |
| env_path = find_dotenv() | |
| if env_path: | |
| load_dotenv(env_path) | |
| # Hugging Face authentication | |
| HUGGINGFACE_TOKEN = os.environ.get('HUGGINGFACE_TOKEN') or os.environ.get('HF_TOKEN') or (st.secrets.get('HUGGINGFACE_TOKEN') if st.secrets.has_key('HUGGINGFACE_TOKEN') else None) or (st.secrets.get('HF_TOKEN') if st.secrets.has_key('HF_TOKEN') else None) | |
| # Check if we have a valid token | |
| if HUGGINGFACE_TOKEN and HUGGINGFACE_TOKEN.startswith('hf_'): | |
| try: | |
| login(token=HUGGINGFACE_TOKEN, add_to_git_credential=False) | |
| st.sidebar.success("β Authenticated with Hugging Face") | |
| except Exception as e: | |
| st.sidebar.warning(f"β οΈ Using public models (rate limited): {str(e)}") | |
| HUGGINGFACE_TOKEN = None # Reset token if invalid | |
| else: | |
| st.sidebar.warning(""" | |
| β οΈ No Hugging Face token found. Using public models (rate limited). | |
| To use your own token: | |
| 1. Get your token from [Hugging Face Settings](https://huggingface.co/settings/tokens) | |
| 2. Add it to your Space's secrets as `HUGGINGFACE_TOKEN` | |
| """) | |
| # Configuration dictionary to store app settings | |
| app_config = { | |
| 'RECORDING_ENABLED': False, | |
| 'AUDIO_DEVICES': [], | |
| 'DEFAULT_MODEL': 'openai/whisper-small' # Default model for transcription | |
| } | |
| def init_recording(): | |
| """Initialize recording capability and return status.""" | |
| try: | |
| # Try to import recording-related modules | |
| from recorder import AudioRecorder, list_audio_devices | |
| # Update config with recording components | |
| app_config['AudioRecorder'] = AudioRecorder | |
| app_config['list_audio_devices'] = list_audio_devices | |
| # Try to list audio devices to verify everything works | |
| try: | |
| devices = list_audio_devices() | |
| app_config['AUDIO_DEVICES'] = devices | |
| if not devices or not any(d.get('max_input_channels', 0) > 0 for d in devices): | |
| app_config['RECORDING_ENABLED'] = False | |
| else: | |
| app_config['RECORDING_ENABLED'] = True | |
| return True | |
| except Exception as e: | |
| app_config['RECORDING_ENABLED'] = False | |
| app_config['AUDIO_DEVICES'] = [] | |
| return False | |
| except ImportError: | |
| app_config['RECORDING_ENABLED'] = False | |
| return False | |
| except Exception: | |
| app_config['RECORDING_ENABLED'] = False | |
| return False | |
| # Initialize recording capability | |
| init_recording() | |
| def get_binary_file_downloader_html(bin_file, file_label='File'): | |
| """Generate a link to download the given file.""" | |
| with open(bin_file, 'rb') as f: | |
| data = f.read() | |
| bin_str = base64.b64encode(data).decode() | |
| href = f'<a href="data:application/octet-stream;base64,{bin_str}" download="{os.path.basename(bin_file)}">Download {file_label}</a>' | |
| return href | |
| def render_file_uploader(): | |
| """Render the file uploader component.""" | |
| st.info("βΉοΈ Please upload an audio file for transcription (WAV, MP3, or OGG format)") | |
| return st.file_uploader( | |
| "Choose an audio file", | |
| type=["wav", "mp3", "ogg"], | |
| help="Select an audio file to transcribe (max 30MB)", | |
| key="file_uploader" | |
| ) | |
| def save_uploaded_file(uploaded_file): | |
| """Save uploaded file to a temporary file and return the path.""" | |
| try: | |
| #Validate file type | |
| allowed_types = ["wav", "mp3", "ogg"] | |
| file_ext = os.path.splitext(uploaded_file.name)[1].lower() | |
| if file_ext not in allowed_types: | |
| raise ValueError(f"Unsupported file type: {file_ext}. Allowed: {', '.join(allowed_types)}") | |
| #Create temp file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_ext}") as tmp_file: | |
| #Read files in chunks to handle large files | |
| for chunk in uploaded_file.chunks(4096): | |
| tmp_file.write(chunk) | |
| return tmp_file.name | |
| except Exception as e: | |
| st.error(f"Error saving file: {str(e)}") | |
| if 'tmp_file' in locals() and os.path.exists(tmp_file.name): | |
| try: | |
| os.unlink(tmp_file.name) | |
| except: | |
| pass | |
| raise | |
| def transcribe_audio(file_path, model_name): | |
| """Transcribe audio using the specified model.""" | |
| try: | |
| #debug info | |
| st.sidebar.info("βΉοΈ Starting transcription...") | |
| # Debug: Show authentication status | |
| hf_token = os.getenv('HUGGINGFACE_TOKEN') or os.getenv('HF_TOKEN') | |
| st.sidebar.info(f"π Using model: {model_name}") | |
| st.sidebar.info(f"π Token present: {'Yes' if hf_token else 'No'}") | |
| transcriber = HFTranscriber(model_name=model_name) | |
| # Read the audio file | |
| try: | |
| audio_data, sample_rate = librosa.load(file_path, sr=16000, mono=True) | |
| st.sidebar.info(f"π Loaded audio: {len(audio_data)/sample_rate:.2f} seconds @ {sample_rate}Hz") | |
| except Exception as e: | |
| raise Exception(f"β Error loading audio file: {str(e)}") | |
| # Transcribe | |
| with st.spinner("π Transcribing audio..."): | |
| result = transcriber.transcribe_audio(audio_data, sample_rate) | |
| if not result or 'text' not in result: | |
| raise ValueError("β No transcription results returned. The model might not be accessible.") | |
| return result | |
| except Exception as e: | |
| st.sidebar.error("β Transcription failed: {str(e)}") | |
| st.sidebar.error("This might be due to:") | |
| st.sidebar.error("1. Invalid or missing Hugging Face token") | |
| st.sidebar.error("2. Insufficient permissions for the model") | |
| st.sidebar.error("3. Network connectivity issues") | |
| st.sidebar.error("4. Model not found or not accessible") | |
| raise | |
| # Add debug info | |
| st.sidebar.error("π Debug Info:") | |
| st.sidebar.json({ | |
| "model": model_name, | |
| "token_present": bool(hf_token), | |
| "token_prefix": hf_token[:8] + '...' if hf_token else None, | |
| "error": str(e) | |
| }) | |
| return None | |
| def record_audio(): | |
| """Handle audio recording functionality.""" | |
| st.header("π€ Record Audio") | |
| if not app_config['RECORDING_ENABLED']: | |
| st.warning("Audio recording is not available on this device.") | |
| return | |
| AudioRecorder = app_config['AudioRecorder'] | |
| if 'recorder' not in st.session_state: | |
| st.session_state.recorder = AudioRecorder() | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("π€ Start Recording"): | |
| st.session_state.recorder.start() | |
| st.session_state.recording = True | |
| st.experimental_rerun() | |
| with col2: | |
| if st.button("βΉοΈ Stop Recording") and st.session_state.get('recording', False): | |
| audio_data = st.session_state.recorder.stop() | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| output_file = os.path.join("outputs", f"recording_{timestamp}.wav") | |
| os.makedirs("outputs", exist_ok=True) | |
| audio_data.export(output_file, format="wav") | |
| st.session_state.recorded_file = output_file | |
| st.session_state.recording = False | |
| st.experimental_rerun() | |
| if st.session_state.get('recording', False): | |
| st.warning("Recording in progress... Click 'Stop Recording' when finished.") | |
| if 'recorded_file' in st.session_state and os.path.exists(st.session_state.recorded_file): | |
| st.audio(st.session_state.recorded_file) | |
| return st.session_state.recorded_file | |
| return None | |
| # Add this before your main() function | |
| def test_hf_connection(): | |
| from transformers import pipeline | |
| try: | |
| pipe = pipeline("text-classification", model="distilbert-base-uncased-finetuned-sst-2-english") | |
| result = pipe("This is a test") | |
| st.sidebar.success("β Connection to Hugging Face successful!") | |
| st.sidebar.json(result[0]) | |
| except Exception as e: | |
| st.sidebar.error(f"β Connection failed: {str(e)}") | |
| def main(): | |
| st.title("π΅ Audio to Sheet Music Transcriber") | |
| st.markdown("### Record or upload audio for transcription") | |
| # Model selection in sidebar | |
| with st.sidebar: | |
| st.header("π§ Settings") | |
| # Model selection | |
| model_options = { | |
| "Whisper Small (Recommended)": "openai/whisper-small", | |
| "Whisper Tiny": "openai/whisper-tiny", | |
| "Whisper Base": "openai/whisper-base", | |
| "Wav2Vec2 Base": "facebook/wav2vec2-base-960h" | |
| } | |
| selected_model = st.selectbox( | |
| "Select Model", | |
| options=list(model_options.keys()), | |
| index=0, | |
| help="Choose the transcription model. Whisper models generally provide better accuracy." | |
| ) | |
| model_name = model_options[selected_model] | |
| # Main content area - Tabs for different input methods | |
| if app_config['RECORDING_ENABLED']: | |
| tab1, tab2 = st.tabs(["π€ Record Audio", "π Upload File"]) | |
| recorded_file = None | |
| uploaded_file = None | |
| with tab1: | |
| recorded_file = record_audio() | |
| with tab2: | |
| uploaded_file = render_file_uploader() | |
| else: | |
| # If recording is not available, just show the file uploader | |
| st.info("π€ Audio recording is not available in this environment. Please upload an audio file instead.") | |
| uploaded_file = render_file_uploader() | |
| recorded_file = None | |
| if uploaded_file is not None or recorded_file: | |
| with st.spinner("Processing audio..."): | |
| try: | |
| # Get the file path (either recorded or uploaded) | |
| if recorded_file: | |
| temp_file_path = recorded_file | |
| file_ext = os.path.splitext(temp_file_path)[1][1:] | |
| else: | |
| temp_file_path = save_uploaded_file(uploaded_file) | |
| file_ext = os.path.splitext(uploaded_file.name)[1][1:] | |
| # Display the audio player | |
| st.audio(temp_file_path, format=f'audio/{file_ext}') | |
| # Show file info | |
| file_size = os.path.getsize(temp_file_path) / (1024 * 1024) # in MB | |
| st.info(f"π Processing: {os.path.basename(temp_file_path)} ({file_size:.2f} MB)") | |
| except Exception as e: | |
| st.error(f"Error processing uploaded file: {str(e)}") | |
| if 'temp_file_path' in locals() and os.path.exists(temp_file_path): | |
| try: | |
| os.remove(temp_file_path) | |
| except: | |
| pass | |
| # Transcription Section | |
| if uploaded_file is not None: | |
| audio_file = temp_file_path | |
| # Add model selection | |
| model_options = { | |
| "Whisper Small": "openai/whisper-small", | |
| "Whisper Tiny": "openai/whisper-tiny", | |
| "Whisper Base": "openai/whisper-base", | |
| "Wav2Vec2 Base": "facebook/wav2vec2-base-960h", | |
| "SpeechT5": "microsoft/speecht5_asr" | |
| } | |
| selected_model = st.selectbox( | |
| "Select Transcription Model", | |
| options=list(model_options.keys()), | |
| index=0 | |
| ) | |
| if st.button("π΅ Transcribe Audio"): | |
| try: | |
| with st.spinner("Transcribing audio..."): | |
| # Initialize the transcriber with the selected model | |
| model_name = model_options[selected_model] | |
| transcriber = HFTranscriber(model_name=model_name) | |
| # Create output directory | |
| os.makedirs("outputs", exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| try: | |
| # Transcribe the audio | |
| result = transcriber.transcribe_audio(audio_file) | |
| # Save transcription to file | |
| txt_output = os.path.join("outputs", f"transcription_{timestamp}.txt") | |
| with open(txt_output, 'w', encoding='utf-8') as f: | |
| f.write(result['transcription']) | |
| # Show success message | |
| st.success("π΅ Transcription completed successfully!") | |
| # Display the transcription | |
| st.markdown("### Transcription Result") | |
| st.text_area("Transcription", result['transcription'], height=200) | |
| # Show download link | |
| st.markdown("### Download Results") | |
| st.download_button( | |
| label="Download Transcription", | |
| data=result['transcription'], | |
| file_name=f"transcription_{timestamp}.txt", | |
| mime="text/plain" | |
| ) | |
| # If we have timing information, show it | |
| if 'word_timestamps' in result and result['word_timestamps']: | |
| st.markdown("### Word-level Timestamps") | |
| st.json(result['word_timestamps']) | |
| st.markdown(f"**MIDI File:** {get_binary_file_downloader_html(midi_output, 'Download MIDI')}", | |
| unsafe_allow_html=True) | |
| # Display MusicXML file | |
| if os.path.exists(musicxml_output): | |
| st.markdown(f"**MusicXML File:** {get_binary_file_downloader_html(musicxml_output, 'Download MusicXML')}", | |
| unsafe_allow_html=True) | |
| # Display a preview of the transcription if possible | |
| try: | |
| from IPython.display import display, Audio | |
| audio = Audio(audio_file) | |
| st.audio(audio_file, format='audio/wav') | |
| except Exception as e: | |
| st.warning(f"Could not display audio preview: {str(e)}") | |
| except Exception as e: | |
| st.error(f"β Error during transcription: {str(e)}") | |
| st.exception(e) # Show full traceback for debugging | |
| except Exception as e: | |
| st.error(f"An error occurred during transcription setup: {str(e)}") | |
| st.exception(e) # Show full traceback for debugging | |
| # Clean up temporary files | |
| if os.path.exists("temp_uploads"): | |
| for file in os.listdir("temp_uploads"): | |
| try: | |
| os.remove(os.path.join("temp_uploads", file)) | |
| except: | |
| pass | |
| def clean_up_recordings(keep_last=5): | |
| """Clean up old recording files, keeping only the most recent ones.""" | |
| try: | |
| # Use absolute path for the recordings directory | |
| recordings_dir = os.path.abspath("recordings") | |
| # Ensure the recordings directory exists | |
| os.makedirs(recordings_dir, exist_ok=True) | |
| # Get all wav files and sort by modification time | |
| recordings = sorted( | |
| glob.glob(os.path.join(recordings_dir, "*.wav")), | |
| key=os.path.getmtime, | |
| reverse=True | |
| ) | |
| # Remove old recordings, keeping the specified number | |
| for old_recording in recordings[keep_last:]: | |
| try: | |
| if os.path.exists(old_recording): | |
| os.remove(old_recording) | |
| print(f"Removed old recording: {old_recording}") | |
| except Exception as e: | |
| print(f"Error removing {old_recording}: {e}") | |
| except Exception as e: | |
| print(f"Error in clean_up_recordings: {e}") | |
| if __name__ == "__main__": | |
| # Create necessary directories | |
| os.makedirs("outputs", exist_ok=True) | |
| # Run the app | |
| main() | |
| # Add footer | |
| st.markdown("---") | |
| st.markdown("### About") | |
| st.markdown(""" | |
| This app uses Hugging Face's Transformers library for speech-to-text transcription. | |
| Models are loaded on-demand and require an internet connection. | |
| **Note:** This version supports both file uploads and live recording (if your device supports it). | |
| """) | |