Spaces:
Running
Running
| import sounddevice as sd | |
| import numpy as np | |
| import wave | |
| import os | |
| from datetime import datetime | |
| from typing import Optional, Tuple | |
| import threading | |
| import queue | |
| class AudioRecorder: | |
| def __init__(self, sample_rate: int = 44100, channels: int = 1, device_index: int = None): | |
| """ | |
| Initialize the audio recorder with fallback for environments without audio devices. | |
| Args: | |
| sample_rate (int): Sample rate in Hz (default: 44100) | |
| channels (int): Number of audio channels (1 for mono, 2 for stereo) | |
| device_index (int, optional): Index of the audio input device to use | |
| """ | |
| self.sample_rate = sample_rate | |
| self.channels = channels | |
| self.device_index = device_index | |
| self.recording = False | |
| self.audio_queue = queue.Queue() | |
| self.recording_thread = None | |
| self.recording_data = [] | |
| self.stream = None | |
| self.final_audio_data = None | |
| self.has_audio_device = True # Assume we have audio device by default | |
| # Test if we can actually use the audio device | |
| try: | |
| # Try to query devices to see if we have audio support | |
| devices = sd.query_devices() | |
| if not devices: | |
| print("Warning: No audio devices found") | |
| self.has_audio_device = False | |
| elif self.device_index is not None and self.device_index >= len(devices): | |
| print(f"Warning: Device index {self.device_index} is out of range") | |
| self.has_audio_device = False | |
| except Exception as e: | |
| print(f"Warning: Could not query audio devices: {e}") | |
| self.has_audio_device = False | |
| def start_recording(self): | |
| """Start recording audio from the selected input device.""" | |
| if self.recording: | |
| return False | |
| print("Starting recording...") | |
| if not self.has_audio_device: | |
| print("Warning: No audio device available, recording will not capture any audio") | |
| self.recording = True | |
| return True | |
| self.recording = True | |
| self.recording_data = [] | |
| self.audio_queue = queue.Queue() | |
| def record(): | |
| try: | |
| self.stream = sd.InputStream( | |
| samplerate=self.sample_rate, | |
| channels=self.channels, | |
| callback=self._callback, | |
| dtype='float32', | |
| device=self.device_index, | |
| blocksize=1024 | |
| ) | |
| with self.stream: | |
| while self.recording: | |
| try: | |
| data = self.audio_queue.get(timeout=0.5) | |
| if data is not None and len(data) > 0: | |
| self.recording_data.append(data) | |
| print(f"Collected {len(data)} frames, total: {sum(len(d) for d in self.recording_data)}") | |
| except queue.Empty: | |
| if not self.recording: | |
| break | |
| continue | |
| except Exception as e: | |
| print(f"Error in recording thread: {e}") | |
| self.recording = False | |
| break | |
| except Exception as e: | |
| print(f"Error in audio stream: {e}") | |
| self.recording = False | |
| self.recording_thread = threading.Thread(target=record, daemon=True) | |
| self.recording_thread.start() | |
| return True | |
| def stop_recording(self) -> Optional[np.ndarray]: | |
| """Stop recording and return the recorded audio data.""" | |
| if not hasattr(self, 'recording') or not self.recording: | |
| print("Not currently recording") | |
| return None | |
| print("Stopping recording...") | |
| self.recording = False | |
| if not self.has_audio_device: | |
| print("No audio device was available during recording") | |
| # Return a small silent audio buffer | |
| self.final_audio_data = np.zeros((44100, self.channels), dtype=np.float32) | |
| return self.final_audio_data | |
| # First, get any remaining data from the queue | |
| queue_data = [] | |
| max_attempts = 10 # Prevent infinite loops | |
| attempts = 0 | |
| # Keep trying to get data from the queue for a short time | |
| while attempts < max_attempts and (not self.audio_queue.empty() or not queue_data): | |
| try: | |
| while not self.audio_queue.empty(): | |
| try: | |
| data = self.audio_queue.get_nowait() | |
| if data is not None and len(data) > 0: | |
| queue_data.append(data) | |
| print(f"Got {len(data)} frames from queue") | |
| except queue.Empty: | |
| break | |
| except Exception as e: | |
| print(f"Error getting data from queue (attempt {attempts + 1}): {e}") | |
| if not queue_data: | |
| time.sleep(0.1) # Small delay to allow more data to arrive | |
| attempts += 1 | |
| # Add queued data to recording data | |
| if queue_data: | |
| print(f"Adding {len(queue_data)} chunks from queue") | |
| self.recording_data.extend(queue_data) | |
| # Stop the stream if it's still active | |
| if hasattr(self, 'stream') and self.stream is not None: | |
| try: | |
| print("Stopping audio stream...") | |
| self.stream.stop() | |
| self.stream.close() | |
| except Exception as e: | |
| print(f"Error stopping stream: {e}") | |
| finally: | |
| self.stream = None | |
| # Wait for recording thread to finish with a timeout | |
| if hasattr(self, 'recording_thread') and self.recording_thread and self.recording_thread.is_alive(): | |
| print("Waiting for recording thread to finish...") | |
| self.recording_thread.join(timeout=2.0) | |
| if self.recording_thread.is_alive(): | |
| print("Warning: Recording thread did not finish cleanly") | |
| # Process the recorded data | |
| if not self.recording_data: | |
| print("No audio data was recorded") | |
| self.final_audio_data = None | |
| return None | |
| try: | |
| print(f"Processing {len(self.recording_data)} audio chunks...") | |
| # Filter out None or invalid chunks | |
| valid_chunks = [chunk for chunk in self.recording_data | |
| if chunk is not None and hasattr(chunk, 'shape') and len(chunk.shape) > 0] | |
| if not valid_chunks: | |
| print("No valid audio chunks found in recording data") | |
| self.final_audio_data = None | |
| return None | |
| # Ensure all chunks have the same shape | |
| min_len = min(chunk.shape[0] for chunk in valid_chunks) | |
| print(f"Trimming all chunks to {min_len} samples") | |
| trimmed_data = [chunk[:min_len] for chunk in valid_chunks] | |
| # Concatenate the trimmed data | |
| print("Concatenating audio data...") | |
| concatenated = np.concatenate(trimmed_data, axis=0) | |
| print(f"Final audio data shape: {concatenated.shape}") | |
| # Store the final data for saving | |
| self.final_audio_data = concatenated | |
| return concatenated | |
| except Exception as e: | |
| print(f"Error processing audio data: {e}") | |
| if hasattr(self, 'recording_data') and self.recording_data: | |
| print(f"Number of chunks: {len(self.recording_data)}") | |
| for i, chunk in enumerate(self.recording_data): | |
| chunk_type = type(chunk) | |
| chunk_shape = chunk.shape if hasattr(chunk, 'shape') else 'No shape' | |
| print(f"Chunk {i}: type={chunk_type}, shape={chunk_shape}") | |
| self.final_audio_data = None | |
| return None | |
| finally: | |
| # Always clean up | |
| self.recording_data = [] | |
| self.audio_queue = queue.Queue() | |
| self.final_audio_data = None | |
| def save_recording(self, filename: str = None, format: str = 'wav') -> str: | |
| """ | |
| Save the recorded audio to a file. | |
| Args: | |
| filename (str, optional): Output filename. If None, generates a timestamped filename. | |
| format (str): Output format ('wav' or 'npy') | |
| Returns: | |
| str: Path to the saved file | |
| Raises: | |
| ValueError: If no recording data is available or if there's an error processing the data | |
| """ | |
| # Use the final_audio_data if available, otherwise try to process recording_data | |
| if hasattr(self, 'final_audio_data') and self.final_audio_data is not None: | |
| audio_data = self.final_audio_data | |
| print(f"Using pre-processed audio data with shape: {audio_data.shape}") | |
| elif not hasattr(self, 'recording_data') or not self.recording_data: | |
| error_msg = "No recording data available to save" | |
| print(error_msg) | |
| print(f"Recording state: {'Active' if hasattr(self, 'recording') and self.recording else 'Inactive'}") | |
| print(f"Number of audio chunks: {len(self.recording_data) if hasattr(self, 'recording_data') else 0}") | |
| if hasattr(self, 'stream') and self.stream is not None: | |
| print("Warning: Stream is still active") | |
| raise ValueError(error_msg) | |
| else: | |
| try: | |
| print(f"Processing {len(self.recording_data)} audio chunks...") | |
| # Filter out None or invalid chunks | |
| valid_chunks = [chunk for chunk in self.recording_data | |
| if chunk is not None and hasattr(chunk, 'shape') and len(chunk.shape) > 0] | |
| if not valid_chunks: | |
| raise ValueError("No valid audio chunks found in recording data") | |
| # Ensure all chunks have the same shape | |
| min_len = min(chunk.shape[0] for chunk in valid_chunks) | |
| trimmed_data = [chunk[:min_len] for chunk in valid_chunks] | |
| audio_data = np.concatenate(trimmed_data, axis=0) | |
| print(f"Audio data shape: {audio_data.shape}") | |
| # Cache the processed data | |
| self.final_audio_data = audio_data | |
| except Exception as e: | |
| error_msg = f"Error processing audio data: {e}" | |
| print(error_msg) | |
| if hasattr(self, 'recording_data') and self.recording_data: | |
| print(f"Number of chunks: {len(self.recording_data)}") | |
| for i, chunk in enumerate(self.recording_data): | |
| chunk_type = type(chunk) | |
| chunk_shape = chunk.shape if hasattr(chunk, 'shape') else 'No shape' | |
| print(f"Chunk {i}: type={chunk_type}, shape={chunk_shape}") | |
| raise ValueError(error_msg) from e | |
| if filename is None: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"recording_{timestamp}.{format}" | |
| os.makedirs("recordings", exist_ok=True) | |
| filepath = os.path.join("recordings", filename) | |
| if format.lower() == 'wav': | |
| # Convert float32 to int16 for WAV format | |
| audio_data = (audio_data * 32767).astype(np.int16) | |
| with wave.open(filepath, 'wb') as wf: | |
| wf.setnchannels(self.channels) | |
| wf.setsampwidth(2) # 16-bit | |
| wf.setframerate(self.sample_rate) | |
| wf.writeframes(audio_data.tobytes()) | |
| elif format.lower() == 'npy': | |
| np.save(filepath, audio_data) | |
| else: | |
| raise ValueError(f"Unsupported format: {format}") | |
| return filepath | |
| def _callback(self, indata, frames, time, status): | |
| """Callback function for audio input stream.""" | |
| if status: | |
| print(f"Audio input status: {status}") | |
| try: | |
| if self.recording: # Only add data if we're still recording | |
| self.audio_queue.put(indata.copy()) | |
| except Exception as e: | |
| print(f"Error in audio callback: {e}") | |
| def list_audio_devices() -> list: | |
| """ | |
| List all available audio input devices with fallback for environments without audio devices. | |
| Returns: | |
| list: A list of dictionaries containing device information | |
| """ | |
| try: | |
| # First, check if we can even query devices | |
| try: | |
| devices = sd.query_devices() | |
| input_devices = [] | |
| for i, device in enumerate(devices): | |
| try: | |
| if device.get('max_input_channels', 0) > 0: | |
| input_devices.append({ | |
| 'id': i, | |
| 'name': str(device.get('name', f'Device {i}')), # Ensure name is string | |
| 'input_channels': int(device.get('max_input_channels', 1)), | |
| 'default_samplerate': int(device.get('default_samplerate', 44100)) | |
| }) | |
| except (KeyError, TypeError, ValueError) as e: | |
| print(f"Warning: Error processing device {i}: {e}") | |
| continue | |
| if input_devices: | |
| return input_devices | |
| except Exception as e: | |
| print(f"Warning: Could not query audio devices: {e}") | |
| # If we get here, either no input devices found or there was an error | |
| print("No valid input devices found, using default configuration") | |
| return [{ | |
| 'id': sd.default.device[0] if hasattr(sd, 'default') and hasattr(sd.default, 'device') else 0, | |
| 'name': 'Default Device', | |
| 'input_channels': 1, | |
| 'default_samplerate': 44100 | |
| }] | |
| except Exception as e: | |
| print(f"Error in list_audio_devices: {e}") | |
| # Final fallback in case anything else goes wrong | |
| return [{ | |
| 'id': 0, | |
| 'name': 'Fallback Device', | |
| 'input_channels': 1, | |
| 'default_samplerate': 44100 | |
| }] |