Podcastking2 / conver.py
gnosticdev's picture
Update conver.py
4d25f3d verified
raw
history blame
7.01 kB
from dataclasses import dataclass
from typing import List, Tuple, Dict
import os
import re
import httpx
import json
from openai import OpenAI
import edge_tts
import tempfile
from pydub import AudioSegment
import base64
from pathlib import Path
import hashlib
import asyncio
@dataclass
class ConversationConfig:
max_words: int = 3000
prefix_url: str = "https://r.jina.ai/"
model_name: str = "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo"
class URLToAudioConverter:
def __init__(self, config: ConversationConfig, llm_api_key: str):
self.config = config
self.llm_client = OpenAI(api_key=llm_api_key, base_url="https://api.together.xyz/v1")
self.llm_out = None
def fetch_text(self, url: str) -> str:
if not url:
raise ValueError("URL cannot be empty")
response = httpx.get(f"{self.config.prefix_url}{url}", timeout=60.0)
response.raise_for_status()
return response.text
def extract_conversation(self, text: str) -> Dict:
prompt = (
f"{text}\nConvert the provided text into a short informative podcast conversation "
f"between two experts. Return ONLY a JSON object with the following structure:\n"
'{"conversation": [{"speaker": "Speaker1", "text": "..."}, {"speaker": "Speaker2", "text": "..."}]}'
)
chat_completion = self.llm_client.chat.completions.create(
messages=[{"role": "user", "content": prompt}],
model=self.config.model_name,
response_format={"type": "json_object"}
)
response_content = chat_completion.choices[0].message.content
json_str = response_content.strip()
if not json_str.startswith("{"):
json_str = json_str[json_str.find("{"):]
if not json_str.endswith("}"):
json_str = json_str[: json_str.rfind("}") + 1]
return json.loads(json_str)
async def text_to_speech(self, conversation_json: Dict, voice_1: str, voice_2: str) -> Tuple[List[str], str]:
output_dir = Path(self._create_output_directory())
filenames = []
for i, turn in enumerate(conversation_json["conversation"]):
voice = voice_1 if i % 2 == 0 else voice_2
tmp_path, error = await self._generate_audio(turn["text"], voice)
if error:
raise RuntimeError(f"Text-to-speech failed: {error}")
filename = output_dir / f"output_{i}.mp3"
os.rename(tmp_path, filename)
filenames.append(str(filename))
return filenames, str(output_dir)
async def _generate_audio(self, text: str, voice: str, rate: int = 0, pitch: int = 0) -> Tuple[str, str]:
voice_short_name = voice.split(" - ")[0]
rate_str = f"{rate:+d}%"
pitch_str = f"{pitch:+d}Hz"
communicate = edge_tts.Communicate(text, voice_short_name, rate=rate_str, pitch=pitch_str)
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
tmp_path = tmp_file.name
await communicate.save(tmp_path)
return tmp_path, None
def _create_output_directory(self) -> str:
random_bytes = os.urandom(8)
folder_name = base64.urlsafe_b64encode(random_bytes).decode("utf-8")
os.makedirs(folder_name, exist_ok=True)
return folder_name
def combine_audio_files(self, filenames: List[str], output_file: str) -> None:
combined = AudioSegment.empty()
for filename in filenames:
combined += AudioSegment.from_file(filename, format="mp3")
combined.export(output_file, format="mp3")
dir_path = os.path.dirname(filenames[0])
for file in os.listdir(dir_path):
os.remove(os.path.join(dir_path, file))
os.rmdir(dir_path)
async def url_to_audio(self, url: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
text = self.fetch_text(url)
words = text.split()
if len(words) > self.config.max_words:
text = " ".join(words[: self.config.max_words])
conversation_json = self.extract_conversation(text)
conversation_text = "\n".join(f"{t['speaker']}: {t['text']}" for t in conversation_json["conversation"])
self.llm_out = conversation_json
audio_files, folder_name = await self.text_to_speech(conversation_json, voice_1, voice_2)
final_output = os.path.join(folder_name, "combined_output.mp3")
self.combine_audio_files(audio_files, final_output)
return final_output, conversation_text
async def text_to_audio(self, text: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
conversation_json = self.extract_conversation(text)
conversation_text = "\n".join(f"{t['speaker']}: {t['text']}" for t in conversation_json["conversation"])
audio_files, folder_name = await self.text_to_speech(conversation_json, voice_1, voice_2)
final_output = os.path.join(folder_name, "combined_output.mp3")
self.combine_audio_files(audio_files, final_output)
return final_output, conversation_text
async def raw_text_to_audio(self, text: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
try:
print("\n=== DEBUG INICIO (raw_text_to_audio) ===")
print(f"Texto recibido: {text[:200]}...") # Verifica el input
# Usa una ruta absoluta en /tmp (compatible con Spaces)
output_dir = "/tmp/podcast_outputs"
os.makedirs(output_dir, exist_ok=True)
hash_name = hashlib.md5(text.encode()).hexdigest()[:8]
output_file = os.path.join(output_dir, f"podcast_{hash_name}.mp3")
print(f"Ruta de salida: {output_file}")
# Verifica voces disponibles (DEBUG)
voices = await edge_tts.list_voices()
voice_names = [v['Name'] for v in voices]
print(f"Voces disponibles (primeras 5): {voice_names[:5]}...")
# Extrae el nombre corto de la voz (ej: "en-US-AvaMultilingualNeural")
voice_short = voice_1.split(" - ")[0] if " - " in voice_1 else voice_1
print(f"Voz a usar: {voice_short}")
# Genera el audio
communicate = edge_tts.Communicate(text, voice_short)
print("Generando audio...")
await communicate.save(output_file)
print("Audio generado.")
# Verifica que el archivo existe y no está vacío
if not os.path.exists(output_file):
print("ERROR: Archivo no creado.")
return "Error: Archivo no generado", None
elif os.path.getsize(output_file) == 0:
print("ERROR: Archivo vacío.")
return "Error: Archivo de audio vacío", None
print(f"=== DEBUG FIN (Archivo válido: {output_file}) ===")
return text, output_file
except Exception as e:
print(f"ERROR CRÍTICO: {str(e)}")
return f"Error: {str(e)}", None