Maya-AI / app.py
Devakumar868's picture
Update app.py
cfde29f verified
raw
history blame
18.3 kB
import gradio as gr
import torch
import numpy as np
import librosa
import soundfile as sf
import threading
import time
import queue
import warnings
from typing import Optional, List, Dict, Tuple
from dataclasses import dataclass
from collections import deque
import psutil
import gc
# Import models
from dia.model import Dia
from transformers import pipeline
import webrtcvad
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.filterwarnings("ignore", category=UserWarning)
@dataclass
class ConversationTurn:
user_audio: np.ndarray
user_text: str
ai_response_text: str
ai_response_audio: np.ndarray
timestamp: float
emotion: str
speaker_id: str
class EmotionRecognizer:
def __init__(self):
self.emotion_pipeline = pipeline(
"audio-classification",
model="ehcalabres/wav2vec2-lg-xlsr-en-speech-emotion-recognition",
device=0 if torch.cuda.is_available() else -1
)
def detect_emotion(self, audio: np.ndarray, sample_rate: int = 16000) -> str:
try:
result = self.emotion_pipeline({"array": audio, "sampling_rate": sample_rate})
return result[0]["label"] if result else "neutral"
except Exception as e:
print(f"Emotion detection error: {e}")
return "neutral"
class VADProcessor:
def __init__(self, aggressiveness: int = 2):
self.vad = webrtcvad.Vad(aggressiveness)
self.sample_rate = 16000
self.frame_duration = 30 # ms
self.frame_size = int(self.sample_rate * self.frame_duration / 1000)
def is_speech(self, audio: np.ndarray) -> bool:
try:
# Convert to 16-bit PCM
audio_int16 = (audio * 32767).astype(np.int16)
# Process in frames
frames = []
for i in range(0, len(audio_int16) - self.frame_size, self.frame_size):
frame = audio_int16[i:i + self.frame_size].tobytes()
frames.append(self.vad.is_speech(frame, self.sample_rate))
# Return True if majority of frames contain speech
return sum(frames) > len(frames) * 0.3
except Exception:
return True # Default to treating as speech
class ConversationManager:
def __init__(self, max_exchanges: int = 50):
self.conversations: Dict[str, deque] = {}
self.max_exchanges = max_exchanges
self.lock = threading.RLock()
def add_turn(self, session_id: str, turn: ConversationTurn):
with self.lock:
if session_id not in self.conversations:
self.conversations[session_id] = deque(maxlen=self.max_exchanges)
self.conversations[session_id].append(turn)
def get_context(self, session_id: str, last_n: int = 5) -> List[ConversationTurn]:
with self.lock:
if session_id not in self.conversations:
return []
return list(self.conversations[session_id])[-last_n:]
def clear_session(self, session_id: str):
with self.lock:
if session_id in self.conversations:
del self.conversations[session_id]
class SupernaturalAI:
def __init__(self):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.models_loaded = False
self.processing_queue = queue.Queue()
self.conversation_manager = ConversationManager()
self.emotion_recognizer = None
self.vad_processor = VADProcessor()
# Models
self.ultravox_model = None
self.dia_model = None
# Performance tracking
self.active_sessions = set()
self.processing_times = deque(maxlen=100)
print("Initializing Supernatural AI...")
self._initialize_models()
def _initialize_models(self):
try:
print("Loading Ultravox model...")
self.ultravox_model = pipeline(
'automatic-speech-recognition',
model='fixie-ai/ultravox-v0_2',
trust_remote_code=True,
device=0 if torch.cuda.is_available() else -1,
torch_dtype=torch.float16
)
print("Loading Dia TTS model...")
self.dia_model = Dia.from_pretrained(
"nari-labs/Dia-1.6B",
compute_dtype="float16"
)
print("Loading emotion recognition...")
self.emotion_recognizer = EmotionRecognizer()
self.models_loaded = True
print("βœ… All models loaded successfully!")
# Memory cleanup
if torch.cuda.is_available():
torch.cuda.empty_cache()
except Exception as e:
print(f"❌ Error loading models: {e}")
self.models_loaded = False
def _get_memory_usage(self) -> Dict[str, float]:
"""Get current memory usage statistics"""
memory = psutil.virtual_memory()
gpu_memory = {}
if torch.cuda.is_available():
for i in range(torch.cuda.device_count()):
gpu_memory[f"GPU_{i}"] = {
"allocated": torch.cuda.memory_allocated(i) / 1024**3,
"cached": torch.cuda.memory_reserved(i) / 1024**3
}
return {
"RAM": memory.percent,
"GPU": gpu_memory
}
def _generate_contextual_prompt(self,
user_text: str,
emotion: str,
context: List[ConversationTurn]) -> str:
"""Generate contextual prompt with emotion and conversation history"""
# Build context from previous turns
context_text = ""
if context:
for turn in context[-3:]: # Last 3 exchanges
context_text += f"[S1] {turn.user_text} [S2] {turn.ai_response_text} "
# Emotion-aware response generation
emotion_modifiers = {
"happy": "(cheerful)",
"sad": "(sympathetic)",
"angry": "(calming)",
"fear": "(reassuring)",
"surprise": "(excited)",
"neutral": ""
}
modifier = emotion_modifiers.get(emotion.lower(), "")
# Create supernatural AI personality
prompt = f"{context_text}[S1] {user_text} [S2] {modifier} As a supernatural AI with deep emotional understanding, I sense your {emotion} energy. "
return prompt
def process_audio_input(self,
audio_data: Tuple[int, np.ndarray],
session_id: str) -> Tuple[Optional[Tuple[int, np.ndarray]], str, str]:
"""Main processing pipeline for audio input"""
if not self.models_loaded:
return None, "❌ Models not loaded", "Please wait for initialization"
if audio_data is None:
return None, "❌ No audio received", "Please record some audio"
start_time = time.time()
try:
sample_rate, audio = audio_data
# Ensure audio is mono and proper format
if len(audio.shape) > 1:
audio = np.mean(audio, axis=1)
# Normalize audio
audio = audio.astype(np.float32)
if np.max(np.abs(audio)) > 0:
audio = audio / np.max(np.abs(audio)) * 0.95
# Voice Activity Detection
if not self.vad_processor.is_speech(audio):
return None, "πŸ”‡ No speech detected", "Please speak clearly"
# Resample if needed
if sample_rate != 16000:
audio = librosa.resample(audio, orig_sr=sample_rate, target_sr=16000)
sample_rate = 16000
# Speech Recognition with Ultravox
try:
speech_result = self.ultravox_model({
'array': audio,
'sampling_rate': sample_rate
})
user_text = speech_result.get('text', '').strip()
if not user_text:
return None, "❌ Could not understand speech", "Please speak more clearly"
except Exception as e:
print(f"ASR Error: {e}")
return None, f"❌ Speech recognition failed: {str(e)}", "Please try again"
# Emotion Recognition
emotion = self.emotion_recognizer.detect_emotion(audio, sample_rate)
# Get conversation context
context = self.conversation_manager.get_context(session_id)
# Generate contextual response
prompt = self._generate_contextual_prompt(user_text, emotion, context)
# Generate speech with Dia TTS
try:
with torch.no_grad():
audio_output = self.dia_model.generate(
prompt,
use_torch_compile=False, # Better stability
verbose=False
)
# Ensure audio output is proper format
if isinstance(audio_output, torch.Tensor):
audio_output = audio_output.cpu().numpy()
# Normalize output
if len(audio_output) > 0:
max_val = np.max(np.abs(audio_output))
if max_val > 1.0:
audio_output = audio_output / max_val * 0.95
except Exception as e:
print(f"TTS Error: {e}")
return None, f"❌ Speech generation failed: {str(e)}", "Please try again"
# Extract AI response text (remove speaker tags and modifiers)
ai_response = prompt.split('[S2]')[-1].strip()
ai_response = ai_response.replace('(cheerful)', '').replace('(sympathetic)', '')
ai_response = ai_response.replace('(calming)', '').replace('(reassuring)', '')
ai_response = ai_response.replace('(excited)', '').strip()
# Store conversation turn
turn = ConversationTurn(
user_audio=audio,
user_text=user_text,
ai_response_text=ai_response,
ai_response_audio=audio_output,
timestamp=time.time(),
emotion=emotion,
speaker_id=session_id
)
self.conversation_manager.add_turn(session_id, turn)
# Track performance
processing_time = time.time() - start_time
self.processing_times.append(processing_time)
# Memory cleanup
if torch.cuda.is_available():
torch.cuda.empty_cache()
gc.collect()
status = f"βœ… Processed in {processing_time:.2f}s | Emotion: {emotion} | Users: {len(self.active_sessions)}"
return (44100, audio_output), status, f"**You said:** {user_text}\n\n**AI Response:** {ai_response}"
except Exception as e:
print(f"Processing error: {e}")
return None, f"❌ Processing failed: {str(e)}", "Please try again"
def get_conversation_history(self, session_id: str) -> str:
"""Get formatted conversation history"""
context = self.conversation_manager.get_context(session_id, last_n=10)
if not context:
return "No conversation history yet."
history = "## Conversation History\n\n"
for i, turn in enumerate(context, 1):
history += f"**Turn {i}:**\n"
history += f"- **You:** {turn.user_text}\n"
history += f"- **AI:** {turn.ai_response_text}\n"
history += f"- **Emotion Detected:** {turn.emotion}\n\n"
return history
def clear_conversation(self, session_id: str) -> str:
"""Clear conversation history for session"""
self.conversation_manager.clear_session(session_id)
return "Conversation history cleared."
def get_system_status(self) -> str:
"""Get system status information"""
memory = self._get_memory_usage()
avg_processing = np.mean(self.processing_times) if self.processing_times else 0
status = f"""## System Status
**Performance:**
- Average Processing Time: {avg_processing:.2f}s
- Active Sessions: {len(self.active_sessions)}
- Total Conversations: {len(self.conversation_manager.conversations)}
**Memory Usage:**
- RAM: {memory['RAM']:.1f}%
- GPU Memory: {memory.get('GPU', {})}
**Models Status:**
- Models Loaded: {"βœ…" if self.models_loaded else "❌"}
- Device: {self.device}
"""
return status
# Initialize the AI system
print("Starting Supernatural AI system...")
ai_system = SupernaturalAI()
# Gradio Interface
def process_audio_interface(audio, session_id):
"""Interface function for Gradio"""
if not session_id:
session_id = f"user_{int(time.time())}"
ai_system.active_sessions.add(session_id)
result = ai_system.process_audio_input(audio, session_id)
return result + (session_id,)
def get_history_interface(session_id):
"""Get conversation history interface"""
if not session_id:
return "No session ID provided"
return ai_system.get_conversation_history(session_id)
def clear_history_interface(session_id):
"""Clear history interface"""
if not session_id:
return "No session ID provided"
return ai_system.clear_conversation(session_id)
# Create Gradio interface
with gr.Blocks(title="Supernatural Conversational AI", theme=gr.themes.Soft()) as demo:
gr.HTML("""
<div style="text-align: center; padding: 20px;">
<h1>πŸ§™β€β™‚οΈ Supernatural Conversational AI</h1>
<p style="font-size: 18px; color: #666;">
Advanced Speech-to-Speech AI with Emotional Intelligence
</p>
<p style="color: #888;">
Powered by Ultravox + Dia TTS | Optimized for 4x L4 GPUs
</p>
</div>
""")
with gr.Row():
with gr.Column(scale=2):
# Audio input/output
audio_input = gr.Audio(
label="🎀 Speak to the AI",
sources=["microphone"],
type="numpy",
streaming=False
)
audio_output = gr.Audio(
label="πŸ”Š AI Response",
type="numpy",
autoplay=True
)
# Session management
session_id = gr.Textbox(
label="Session ID",
placeholder="Auto-generated if empty",
value="",
interactive=True
)
# Process button
process_btn = gr.Button("🎯 Process Audio", variant="primary", size="lg")
with gr.Column(scale=1):
# Status and conversation
status_display = gr.Textbox(
label="πŸ“Š Status",
interactive=False,
lines=3
)
conversation_display = gr.Markdown(
label="πŸ’¬ Conversation",
value="Start speaking to begin..."
)
# History management
with gr.Row():
history_btn = gr.Button("πŸ“œ Show History", size="sm")
clear_btn = gr.Button("πŸ—‘οΈ Clear History", size="sm")
status_btn = gr.Button("⚑ System Status", size="sm")
# History and status display
history_display = gr.Markdown(
label="πŸ“š Conversation History",
value="No history yet."
)
# Event handlers
process_btn.click(
fn=process_audio_interface,
inputs=[audio_input, session_id],
outputs=[audio_output, status_display, conversation_display, session_id]
)
history_btn.click(
fn=get_history_interface,
inputs=[session_id],
outputs=[history_display]
)
clear_btn.click(
fn=clear_history_interface,
inputs=[session_id],
outputs=[history_display]
)
status_btn.click(
fn=lambda: ai_system.get_system_status(),
outputs=[history_display]
)
# Auto-process on audio input
audio_input.change(
fn=process_audio_interface,
inputs=[audio_input, session_id],
outputs=[audio_output, status_display, conversation_display, session_id]
)
# Usage instructions
gr.HTML("""
<div style="margin-top: 20px; padding: 15px; background: #f0f8ff; border-radius: 8px;">
<h3>πŸ’‘ Usage Instructions:</h3>
<ul>
<li><strong>Record Audio:</strong> Click the microphone and speak naturally</li>
<li><strong>Emotional AI:</strong> The AI detects and responds to your emotions</li>
<li><strong>Conversation Memory:</strong> Up to 50 exchanges are remembered</li>
<li><strong>Session Management:</strong> Use Session ID to maintain separate conversations</li>
<li><strong>Performance:</strong> Optimized for sub-500ms latency</li>
</ul>
<p><strong>Supported Features:</strong> Emotion recognition, voice activity detection,
contextual responses, conversation history, concurrent users (15-20), memory management</p>
</div>
""")
# Configure for optimal performance
demo.queue(
concurrency_count=20, # Support 20 concurrent users
max_size=100,
api_open=False
)
if __name__ == "__main__":
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
quiet=False,
enable_queue=True,
max_threads=40
)