Spaces:
Running
Running
File size: 7,010 Bytes
5fe16b1 15d0727 5fe16b1 187ec28 4d25f3d 5fe16b1 187ec28 5fe16b1 187ec28 5fe16b1 187ec28 5fe16b1 885ea0a 5fe16b1 187ec28 5fe16b1 41484d1 5fe16b1 187ec28 5fe16b1 187ec28 5fe16b1 187ec28 885ea0a 5fe16b1 41484d1 187ec28 885ea0a 41484d1 bf07215 4d25f3d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
from dataclasses import dataclass
from typing import List, Tuple, Dict
import os
import re
import httpx
import json
from openai import OpenAI
import edge_tts
import tempfile
from pydub import AudioSegment
import base64
from pathlib import Path
import hashlib
import asyncio
@dataclass
class ConversationConfig:
max_words: int = 3000
prefix_url: str = "https://r.jina.ai/"
model_name: str = "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo"
class URLToAudioConverter:
def __init__(self, config: ConversationConfig, llm_api_key: str):
self.config = config
self.llm_client = OpenAI(api_key=llm_api_key, base_url="https://api.together.xyz/v1")
self.llm_out = None
def fetch_text(self, url: str) -> str:
if not url:
raise ValueError("URL cannot be empty")
response = httpx.get(f"{self.config.prefix_url}{url}", timeout=60.0)
response.raise_for_status()
return response.text
def extract_conversation(self, text: str) -> Dict:
prompt = (
f"{text}\nConvert the provided text into a short informative podcast conversation "
f"between two experts. Return ONLY a JSON object with the following structure:\n"
'{"conversation": [{"speaker": "Speaker1", "text": "..."}, {"speaker": "Speaker2", "text": "..."}]}'
)
chat_completion = self.llm_client.chat.completions.create(
messages=[{"role": "user", "content": prompt}],
model=self.config.model_name,
response_format={"type": "json_object"}
)
response_content = chat_completion.choices[0].message.content
json_str = response_content.strip()
if not json_str.startswith("{"):
json_str = json_str[json_str.find("{"):]
if not json_str.endswith("}"):
json_str = json_str[: json_str.rfind("}") + 1]
return json.loads(json_str)
async def text_to_speech(self, conversation_json: Dict, voice_1: str, voice_2: str) -> Tuple[List[str], str]:
output_dir = Path(self._create_output_directory())
filenames = []
for i, turn in enumerate(conversation_json["conversation"]):
voice = voice_1 if i % 2 == 0 else voice_2
tmp_path, error = await self._generate_audio(turn["text"], voice)
if error:
raise RuntimeError(f"Text-to-speech failed: {error}")
filename = output_dir / f"output_{i}.mp3"
os.rename(tmp_path, filename)
filenames.append(str(filename))
return filenames, str(output_dir)
async def _generate_audio(self, text: str, voice: str, rate: int = 0, pitch: int = 0) -> Tuple[str, str]:
voice_short_name = voice.split(" - ")[0]
rate_str = f"{rate:+d}%"
pitch_str = f"{pitch:+d}Hz"
communicate = edge_tts.Communicate(text, voice_short_name, rate=rate_str, pitch=pitch_str)
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
tmp_path = tmp_file.name
await communicate.save(tmp_path)
return tmp_path, None
def _create_output_directory(self) -> str:
random_bytes = os.urandom(8)
folder_name = base64.urlsafe_b64encode(random_bytes).decode("utf-8")
os.makedirs(folder_name, exist_ok=True)
return folder_name
def combine_audio_files(self, filenames: List[str], output_file: str) -> None:
combined = AudioSegment.empty()
for filename in filenames:
combined += AudioSegment.from_file(filename, format="mp3")
combined.export(output_file, format="mp3")
dir_path = os.path.dirname(filenames[0])
for file in os.listdir(dir_path):
os.remove(os.path.join(dir_path, file))
os.rmdir(dir_path)
async def url_to_audio(self, url: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
text = self.fetch_text(url)
words = text.split()
if len(words) > self.config.max_words:
text = " ".join(words[: self.config.max_words])
conversation_json = self.extract_conversation(text)
conversation_text = "\n".join(f"{t['speaker']}: {t['text']}" for t in conversation_json["conversation"])
self.llm_out = conversation_json
audio_files, folder_name = await self.text_to_speech(conversation_json, voice_1, voice_2)
final_output = os.path.join(folder_name, "combined_output.mp3")
self.combine_audio_files(audio_files, final_output)
return final_output, conversation_text
async def text_to_audio(self, text: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
conversation_json = self.extract_conversation(text)
conversation_text = "\n".join(f"{t['speaker']}: {t['text']}" for t in conversation_json["conversation"])
audio_files, folder_name = await self.text_to_speech(conversation_json, voice_1, voice_2)
final_output = os.path.join(folder_name, "combined_output.mp3")
self.combine_audio_files(audio_files, final_output)
return final_output, conversation_text
async def raw_text_to_audio(self, text: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
try:
print("\n=== DEBUG INICIO (raw_text_to_audio) ===")
print(f"Texto recibido: {text[:200]}...") # Verifica el input
# Usa una ruta absoluta en /tmp (compatible con Spaces)
output_dir = "/tmp/podcast_outputs"
os.makedirs(output_dir, exist_ok=True)
hash_name = hashlib.md5(text.encode()).hexdigest()[:8]
output_file = os.path.join(output_dir, f"podcast_{hash_name}.mp3")
print(f"Ruta de salida: {output_file}")
# Verifica voces disponibles (DEBUG)
voices = await edge_tts.list_voices()
voice_names = [v['Name'] for v in voices]
print(f"Voces disponibles (primeras 5): {voice_names[:5]}...")
# Extrae el nombre corto de la voz (ej: "en-US-AvaMultilingualNeural")
voice_short = voice_1.split(" - ")[0] if " - " in voice_1 else voice_1
print(f"Voz a usar: {voice_short}")
# Genera el audio
communicate = edge_tts.Communicate(text, voice_short)
print("Generando audio...")
await communicate.save(output_file)
print("Audio generado.")
# Verifica que el archivo existe y no está vacío
if not os.path.exists(output_file):
print("ERROR: Archivo no creado.")
return "Error: Archivo no generado", None
elif os.path.getsize(output_file) == 0:
print("ERROR: Archivo vacío.")
return "Error: Archivo de audio vacío", None
print(f"=== DEBUG FIN (Archivo válido: {output_file}) ===")
return text, output_file
except Exception as e:
print(f"ERROR CRÍTICO: {str(e)}")
return f"Error: {str(e)}", None |