gnosticdev commited on
Commit
00b6284
·
verified ·
1 Parent(s): 08ce0cf

Update conver.py

Browse files
Files changed (1) hide show
  1. conver.py +128 -86
conver.py CHANGED
@@ -9,12 +9,13 @@ import tempfile
9
  from pydub import AudioSegment
10
  import base64
11
  from pathlib import Path
 
12
 
13
  @dataclass
14
  class ConversationConfig:
15
  max_words: int = 3000
16
  prefix_url: str = "https://r.jina.ai/"
17
- model_name: str = "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo"
18
 
19
  class URLToAudioConverter:
20
  def __init__(self, config: ConversationConfig, llm_api_key: str):
@@ -23,6 +24,7 @@ class URLToAudioConverter:
23
  self.llm_out = None
24
 
25
  def fetch_text(self, url: str) -> str:
 
26
  if not url:
27
  raise ValueError("URL cannot be empty")
28
  full_url = f"{self.config.prefix_url}{url}"
@@ -34,133 +36,173 @@ class URLToAudioConverter:
34
  raise RuntimeError(f"Failed to fetch URL: {e}")
35
 
36
  def extract_conversation(self, text: str) -> Dict:
 
37
  if not text:
38
  raise ValueError("Input text cannot be empty")
39
  try:
40
  prompt = (
41
- f"{text}\nConvert the provided text into a short informative podcast conversation "
42
- f"between two experts. Return ONLY a JSON object with the following structure:\n"
43
- '{"conversation": [{"speaker": "Speaker1", "text": "..."}, {"speaker": "Speaker2", "text": "..."}]}'
44
  )
45
- chat_completion = self.llm_client.chat.completions.create(
46
  messages=[{"role": "user", "content": prompt}],
47
  model=self.config.model_name,
48
  response_format={"type": "json_object"}
49
  )
50
- response_content = chat_completion.choices[0].message.content
51
- json_str = response_content.strip()
52
- if not json_str.startswith('{'):
53
- json_str = json_str[json_str.find('{'):]
54
- if not json_str.endswith('}'):
55
- json_str = json_str[:json_str.rfind('}')+1]
56
- return json.loads(json_str)
57
  except Exception as e:
58
  raise RuntimeError(f"Failed to extract conversation: {str(e)}")
59
 
60
  async def text_to_speech(self, conversation_json: Dict, voice_1: str, voice_2: str) -> Tuple[List[str], str]:
 
61
  output_dir = Path(self._create_output_directory())
62
  filenames = []
63
  try:
64
  for i, turn in enumerate(conversation_json["conversation"]):
65
- filename = output_dir / f"output_{i}.mp3"
66
- voice = voice_1 if i % 2 == 0 else voice_2
67
- tmp_path, error = await self._generate_audio(turn["text"], voice)
68
- if error:
69
- raise RuntimeError(f"Text-to-speech failed: {error}")
70
  os.rename(tmp_path, filename)
71
  filenames.append(str(filename))
72
  return filenames, str(output_dir)
73
  except Exception as e:
74
- raise RuntimeError(f"Failed to convert text to speech: {e}")
75
 
76
- async def _generate_audio(self, text: str, voice: str, rate: int = 0, pitch: int = 0) -> Tuple[str, Optional[str]]:
 
77
  if not text.strip():
78
- return None, "Text cannot be empty"
79
- voice_short_name = voice.split(" - ")[0]
80
  communicate = edge_tts.Communicate(
81
- text,
82
- voice_short_name,
83
- rate=f"{rate:+d}%",
84
- pitch=f"{pitch:+d}Hz"
85
  )
86
  with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
87
- tmp_path = tmp_file.name
88
- await communicate.save(tmp_path)
89
- return tmp_path, None
90
 
91
  def _create_output_directory(self) -> str:
 
92
  folder_name = base64.urlsafe_b64encode(os.urandom(8)).decode("utf-8")
93
  os.makedirs(folder_name, exist_ok=True)
94
  return folder_name
95
 
96
  def combine_audio_files(self, filenames: List[str]) -> AudioSegment:
 
97
  if not filenames:
98
- raise ValueError("No input files provided")
 
99
  combined = AudioSegment.empty()
100
  for filename in filenames:
101
  combined += AudioSegment.from_file(filename, format="mp3")
102
  return combined
103
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
104
  def add_background_music_and_tags(
105
- self,
106
- speech_audio: AudioSegment,
107
- music_file: str,
108
- tags_files: List[str]
109
  ) -> AudioSegment:
110
- music = AudioSegment.from_file(music_file)
 
 
 
 
 
111
  if len(music) < len(speech_audio):
