gnosticdev commited on
Commit
64bc311
·
verified ·
1 Parent(s): 00b6284

Update conver.py

Browse files
Files changed (1) hide show
  1. conver.py +48 -95
conver.py CHANGED
@@ -9,7 +9,6 @@ import tempfile
9
  from pydub import AudioSegment
10
  import base64
11
  from pathlib import Path
12
- import numpy as np
13
 
14
  @dataclass
15
  class ConversationConfig:
@@ -24,7 +23,6 @@ class URLToAudioConverter:
24
  self.llm_out = None
25
 
26
  def fetch_text(self, url: str) -> str:
27
- """Obtiene texto desde una URL"""
28
  if not url:
29
  raise ValueError("URL cannot be empty")
30
  full_url = f"{self.config.prefix_url}{url}"
@@ -36,7 +34,6 @@ class URLToAudioConverter:
36
  raise RuntimeError(f"Failed to fetch URL: {e}")
37
 
38
  def extract_conversation(self, text: str) -> Dict:
39
- """Convierte texto plano a estructura de conversación"""
40
  if not text:
41
  raise ValueError("Input text cannot be empty")
42
  try:
@@ -56,7 +53,6 @@ class URLToAudioConverter:
56
  raise RuntimeError(f"Failed to extract conversation: {str(e)}")
57
 
58
  async def text_to_speech(self, conversation_json: Dict, voice_1: str, voice_2: str) -> Tuple[List[str], str]:
59
- """Convierte JSON de conversación a archivos de audio"""
60
  output_dir = Path(self._create_output_directory())
61
  filenames = []
62
  try:
@@ -71,10 +67,8 @@ class URLToAudioConverter:
71
  raise RuntimeError(f"Text-to-speech failed: {e}")
72
 
73
  async def _generate_audio(self, text: str, voice: str) -> str:
74
- """Genera audio temporal con edge-tts"""
75
  if not text.strip():
76
  raise ValueError("Text cannot be empty")
