gnosticdev commited on
Commit
b41ca3b
·
verified ·
1 Parent(s): 0f9b00e

Update conver.py

Browse files
Files changed (1) hide show
  1. conver.py +54 -71
conver.py CHANGED
@@ -1,5 +1,5 @@
1
  from dataclasses import dataclass
2
- from typing import List, Tuple, Dict
3
  import os
4
  import json
5
  import httpx
@@ -50,17 +50,11 @@ class URLToAudioConverter:
50
  response_content = chat_completion.choices[0].message.content
51
  json_str = response_content.strip()
52
  if not json_str.startswith('{'):
53
- start = json_str.find('{')
54
- if start != -1:
55
- json_str = json_str[start:]
56
  if not json_str.endswith('}'):
57
- end = json_str.rfind('}')
58
- if end != -1:
59
- json_str = json_str[:end+1]
60
  return json.loads(json_str)
61
  except Exception as e:
62
- print(f"Error en extract_conversation: {str(e)}")
63
- print(f"Respuesta del modelo: {response_content}")
64
  raise RuntimeError(f"Failed to extract conversation: {str(e)}")
65
 
66
  async def text_to_speech(self, conversation_json: Dict, voice_1: str, voice_2: str) -> Tuple[List[str], str]:
@@ -79,105 +73,94 @@ class URLToAudioConverter:
79
  except Exception as e:
80
  raise RuntimeError(f"Failed to convert text to speech: {e}")
81
 
82
- async def _generate_audio(self, text: str, voice: str, rate: int = 0, pitch: int = 0) -> Tuple[str, str]:
83
  if not text.strip():
84
  return None, "Text cannot be empty"
85
- if not voice:
86
- return None, "Voice cannot be empty"
87
  voice_short_name = voice.split(" - ")[0]
88
- rate_str = f"{rate:+d}%"
89
- pitch_str = f"{pitch:+d}Hz"
90
- communicate = edge_tts.Communicate(text, voice_short_name, rate=rate_str, pitch=pitch_str)
 
 
 
91
  with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
92
  tmp_path = tmp_file.name
93
  await communicate.save(tmp_path)
94
  return tmp_path, None
95
 
96
  def _create_output_directory(self) -> str:
97
- random_bytes = os.urandom(8)
98
- folder_name = base64.urlsafe_b64encode(random_bytes).decode("utf-8")
99
  os.makedirs(folder_name, exist_ok=True)
100
  return folder_name
101
 
102
  def combine_audio_files(self, filenames: List[str]) -> AudioSegment:
103
  if not filenames:
104
  raise ValueError("No input files provided")
105
- try:
106
- combined = AudioSegment.empty()
107
- for filename in filenames:
108
- audio_segment = AudioSegment.from_file(filename, format="mp3")
109
- combined += audio_segment
110
- return combined
111
- except Exception as e:
112
- raise RuntimeError(f"Failed to combine audio files: {e}")
113
 
114
  def add_background_music_and_tags(
115
- self, speech_audio: AudioSegment, music_file: str, tags_files: List[str]
 
 
 
116
  ) -> AudioSegment:
117
  music = AudioSegment.from_file(music_file)
118
  if len(music) < len(speech_audio):
