|
''' |
|
pip install datasets soundfile huggingface_hub librosa |
|
|
|
|
|
from datasets import load_dataset |
|
import soundfile as sf |
|
import os |
|
from collections import defaultdict |
|
import io |
|
|
|
def download_voices_with_dynamic_counting(output_folder='genshin_voices_sample_5', max_files_per_speaker=5): |
|
"""动态统计并下载所有speaker的音频和转录文件(兼容bytes/path格式)""" |
|
# 加载数据集(流式模式) |
|
dataset = load_dataset('simon3000/genshin-voice', split='train', streaming=True) |
|
|
|
# 过滤条件:中文、有转录、类型为对话 |
|
filtered_data = dataset.filter( |
|
lambda x: ( |
|
x['language'] == 'Chinese' and |
|
x['transcription'] != '' and |
|
x['type'] == 'Dialog' |
|
) |
|
) |
|
|
|
# 动态统计speaker计数和文件下载 |
|
speaker_counts = defaultdict(int) |
|
speaker_file_indices = defaultdict(int) |
|
|
|
os.makedirs(output_folder, exist_ok=True) |
|
|
|
for voice in filtered_data: |
|
speaker = voice['speaker'] |
|
|
|
# 如果该speaker已下载足够文件,跳过 |
|
if speaker_counts[speaker] >= max_files_per_speaker: |
|
continue |
|
|
|
# 更新speaker计数 |
|
speaker_counts[speaker] += 1 |
|
file_num = str(speaker_file_indices[speaker] + 1).zfill(5) # 从00001开始 |
|
|
|
# 创建speaker子文件夹 |
|
speaker_folder = os.path.join(output_folder, speaker) |
|
os.makedirs(speaker_folder, exist_ok=True) |
|
|
|
# 构建文件路径 |
|
audio_path = os.path.join(speaker_folder, f'{speaker}_{file_num}.wav') |
|
transcription_path = os.path.join(speaker_folder, f'{speaker}_{file_num}.txt') |
|
|
|
# 处理音频数据(兼容bytes或path格式) |
|
audio_data = voice['audio'] |
|
try: |
|
if 'bytes' in audio_data and audio_data['bytes'] is not None: |
|
# 从bytes直接读取音频 |
|
with io.BytesIO(audio_data['bytes']) as audio_bytes: |
|
data, samplerate = sf.read(audio_bytes) |
|
sf.write(audio_path, data, samplerate) |
|
''' |
|
elif 'path' in audio_data and os.path.exists(audio_data['path']): |
|
|
|
data, samplerate = sf.read(audio_data['path']) |
|
sf.write(audio_path, data, samplerate) |
|
''' |
|
else: |
|
print(f"警告: {speaker}的音频数据格式不支持,跳过") |
|
speaker_counts[speaker] -= 1 # 回滚计数 |
|
continue |
|
except Exception as e: |
|
print(f"处理{speaker}的音频时出错: {str(e)}") |
|
speaker_counts[speaker] -= 1 |
|
continue |
|
|
|
# 保存转录文件 |
|
with open(transcription_path, 'w', encoding='utf-8') as f: |
|
f.write(voice['transcription']) |
|
|
|
speaker_file_indices[speaker] += 1 |
|
print( |
|
f"[下载进度] {speaker}_{file_num} | " |
|
f"进度: {speaker_counts[speaker]}/{max_files_per_speaker}" |
|
) |
|
|
|
# 打印最终统计 |
|
print("\n=== 下载结果 ===") |
|
for speaker, count in speaker_counts.items(): |
|
print(f"{speaker}: {count}个文件") |
|
|
|
if __name__ == '__main__': |
|
download_voices_with_dynamic_counting() |
|
|
|
|
|
from gradio_client import Client, handle_file |
|
|
|
client = Client("http://localhost:7860") |
|
result = client.predict( |
|
prompt=handle_file('genshin_voices_sample_5/Ahangar/Ahangar_00001.wav'), |
|
text="偷窃者没有好下场", |
|
api_name="/gen_single" |
|
) |
|
print(result) |
|
from shutil import copy2 |
|
copy2(result["value"], result["value"].split("/")[-1]) |
|
|
|
''' |
|
|
|
import spaces |
|
import os |
|
import shutil |
|
import threading |
|
import time |
|
import sys |
|
|
|
from huggingface_hub import snapshot_download |
|
|
|
current_dir = os.path.dirname(os.path.abspath(__file__)) |
|
sys.path.append(current_dir) |
|
sys.path.append(os.path.join(current_dir, "indextts")) |
|
|
|
import gradio as gr |
|
from indextts.infer import IndexTTS |
|
from tools.i18n.i18n import I18nAuto |
|
|
|
i18n = I18nAuto(language="zh_CN") |
|
MODE = 'local' |
|
snapshot_download("IndexTeam/IndexTTS-1.5",local_dir="checkpoints",) |
|
tts = IndexTTS(model_dir="checkpoints", cfg_path="checkpoints/config.yaml") |
|
|
|
os.makedirs("outputs/tasks",exist_ok=True) |
|
os.makedirs("prompts",exist_ok=True) |
|
|
|
@spaces.GPU |
|
def infer(voice, text,output_path=None): |
|
if not tts: |
|
raise Exception("Model not loaded") |
|
if not output_path: |
|
output_path = os.path.join("outputs", f"spk_{int(time.time())}.wav") |
|
tts.infer(voice, text, output_path) |
|
return output_path |
|
|
|
def tts_api(voice, text): |
|
try: |
|
output_path = infer(voice, text) |
|
with open(output_path, "rb") as f: |
|
audio_bytes = f.read() |
|
return (200, {}, audio_bytes) |
|
except Exception as e: |
|
return (500, {"error": str(e)}, None) |
|
|
|
def gen_single(prompt, text): |
|
output_path = infer(prompt, text) |
|
return gr.update(value=output_path,visible=True) |
|
|
|
def update_prompt_audio(): |
|
update_button = gr.update(interactive=True) |
|
return update_button |
|
|
|
with gr.Blocks() as demo: |
|
mutex = threading.Lock() |
|
gr.HTML(''' |
|
<h2><center>IndexTTS: An Industrial-Level Controllable and Efficient Zero-Shot Text-To-Speech System</h2> |
|
|
|
<p align="center"> |
|
<a href='https://arxiv.org/abs/2502.05512'><img src='https://img.shields.io/badge/ArXiv-2502.05512-red'></a> |
|
''') |
|
with gr.Tab("音频生成"): |
|
with gr.Row(): |
|
os.makedirs("prompts",exist_ok=True) |
|
prompt_audio = gr.Audio(label="请上传参考音频",key="prompt_audio", |
|
sources=["upload","microphone"],type="filepath") |
|
prompt_list = os.listdir("prompts") |
|
default = '' |
|
if prompt_list: |
|
default = prompt_list[0] |
|
input_text_single = gr.Textbox(label="请输入目标文本",key="input_text_single") |
|
gen_button = gr.Button("生成语音",key="gen_button",interactive=True) |
|
output_audio = gr.Audio(label="生成结果", visible=False,key="output_audio") |
|
|
|
prompt_audio.upload(update_prompt_audio, |
|
inputs=[], |
|
outputs=[gen_button]) |
|
|
|
gen_button.click(gen_single, |
|
inputs=[prompt_audio, input_text_single], |
|
outputs=[output_audio]) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(): |
|
tts.load_normalizer() |
|
demo.launch(server_name="0.0.0.0", server_port=7860, share = True) |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|
|
|