77
-
78
  communicate = edge_tts.Communicate(
79
  text,
80
  voice.split(" - ")[0],
@@ -86,123 +80,82 @@ class URLToAudioConverter:
86
  return tmp_file.name
87
 
88
  def _create_output_directory(self) -> str:
89
- """Crea directorio único para los archivos"""
90
  folder_name = base64.urlsafe_b64encode(os.urandom(8)).decode("utf-8")
91
  os.makedirs(folder_name, exist_ok=True)
92
  return folder_name
93
 
94
  def combine_audio_files(self, filenames: List[str]) -> AudioSegment:
95
- """Combina segmentos de audio"""
96
  if not filenames:
97
  raise ValueError("No audio files provided")
98
-
99
  combined = AudioSegment.empty()
100
  for filename in filenames:
101
  combined += AudioSegment.from_file(filename, format="mp3")
102
  return combined
103
 
104
- def _detect_silences(self, audio: AudioSegment, min_len: int = 500, thresh: int = -40) -> List[Tuple[int, int]]:
105
- """Detecta intervalos de silencio en el audio"""
106
- silent_ranges = []
107
- start = None
108
-
109
- samples = np.array(audio.get_array_of_samples())
110
- window_size = int(min_len * audio.frame_rate / 1000)
111
-
112
- for i in range(0, len(samples) - window_size, window_size):
113
- window = samples[i:i+window_size]
114
- if np.max(window) < thresh:
115
- if start is None:
116
- start = i
117
- else:
118
- if start is not None:
119
- silent_ranges.append((start, i))
120
- start = None
121
-
122
- return silent_ranges
123
-
124
  def add_background_music_and_tags(
125
  self,
126
  speech_audio: AudioSegment,
127
  music_path: str,
128
  tags_paths: List[str]
129
  ) -> AudioSegment:
130
- """Mezcla música de fondo y tags inteligentemente"""
131
- # 1. Cargar y ajustar música
132
- music = AudioSegment.from_file(music_path).fade_out(2000)
133
- music = music - 25 # Reducir volumen
134
-
135
- # 2. Loop inteligente (solo si es necesario)
136
  if len(music) < len(speech_audio):
137
- loops = (len(speech_audio) // len(music)) + 1
138
- music = music * loops
139
  music = music[:len(speech_audio)]
 
140
 
141
- # 3. Mezclar voz y música
142
- mixed = speech_audio.overlay(music, position=0)
143
-
144
- # 4. Insertar tags
145
  tag_intro = AudioSegment.from_file(tags_paths[0]) - 10
146
- tag_transition = AudioSegment.from_file(tags_paths[1]) - 10
147
-
148
- # Tag inicial
149
  final_audio = tag_intro + mixed
150
 
151
- # Tags en pausas (opcional)
152
- silences = self._detect_silences(speech_audio)
153
- for start, end in reversed(silences):
154
- if (end - start) > len(tag_transition):
155
- final_audio = final_audio.overlay(
156
- tag_transition,
157
- position=start + 100 # Pequeño margen
158
- )
 
159
 
160
  return final_audio
161
 
162
- async def process_content(
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
163
  self,
164
- content: str,
165
  voice_1: str,
166
- voice_2: str,
167
- is_url: bool = False
168
  ) -> Tuple[str, str]:
169
- """Procesa contenido (URL o texto) a audio final"""
170
- try:
171
- # 1. Obtener texto estructurado
172
- if is_url:
173
- text = self.fetch_text(content)
174
- if len(words := text.split()) > self.config.max_words:
175
- text = " ".join(words[:self.config.max_words])
176
- conversation = self.extract_conversation(text)
177
- else:
178
- conversation = self.extract_conversation(content)
179
-
180
- # 2. Generar audio
181
- audio_files, folder_name = await self.text_to_speech(conversation, voice_1, voice_2)
182
- combined = self.combine_audio_files(audio_files)
183
-
184
- # 3. Mezclar con música y tags
185
- final_audio = self.add_background_music_and_tags(
186
- combined,
187
- "musica.mp3",
188
- ["tag.mp3", "tag2.mp3"]
189
- )
190
-
191
- # 4. Exportar
192
- output_path = os.path.join(folder_name, "podcast_final.mp3")
193
- final_audio.export(output_path, format="mp3")
194
-
195
- # 5. Limpieza
196
- for f in audio_files:
197
- os.remove(f)
198
-
199
- # Texto de conversación
200
- conversation_text = "\n".join(
201
- f"{turn['speaker']}: {turn['text']}"
202
- for turn in conversation["conversation"]
203
- )
204
-
205
- return output_path, conversation_text
206
 
207
- except Exception as e:
208
- raise RuntimeError(f"Processing failed: {str(e)}")
 
 
 
 
9
  from pydub import AudioSegment
10
  import base64
11
  from pathlib import Path
 
12
 
13
  @dataclass
14
  class ConversationConfig:
 
23
  self.llm_out = None
24
 
25
  def fetch_text(self, url: str) -> str:
 
26
  if not url:
27
  raise ValueError("URL cannot be empty")
28
  full_url = f"{self.config.prefix_url}{url}"
 
34
  raise RuntimeError(f"Failed to fetch URL: {e}")
35
 
36
  def extract_conversation(self, text: str) -> Dict:
 
37
  if not text:
38
  raise ValueError("Input text cannot be empty")
39
  try:
 
53
  raise RuntimeError(f"Failed to extract conversation: {str(e)}")
54
 
55
  async def text_to_speech(self, conversation_json: Dict, voice_1: str, voice_2: str) -> Tuple[List[str], str]:
 
56
  output_dir = Path(self._create_output_directory())
57
  filenames = []
58
  try:
 
67
  raise RuntimeError(f"Text-to-speech failed: {e}")
68
 
69
  async def _generate_audio(self, text: str, voice: str) -> str:
 
70
  if not text.strip():
71
  raise ValueError("Text cannot be empty")
 
72
  communicate = edge_tts.Communicate(
73
  text,
74
  voice.split(" - ")[0],
 
80
  return tmp_file.name
81
 
82
  def _create_output_directory(self) -> str:
 
83
  folder_name = base64.urlsafe_b64encode(os.urandom(8)).decode("utf-8")
84
  os.makedirs(folder_name, exist_ok=True)
85
  return folder_name
86
 
87
  def combine_audio_files(self, filenames: List[str]) -> AudioSegment:
 
88
  if not filenames:
89
  raise ValueError("No audio files provided")
 
90
  combined = AudioSegment.empty()
91
  for filename in filenames:
92
  combined += AudioSegment.from_file(filename, format="mp3")
93
  return combined
94
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
95
  def add_background_music_and_tags(
96
  self,
97
  speech_audio: AudioSegment,
98
  music_path: str,
99
  tags_paths: List[str]
100
  ) -> AudioSegment:
101
+ music = AudioSegment.from_file(music_path).fade_out(2000) - 25
 
 
 
 
 
102
  if len(music) < len(speech_audio):
103
+ music = music * ((len(speech_audio) // len(music)) + 1)
 
104
  music = music[:len(speech_audio)]
105
+ mixed = speech_audio.overlay(music)
106
 
 
 
 
 
107
  tag_intro = AudioSegment.from_file(tags_paths[0]) - 10
108
+ tag_trans = AudioSegment.from_file(tags_paths[1]) - 10
 
 
109
  final_audio = tag_intro + mixed
110
 
111
+ silent_ranges = []
112
+ for i in range(0, len(speech_audio) - 500, 100):
113
+ chunk = speech_audio[i:i+500]
114
+ if chunk.dBFS < -40:
115
+ silent_ranges.append((i, i + 500))
116
+
117
+ for start, end in reversed(silent_ranges):
118
+ if (end - start) >= len(tag_trans):
119
+ final_audio = final_audio.overlay(tag_trans, position=start + 50)
120
 
121
  return final_audio
122
 
123
+ async def url_to_audio(self, url: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
124
+ text = self.fetch_text(url)
125
+ if len(words := text.split()) > self.config.max_words:
126
+ text = " ".join(words[:self.config.max_words])
127
+ conversation = self.extract_conversation(text)
128
+ return await self._process_to_audio(conversation, voice_1, voice_2)
129
+
130
+ async def text_to_audio(self, text: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
131
+ conversation = self.extract_conversation(text)
132
+ return await self._process_to_audio(conversation, voice_1, voice_2)
133
+
134
+ async def raw_text_to_audio(self, text: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
135
+ conversation = {"conversation": [{"speaker": "Narrator", "text": text}]}
136
+ return await self._process_to_audio(conversation, voice_1, voice_2)
137
+
138
+ async def _process_to_audio(
139
  self,
140
+ conversation: Dict,
141
  voice_1: str,
142
+ voice_2: str
 
143
  ) -> Tuple[str, str]:
144
+ audio_files, folder_name = await self.text_to_speech(conversation, voice_1, voice_2)
145
+ combined = self.combine_audio_files(audio_files)
146
+ final_audio = self.add_background_music_and_tags(
147
+ combined,
148
+ "musica.mp3",
149
+ ["tag.mp3", "tag2.mp3"]
150
+ )
151
+ output_path = os.path.join(folder_name, "output.mp3")
152
+ final_audio.export(output_path, format="mp3")
153
+
154
+ for f in audio_files:
155
+ os.remove(f)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
156
 
157
+ text_output = "\n".join(
158
+ f"{turn['speaker']}: {turn['text']}"
159
+ for turn in conversation["conversation"]
160
+ )
161
+ return output_path, text_output