File size: 10,646 Bytes
5fe16b1
2247844
5fe16b1
 
587b534
5fe16b1
 
 
 
 
 
03c080b
 
d1f45a0
5fe16b1
 
 
26a40ee
5fe16b1
03c080b
9e11cdd
5fe16b1
 
 
 
 
 
26a40ee
d1f45a0
 
 
 
5fe16b1
03c080b
 
 
 
 
 
e99d513
03c080b
 
e99d513
03c080b
 
e99d513
 
03c080b
5fe16b1
e99d513
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9e11cdd
 
 
3b3add5
9e11cdd
a2a2610
e99d513
 
a2a2610
 
e99d513
a2a2610
3b3add5
2fb6f64
3b3add5
 
 
 
d1f45a0
3b3add5
 
049c2dc
 
 
3b3add5
2fb6f64
 
 
3b3add5
 
 
 
 
 
 
 
 
 
 
2fb6f64
e99d513
 
 
 
 
97cd35a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e99d513
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5fe16b1
587b534
00b6284
 
 
9e11cdd
cc36c27
 
587b534
d1f45a0
 
 
 
 
 
 
 
cc36c27
 
 
 
 
 
 
 
 
 
 
d1f45a0
 
cc36c27
64bc311
 
 
 
 
 
 
 
00b6284
bf07215
9e11cdd
e99d513
 
 
 
9e11cdd
e99d513
9e11cdd
e99d513
9e11cdd
e99d513
9e11cdd
f560070
9e11cdd
e99d513
 
 
 
 
9e11cdd
 
e99d513
37a6fed
e99d513
 
 
2fb6f64
 
cc36c27
 
e99d513
 
 
 
 
 
 
 
 
d1f45a0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
from dataclasses import dataclass
from typing import List, Tuple, Dict
import os
import json
import httpx
from openai import OpenAI
import edge_tts
import tempfile
from pydub import AudioSegment
import base64
from pathlib import Path
import time
from threading import Thread
import re

@dataclass
class ConversationConfig:
    max_words: int = 3000
    prefix_url: str = "https://r.jina.ai/"
    model_name: str = "meta-llama/Llama-3-8b-chat-hf"
    custom_prompt_template: str = None

