gnosticdev commited on
Commit
71a491a
·
verified ·
1 Parent(s): 4d25f3d

Update conver.py

Browse files
Files changed (1) hide show
  1. conver.py +107 -95
conver.py CHANGED
@@ -10,8 +10,7 @@ import tempfile
10
  from pydub import AudioSegment
11
  import base64
12
  from pathlib import Path
13
- import hashlib
14
- import asyncio
15
 
16
  @dataclass
17
  class ConversationConfig:
@@ -28,43 +27,71 @@ class URLToAudioConverter:
28
  def fetch_text(self, url: str) -> str:
29
  if not url:
30
  raise ValueError("URL cannot be empty")
31
- response = httpx.get(f"{self.config.prefix_url}{url}", timeout=60.0)
32
- response.raise_for_status()
33
- return response.text
 
 
 
 
 
34
 
35
  def extract_conversation(self, text: str) -> Dict:
36
- prompt = (
37
- f"{text}\nConvert the provided text into a short informative podcast conversation "
38
- f"between two experts. Return ONLY a JSON object with the following structure:\n"
39
- '{"conversation": [{"speaker": "Speaker1", "text": "..."}, {"speaker": "Speaker2", "text": "..."}]}'
40
- )
41
- chat_completion = self.llm_client.chat.completions.create(
42
- messages=[{"role": "user", "content": prompt}],
43
- model=self.config.model_name,
44
- response_format={"type": "json_object"}
45
- )
46
- response_content = chat_completion.choices[0].message.content
47
- json_str = response_content.strip()
48
- if not json_str.startswith("{"):
49
- json_str = json_str[json_str.find("{"):]
50
- if not json_str.endswith("}"):
51
- json_str = json_str[: json_str.rfind("}") + 1]
52
- return json.loads(json_str)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
53
 
54
  async def text_to_speech(self, conversation_json: Dict, voice_1: str, voice_2: str) -> Tuple[List[str], str]:
55
- output_dir = Path(self._create_output_directory())
56
  filenames = []
57
- for i, turn in enumerate(conversation_json["conversation"]):
58
- voice = voice_1 if i % 2 == 0 else voice_2
59
- tmp_path, error = await self._generate_audio(turn["text"], voice)
60
- if error:
61
- raise RuntimeError(f"Text-to-speech failed: {error}")
62
- filename = output_dir / f"output_{i}.mp3"
63
- os.rename(tmp_path, filename)
64
- filenames.append(str(filename))
65
- return filenames, str(output_dir)
 
 
 
66
 
67
  async def _generate_audio(self, text: str, voice: str, rate: int = 0, pitch: int = 0) -> Tuple[str, str]:
 
 
 
 
68
  voice_short_name = voice.split(" - ")[0]
69
  rate_str = f"{rate:+d}%"
70
  pitch_str = f"{pitch:+d}Hz"
@@ -75,80 +102,65 @@ class URLToAudioConverter:
75
  return tmp_path, None
76
 
77
  def _create_output_directory(self) -> str:
78
- random_bytes = os.urandom(8)
79
- folder_name = base64.urlsafe_b64encode(random_bytes).decode("utf-8")
80
- os.makedirs(folder_name, exist_ok=True)
81
- return folder_name
 
 
 
 
 
 
 
82
 
83
  def combine_audio_files(self, filenames: List[str], output_file: str) -> None:
84
- combined = AudioSegment.empty()
85
- for filename in filenames:
86
- combined += AudioSegment.from_file(filename, format="mp3")
87
- combined.export(output_file, format="mp3")
88
- dir_path = os.path.dirname(filenames[0])
89
- for file in os.listdir(dir_path):
90
- os.remove(os.path.join(dir_path, file))
91
- os.rmdir(dir_path)
 
92
 
