AudioTranscriber / recorder.py
PatienceIzere's picture
Update recorder.py
26f06e6 verified
import sounddevice as sd
import numpy as np
import wave
import os
from datetime import datetime
from typing import Optional, Tuple
import threading
import queue
class AudioRecorder:
def __init__(self, sample_rate: int = 44100, channels: int = 1, device_index: int = None):
"""
Initialize the audio recorder with fallback for environments without audio devices.
Args:
sample_rate (int): Sample rate in Hz (default: 44100)
channels (int): Number of audio channels (1 for mono, 2 for stereo)
device_index (int, optional): Index of the audio input device to use
"""
self.sample_rate = sample_rate
self.channels = channels
self.device_index = device_index
self.recording = False
self.audio_queue = queue.Queue()
self.recording_thread = None
self.recording_data = []
self.stream = None
self.final_audio_data = None
self.has_audio_device = True # Assume we have audio device by default
# Test if we can actually use the audio device
try:
# Try to query devices to see if we have audio support
devices = sd.query_devices()
if not devices:
print("Warning: No audio devices found")
self.has_audio_device = False
elif self.device_index is not None and self.device_index >= len(devices):
print(f"Warning: Device index {self.device_index} is out of range")
self.has_audio_device = False
except Exception as e:
print(f"Warning: Could not query audio devices: {e}")
self.has_audio_device = False
def start_recording(self):
"""Start recording audio from the selected input device."""
if self.recording:
return False
print("Starting recording...")
if not self.has_audio_device:
print("Warning: No audio device available, recording will not capture any audio")
self.recording = True
return True
self.recording = True
self.recording_data = []
self.audio_queue = queue.Queue()
def record():
try:
self.stream = sd.InputStream(
samplerate=self.sample_rate,
channels=self.channels,
callback=self._callback,
dtype='float32',
device=self.device_index,
blocksize=1024
)
with self.stream:
while self.recording:
try:
data = self.audio_queue.get(timeout=0.5)
if data is not None and len(data) > 0:
self.recording_data.append(data)
print(f"Collected {len(data)} frames, total: {sum(len(d) for d in self.recording_data)}")
except queue.Empty:
if not self.recording:
break
continue
except Exception as e:
print(f"Error in recording thread: {e}")
self.recording = False
break
except Exception as e:
print(f"Error in audio stream: {e}")
self.recording = False
self.recording_thread = threading.Thread(target=record, daemon=True)
self.recording_thread.start()
return True
def stop_recording(self) -> Optional[np.ndarray]:
"""Stop recording and return the recorded audio data."""
if not hasattr(self, 'recording') or not self.recording:
print("Not currently recording")
return None
print("Stopping recording...")
self.recording = False
if not self.has_audio_device:
print("No audio device was available during recording")
# Return a small silent audio buffer
self.final_audio_data = np.zeros((44100, self.channels), dtype=np.float32)
return self.final_audio_data
# First, get any remaining data from the queue
queue_data = []
max_attempts = 10 # Prevent infinite loops
attempts = 0
# Keep trying to get data from the queue for a short time
while attempts < max_attempts and (not self.audio_queue.empty() or not queue_data):
try:
while not self.audio_queue.empty():
try:
data = self.audio_queue.get_nowait()
if data is not None and len(data) > 0:
queue_data.append(data)
print(f"Got {len(data)} frames from queue")
except queue.Empty:
break
except Exception as e:
print(f"Error getting data from queue (attempt {attempts + 1}): {e}")
if not queue_data:
time.sleep(0.1) # Small delay to allow more data to arrive
attempts += 1
# Add queued data to recording data
if queue_data:
print(f"Adding {len(queue_data)} chunks from queue")
self.recording_data.extend(queue_data)
# Stop the stream if it's still active
if hasattr(self, 'stream') and self.stream is not None:
try:
print("Stopping audio stream...")
self.stream.stop()
self.stream.close()
except Exception as e:
print(f"Error stopping stream: {e}")
finally:
self.stream = None
# Wait for recording thread to finish with a timeout
if hasattr(self, 'recording_thread') and self.recording_thread and self.recording_thread.is_alive():
print("Waiting for recording thread to finish...")
self.recording_thread.join(timeout=2.0)
if self.recording_thread.is_alive():
print("Warning: Recording thread did not finish cleanly")
# Process the recorded data
if not self.recording_data:
print("No audio data was recorded")
self.final_audio_data = None
return None
try:
print(f"Processing {len(self.recording_data)} audio chunks...")
# Filter out None or invalid chunks
valid_chunks = [chunk for chunk in self.recording_data
if chunk is not None and hasattr(chunk, 'shape') and len(chunk.shape) > 0]
if not valid_chunks:
print("No valid audio chunks found in recording data")
self.final_audio_data = None
return None
# Ensure all chunks have the same shape
min_len = min(chunk.shape[0] for chunk in valid_chunks)
print(f"Trimming all chunks to {min_len} samples")
trimmed_data = [chunk[:min_len] for chunk in valid_chunks]
# Concatenate the trimmed data
print("Concatenating audio data...")
concatenated = np.concatenate(trimmed_data, axis=0)
print(f"Final audio data shape: {concatenated.shape}")
# Store the final data for saving
self.final_audio_data = concatenated
return concatenated
except Exception as e:
print(f"Error processing audio data: {e}")
if hasattr(self, 'recording_data') and self.recording_data:
print(f"Number of chunks: {len(self.recording_data)}")
for i, chunk in enumerate(self.recording_data):
chunk_type = type(chunk)
chunk_shape = chunk.shape if hasattr(chunk, 'shape') else 'No shape'
print(f"Chunk {i}: type={chunk_type}, shape={chunk_shape}")
self.final_audio_data = None
return None
finally:
# Always clean up
self.recording_data = []
self.audio_queue = queue.Queue()
self.final_audio_data = None
def save_recording(self, filename: str = None, format: str = 'wav') -> str:
"""
Save the recorded audio to a file.
Args:
filename (str, optional): Output filename. If None, generates a timestamped filename.
format (str): Output format ('wav' or 'npy')
Returns:
str: Path to the saved file
Raises:
ValueError: If no recording data is available or if there's an error processing the data
"""
# Use the final_audio_data if available, otherwise try to process recording_data
if hasattr(self, 'final_audio_data') and self.final_audio_data is not None:
audio_data = self.final_audio_data
print(f"Using pre-processed audio data with shape: {audio_data.shape}")
elif not hasattr(self, 'recording_data') or not self.recording_data:
error_msg = "No recording data available to save"
print(error_msg)
print(f"Recording state: {'Active' if hasattr(self, 'recording') and self.recording else 'Inactive'}")
print(f"Number of audio chunks: {len(self.recording_data) if hasattr(self, 'recording_data') else 0}")
if hasattr(self, 'stream') and self.stream is not None:
print("Warning: Stream is still active")
raise ValueError(error_msg)
else:
try:
print(f"Processing {len(self.recording_data)} audio chunks...")
# Filter out None or invalid chunks
valid_chunks = [chunk for chunk in self.recording_data
if chunk is not None and hasattr(chunk, 'shape') and len(chunk.shape) > 0]
if not valid_chunks:
raise ValueError("No valid audio chunks found in recording data")
# Ensure all chunks have the same shape
min_len = min(chunk.shape[0] for chunk in valid_chunks)
trimmed_data = [chunk[:min_len] for chunk in valid_chunks]
audio_data = np.concatenate(trimmed_data, axis=0)
print(f"Audio data shape: {audio_data.shape}")
# Cache the processed data
self.final_audio_data = audio_data
except Exception as e:
error_msg = f"Error processing audio data: {e}"
print(error_msg)
if hasattr(self, 'recording_data') and self.recording_data:
print(f"Number of chunks: {len(self.recording_data)}")
for i, chunk in enumerate(self.recording_data):
chunk_type = type(chunk)
chunk_shape = chunk.shape if hasattr(chunk, 'shape') else 'No shape'
print(f"Chunk {i}: type={chunk_type}, shape={chunk_shape}")
raise ValueError(error_msg) from e
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"recording_{timestamp}.{format}"
os.makedirs("recordings", exist_ok=True)
filepath = os.path.join("recordings", filename)
if format.lower() == 'wav':
# Convert float32 to int16 for WAV format
audio_data = (audio_data * 32767).astype(np.int16)
with wave.open(filepath, 'wb') as wf:
wf.setnchannels(self.channels)
wf.setsampwidth(2) # 16-bit
wf.setframerate(self.sample_rate)
wf.writeframes(audio_data.tobytes())
elif format.lower() == 'npy':
np.save(filepath, audio_data)
else:
raise ValueError(f"Unsupported format: {format}")
return filepath
def _callback(self, indata, frames, time, status):
"""Callback function for audio input stream."""
if status:
print(f"Audio input status: {status}")
try:
if self.recording: # Only add data if we're still recording
self.audio_queue.put(indata.copy())
except Exception as e:
print(f"Error in audio callback: {e}")
def list_audio_devices() -> list:
"""
List all available audio input devices with fallback for environments without audio devices.
Returns:
list: A list of dictionaries containing device information
"""
try:
# First, check if we can even query devices
try:
devices = sd.query_devices()
input_devices = []
for i, device in enumerate(devices):
try:
if device.get('max_input_channels', 0) > 0:
input_devices.append({
'id': i,
'name': str(device.get('name', f'Device {i}')), # Ensure name is string
'input_channels': int(device.get('max_input_channels', 1)),
'default_samplerate': int(device.get('default_samplerate', 44100))
})
except (KeyError, TypeError, ValueError) as e:
print(f"Warning: Error processing device {i}: {e}")
continue
if input_devices:
return input_devices
except Exception as e:
print(f"Warning: Could not query audio devices: {e}")
# If we get here, either no input devices found or there was an error
print("No valid input devices found, using default configuration")
return [{
'id': sd.default.device[0] if hasattr(sd, 'default') and hasattr(sd.default, 'device') else 0,
'name': 'Default Device',
'input_channels': 1,
'default_samplerate': 44100
}]
except Exception as e:
print(f"Error in list_audio_devices: {e}")
# Final fallback in case anything else goes wrong
return [{
'id': 0,
'name': 'Fallback Device',
'input_channels': 1,
'default_samplerate': 44100
}]