Spaces:
Running
Running
File size: 7,199 Bytes
18b21ee e888ead 18b21ee f334b99 26fc80f e888ead a2ed037 18b21ee a2ed037 18b21ee 779d79b b407959 a2ed037 692769a 8505a8f 895c600 b407959 a2ed037 779d79b b407959 b6fdfee 8505a8f ac5f4c0 b407959 8505a8f b407959 c699992 b407959 a2ed037 b407959 c699992 b407959 18b21ee 40dfec3 18b21ee 8c34a9f 18b21ee b407959 18b21ee b407959 18b21ee 0011522 18b21ee b407959 18b21ee b407959 18b21ee e888ead 18b21ee |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
import os
import gradio as gr
import torch
import nemo.collections.asr as nemo_asr
from omegaconf import OmegaConf
import time
import spaces
import librosa
# Important: Don't initialize CUDA in the main process for Spaces
# The model will be loaded in the worker process through the GPU decorator
model = None
def load_model():
# This function will be called in the GPU worker process
global model
if model is None:
print(f"Loading model in worker process")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"CUDA device: {torch.cuda.get_device_name(0)}")
model = nemo_asr.models.EncDecRNNTBPEModel.from_pretrained("nvidia/parakeet-tdt-0.6b-v2")
print(f"Model loaded on device: {model.device}")
return model
@spaces.GPU(duration=120)
def transcribe(audio, state="", audio_buffer=None, last_processed_time=0):
# Load the model inside the GPU worker process
import numpy as np
import soundfile as sf
import librosa
import os
model = load_model()
if audio_buffer is None:
audio_buffer = []
if audio is None or isinstance(audio, int):
print(f"Skipping invalid audio input: {type(audio)}")
return state, state, audio_buffer, last_processed_time
print(f"Received audio input of type: {type(audio)}")
if isinstance(audio, tuple) and len(audio) == 2 and isinstance(audio[1], np.ndarray):
sample_rate, audio_data = audio
print(f"Sample rate: {sample_rate}, Audio shape: {audio_data.shape}")
# Append chunk to buffer
audio_buffer.append(audio_data)
# Calculate total duration in seconds
total_samples = sum(arr.shape[0] for arr in audio_buffer)
total_duration = total_samples / sample_rate
print(f"Total buffered duration: {total_duration:.2f}s")
# Process 3-second chunks with 1-second step size (2-second overlap)
chunk_duration = 3.0 # seconds
step_size = 1.0 # seconds
min_samples = int(chunk_duration * 16000) # 3s at 16kHz
if total_duration < chunk_duration:
print(f"Buffering audio, total duration: {total_duration:.2f}s")
return state, state, audio_buffer, last_processed_time
try:
# Concatenate buffered chunks
full_audio = np.concatenate(audio_buffer)
# Resample to 16kHz if needed
if sample_rate != 16000:
print(f"Resampling from {sample_rate}Hz to 16000Hz")
full_audio = librosa.resample(full_audio.astype(float), orig_sr=sample_rate, target_sr=16000)
sample_rate = 16000
else:
full_audio = full_audio.astype(float)
# Process 3-second chunks
new_state = state
current_time = last_processed_time
total_samples_16k = len(full_audio)
while current_time + chunk_duration <= total_duration:
start_sample = int(current_time * sample_rate)
end_sample = int((current_time + chunk_duration) * sample_rate)
if end_sample > total_samples_16k:
break
chunk = full_audio[start_sample:end_sample]
print(f"Processing chunk from {current_time:.2f}s to {current_time + chunk_duration:.2f}s")
# Save to temporary WAV file
temp_file = "temp_audio.wav"
sf.write(temp_file, chunk, samplerate=16000)
# Transcribe
hypothesis = model.transcribe([temp_file])[0]
transcription = hypothesis.text
print(f"Transcription: {transcription}")
os.remove(temp_file)
print("Temporary file removed.")
# Append transcription if non-empty
if transcription.strip():
new_state = new_state + " " + transcription if new_state else transcription
current_time += step_size
# Update last processed time
last_processed_time = current_time
# Trim buffer to keep only unprocessed audio
keep_samples = int((total_duration - current_time) * sample_rate)
if keep_samples > 0:
audio_buffer = [full_audio[-keep_samples:]]
else:
audio_buffer = []
print(f"New state: {new_state}")
return new_state, new_state, audio_buffer, last_processed_time
except Exception as e:
print(f"Error processing audio: {e}")
return state, state, audio_buffer, last_processed_time
print(f"Invalid audio input format: {type(audio)}")
return state, state, audio_buffer, last_processed_time
# Define the Gradio interface
with gr.Blocks(title="Real-time Speech-to-Text with NeMo") as demo:
gr.Markdown("# ๐๏ธ Real-time Speech-to-Text Transcription")
gr.Markdown("Powered by NVIDIA NeMo and the parakeet-tdt-0.6b-v2 model")
with gr.Row():
with gr.Column(scale=2):
audio_input = gr.Audio(
sources=["microphone"],
type="numpy",
streaming=True,
label="Speak into your microphone"
)
clear_btn = gr.Button("Clear Transcript")
with gr.Column(scale=3):
text_output = gr.Textbox(
label="Transcription",
placeholder="Your speech will appear here...",
lines=10
)
streaming_text = gr.Textbox(
label="Real-time Transcription",
placeholder="Real-time results will appear here...",
lines=2
)
# State to store the ongoing transcription
state = gr.State("")
audio_buffer = gr.State(value=None)
last_processed_time = gr.State(value=0)
# Handle the audio stream
audio_input.stream(
fn=transcribe,
inputs=[audio_input, state, audio_buffer, last_processed_time],
outputs=[state, streaming_text, audio_buffer, last_processed_time],
)
# Clear the transcription
def clear_transcription():
return "", "", None, 0
clear_btn.click(
fn=clear_transcription,
inputs=[],
outputs=[text_output, streaming_text, audio_buffer, last_processed_time]
)
# Update the main text output when the state changes
state.change(
fn=lambda s: s,
inputs=[state],
outputs=[text_output]
)
gr.Markdown("## ๐ Instructions")
gr.Markdown("""
1. Click the microphone button to start recording
2. Speak clearly into your microphone
3. The transcription will appear in real-time
4. Click 'Clear Transcript' to start a new transcription
""")
# Launch the app
if __name__ == "__main__":
demo.launch()
|