93
  async def url_to_audio(self, url: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
 
94
  text = self.fetch_text(url)
95
  words = text.split()
96
  if len(words) > self.config.max_words:
97
- text = " ".join(words[: self.config.max_words])
98
  conversation_json = self.extract_conversation(text)
99
- conversation_text = "\n".join(f"{t['speaker']}: {t['text']}" for t in conversation_json["conversation"])
 
 
100
  self.llm_out = conversation_json
101
- audio_files, folder_name = await self.text_to_speech(conversation_json, voice_1, voice_2)
102
- final_output = os.path.join(folder_name, "combined_output.mp3")
103
- self.combine_audio_files(audio_files, final_output)
104
- return final_output, conversation_text
105
 
106
  async def text_to_audio(self, text: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
 
107
  conversation_json = self.extract_conversation(text)
108
- conversation_text = "\n".join(f"{t['speaker']}: {t['text']}" for t in conversation_json["conversation"])
109
- audio_files, folder_name = await self.text_to_speech(conversation_json, voice_1, voice_2)
110
- final_output = os.path.join(folder_name, "combined_output.mp3")
111
- self.combine_audio_files(audio_files, final_output)
112
- return final_output, conversation_text
 
 
113
 
114
  async def raw_text_to_audio(self, text: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
115
- try:
116
- print("\n=== DEBUG INICIO (raw_text_to_audio) ===")
117
- print(f"Texto recibido: {text[:200]}...") # Verifica el input
118
-
119
- # Usa una ruta absoluta en /tmp (compatible con Spaces)
120
- output_dir = "/tmp/podcast_outputs"
121
- os.makedirs(output_dir, exist_ok=True)
122
- hash_name = hashlib.md5(text.encode()).hexdigest()[:8]
123
- output_file = os.path.join(output_dir, f"podcast_{hash_name}.mp3")
124
- print(f"Ruta de salida: {output_file}")
125
-
126
- # Verifica voces disponibles (DEBUG)
127
- voices = await edge_tts.list_voices()
128
- voice_names = [v['Name'] for v in voices]
129
- print(f"Voces disponibles (primeras 5): {voice_names[:5]}...")
130
-
131
- # Extrae el nombre corto de la voz (ej: "en-US-AvaMultilingualNeural")
132
- voice_short = voice_1.split(" - ")[0] if " - " in voice_1 else voice_1
133
- print(f"Voz a usar: {voice_short}")
134
-
135
- # Genera el audio
136
- communicate = edge_tts.Communicate(text, voice_short)
137
- print("Generando audio...")
138
- await communicate.save(output_file)
139
- print("Audio generado.")
140
-
141
- # Verifica que el archivo existe y no está vacío
142
- if not os.path.exists(output_file):
143
- print("ERROR: Archivo no creado.")
144
- return "Error: Archivo no generado", None
145
- elif os.path.getsize(output_file) == 0:
146
- print("ERROR: Archivo vacío.")
147
- return "Error: Archivo de audio vacío", None
148
-
149
- print(f"=== DEBUG FIN (Archivo válido: {output_file}) ===")
150
- return text, output_file
151
-
152
- except Exception as e:
153
- print(f"ERROR CRÍTICO: {str(e)}")
154
- return f"Error: {str(e)}", None
 
10
  from pydub import AudioSegment
11
  import base64
12
  from pathlib import Path
13
+ import time
 
14
 
15
  @dataclass
16
  class ConversationConfig:
 
27
  def fetch_text(self, url: str) -> str:
28
  if not url:
29
  raise ValueError("URL cannot be empty")
30
+
31
+ full_url = f"{self.config.prefix_url}{url}"
32
+ try:
33
+ response = httpx.get(full_url, timeout=60.0)
34
+ response.raise_for_status()
35
+ return response.text
36
+ except httpx.HTTPError as e:
37
+ raise RuntimeError(f"Failed to fetch URL: {e}")
38
 
39
  def extract_conversation(self, text: str) -> Dict:
40
+ if not text:
41
+ raise ValueError("Input text cannot be empty")
42
+
43
+ try:
44
+ prompt = (
45
+ f"{text}\nConvert the provided text into a short informative podcast conversation "
46
+ f"between two experts. Return ONLY a JSON object with the following structure:\n"
47
+ '{"conversation": [{"speaker": "Speaker1", "text": "..."}, {"speaker": "Speaker2", "text": "..."}]}'
48
+ )
49
+
50
+ chat_completion = self.llm_client.chat.completions.create(
51
+ messages=[{"role": "user", "content": prompt}],
52
+ model=self.config.model_name,
53
+ response_format={"type": "json_object"}
54
+ )
55
+
56
+ response_content = chat_completion.choices[0].message.content
57
+ json_str = response_content.strip()
58
+
59
+ if not json_str.startswith('{'):
60
+ start = json_str.find('{')
61
+ if start != -1:
62
+ json_str = json_str[start:]
63
+
64
+ if not json_str.endswith('}'):
65
+ end = json_str.rfind('}')
66
+ if end != -1:
67
+ json_str = json_str[:end+1]
68
+
69
+ return json.loads(json_str)
70
+ except Exception as e:
71
+ print(f"Error en extract_conversation: {str(e)}")
72
+ raise RuntimeError(f"Failed to extract conversation: {str(e)}")
73
 
74
  async def text_to_speech(self, conversation_json: Dict, voice_1: str, voice_2: str) -> Tuple[List[str], str]:
75
+ output_dir = self._create_output_directory()
76
  filenames = []
77
+ try:
78
+ for i, turn in enumerate(conversation_json["conversation"]):
79
+ filename = os.path.join(output_dir, f"output_{i}.mp3")
80
+ voice = voice_1 if i % 2 == 0 else voice_2
81
+ tmp_path, error = await self._generate_audio(turn["text"], voice)
82
+ if error:
83
+ raise RuntimeError(f"Text-to-speech failed: {error}")
84
+ os.rename(tmp_path, filename)
85
+ filenames.append(filename)
86
+ return filenames, output_dir
87
+ except Exception as e:
88
+ raise RuntimeError(f"Failed to convert text to speech: {e}")
89
 
90
  async def _generate_audio(self, text: str, voice: str, rate: int = 0, pitch: int = 0) -> Tuple[str, str]:
91
+ if not text.strip():
92
+ return None, "Text cannot be empty"
93
+ if not voice:
94
+ return None, "Voice cannot be empty"
95
  voice_short_name = voice.split(" - ")[0]
96
  rate_str = f"{rate:+d}%"
97
  pitch_str = f"{pitch:+d}Hz"
 
102
  return tmp_path, None
103
 
104
  def _create_output_directory(self) -> str:
105
+ os.makedirs("outputs", exist_ok=True)
106
+ return "outputs"
107
+
108
+ def clean_old_files(self, directory: str = "outputs", max_age_seconds: int = 86400):
109
+ now = time.time()
110
+ for filename in os.listdir(directory):
111
+ file_path = os.path.join(directory, filename)
112
+ if file_path.endswith(".mp3"):
113
+ file_age = now - os.path.getmtime(file_path)
114
+ if file_age > max_age_seconds:
115
+ os.remove(file_path)
116
 
117
  def combine_audio_files(self, filenames: List[str], output_file: str) -> None:
118
+ if not filenames:
119
+ raise ValueError("No input files provided")
120
+ try:
121
+ combined = AudioSegment.empty()
122
+ for filename in filenames:
123
+ combined += AudioSegment.from_file(filename, format="mp3")
124
+ combined.export(output_file, format="mp3")
125
+ except Exception as e:
126
+ raise RuntimeError(f"Failed to combine audio files: {e}")
127
 
128
  async def url_to_audio(self, url: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
129
+ self.clean_old_files()
130
  text = self.fetch_text(url)
131
  words = text.split()
132
  if len(words) > self.config.max_words:
133
+ text = " ".join(words[:self.config.max_words])
134
  conversation_json = self.extract_conversation(text)
135
+ conversation_text = "\n".join(
136
+ f"{turn['speaker']}: {turn['text']}" for turn in conversation_json["conversation"]
137
+ )
138
  self.llm_out = conversation_json
139
+ audio_files, output_dir = await self.text_to_speech(conversation_json, voice_1, voice_2)
140
+ output_file = os.path.join(output_dir, f"combined_{int(time.time())}.mp3")
141
+ self.combine_audio_files(audio_files, output_file)
142
+ return output_file, conversation_text
143
 
144
  async def text_to_audio(self, text: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
145
+ self.clean_old_files()
146
  conversation_json = self.extract_conversation(text)
147
+ conversation_text = "\n".join(
148
+ f"{turn['speaker']}: {turn['text']}" for turn in conversation_json["conversation"]
149
+ )
150
+ audio_files, output_dir = await self.text_to_speech(conversation_json, voice_1, voice_2)
151
+ output_file = os.path.join(output_dir, f"combined_{int(time.time())}.mp3")
152
+ self.combine_audio_files(audio_files, output_file)
153
+ return output_file, conversation_text
154
 
155
  async def raw_text_to_audio(self, text: str, voice_1: str, voice_2: str) -> Tuple[str, str]:
156
+ self.clean_old_files()
157
+ conversation = {
158
+ "conversation": [
159
+ {"speaker": "Host", "text": text},
160
+ {"speaker": "Co-host", "text": "(Continuación del tema)"}
161
+ ]
162
+ }
163
+ audio_files, output_dir = await self.text_to_speech(conversation, voice_1, voice_2)
164
+ output_file = os.path.join(output_dir, f"raw_podcast_{int(time.time())}.mp3")
165
+ self.combine_audio_files(audio_files, output_file)
166
+ return text, output_file