119
- loops = (len(speech_audio) // len(music)) + 1
120
- music = music * loops
121
- music = music[:len(speech_audio)] - 20 # bajar volumen música
122
  mixed = speech_audio.overlay(music)
123
- for i, tag_path in enumerate(tags_files):
124
  tag_audio = AudioSegment.from_file(tag_path) - 5
125
- if i == 0:
126
- mixed = tag_audio + mixed
127
- else:
128
- mixed = mixed + tag_audio
129
  return mixed
130
 
131
  async def url_to_audio(self, url: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
132
  text = self.fetch_text(url)
133
- words = text.split()
134
- if len(words) > self.config.max_words:
135
  text = " ".join(words[:self.config.max_words])
136
  conversation_json = self.extract_conversation(text)
137
  conversation_text = "\n".join(
138
- f"{turn['speaker']}: {turn['text']}" for turn in conversation_json["conversation"]
 
139
  )
140
- self.llm_out = conversation_json
141
- audio_files, folder_name = await self.text_to_speech(conversation_json, voice_1, voice_2)
142
- combined_audio = self.combine_audio_files(audio_files)
143
- music_path = "musica.mp3"
144
- tags_paths = ["tag.mp3", "tag2.mp3"]
145
- final_audio = self.add_background_music_and_tags(combined_audio, music_path, tags_paths)
146
- final_output = os.path.join(folder_name, "combined_output_with_music.mp3")
147
- final_audio.export(final_output, format="mp3")
148
- for f in audio_files:
149
- os.remove(f)
150
- return final_output, conversation_text
151
 
152
- async def text_to_audio(self, text: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
153
- conversation_json = self.extract_conversation(text)
 
154
  conversation_text = "\n".join(
155
- f"{turn['speaker']}: {turn['text']}" for turn in conversation_json["conversation"]
 
156
  )
157
- audio_files, folder_name = await self.text_to_speech(conversation_json, voice_1, voice_2)
158
- combined_audio = self.combine_audio_files(audio_files)
159
- music_path = "musica.mp3"
160
- tags_paths = ["tag.mp3", "tag2.mp3"]
161
- final_audio = self.add_background_music_and_tags(combined_audio, music_path, tags_paths)
162
- final_output = os.path.join(folder_name, "combined_output_with_music.mp3")
163
- final_audio.export(final_output, format="mp3")
164
- for f in audio_files:
165
- os.remove(f)
166
- return final_output, conversation_text
167
 
168
- async def generate_podcast(self, conversation, voice_1, voice_2):
169
- audio_files, folder_name = await self.text_to_speech(conversation, voice_1, voice_2)
 
 
 
 
 
 
 
170
  combined_audio = self.combine_audio_files(audio_files)
171
- music_path = "musica.mp3"
172
- tags_paths = ["tag.mp3", "tag2.mp3"]
173
- final_audio = self.add_background_music_and_tags(combined_audio, music_path, tags_paths)
174
- output_file = os.path.join(folder_name, "raw_podcast_with_music.mp3")
 
 
175
  final_audio.export(output_file, format="mp3")
176
 
177
  for f in audio_files:
178
  os.remove(f)
179
 
180
- conversation_text = "\n".join(
181
- f"{turn['speaker']}: {turn['text']}" for turn in conversation["conversation"]
182
- )
183
- return conversation_text, output_file
 
1
  from dataclasses import dataclass
2
+ from typing import List, Tuple, Dict, Optional
3
  import os
4
  import json
5
  import httpx
 
50
  response_content = chat_completion.choices[0].message.content
51
  json_str = response_content.strip()
52
  if not json_str.startswith('{'):
53
+ json_str = json_str[json_str.find('{'):]
 
 
54
  if not json_str.endswith('}'):
55
+ json_str = json_str[:json_str.rfind('}')+1]
 
 
56
  return json.loads(json_str)
57
  except Exception as e:
 
 
58
  raise RuntimeError(f"Failed to extract conversation: {str(e)}")
59
 
60
  async def text_to_speech(self, conversation_json: Dict, voice_1: str, voice_2: str) -> Tuple[List[str], str]:
 
73
  except Exception as e:
74
  raise RuntimeError(f"Failed to convert text to speech: {e}")
75
 
76
+ async def _generate_audio(self, text: str, voice: str, rate: int = 0, pitch: int = 0) -> Tuple[str, Optional[str]]:
77
  if not text.strip():
78
  return None, "Text cannot be empty"
 
 
79
  voice_short_name = voice.split(" - ")[0]
80
+ communicate = edge_tts.Communicate(
81
+ text,
82
+ voice_short_name,
83
+ rate=f"{rate:+d}%",
84
+ pitch=f"{pitch:+d}Hz"
85
+ )
86
  with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
87
  tmp_path = tmp_file.name
88
  await communicate.save(tmp_path)
89
  return tmp_path, None
90
 
91
  def _create_output_directory(self) -> str:
92
+ folder_name = base64.urlsafe_b64encode(os.urandom(8)).decode("utf-8")
 
93
  os.makedirs(folder_name, exist_ok=True)
94
  return folder_name
95
 
96
  def combine_audio_files(self, filenames: List[str]) -> AudioSegment:
97
  if not filenames:
98
  raise ValueError("No input files provided")
99
+ combined = AudioSegment.empty()
100
+ for filename in filenames:
101
+ combined += AudioSegment.from_file(filename, format="mp3")
102
+ return combined
 
 
 
 
103
 
104
  def add_background_music_and_tags(
105
+ self,
106
+ speech_audio: AudioSegment,
107
+ music_file: str,
108
+ tags_files: List[str]
109
  ) -> AudioSegment:
110
  music = AudioSegment.from_file(music_file)
111
  if len(music) < len(speech_audio):
112
+ music = music * (len(speech_audio) // len(music) + 1)
113
+ music = music[:len(speech_audio)] - 20
 
114
  mixed = speech_audio.overlay(music)
115
+ for tag_path in tags_files:
116
  tag_audio = AudioSegment.from_file(tag_path) - 5
117
+ mixed = tag_audio + mixed
 
 
 
118
  return mixed
119
 
120
  async def url_to_audio(self, url: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
121
  text = self.fetch_text(url)
122
+ if len(words := text.split()) > self.config.max_words:
 
123
  text = " ".join(words[:self.config.max_words])
124
  conversation_json = self.extract_conversation(text)
125
  conversation_text = "\n".join(
126
+ f"{turn['speaker']}: {turn['text']}"
127
+ for turn in conversation_json["conversation"]
128
  )
129
+ return await self._process_audio(conversation_json, voice_1, voice_2, conversation_text)
 
 
 
 
 
 
 
 
 
 
130
 
131
+ async def text_to_audio(self, structured_text: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
132
+ """Para texto YA estructurado como JSON de conversación."""
133
+ conversation_json = self.extract_conversation(structured_text)
134
  conversation_text = "\n".join(
135
+ f"{turn['speaker']}: {turn['text']}"
136
+ for turn in conversation_json["conversation"]
137
  )
138
+ return await self._process_audio(conversation_json, voice_1, voice_2, conversation_text)
139
+
140
+ async def raw_text_to_audio(self, raw_text: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
141
+ """Para texto plano directo (sin estructura de diálogo)."""
142
+ fake_conversation = {"conversation": [{"speaker": "Narrador", "text": raw_text}]}
143
+ return await self._process_audio(fake_conversation, voice_1, voice_2, raw_text)
 
 
 
 
144
 
145
+ async def _process_audio(
146
+ self,
147
+ conversation_json: Dict,
148
+ voice_1: str,
149
+ voice_2: str,
150
+ text: str
151
+ ) -> Tuple[str, str]:
152
+ """Método interno para procesamiento común."""
153
+ audio_files, folder_name = await self.text_to_speech(conversation_json, voice_1, voice_2)
154
  combined_audio = self.combine_audio_files(audio_files)
155
+ final_audio = self.add_background_music_and_tags(
156
+ combined_audio,
157
+ "musica.mp3",
158
+ ["tag.mp3", "tag2.mp3"]
159
+ )
160
+ output_file = os.path.join(folder_name, "output.mp3")
161
  final_audio.export(output_file, format="mp3")
162
 
163
  for f in audio_files:
164
  os.remove(f)
165
 
166
+ return output_file, text