File size: 16,133 Bytes
37a9836
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dd5487b
 
 
37a9836
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7c1c351
37a9836
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
from typing import List, Tuple, Optional, Dict, Any
import traceback
import torch
import gradio as gr
import numpy as np
import time
import os
import re
import wave
import contextlib
import logging
import pandas as pd
import gc



from core.data_model import AudioFile
from core.bark.voice_clone import create_bark_prompt
from core.bark.generate_audio import generate_audio
from core.data_model import BarkPrompt, BarkGenerationConfig
from core.utils.audio import save_audio_file
from config import *

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)


# return list of available devices and the best device to be used as default for all inference
def get_available_torch_devices() -> Tuple[List[str], str]:
    devices = ["cpu"]
    best_device = "cpu"
    # if torch.backend.mps.is_available():
    #     devices.append("mps")
    #     best_device = "mps"
    if torch.cuda.is_available():
        devices.append("cuda")
        best_device = "cuda"

    return devices, best_device


# --- Helper Functions ---
# (Keep get_wav_duration, load_existing_audio, get_safe_filename,
#  generate_sine_wave, save_audio, parse_text_prompts, get_available_prompts,
#  create_audio_prompt as they are, they are mostly backend logic)
def get_wav_duration(filepath):
    """Gets the duration of a WAV file in seconds."""
    try:
        with contextlib.closing(wave.open(filepath, "r")) as f:
            frames = f.getnframes()
            rate = f.getframerate()
            if rate > 0:
                duration = frames / float(rate)
                return duration
            else:
                logger.info(f"Warning: Framerate is 0 for {filepath}")
                return 0
    except wave.Error as e:
        logger.info(f"Warning: Could not read wave file header for {filepath}: {e}")
        return 0
    except Exception as e:
        logger.info(f"Warning: Could not get duration for {filepath}: {e}")
        return 0


def load_existing_audio() -> List[Dict[str, Any]]:
    """Scans the audio directory and loads metadata for existing WAV files."""
    logger.info("\n--- Loading Existing Audio Files ---")
    existing_files_metadata = []
    if not os.path.isdir(GENERATED_AUDIO_DIR):
        logger.info(f"Directory not found: {GENERATED_AUDIO_DIR}")
        return []

    try:
        for filename in os.listdir(GENERATED_AUDIO_DIR):
            if filename.lower().endswith(".wav"):
                filepath = os.path.join(GENERATED_AUDIO_DIR, filename)
                if not os.path.isfile(filepath):
                    continue

                match = re.match(r"^(.*)_(\d{13})\.wav$", filename)
                text_guess = "Unknown (from filename)"
                timestamp_ms = 0
                if match:
                    text_guess = match.group(1).replace("_", " ")
                    try:
                        timestamp_ms = int(match.group(2))
                    except ValueError:
                        timestamp_ms = 0
                else:
                    text_guess = os.path.splitext(filename)[0].replace("_", " ")

                timestamp_sec = (
                    timestamp_ms / 1000.0
                    if timestamp_ms > 0
                    else os.path.getmtime(filepath)
                )
                duration = get_wav_duration(filepath)

                metadata = {
                    "text": text_guess,
                    "path": filepath,
                    "duration": duration,
                    "timestamp": timestamp_sec,
                }
                existing_files_metadata.append(metadata)

    except Exception as e:
        logger.error(f"Error loading existing audio files: {e}")

    existing_files_metadata.sort(key=lambda x: x.get("timestamp", 0))
    logger.info(
        f"--- Finished Loading {len(existing_files_metadata)} Existing Files ---"
    )
    return existing_files_metadata


def get_safe_filename(base_name: str, extension: str, directory: str) -> str:
    """Creates a safe and unique filename in the target directory."""
    safe_base = "".join(
        c if c.isalnum() or c in ["_", "-"] else "_" for c in base_name[:50]
    )
    timestamp = int(time.time() * 1000)
    filename = f"{safe_base}_{timestamp}.{extension}"
    filepath = os.path.join(directory, filename)
    counter = 1
    while os.path.exists(filepath):
        filename = f"{safe_base}_{timestamp}_{counter}.{extension}"
        filepath = os.path.join(directory, filename)
        counter += 1
    return filepath


