Wan2GP / wan /utils /notification_sound.py
zxymimi23451's picture
Upload 258 files
78360e7 verified
"""Add commentMore actions
Notification sounds for Wan2GP video generation application
Pure Python audio notification system with multiple backend support
"""
import os
import sys
import threading
import time
import numpy as np
def generate_notification_beep(volume=50, sample_rate=44100):
"""Generate pleasant C major chord notification sound"""
if volume == 0:
return np.array([])
volume = max(0, min(100, volume))
# Volume curve mapping: 25%->50%, 50%->75%, 75%->100%, 100%->105%
if volume <= 25:
volume_mapped = (volume / 25.0) * 0.5
elif volume <= 50:
volume_mapped = 0.5 + ((volume - 25) / 25.0) * 0.25
elif volume <= 75:
volume_mapped = 0.75 + ((volume - 50) / 25.0) * 0.25
else:
volume_mapped = 1.0 + ((volume - 75) / 25.0) * 0.05 # Only 5% boost instead of 15%
volume = volume_mapped
# C major chord frequencies
freq_c = 261.63 # C4
freq_e = 329.63 # E4
freq_g = 392.00 # G4
duration = 0.8
t = np.linspace(0, duration, int(sample_rate * duration), False)
# Generate chord components
wave_c = np.sin(freq_c * 2 * np.pi * t) * 0.4
wave_e = np.sin(freq_e * 2 * np.pi * t) * 0.3
wave_g = np.sin(freq_g * 2 * np.pi * t) * 0.2
wave = wave_c + wave_e + wave_g
# Prevent clipping
max_amplitude = np.max(np.abs(wave))
if max_amplitude > 0:
wave = wave / max_amplitude * 0.8
# ADSR envelope
def apply_adsr_envelope(wave_data):
length = len(wave_data)
attack_time = int(0.2 * length)
decay_time = int(0.1 * length)
release_time = int(0.5 * length)
envelope = np.ones(length)
if attack_time > 0:
envelope[:attack_time] = np.power(np.linspace(0, 1, attack_time), 3)
if decay_time > 0:
start_idx = attack_time
end_idx = attack_time + decay_time
envelope[start_idx:end_idx] = np.linspace(1, 0.85, decay_time)
if release_time > 0:
start_idx = length - release_time
envelope[start_idx:] = 0.85 * np.exp(-4 * np.linspace(0, 1, release_time))
return wave_data * envelope
wave = apply_adsr_envelope(wave)
# Simple low-pass filter
def simple_lowpass_filter(signal, cutoff_ratio=0.8):
window_size = max(3, int(len(signal) * 0.001))
if window_size % 2 == 0:
window_size += 1
kernel = np.ones(window_size) / window_size
padded = np.pad(signal, window_size//2, mode='edge')
filtered = np.convolve(padded, kernel, mode='same')
return filtered[window_size//2:-window_size//2]
wave = simple_lowpass_filter(wave)
# Add reverb effect
if len(wave) > sample_rate // 4:
delay_samples = int(0.12 * sample_rate)
reverb = np.zeros_like(wave)
reverb[delay_samples:] = wave[:-delay_samples] * 0.08
wave = wave + reverb
# Apply volume first, then normalize to prevent clipping
wave = wave * volume * 0.5
# Final normalization with safety margin
max_amplitude = np.max(np.abs(wave))
if max_amplitude > 0.85: # If approaching clipping threshold
wave = wave / max_amplitude * 0.85 # More conservative normalization
return wave
def play_audio_with_pygame(audio_data, sample_rate=44100):
"""Play audio using pygame backend"""
try:
import pygame
# Initialize pygame mixer only if not already initialized
if not pygame.mixer.get_init():
pygame.mixer.pre_init(frequency=sample_rate, size=-16, channels=2, buffer=1024)
pygame.mixer.init()
else:
# Reinitialize with new settings if needed
current_freq, current_size, current_channels = pygame.mixer.get_init()
if current_freq != sample_rate or current_channels != 2:
pygame.mixer.quit()
pygame.mixer.pre_init(frequency=sample_rate, size=-16, channels=2, buffer=1024)
pygame.mixer.init()
audio_int16 = (audio_data * 32767).astype(np.int16)
# Convert mono to stereo
if len(audio_int16.shape) == 1:
stereo_data = np.column_stack((audio_int16, audio_int16))
else:
stereo_data = audio_int16
sound = pygame.sndarray.make_sound(stereo_data)
sound.play()
pygame.time.wait(int(len(audio_data) / sample_rate * 1000) + 100)
# Don't quit mixer - this can interfere with Gradio server
# pygame.mixer.quit()
return True
except ImportError:
return False
except Exception as e:
print(f"Pygame error: {e}")
return False
def play_audio_with_sounddevice(audio_data, sample_rate=44100):
"""Play audio using sounddevice backend"""
try:
import sounddevice as sd
sd.play(audio_data, sample_rate)
sd.wait()
return True
except ImportError:
return False
except Exception as e:
print(f"Sounddevice error: {e}")
return False
def play_audio_with_winsound(audio_data, sample_rate=44100):
"""Play audio using winsound backend (Windows only)"""
if sys.platform != "win32":
return False
try:
import winsound
import wave
import tempfile
import uuid
temp_dir = tempfile.gettempdir()
temp_filename = os.path.join(temp_dir, f"notification_{uuid.uuid4().hex}.wav")
try:
with wave.open(temp_filename, 'w') as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2)
wav_file.setframerate(sample_rate)
audio_int16 = (audio_data * 32767).astype(np.int16)
wav_file.writeframes(audio_int16.tobytes())
winsound.PlaySound(temp_filename, winsound.SND_FILENAME)
finally:
# Clean up temp file
for _ in range(3):
try:
if os.path.exists(temp_filename):
os.unlink(temp_filename)
break
except:
time.sleep(0.1)
return True
except ImportError:
return False
except Exception as e:
print(f"Winsound error: {e}")
return False
def play_notification_sound(volume=50):
"""Play notification sound with specified volume"""
if volume == 0:
return
audio_data = generate_notification_beep(volume=volume)
if len(audio_data) == 0:
return
# Try audio backends in order
audio_backends = [
play_audio_with_pygame,
play_audio_with_sounddevice,
play_audio_with_winsound,
]
for backend in audio_backends:
try:
if backend(audio_data):
return
except Exception as e:
continue
# Fallback: terminal beep
print(f"All audio backends failed, using terminal beep")
print('\a')
def play_notification_async(volume=50):
"""Play notification sound asynchronously (non-blocking)"""
def play_sound():
try:
play_notification_sound(volume)
except Exception as e:
print(f"Error playing notification sound: {e}")
sound_thread = threading.Thread(target=play_sound, daemon=True)
sound_thread.start()
def notify_video_completion(video_path=None, volume=50):
"""Notify about completed video generation"""
play_notification_async(volume)
if __name__ == "__main__":
print("Testing notification sounds with different volumes...")
print("Auto-detecting available audio backends...")
volumes = [25, 50, 75, 100]
for vol in volumes:
print(f"Testing volume {vol}%:")
play_notification_sound(vol)
time.sleep(2)
print("Test completed!")