Spaces:
Runtime error
Runtime error
import gradio as gr | |
import torch | |
import numpy as np | |
import librosa | |
import soundfile as sf | |
import threading | |
import time | |
import queue | |
import warnings | |
from typing import Optional, List, Dict, Tuple | |
from dataclasses import dataclass | |
from collections import deque | |
import psutil | |
import gc | |
# Import models | |
from dia.model import Dia | |
from transformers import pipeline | |
import webrtcvad | |
warnings.filterwarnings("ignore", category=FutureWarning) | |
warnings.filterwarnings("ignore", category=UserWarning) | |
class ConversationTurn: | |
user_audio: np.ndarray | |
user_text: str | |
ai_response_text: str | |
ai_response_audio: np.ndarray | |
timestamp: float | |
emotion: str | |
speaker_id: str | |
class EmotionRecognizer: | |
def __init__(self): | |
self.emotion_pipeline = pipeline( | |
"audio-classification", | |
model="ehcalabres/wav2vec2-lg-xlsr-en-speech-emotion-recognition", | |
device=0 if torch.cuda.is_available() else -1 | |
) | |
def detect_emotion(self, audio: np.ndarray, sample_rate: int = 16000) -> str: | |
try: | |
result = self.emotion_pipeline({"array": audio, "sampling_rate": sample_rate}) | |
return result[0]["label"] if result else "neutral" | |
except Exception as e: | |
print(f"Emotion detection error: {e}") | |
return "neutral" | |
class VADProcessor: | |
def __init__(self, aggressiveness: int = 2): | |
self.vad = webrtcvad.Vad(aggressiveness) | |
self.sample_rate = 16000 | |
self.frame_duration = 30 # ms | |
self.frame_size = int(self.sample_rate * self.frame_duration / 1000) | |
def is_speech(self, audio: np.ndarray) -> bool: | |
try: | |
# Convert to 16-bit PCM | |
audio_int16 = (audio * 32767).astype(np.int16) | |
# Process in frames | |
frames = [] | |
for i in range(0, len(audio_int16) - self.frame_size, self.frame_size): | |
frame = audio_int16[i:i + self.frame_size].tobytes() | |
frames.append(self.vad.is_speech(frame, self.sample_rate)) | |
# Return True if majority of frames contain speech | |
return sum(frames) > len(frames) * 0.3 | |
except Exception: | |
return True # Default to treating as speech | |
class ConversationManager: | |
def __init__(self, max_exchanges: int = 50): | |
self.conversations: Dict[str, deque] = {} | |
self.max_exchanges = max_exchanges | |
self.lock = threading.RLock() | |
def add_turn(self, session_id: str, turn: ConversationTurn): | |
with self.lock: | |
if session_id not in self.conversations: | |
self.conversations[session_id] = deque(maxlen=self.max_exchanges) | |
self.conversations[session_id].append(turn) | |
def get_context(self, session_id: str, last_n: int = 5) -> List[ConversationTurn]: | |
with self.lock: | |
if session_id not in self.conversations: | |
return [] | |
return list(self.conversations[session_id])[-last_n:] | |
def clear_session(self, session_id: str): | |
with self.lock: | |
if session_id in self.conversations: | |
del self.conversations[session_id] | |
class SupernaturalAI: | |
def __init__(self): | |
self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
self.models_loaded = False | |
self.processing_queue = queue.Queue() | |
self.conversation_manager = ConversationManager() | |
self.emotion_recognizer = None | |
self.vad_processor = VADProcessor() | |
# Models | |
self.ultravox_model = None | |
self.dia_model = None | |
# Performance tracking | |
self.active_sessions = set() | |
self.processing_times = deque(maxlen=100) | |
print("Initializing Supernatural AI...") | |
self._initialize_models() | |
def _initialize_models(self): | |
try: | |
print("Loading Ultravox model...") | |
self.ultravox_model = pipeline( | |
'automatic-speech-recognition', | |
model='fixie-ai/ultravox-v0_2', | |
trust_remote_code=True, | |
device=0 if torch.cuda.is_available() else -1, | |
torch_dtype=torch.float16 | |
) | |
print("Loading Dia TTS model...") | |
self.dia_model = Dia.from_pretrained( | |
"nari-labs/Dia-1.6B", | |
compute_dtype="float16" | |
) | |
print("Loading emotion recognition...") | |
self.emotion_recognizer = EmotionRecognizer() | |
self.models_loaded = True | |
print("β All models loaded successfully!") | |
# Memory cleanup | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
except Exception as e: | |
print(f"β Error loading models: {e}") | |
self.models_loaded = False | |
def _get_memory_usage(self) -> Dict[str, float]: | |
"""Get current memory usage statistics""" | |
memory = psutil.virtual_memory() | |
gpu_memory = {} | |
if torch.cuda.is_available(): | |
for i in range(torch.cuda.device_count()): | |
gpu_memory[f"GPU_{i}"] = { | |
"allocated": torch.cuda.memory_allocated(i) / 1024**3, | |
"cached": torch.cuda.memory_reserved(i) / 1024**3 | |
} | |
return { | |
"RAM": memory.percent, | |
"GPU": gpu_memory | |
} | |
def _generate_contextual_prompt(self, | |
user_text: str, | |
emotion: str, | |
context: List[ConversationTurn]) -> str: | |
"""Generate contextual prompt with emotion and conversation history""" | |
# Build context from previous turns | |
context_text = "" | |
if context: | |
for turn in context[-3:]: # Last 3 exchanges | |
context_text += f"[S1] {turn.user_text} [S2] {turn.ai_response_text} " | |
# Emotion-aware response generation | |
emotion_modifiers = { | |
"happy": "(cheerful)", | |
"sad": "(sympathetic)", | |
"angry": "(calming)", | |
"fear": "(reassuring)", | |
"surprise": "(excited)", | |
"neutral": "" | |
} | |
modifier = emotion_modifiers.get(emotion.lower(), "") | |
# Create supernatural AI personality | |
prompt = f"{context_text}[S1] {user_text} [S2] {modifier} As a supernatural AI with deep emotional understanding, I sense your {emotion} energy. " | |
return prompt | |
def process_audio_input(self, | |
audio_data: Tuple[int, np.ndarray], | |
session_id: str) -> Tuple[Optional[Tuple[int, np.ndarray]], str, str]: | |
"""Main processing pipeline for audio input""" | |
if not self.models_loaded: | |
return None, "β Models not loaded", "Please wait for initialization" | |
if audio_data is None: | |
return None, "β No audio received", "Please record some audio" | |
start_time = time.time() | |
try: | |
sample_rate, audio = audio_data | |
# Ensure audio is mono and proper format | |
if len(audio.shape) > 1: | |
audio = np.mean(audio, axis=1) | |
# Normalize audio | |
audio = audio.astype(np.float32) | |
if np.max(np.abs(audio)) > 0: | |
audio = audio / np.max(np.abs(audio)) * 0.95 | |
# Voice Activity Detection | |
if not self.vad_processor.is_speech(audio): | |
return None, "π No speech detected", "Please speak clearly" | |
# Resample if needed | |
if sample_rate != 16000: | |
audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=16000) | |
sample_rate = 16000 | |
# Speech Recognition with Ultravox | |
try: | |
speech_result = self.ultravox_model({ | |
'array': audio, | |
'sampling_rate': sample_rate | |
}) | |
user_text = speech_result.get('text', '').strip() | |
if not user_text: | |
return None, "β Could not understand speech", "Please speak more clearly" | |
except Exception as e: | |
print(f"ASR Error: {e}") | |
return None, f"β Speech recognition failed: {str(e)}", "Please try again" | |
# Emotion Recognition | |
emotion = self.emotion_recognizer.detect_emotion(audio, sample_rate) | |
# Get conversation context | |
context = self.conversation_manager.get_context(session_id) | |
# Generate contextual response | |
prompt = self._generate_contextual_prompt(user_text, emotion, context) | |
# Generate speech with Dia TTS | |
try: | |
with torch.no_grad(): | |
audio_output = self.dia_model.generate( | |
prompt, | |
use_torch_compile=False, # Better stability | |
verbose=False | |
) | |
# Ensure audio output is proper format | |
if isinstance(audio_output, torch.Tensor): | |
audio_output = audio_output.cpu().numpy() | |
# Normalize output | |
if len(audio_output) > 0: | |
max_val = np.max(np.abs(audio_output)) | |
if max_val > 1.0: | |
audio_output = audio_output / max_val * 0.95 | |
except Exception as e: | |
print(f"TTS Error: {e}") | |
return None, f"β Speech generation failed: {str(e)}", "Please try again" | |
# Extract AI response text (remove speaker tags and modifiers) | |
ai_response = prompt.split('[S2]')[-1].strip() | |
ai_response = ai_response.replace('(cheerful)', '').replace('(sympathetic)', '') | |
ai_response = ai_response.replace('(calming)', '').replace('(reassuring)', '') | |
ai_response = ai_response.replace('(excited)', '').strip() | |
# Store conversation turn | |
turn = ConversationTurn( | |
user_audio=audio, | |
user_text=user_text, | |
ai_response_text=ai_response, | |
ai_response_audio=audio_output, | |
timestamp=time.time(), | |
emotion=emotion, | |
speaker_id=session_id | |
) | |
self.conversation_manager.add_turn(session_id, turn) | |
# Track performance | |
processing_time = time.time() - start_time | |
self.processing_times.append(processing_time) | |
# Memory cleanup | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
gc.collect() | |
status = f"β Processed in {processing_time:.2f}s | Emotion: {emotion} | Users: {len(self.active_sessions)}" | |
return (44100, audio_output), status, f"**You said:** {user_text}\n\n**AI Response:** {ai_response}" | |
except Exception as e: | |
print(f"Processing error: {e}") | |
return None, f"β Processing failed: {str(e)}", "Please try again" | |
def get_conversation_history(self, session_id: str) -> str: | |
"""Get formatted conversation history""" | |
context = self.conversation_manager.get_context(session_id, last_n=10) | |
if not context: | |
return "No conversation history yet." | |
history = "## Conversation History\n\n" | |
for i, turn in enumerate(context, 1): | |
history += f"**Turn {i}:**\n" | |
history += f"- **You:** {turn.user_text}\n" | |
history += f"- **AI:** {turn.ai_response_text}\n" | |
history += f"- **Emotion Detected:** {turn.emotion}\n\n" | |
return history | |
def clear_conversation(self, session_id: str) -> str: | |
"""Clear conversation history for session""" | |
self.conversation_manager.clear_session(session_id) | |
return "Conversation history cleared." | |
def get_system_status(self) -> str: | |
"""Get system status information""" | |
memory = self._get_memory_usage() | |
avg_processing = np.mean(self.processing_times) if self.processing_times else 0 | |
status = f"""## System Status | |
**Performance:** | |
- Average Processing Time: {avg_processing:.2f}s | |
- Active Sessions: {len(self.active_sessions)} | |
- Total Conversations: {len(self.conversation_manager.conversations)} | |
**Memory Usage:** | |
- RAM: {memory['RAM']:.1f}% | |
- GPU Memory: {memory.get('GPU', {})} | |
**Models Status:** | |
- Models Loaded: {"β " if self.models_loaded else "β"} | |
- Device: {self.device} | |
""" | |
return status | |
# Initialize the AI system | |
print("Starting Supernatural AI system...") | |
ai_system = SupernaturalAI() | |
# Gradio Interface | |
def process_audio_interface(audio, session_id): | |
"""Interface function for Gradio""" | |
if not session_id: | |
session_id = f"user_{int(time.time())}" | |
ai_system.active_sessions.add(session_id) | |
result = ai_system.process_audio_input(audio, session_id) | |
return result + (session_id,) | |
def get_history_interface(session_id): | |
"""Get conversation history interface""" | |
if not session_id: | |
return "No session ID provided" | |
return ai_system.get_conversation_history(session_id) | |
def clear_history_interface(session_id): | |
"""Clear history interface""" | |
if not session_id: | |
return "No session ID provided" | |
return ai_system.clear_conversation(session_id) | |
# Create Gradio interface | |
with gr.Blocks(title="Supernatural Conversational AI", theme=gr.themes.Soft()) as demo: | |
gr.HTML(""" | |
<div style="text-align: center; padding: 20px;"> | |
<h1>π§ββοΈ Supernatural Conversational AI</h1> | |
<p style="font-size: 18px; color: #666;"> | |
Advanced Speech-to-Speech AI with Emotional Intelligence | |
</p> | |
<p style="color: #888;"> | |
Powered by Ultravox + Dia TTS | Optimized for 4x L4 GPUs | |
</p> | |
</div> | |
""") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
# Audio input/output | |
audio_input = gr.Audio( | |
label="π€ Speak to the AI", | |
sources=["microphone"], | |
type="numpy", | |
streaming=False | |
) | |
audio_output = gr.Audio( | |
label="π AI Response", | |
type="numpy", | |
autoplay=True | |
) | |
# Session management | |
session_id = gr.Textbox( | |
label="Session ID", | |
placeholder="Auto-generated if empty", | |
value="", | |
interactive=True | |
) | |
# Process button | |
process_btn = gr.Button("π― Process Audio", variant="primary", size="lg") | |
with gr.Column(scale=1): | |
# Status and conversation | |
status_display = gr.Textbox( | |
label="π Status", | |
interactive=False, | |
lines=3 | |
) | |
conversation_display = gr.Markdown( | |
label="π¬ Conversation", | |
value="Start speaking to begin..." | |
) | |
# History management | |
with gr.Row(): | |
history_btn = gr.Button("π Show History", size="sm") | |
clear_btn = gr.Button("ποΈ Clear History", size="sm") | |
status_btn = gr.Button("β‘ System Status", size="sm") | |
# History and status display | |
history_display = gr.Markdown( | |
label="π Conversation History", | |
value="No history yet." | |
) | |
# Event handlers | |
process_btn.click( | |
fn=process_audio_interface, | |
inputs=[audio_input, session_id], | |
outputs=[audio_output, status_display, conversation_display, session_id] | |
) | |
history_btn.click( | |
fn=get_history_interface, | |
inputs=[session_id], | |
outputs=[history_display] | |
) | |
clear_btn.click( | |
fn=clear_history_interface, | |
inputs=[session_id], | |
outputs=[history_display] | |
) | |
status_btn.click( | |
fn=lambda: ai_system.get_system_status(), | |
outputs=[history_display] | |
) | |
# Auto-process on audio input | |
audio_input.change( | |
fn=process_audio_interface, | |
inputs=[audio_input, session_id], | |
outputs=[audio_output, status_display, conversation_display, session_id] | |
) | |
# Usage instructions | |
gr.HTML(""" | |
<div style="margin-top: 20px; padding: 15px; background: #f0f8ff; border-radius: 8px;"> | |
<h3>π‘ Usage Instructions:</h3> | |
<ul> | |
<li><strong>Record Audio:</strong> Click the microphone and speak naturally</li> | |
<li><strong>Emotional AI:</strong> The AI detects and responds to your emotions</li> | |
<li><strong>Conversation Memory:</strong> Up to 50 exchanges are remembered</li> | |
<li><strong>Session Management:</strong> Use Session ID to maintain separate conversations</li> | |
<li><strong>Performance:</strong> Optimized for sub-500ms latency</li> | |
</ul> | |
<p><strong>Supported Features:</strong> Emotion recognition, voice activity detection, | |
contextual responses, conversation history, concurrent users (15-20), memory management</p> | |
</div> | |
""") | |
# Configure for optimal performance | |
demo.queue( | |
concurrency_count=20, # Support 20 concurrent users | |
max_size=100, | |
api_open=False | |
) | |
if __name__ == "__main__": | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False, | |
show_error=True, | |
quiet=False, | |
enable_queue=True, | |
max_threads=40 | |
) | |