def update_audio_list(
    newly_generated_metadata: List[Dict[str, Any]],
    current_audio_list: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
    """Appends new metadata to the list and sorts it by timestamp."""
    logger.info(f"\n--- Updating Audio List State ---")
    if not isinstance(current_audio_list, list):
        logger.info("Current audio list was not a list, initializing.")
        current_audio_list = []
    if not isinstance(newly_generated_metadata, list):
        logger.info("Newly generated metadata is not a list, skipping update.")
        return current_audio_list

    logger.info(f"Current list size: {len(current_audio_list)}")
    logger.info(f"Adding {len(newly_generated_metadata)} new items.")
    updated_list = current_audio_list + newly_generated_metadata
    updated_list.sort(key=lambda x: x.get("timestamp", 0))
    logger.info(f"Updated list state size: {len(updated_list)}")
    logger.info("--- Finished Updating Audio List State ---")
    return updated_list


def format_audio_list_for_dataframe(audio_list: List[Dict[str, Any]]) -> pd.DataFrame:
    """Converts the list of audio metadata dicts into a pandas DataFrame for display."""
    logger.info("\n--- Formatting List for DataFrame ---")
    if not audio_list:
        logger.info("Audio list is empty, returning empty DataFrame.")
        # Return empty DataFrame with correct columns
        return pd.DataFrame(columns=["File", "Prompt", "Duration (s)"])

    display_data = []
    for item in audio_list:
        filepath = item.get("path", "N/A")
        filename = os.path.basename(filepath) if filepath != "N/A" else "N/A"
        # Truncate long text prompts for display in the table
        text_prompt = item.get("text", "N/A")
        display_text = (
            (text_prompt[:75] + "...") if len(text_prompt) > 75 else text_prompt
        )
        duration = item.get("duration", 0)
        display_data.append(
            {
                "File": filename,
                "Prompt": display_text,
                "Duration (s)": f"{duration:.2f}" if duration else "N/A",
                # Store the full path implicitly by list order, not shown in df
            }
        )

    df = pd.DataFrame(display_data)
    logger.info(f"Created DataFrame with {len(df)} rows.")
    logger.info("--- Finished Formatting List for DataFrame ---")
    return df


def handle_row_selection(
    audio_list: List[Dict[str, Any]], evt: gr.SelectData
) -> Tuple[Optional[str], int]:
    """
    Handles the selection event from the DataFrame.
    Updates the audio player with the selected file's path.
    Returns the filepath and the selected index.
    """
    logger.info("\n--- Handling Row Selection ---")
    selected_index = evt.index[0] if evt.index else None  # Get row index
    logger.info(f"DataFrame row selected. Event data: {evt}")

    if selected_index is not None and 0 <= selected_index < len(audio_list):
        selected_item = audio_list[selected_index]
        filepath = selected_item.get("path")
        logger.info(f"Selected item at index {selected_index}: {selected_item}")
        if filepath and os.path.exists(filepath):
            logger.info(f"Updating audio player with: {filepath}")
            logger.info("--- Finished Handling Row Selection (Success) ---")
            return filepath, selected_index
        else:
            logger.info(f"File not found for selected item: {filepath}")
            gr.Warning(
                f"File not found for selected row: {os.path.basename(filepath or 'N/A')}"
            )
            logger.info("--- Finished Handling Row Selection (File Not Found) ---")
            return None, selected_index  # Keep index, but clear player
    else:
        logger.info("Invalid selection index or empty list.")
        logger.info("--- Finished Handling Row Selection (Invalid Index) ---")
        return None, -1  # Clear player and indicate no valid selection


def handle_delete_selected(
    selected_index: int, current_audio_list: List[Dict[str, Any]]
) -> Tuple[List[Dict[str, Any]], int, Optional[str]]:
    """
    Deletes the audio file corresponding to the selected index.
    Updates the main audio list state.
    Clears the selection index and audio player.
    """
    logger.info("\n--- Handling Delete Selected ---")
    logger.info(f"Attempting deletion for selected index: {selected_index}")

    if (
        selected_index is None
        or selected_index < 0
        or selected_index >= len(current_audio_list)
    ):
        gr.Warning("No valid audio selected for deletion.")
        logger.info("No valid index provided.")
        # Return current list, clear index, clear player
        return current_audio_list, -1, None

    item_to_delete = current_audio_list[selected_index]
    filepath_to_delete = item_to_delete.get("path")
    logger.info(f"Item to delete: {item_to_delete}")

    # Create the new list excluding the item
    # Corrected slicing logic: include elements before and after the index
    new_audio_list = (
        current_audio_list[:selected_index] + current_audio_list[selected_index + 1 :]
    )
    logger.info(f"New list size after filtering: {len(new_audio_list)}")

    # Try to delete the file from disk
    deletion_successful_on_disk = False
    try:
        if filepath_to_delete and os.path.exists(filepath_to_delete):
            os.remove(filepath_to_delete)
            logger.info(f"Successfully deleted file: {filepath_to_delete}")
            gr.Info(f"Deleted {os.path.basename(filepath_to_delete)}")
            deletion_successful_on_disk = True
        elif filepath_to_delete:
            logger.info(f"File not found for deletion: {filepath_to_delete}")
            gr.Warning("Audio entry removed from list, but file was not found on disk.")
            deletion_successful_on_disk = True  # Consider list update successful
        else:
            logger.info("Invalid filepath in selected item.")
            gr.Warning("Could not delete: Invalid file path associated with selection.")
            # Revert list change if filepath was invalid from the start? Or keep it removed?
            # Let's keep it removed from the list for consistency.
            deletion_successful_on_disk = True  # Treat as success for list update

    except OSError as e:
        logger.info(f"Error deleting file {filepath_to_delete}: {e}")
        traceback.logger.info_exc()
        gr.Error(f"Error deleting file: {e}")
        # If file deletion fails, we still return the updated list (item removed).
        # If you want to revert the list change on OS error, return `current_audio_list` here.

    logger.info("--- Finished Deleting Selected Item ---")
    # Return the updated list, clear the selected index, clear the audio player
    return new_audio_list, -1, None


def get_available_prompts() -> List[str]:
    """Loads available prompt file names."""
    try:
        prompts = [
            f
            for f in os.listdir(PROMPT_DIR)
            if os.path.isfile(os.path.join(PROMPT_DIR, f))
            and f.lower().endswith((".npz", ".npy", ".json"))
        ]

        if len(prompts) == 0:
            gr.Info("No prompts found.", duration=3)

        return ["None"] + prompts
    except Exception as e:
        logger.info(f"Error loading prompts: {e}")
        gr.Info(f"Error loading prompts {e}", duration=3, title="Error")
        return ["None"]


def update_available_prompts() -> gr.update:
    try:
        prompts = [
            f
            for f in os.listdir(PROMPT_DIR)
            if os.path.isfile(os.path.join(PROMPT_DIR, f))
            and f.lower().endswith((".npz", ".npy", ".json"))
        ]

        if len(prompts) == 0:
            gr.Info("No prompts found.", duration=3)

        return gr.update(choices=["None"] + prompts)
    except Exception as e:
        logger.info(f"Error loading prompts: {e}")
        gr.Info(f"Error loading prompts {e}", duration=3, title="Error")
        return gr.update()


def generate_batch_audio(
    text: str,
    semantic_temp: float,
    coarse_temp: float,
    fine_temp: float,
    manual_seed: int,
    model_type: str,
    inference_device: str,
    selected_prompt_name: Optional[str],
) -> Tuple[List[Dict[str, Any]], str]:
    """
    Generates audio (sine wave) for each line of text input.
    Returns metadata for generated files.
    """
    gc.collect()

    torch.manual_seed(manual_seed)
    if not text:
        gr.Warning("No valid text prompts provided.")
        return []

    generated_metadata = []

    bark_prompt = None
    if selected_prompt_name != "None":
        gr.Info("Loading audio prompt...")
        prompt_path = os.path.join(PROMPT_DIR, selected_prompt_name)
        bark_prompt = BarkPrompt.load_prompt(
            prompt_path, torch.device(inference_device)
        )

    generation_config = BarkGenerationConfig(
        temperature=semantic_temp,
        generate_coarse_temperature=coarse_temp,
        generate_fine_temperature=fine_temp,
        use_small_model=True if model_type == "small" else False,
    )

    # split the text into sentences
    sentences = text.split("\n")

    gr.Info("Generating Audio....", duration=120)
    waves = generate_audio(
        texts=sentences,
        prompt=bark_prompt,
        generation_config=generation_config,
        silent=True,
    )
    audio = np.concat(waves, axis=-1)

    output_filepath = get_safe_filename(text, "wav", GENERATED_AUDIO_DIR)
    save_audio_file(audio, DEFAULT_AUDIO_SAMPLE_RATE, output_filepath)
    duration_sec = audio.shape[0] // DEFAULT_AUDIO_SAMPLE_RATE
    metadata = {
        "text": text,
        "path": output_filepath,
        "duration": duration_sec,
        "timestamp": time.time(),
    }
    generated_metadata.append(metadata)
    gr.Info("Done!", duration=5)
    return generated_metadata


def create_audio_prompt(
    uploaded_audio_file: Optional[str],
    device: str,
    progress: gr.Progress = gr.Progress(),
) -> gr.update:
    """Processes an uploaded audio file to create a voice prompt file (stub)."""
    logger.info("\n--- Starting Prompt Creation ---")
    if uploaded_audio_file is None or len(uploaded_audio_file) == 0:
        gr.Warning("No audio file uploaded!")
        return gr.update()

    logger.info(f"Processing uploaded file: {uploaded_audio_file}")

    try:
        progress(0, desc="Starting prompt creation...")
        new_prompt_filename = None
        progress(0.2, desc="Extracting prompt features...")
        audio_file = AudioFile(audio_file_path=uploaded_audio_file, max_duration=10)
        prompt = create_bark_prompt(
            audio_file=audio_file, temperature=1, eos_p=0.2, device=torch.device(device)
        )

        progress(0.8, desc="Saving prompt file...")
        original_basename = os.path.splitext(os.path.basename(uploaded_audio_file))[0]
        prompt_filepath = get_safe_filename(original_basename, "json", PROMPT_DIR)
        new_prompt_filename = os.path.basename(prompt_filepath)

        ok = prompt.save_prompt(prompt_filepath)
        if ok:
            progress(1.0, desc="Prompt creation complete.")

        else:
            progress(1.0, desc="Error when saving prompt")

        new_choices = get_available_prompts()

        return gr.update(choices=new_choices, value=new_prompt_filename)

    except Exception as e:
        logger.info(f"Error creating prompt: {e}")
        gr.Error(f"Prompt creation failed: {e}")
        return f"Error creating prompt: {e}", gr.update()