import gradio as gr import torch import numpy as np import librosa import soundfile as sf import threading import time import queue import warnings from typing import Optional, List, Dict, Tuple from dataclasses import dataclass from collections import deque import psutil import gc # Import models from dia.model import Dia from transformers import pipeline import webrtcvad warnings.filterwarnings("ignore", category=FutureWarning) warnings.filterwarnings("ignore", category=UserWarning) @dataclass class ConversationTurn: user_audio: np.ndarray user_text: str ai_response_text: str ai_response_audio: np.ndarray timestamp: float emotion: str speaker_id: str class EmotionRecognizer: def __init__(self): self.emotion_pipeline = pipeline( "audio-classification", model="ehcalabres/wav2vec2-lg-xlsr-en-speech-emotion-recognition", device=0 if torch.cuda.is_available() else -1 ) def detect_emotion(self, audio: np.ndarray, sample_rate: int = 16000) -> str: try: result = self.emotion_pipeline({"array": audio, "sampling_rate": sample_rate}) return result[0]["label"] if result else "neutral" except Exception as e: print(f"Emotion detection error: {e}") return "neutral" class VADProcessor: def __init__(self, aggressiveness: int = 2): self.vad = webrtcvad.Vad(aggressiveness) self.sample_rate = 16000 self.frame_duration = 30 # ms self.frame_size = int(self.sample_rate * self.frame_duration / 1000) def is_speech(self, audio: np.ndarray) -> bool: try: # Convert to 16-bit PCM audio_int16 = (audio * 32767).astype(np.int16) # Process in frames frames = [] for i in range(0, len(audio_int16) - self.frame_size, self.frame_size): frame = audio_int16[i:i + self.frame_size].tobytes() frames.append(self.vad.is_speech(frame, self.sample_rate)) # Return True if majority of frames contain speech return sum(frames) > len(frames) * 0.3 except Exception: return True # Default to treating as speech class ConversationManager: def __init__(self, max_exchanges: int = 50): self.conversations: Dict[str, deque] = {} self.max_exchanges = max_exchanges self.lock = threading.RLock() def add_turn(self, session_id: str, turn: ConversationTurn): with self.lock: if session_id not in self.conversations: self.conversations[session_id] = deque(maxlen=self.max_exchanges) self.conversations[session_id].append(turn) def get_context(self, session_id: str, last_n: int = 5) -> List[ConversationTurn]: with self.lock: if session_id not in self.conversations: return [] return list(self.conversations[session_id])[-last_n:] def clear_session(self, session_id: str): with self.lock: if session_id in self.conversations: del self.conversations[session_id] class SupernaturalAI: def __init__(self): self.device = "cuda" if torch.cuda.is_available() else "cpu" self.models_loaded = False self.processing_queue = queue.Queue() self.conversation_manager = ConversationManager() self.emotion_recognizer = None self.vad_processor = VADProcessor() # Models self.ultravox_model = None self.dia_model = None # Performance tracking self.active_sessions = set() self.processing_times = deque(maxlen=100) print("Initializing Supernatural AI...") self._initialize_models() def _initialize_models(self): try: print("Loading Ultravox model...") self.ultravox_model = pipeline( 'automatic-speech-recognition', model='fixie-ai/ultravox-v0_2', trust_remote_code=True, device=0 if torch.cuda.is_available() else -1, torch_dtype=torch.float16 ) print("Loading Dia TTS model...") self.dia_model = Dia.from_pretrained( "nari-labs/Dia-1.6B", compute_dtype="float16" ) print("Loading emotion recognition...") self.emotion_recognizer = EmotionRecognizer() self.models_loaded = True print("✅ All models loaded successfully!") # Memory cleanup if torch.cuda.is_available(): torch.cuda.empty_cache() except Exception as e: print(f"❌ Error loading models: {e}") self.models_loaded = False def _get_memory_usage(self) -> Dict[str, float]: """Get current memory usage statistics""" memory = psutil.virtual_memory() gpu_memory = {} if torch.cuda.is_available(): for i in range(torch.cuda.device_count()): gpu_memory[f"GPU_{i}"] = { "allocated": torch.cuda.memory_allocated(i) / 1024**3, "cached": torch.cuda.memory_reserved(i) / 1024**3 } return { "RAM": memory.percent, "GPU": gpu_memory } def _generate_contextual_prompt(self, user_text: str, emotion: str, context: List[ConversationTurn]) -> str: """Generate contextual prompt with emotion and conversation history""" # Build context from previous turns context_text = "" if context: for turn in context[-3:]: # Last 3 exchanges context_text += f"[S1] {turn.user_text} [S2] {turn.ai_response_text} " # Emotion-aware response generation emotion_modifiers = { "happy": "(cheerful)", "sad": "(sympathetic)", "angry": "(calming)", "fear": "(reassuring)", "surprise": "(excited)", "neutral": "" } modifier = emotion_modifiers.get(emotion.lower(), "") # Create supernatural AI personality prompt = f"{context_text}[S1] {user_text} [S2] {modifier} As a supernatural AI with deep emotional understanding, I sense your {emotion} energy. " return prompt def process_audio_input(self, audio_data: Tuple[int, np.ndarray], session_id: str) -> Tuple[Optional[Tuple[int, np.ndarray]], str, str]: """Main processing pipeline for audio input""" if not self.models_loaded: return None, "❌ Models not loaded", "Please wait for initialization" if audio_data is None: return None, "❌ No audio received", "Please record some audio" start_time = time.time() try: sample_rate, audio = audio_data # Ensure audio is mono and proper format if len(audio.shape) > 1: audio = np.mean(audio, axis=1) # Normalize audio audio = audio.astype(np.float32) if np.max(np.abs(audio)) > 0: audio = audio / np.max(np.abs(audio)) * 0.95 # Voice Activity Detection if not self.vad_processor.is_speech(audio): return None, "🔇 No speech detected", "Please speak clearly" # Resample if needed if sample_rate != 16000: audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=16000) sample_rate = 16000 # Speech Recognition with Ultravox try: speech_result = self.ultravox_model({ 'array': audio, 'sampling_rate': sample_rate }) user_text = speech_result.get('text', '').strip() if not user_text: return None, "❌ Could not understand speech", "Please speak more clearly" except Exception as e: print(f"ASR Error: {e}") return None, f"❌ Speech recognition failed: {str(e)}", "Please try again" # Emotion Recognition emotion = self.emotion_recognizer.detect_emotion(audio, sample_rate) # Get conversation context context = self.conversation_manager.get_context(session_id) # Generate contextual response prompt = self._generate_contextual_prompt(user_text, emotion, context) # Generate speech with Dia TTS try: with torch.no_grad(): audio_output = self.dia_model.generate( prompt, use_torch_compile=False, # Better stability verbose=False ) # Ensure audio output is proper format if isinstance(audio_output, torch.Tensor): audio_output = audio_output.cpu().numpy() # Normalize output if len(audio_output) > 0: max_val = np.max(np.abs(audio_output)) if max_val > 1.0: audio_output = audio_output / max_val * 0.95 except Exception as e: print(f"TTS Error: {e}") return None, f"❌ Speech generation failed: {str(e)}", "Please try again" # Extract AI response text (remove speaker tags and modifiers) ai_response = prompt.split('[S2]')[-1].strip() ai_response = ai_response.replace('(cheerful)', '').replace('(sympathetic)', '') ai_response = ai_response.replace('(calming)', '').replace('(reassuring)', '') ai_response = ai_response.replace('(excited)', '').strip() # Store conversation turn turn = ConversationTurn( user_audio=audio, user_text=user_text, ai_response_text=ai_response, ai_response_audio=audio_output, timestamp=time.time(), emotion=emotion, speaker_id=session_id ) self.conversation_manager.add_turn(session_id, turn) # Track performance processing_time = time.time() - start_time self.processing_times.append(processing_time) # Memory cleanup if torch.cuda.is_available(): torch.cuda.empty_cache() gc.collect() status = f"✅ Processed in {processing_time:.2f}s | Emotion: {emotion} | Users: {len(self.active_sessions)}" return (44100, audio_output), status, f"**You said:** {user_text}\n\n**AI Response:** {ai_response}" except Exception as e: print(f"Processing error: {e}") return None, f"❌ Processing failed: {str(e)}", "Please try again" def get_conversation_history(self, session_id: str) -> str: """Get formatted conversation history""" context = self.conversation_manager.get_context(session_id, last_n=10) if not context: return "No conversation history yet." history = "## Conversation History\n\n" for i, turn in enumerate(context, 1): history += f"**Turn {i}:**\n" history += f"- **You:** {turn.user_text}\n" history += f"- **AI:** {turn.ai_response_text}\n" history += f"- **Emotion Detected:** {turn.emotion}\n\n" return history def clear_conversation(self, session_id: str) -> str: """Clear conversation history for session""" self.conversation_manager.clear_session(session_id) return "Conversation history cleared." def get_system_status(self) -> str: """Get system status information""" memory = self._get_memory_usage() avg_processing = np.mean(self.processing_times) if self.processing_times else 0 status = f"""## System Status **Performance:** - Average Processing Time: {avg_processing:.2f}s - Active Sessions: {len(self.active_sessions)} - Total Conversations: {len(self.conversation_manager.conversations)} **Memory Usage:** - RAM: {memory['RAM']:.1f}% - GPU Memory: {memory.get('GPU', {})} **Models Status:** - Models Loaded: {"✅" if self.models_loaded else "❌"} - Device: {self.device} """ return status # Initialize the AI system print("Starting Supernatural AI system...") ai_system = SupernaturalAI() # Gradio Interface def process_audio_interface(audio, session_id): """Interface function for Gradio""" if not session_id: session_id = f"user_{int(time.time())}" ai_system.active_sessions.add(session_id) result = ai_system.process_audio_input(audio, session_id) return result + (session_id,) def get_history_interface(session_id): """Get conversation history interface""" if not session_id: return "No session ID provided" return ai_system.get_conversation_history(session_id) def clear_history_interface(session_id): """Clear history interface""" if not session_id: return "No session ID provided" return ai_system.clear_conversation(session_id) # Create Gradio interface with gr.Blocks(title="Supernatural Conversational AI", theme=gr.themes.Soft()) as demo: gr.HTML("""
Advanced Speech-to-Speech AI with Emotional Intelligence
Powered by Ultravox + Dia TTS | Optimized for 4x L4 GPUs
Supported Features: Emotion recognition, voice activity detection, contextual responses, conversation history, concurrent users (15-20), memory management