Podcastking2 / conver.py
gnosticdev's picture
Update conver.py
cc36c27 verified
from dataclasses import dataclass
from typing import List, Tuple, Dict
import os
import json
import httpx
from openai import OpenAI
import edge_tts
import tempfile
from pydub import AudioSegment
import base64
from pathlib import Path
import time
from threading import Thread
import re
@dataclass
class ConversationConfig:
max_words: int = 3000
prefix_url: str = "https://r.jina.ai/"
model_name: str = "meta-llama/Llama-3-8b-chat-hf"
custom_prompt_template: str = None
class URLToAudioConverter:
def __init__(self, config: ConversationConfig, llm_api_key: str):
self.config = config
self.llm_client = OpenAI(api_key=llm_api_key, base_url="https://api.together.xyz/v1")
self.llm_out = None
self._start_cleaner()
self.ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
self.MUSICA_FONDO = os.path.join(self.ROOT_DIR, "musica.mp3")
self.TAG1 = os.path.join(self.ROOT_DIR, "tag.mp3")
self.TAG2 = os.path.join(self.ROOT_DIR, "tag2.mp3")
def _start_cleaner(self, max_age_hours: int = 24):
def cleaner():
while True:
now = time.time()
for root, _, files in os.walk("."):
for file in files:
if file.endswith((".mp3", ".wav")):
filepath = os.path.join(root, file)
try:
if now - os.path.getmtime(filepath) > max_age_hours * 3600:
os.remove(filepath)
except:
pass
time.sleep(3600)
Thread(target=cleaner, daemon=True).start()
def fetch_text(self, url: str) -> str:
if not url:
raise ValueError("URL cannot be empty")
full_url = f"{self.config.prefix_url}{url}"
try:
response = httpx.get(full_url, timeout=60.0)
response.raise_for_status()
return response.text
except httpx.HTTPError as e:
raise RuntimeError(f"Failed to fetch URL: {e}")
def extract_conversation(self, text: str) -> Dict:
if not text:
raise ValueError("Input text cannot be empty")
try:
prompt = self.config.custom_prompt_template.format(text=text) if self.config.custom_prompt_template else (
f"{text}\nConvierte el texto en un diálogo de podcast en español entre Anfitrión1 y Anfitrión2. "
f"Genera una conversación extensa y natural con al menos 5 intercambios por hablante. "
f"Devuelve SOLO un objeto JSON con la estructura: "
f'{{"conversation": [{{"speaker": "Anfitrión1", "text": "..."}}, {{"speaker": "Anfitrión2", "text": "..."}}]}}'
)
response = self.llm_client.chat.completions.create(
messages=[{"role": "user", "content": prompt}],
model=self.config.model_name,
response_format={"type": "json_object"}
)
response_content = response.choices[0].message.content
# Clean response to extract valid JSON
response_content = response_content.strip()
# Find the first valid JSON object
start_idx = response_content.find('{')
end_idx = response_content.rfind('}') + 1
if start_idx == -1 or end_idx == 0:
raise ValueError("No valid JSON object found in response")
json_str = response_content[start_idx:end_idx]
# Clean problematic characters and fix JSON issues
json_str = re.sub(r',\s*([\]}])', r'\1', json_str) # Remove trailing commas
json_str = re.sub(r'\s+', ' ', json_str) # Replace multiple spaces
json_str = json_str.replace('\\"', '"').replace('"{', '{').replace('}"', '}')
json_str = re.sub(r'(\w+):', r'"\1":', json_str) # Ensure keys are quoted
try:
dialogue = json.loads(json_str)
except json.JSONDecodeError as e:
# Attempt to fix by truncating to last valid array element
last_comma = json_str.rfind(',', 0, json_str.rfind(']'))
if last_comma != -1:
json_str = json_str[:last_comma] + json_str[json_str.rfind(']'):]
try:
dialogue = json.loads(json_str)
except json.JSONDecodeError as e2:
raise ValueError(f"JSON parsing failed: {str(e2)}")
else:
raise ValueError(f"JSON parsing failed: {str(e)}")
if not dialogue.get("conversation") or not isinstance(dialogue["conversation"], list):
raise ValueError("No valid conversation generated")
return dialogue
except Exception as e:
raise RuntimeError(f"Failed to parse dialogue: {str(e)}")
async def text_to_speech(self, conversation_json: Dict, voice_1: str, voice_2: str) -> Tuple[List[str], str]:
output_dir = Path(self._create_output_directory())
filenames = []
try:
if not conversation_json["conversation"]:
raise ValueError("No conversation data to process")
for i, turn in enumerate(conversation_json["conversation"]):
filename = output_dir / f"segment_{i}.mp3"
voice = voice_1 if turn["speaker"] == "Anfitrión1" else voice_2
tmp_path = await self._generate_audio(turn["text"], voice)
os.rename(tmp_path, filename)
filenames.append(str(filename))
if not filenames:
raise ValueError("No audio files generated")
return filenames, str(output_dir)
except Exception as e:
raise RuntimeError(f"Text-to-speech failed: {e}")
async def _generate_audio(self, text: str, voice: str) -> str:
if not text.strip():
raise ValueError("Text cannot be empty")
communicate = edge_tts.Communicate(
text,
voice.split(" - ")[0],
rate="+0%",
pitch="+0Hz"
)
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
await communicate.save(tmp_file.name)
return tmp_file.name
def _create_output_directory(self) -> str:
folder_name = base64.urlsafe_b64encode(os.urandom(8)).decode("utf-8")
os.makedirs(folder_name, exist_ok=True)
return folder_name
def combine_audio_files(self, filenames: List[str]) -> AudioSegment:
if not filenames:
raise ValueError("No audio files provided")
combined = AudioSegment.empty()
for filename in filenames:
combined += AudioSegment.from_file(filename, format="mp3")
return combined
def add_background_music_and_tags(
self,
speech_audio: AudioSegment,
music_path: str,
tags_paths: List[str],
custom_music_path: str = None,
use_background_music: bool = True
) -> AudioSegment:
tag_outro_file = self.TAG1
tag_trans_file = self.TAG2
if not os.path.exists(tag_outro_file):
raise FileNotFoundError(f"Tag file not found: {tag_outro_file}")
if not os.path.exists(tag_trans_file):
raise FileNotFoundError(f"Tag file not found: {tag_trans_file}")
final_audio = speech_audio
if use_background_music:
music_file = custom_music_path if custom_music_path and os.path.exists(custom_music_path) else self.MUSICA_FONDO
if not os.path.exists(music_file):
raise FileNotFoundError(f"Music file not found: {music_file}")
music = AudioSegment.from_file(music_file).fade_out(2000) - 25
if len(music) < len(speech_audio):
music = music * ((len(speech_audio) // len(music)) + 1)
music = music[:len(speech_audio)]
final_audio = final_audio.overlay(music)
tag_outro = AudioSegment.from_file(tag_outro_file) - 10
tag_trans = AudioSegment.from_file(tag_trans_file) - 10
final_audio = final_audio + tag_outro
silent_ranges = []
for i in range(0, len(speech_audio) - 500, 100):
chunk = speech_audio[i:i+500]
if chunk.dBFS < -40:
silent_ranges.append((i, i + 500))
for start, end in reversed(silent_ranges):
if (end - start) >= len(tag_trans):
final_audio = final_audio.overlay(tag_trans, position=start + 50)
return final_audio
async def url_to_audio(self, url: str, voice_1: str, voice_2: str, custom_music_path: str = None) -> Tuple[str, str]:
text = self.fetch_text(url)
if len(words := text.split()) > self.config.max_words:
text = " ".join(words[:self.config.max_words])
conversation = self.extract_conversation(text)
return await self._process_to_audio(conversation, voice_1, voice_2, custom_music_path)
async def text_to_audio(self, text: str, voice_1: str, voice_2: str, custom_music_path: str = None) -> Tuple[str, str]:
conversation = self.extract_conversation(text)
return await self._process_to_audio(conversation, voice_1, voice_2, custom_music_path)
async def raw_text_to_audio(self, text: str, voice_1: str, voice_2: str, custom_music_path: str = None) -> Tuple[str, str]:
conversation = {"conversation": [{"speaker": "Anfitrión1", "text": text}]}
return await self._process_to_audio(conversation, voice_1, voice_2, custom_music_path)
async def _process_to_audio(
self,
conversation: Dict,
voice_1: str,
voice_2: str,
custom_music_path: str = None
) -> Tuple[str, str]:
audio_files, folder_name = await self.text_to_speech(conversation, voice_1, voice_2)
combined = self.combine_audio_files(audio_files)
final_audio = self.add_background_music_and_tags(
combined,
self.MUSICA_FONDO,
[self.TAG1, self.TAG2],
custom_music_path,
use_background_music=custom_music_path is not None
)
output_path = os.path.join(folder_name, "podcast_final.mp3")
final_audio.export(output_path, format="mp3")
for f in audio_files:
os.remove(f)
text_output = "\n".join(
f"{turn['speaker']}: {turn['text']}"
for turn in conversation["conversation"]
)
return output_path, text_output