class URLToAudioConverter:
    def __init__(self, config: ConversationConfig, llm_api_key: str):
        self.config = config
        self.llm_client = OpenAI(api_key=llm_api_key, base_url="https://api.together.xyz/v1")
        self.llm_out = None
        self._start_cleaner()
        self.ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
        self.MUSICA_FONDO = os.path.join(self.ROOT_DIR, "musica.mp3")
        self.TAG1 = os.path.join(self.ROOT_DIR, "tag.mp3")
        self.TAG2 = os.path.join(self.ROOT_DIR, "tag2.mp3")

    def _start_cleaner(self, max_age_hours: int = 24):
        def cleaner():
            while True:
                now = time.time()
                for root, _, files in os.walk("."):
                    for file in files:
                        if file.endswith((".mp3", ".wav")):
                            filepath = os.path.join(root, file)
                            try:
                                if now - os.path.getmtime(filepath) > max_age_hours * 3600:
                                    os.remove(filepath)
                            except:
                                pass
                time.sleep(3600)
        Thread(target=cleaner, daemon=True).start()

    def fetch_text(self, url: str) -> str:
        if not url:
            raise ValueError("URL cannot be empty")
        full_url = f"{self.config.prefix_url}{url}"
        try:
            response = httpx.get(full_url, timeout=60.0)
            response.raise_for_status()
            return response.text
        except httpx.HTTPError as e:
            raise RuntimeError(f"Failed to fetch URL: {e}")

    def extract_conversation(self, text: str) -> Dict:
        if not text:
            raise ValueError("Input text cannot be empty")
        try:
            prompt = self.config.custom_prompt_template.format(text=text) if self.config.custom_prompt_template else (
                f"{text}\nConvierte el texto en un diálogo de podcast en español entre Anfitrión1 y Anfitrión2. "
                f"Genera una conversación extensa y natural con al menos 5 intercambios por hablante. "
                f"Devuelve SOLO un objeto JSON con la estructura: "
                f'{{"conversation": [{{"speaker": "Anfitrión1", "text": "..."}}, {{"speaker": "Anfitrión2", "text": "..."}}]}}'
            )
            response = self.llm_client.chat.completions.create(
                messages=[{"role": "user", "content": prompt}],
                model=self.config.model_name,
                response_format={"type": "json_object"}
            )
            response_content = response.choices[0].message.content
            # Clean response to extract valid JSON
            response_content = response_content.strip()
            # Find the first valid JSON object
            start_idx = response_content.find('{')
            end_idx = response_content.rfind('}') + 1
            if start_idx == -1 or end_idx == 0:
                raise ValueError("No valid JSON object found in response")
            json_str = response_content[start_idx:end_idx]
            # Clean problematic characters and fix JSON issues
            json_str = re.sub(r',\s*([\]}])', r'\1', json_str)  # Remove trailing commas
            json_str = re.sub(r'\s+', ' ', json_str)  # Replace multiple spaces
            json_str = json_str.replace('\\"', '"').replace('"{', '{').replace('}"', '}')
            json_str = re.sub(r'(\w+):', r'"\1":', json_str)  # Ensure keys are quoted
            try:
                dialogue = json.loads(json_str)
            except json.JSONDecodeError as e:
                # Attempt to fix by truncating to last valid array element
                last_comma = json_str.rfind(',', 0, json_str.rfind(']'))
                if last_comma != -1:
                    json_str = json_str[:last_comma] + json_str[json_str.rfind(']'):]
                    try:
                        dialogue = json.loads(json_str)
                    except json.JSONDecodeError as e2:
                        raise ValueError(f"JSON parsing failed: {str(e2)}")
                else:
                    raise ValueError(f"JSON parsing failed: {str(e)}")
            if not dialogue.get("conversation") or not isinstance(dialogue["conversation"], list):
                raise ValueError("No valid conversation generated")
            return dialogue
        except Exception as e:
            raise RuntimeError(f"Failed to parse dialogue: {str(e)}")

    async def text_to_speech(self, conversation_json: Dict, voice_1: str, voice_2: str) -> Tuple[List[str], str]:
        output_dir = Path(self._create_output_directory())
        filenames = []
        try:
            if not conversation_json["conversation"]:
                raise ValueError("No conversation data to process")
            for i, turn in enumerate(conversation_json["conversation"]):
                filename = output_dir / f"segment_{i}.mp3"
                voice = voice_1 if turn["speaker"] == "Anfitrión1" else voice_2
                tmp_path = await self._generate_audio(turn["text"], voice)
                os.rename(tmp_path, filename)
                filenames.append(str(filename))
            if not filenames:
                raise ValueError("No audio files generated")
            return filenames, str(output_dir)
        except Exception as e:
            raise RuntimeError(f"Text-to-speech failed: {e}")

    async def _generate_audio(self, text: str, voice: str) -> str:
        if not text.strip():
            raise ValueError("Text cannot be empty")
        communicate = edge_tts.Communicate(
            text,
            voice.split(" - ")[0],
            rate="+0%",
            pitch="+0Hz"
        )
        with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
            await communicate.save(tmp_file.name)
            return tmp_file.name

    def _create_output_directory(self) -> str:
        folder_name = base64.urlsafe_b64encode(os.urandom(8)).decode("utf-8")
        os.makedirs(folder_name, exist_ok=True)
        return folder_name

    def combine_audio_files(self, filenames: List[str]) -> AudioSegment:
        if not filenames:
            raise ValueError("No audio files provided")
        combined = AudioSegment.empty()
        for filename in filenames:
            combined += AudioSegment.from_file(filename, format="mp3")
        return combined

    def add_background_music_and_tags(
        self,
        speech_audio: AudioSegment,
        music_path: str,
        tags_paths: List[str],
        custom_music_path: str = None,
        use_background_music: bool = True
    ) -> AudioSegment:
        tag_outro_file = self.TAG1
        tag_trans_file = self.TAG2

        if not os.path.exists(tag_outro_file):
            raise FileNotFoundError(f"Tag file not found: {tag_outro_file}")
        if not os.path.exists(tag_trans_file):
            raise FileNotFoundError(f"Tag file not found: {tag_trans_file}")

        final_audio = speech_audio
        if use_background_music:
            music_file = custom_music_path if custom_music_path and os.path.exists(custom_music_path) else self.MUSICA_FONDO
            if not os.path.exists(music_file):
                raise FileNotFoundError(f"Music file not found: {music_file}")
            music = AudioSegment.from_file(music_file).fade_out(2000) - 25
            if len(music) < len(speech_audio):
                music = music * ((len(speech_audio) // len(music)) + 1)
            music = music[:len(speech_audio)]
            final_audio = final_audio.overlay(music)

        tag_outro = AudioSegment.from_file(tag_outro_file) - 10
        tag_trans = AudioSegment.from_file(tag_trans_file) - 10
        final_audio = final_audio + tag_outro
        silent_ranges = []
        for i in range(0, len(speech_audio) - 500, 100):
            chunk = speech_audio[i:i+500]
            if chunk.dBFS < -40:
                silent_ranges.append((i, i + 500))
        for start, end in reversed(silent_ranges):
            if (end - start) >= len(tag_trans):
                final_audio = final_audio.overlay(tag_trans, position=start + 50)
        return final_audio

    async def url_to_audio(self, url: str, voice_1: str, voice_2: str, custom_music_path: str = None) -> Tuple[str, str]:
        text = self.fetch_text(url)
        if len(words := text.split()) > self.config.max_words:
            text = " ".join(words[:self.config.max_words])
        conversation = self.extract_conversation(text)
        return await self._process_to_audio(conversation, voice_1, voice_2, custom_music_path)

    async def text_to_audio(self, text: str, voice_1: str, voice_2: str, custom_music_path: str = None) -> Tuple[str, str]:
        conversation = self.extract_conversation(text)
        return await self._process_to_audio(conversation, voice_1, voice_2, custom_music_path)

    async def raw_text_to_audio(self, text: str, voice_1: str, voice_2: str, custom_music_path: str = None) -> Tuple[str, str]:
        conversation = {"conversation": [{"speaker": "Anfitrión1", "text": text}]}
        return await self._process_to_audio(conversation, voice_1, voice_2, custom_music_path)

    async def _process_to_audio(
        self,
        conversation: Dict,
        voice_1: str,
        voice_2: str,
        custom_music_path: str = None
    ) -> Tuple[str, str]:
        audio_files, folder_name = await self.text_to_speech(conversation, voice_1, voice_2)
        combined = self.combine_audio_files(audio_files)
        final_audio = self.add_background_music_and_tags(
            combined,
            self.MUSICA_FONDO,
            [self.TAG1, self.TAG2],
            custom_music_path,
            use_background_music=custom_music_path is not None
        )
        output_path = os.path.join(folder_name, "podcast_final.mp3")
        final_audio.export(output_path, format="mp3")
        for f in audio_files:
            os.remove(f)
        text_output = "\n".join(
            f"{turn['speaker']}: {turn['text']}" 
            for turn in conversation["conversation"]
        )
        return output_path, text_output