112
- music = music * (len(speech_audio) // len(music) + 1)
113
- music = music[:len(speech_audio)] - 20
114
- mixed = speech_audio.overlay(music)
115
- for tag_path in tags_files:
116
- tag_audio = AudioSegment.from_file(tag_path) - 5
117
- mixed = tag_audio + mixed
118
- return mixed
119
-
120
- async def url_to_audio(self, url: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
121
- text = self.fetch_text(url)
122
- if len(words := text.split()) > self.config.max_words:
123
- text = " ".join(words[:self.config.max_words])
124
- conversation_json = self.extract_conversation(text)
125
- conversation_text = "\n".join(
126
- f"{turn['speaker']}: {turn['text']}"
127
- for turn in conversation_json["conversation"]
128
- )
129
- return await self._process_audio(conversation_json, voice_1, voice_2, conversation_text)
130
-
131
- async def text_to_audio(self, structured_text: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
132
- """Para texto YA estructurado como JSON de conversación."""
133
- conversation_json = self.extract_conversation(structured_text)
134
- conversation_text = "\n".join(
135
- f"{turn['speaker']}: {turn['text']}"
136
- for turn in conversation_json["conversation"]
137
- )
138
- return await self._process_audio(conversation_json, voice_1, voice_2, conversation_text)
139
-
140
- async def raw_text_to_audio(self, raw_text: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
141
- """Para texto plano directo (sin estructura de diálogo)."""
142
- fake_conversation = {"conversation": [{"speaker": "Narrador", "text": raw_text}]}
143
- return await self._process_audio(fake_conversation, voice_1, voice_2, raw_text)
144
 
145
- async def _process_audio(
146
- self,
147
- conversation_json: Dict,
148
- voice_1: str,
149
- voice_2: str,
150
- text: str
151
  ) -> Tuple[str, str]:
152
- """Método interno para procesamiento común."""
153
- audio_files, folder_name = await self.text_to_speech(conversation_json, voice_1, voice_2)
154
- combined_audio = self.combine_audio_files(audio_files)
155
- final_audio = self.add_background_music_and_tags(
156
- combined_audio,
157
- "musica.mp3",
158
- ["tag.mp3", "tag2.mp3"]
159
- )
160
- output_file = os.path.join(folder_name, "output.mp3")
161
- final_audio.export(output_file, format="mp3")
162
-
163
- for f in audio_files:
164
- os.remove(f)
 
 
 
 
 
 
 
 
 
 
 
 
165
 
166
- return output_file, text
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
  from pydub import AudioSegment
10
  import base64
11
  from pathlib import Path
12
+ import numpy as np
13
 
14
  @dataclass
15
  class ConversationConfig:
16
  max_words: int = 3000
17
  prefix_url: str = "https://r.jina.ai/"
18
+ model_name: str = "meta-llama/Meta-Llama-3-8B-Instruct"
19
 
20
  class URLToAudioConverter:
21
  def __init__(self, config: ConversationConfig, llm_api_key: str):
 
24
  self.llm_out = None
25
 
26
  def fetch_text(self, url: str) -> str:
27
+ """Obtiene texto desde una URL"""
28
  if not url:
29
  raise ValueError("URL cannot be empty")
30
  full_url = f"{self.config.prefix_url}{url}"
 
36
  raise RuntimeError(f"Failed to fetch URL: {e}")
37
 
38
  def extract_conversation(self, text: str) -> Dict:
39
+ """Convierte texto plano a estructura de conversación"""
40
  if not text:
41
  raise ValueError("Input text cannot be empty")
42
  try:
43
  prompt = (
44
+ f"{text}\nConvert this text into a podcast conversation between two hosts. "
45
+ "Return ONLY JSON with this structure:\n"
46
+ '{"conversation": [{"speaker": "Host1", "text": "..."}, {"speaker": "Host2", "text": "..."}]}'
47
  )
48
+ response = self.llm_client.chat.completions.create(
49
  messages=[{"role": "user", "content": prompt}],
50
  model=self.config.model_name,
51
  response_format={"type": "json_object"}
52
  )
53
+ json_str = response.choices[0].message.content.strip()
54
+ return json.loads(json_str[json_str.find('{'):json_str.rfind('}')+1])
 
 
 
 
 
55
  except Exception as e:
56
  raise RuntimeError(f"Failed to extract conversation: {str(e)}")
57
 
58
  async def text_to_speech(self, conversation_json: Dict, voice_1: str, voice_2: str) -> Tuple[List[str], str]:
59
+ """Convierte JSON de conversación a archivos de audio"""
60
  output_dir = Path(self._create_output_directory())
61
  filenames = []
62
  try:
63
  for i, turn in enumerate(conversation_json["conversation"]):
64
+ filename = output_dir / f"segment_{i}.mp3"
65
+ voice = voice_1 if turn["speaker"] == "Host1" else voice_2
66
+ tmp_path = await self._generate_audio(turn["text"], voice)
 
 
67
  os.rename(tmp_path, filename)
68
  filenames.append(str(filename))
69
  return filenames, str(output_dir)
70
  except Exception as e:
71
+ raise RuntimeError(f"Text-to-speech failed: {e}")
72
 
73
+ async def _generate_audio(self, text: str, voice: str) -> str:
74
+ """Genera audio temporal con edge-tts"""
75
  if not text.strip():
76
+ raise ValueError("Text cannot be empty")
77
+
78
  communicate = edge_tts.Communicate(
79
+ text,
80
+ voice.split(" - ")[0],
81
+ rate="+0%",
82
+ pitch="+0Hz"
83
  )
84
  with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
85
+ await communicate.save(tmp_file.name)
86
+ return tmp_file.name
 
87
 
88
  def _create_output_directory(self) -> str:
89
+ """Crea directorio único para los archivos"""
90
  folder_name = base64.urlsafe_b64encode(os.urandom(8)).decode("utf-8")
91
  os.makedirs(folder_name, exist_ok=True)
92
  return folder_name
93
 
94
  def combine_audio_files(self, filenames: List[str]) -> AudioSegment:
95
+ """Combina segmentos de audio"""
96
  if not filenames:
97
+ raise ValueError("No audio files provided")
98
+
99
  combined = AudioSegment.empty()
100
  for filename in filenames:
101
  combined += AudioSegment.from_file(filename, format="mp3")
102
  return combined
103
 
104
+ def _detect_silences(self, audio: AudioSegment, min_len: int = 500, thresh: int = -40) -> List[Tuple[int, int]]:
105
+ """Detecta intervalos de silencio en el audio"""
106
+ silent_ranges = []
107
+ start = None
108
+
109
+ samples = np.array(audio.get_array_of_samples())
110
+ window_size = int(min_len * audio.frame_rate / 1000)
111
+
112
+ for i in range(0, len(samples) - window_size, window_size):
113
+ window = samples[i:i+window_size]
114
+ if np.max(window) < thresh:
115
+ if start is None:
116
+ start = i
117
+ else:
118
+ if start is not None:
119
+ silent_ranges.append((start, i))
120
+ start = None
121
+
122
+ return silent_ranges
123
+
124
  def add_background_music_and_tags(
125
+ self,
126
+ speech_audio: AudioSegment,
127
+ music_path: str,
128
+ tags_paths: List[str]
129
  ) -> AudioSegment:
130
+ """Mezcla música de fondo y tags inteligentemente"""
131
+ # 1. Cargar y ajustar música
132
+ music = AudioSegment.from_file(music_path).fade_out(2000)
133
+ music = music - 25 # Reducir volumen
134
+
135
+ # 2. Loop inteligente (solo si es necesario)
136
  if len(music) < len(speech_audio):
137
+ loops = (len(speech_audio) // len(music)) + 1
138
+ music = music * loops
139
+ music = music[:len(speech_audio)]
140
+
141
+ # 3. Mezclar voz y música
142
+ mixed = speech_audio.overlay(music, position=0)
143
+
144
+ # 4. Insertar tags
145
+ tag_intro = AudioSegment.from_file(tags_paths[0]) - 10
146
+ tag_transition = AudioSegment.from_file(tags_paths[1]) - 10
147
+
148
+ # Tag inicial
149
+ final_audio = tag_intro + mixed
150
+
151
+ # Tags en pausas (opcional)
152
+ silences = self._detect_silences(speech_audio)
153
+ for start, end in reversed(silences):
154
+ if (end - start) > len(tag_transition):
155
+ final_audio = final_audio.overlay(
156
+ tag_transition,
157
+ position=start + 100 # Pequeño margen
158
+ )
159
+
160
+ return final_audio
 
 
 
 
 
 
 
 
161
 
162
+ async def process_content(
163
+ self,
164
+ content: str,
165
+ voice_1: str,
166
+ voice_2: str,
167
+ is_url: bool = False
168
  ) -> Tuple[str, str]:
169
+ """Procesa contenido (URL o texto) a audio final"""
170
+ try:
171
+ # 1. Obtener texto estructurado
172
+ if is_url:
173
+ text = self.fetch_text(content)
174
+ if len(words := text.split()) > self.config.max_words:
175
+ text = " ".join(words[:self.config.max_words])
176
+ conversation = self.extract_conversation(text)
177
+ else:
178
+ conversation = self.extract_conversation(content)
179
+
180
+ # 2. Generar audio
181
+ audio_files, folder_name = await self.text_to_speech(conversation, voice_1, voice_2)
182
+ combined = self.combine_audio_files(audio_files)
183
+
184
+ # 3. Mezclar con música y tags
185
+ final_audio = self.add_background_music_and_tags(
186
+ combined,
187
+ "musica.mp3",
188
+ ["tag.mp3", "tag2.mp3"]
189
+ )
190
+
191
+ # 4. Exportar
192
+ output_path = os.path.join(folder_name, "podcast_final.mp3")
193
+ final_audio.export(output_path, format="mp3")
194
 
195
+ # 5. Limpieza
196
+ for f in audio_files:
197
+ os.remove(f)
198
+
199
+ # Texto de conversación
200
+ conversation_text = "\n".join(
201
+ f"{turn['speaker']}: {turn['text']}"
202
+ for turn in conversation["conversation"]
203
+ )
204
+
205
+ return output_path, conversation_text
206
+
207
+ except Exception as e:
208
+ raise RuntimeError(f"Processing failed: {str(e)}")