Spaces:
Sleeping
Sleeping
#25-08-05 | |
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
import os | |
import re | |
import tempfile | |
import shutil | |
from datetime import datetime | |
from typing import List, Tuple, Dict, Any | |
import json | |
from io import BytesIO | |
class CSVBinaryTransformer: | |
def __init__(self): | |
self.df = None | |
self.original_filename = None | |
self.processed_df = None | |
def load_csv(self, file_path: str) -> Tuple[pd.DataFrame, List[str]]: | |
"""Load CSV file and return dataframe and column list""" | |
try: | |
self.df = pd.read_csv(file_path) | |
self.original_filename = os.path.splitext(os.path.basename(file_path))[0] | |
# Create checkbox options for columns | |
column_choices = [(col, col) for col in self.df.columns.tolist()] | |
return self.df.head(10), column_choices | |
except Exception as e: | |
raise gr.Error(f"Error loading CSV: {str(e)}") | |
def select_all_columns(self) -> List[str]: | |
"""Return all column names for select all functionality""" | |
if self.df is not None: | |
return self.df.columns.tolist() | |
return [] | |
def deselect_all_columns(self) -> List[str]: | |
"""Return empty list for deselect all functionality""" | |
return [] | |
def process_absa_columns(self, df: pd.DataFrame) -> pd.DataFrame: | |
"""Process ABSA prefixed columns to create sentiment and topic columns""" | |
absa_columns = [col for col in df.columns if col.startswith('ABSA')] | |
if not absa_columns: | |
return df | |
# Create verbatim sentiment columns | |
df['Verbatim_Positive'] = 0 | |
df['Verbatim_Neutral'] = 0 | |
df['Verbatim_Negative'] = 0 | |
# Dictionary to store topic-sentiment combinations | |
topic_sentiment_combinations = set() | |
for col in absa_columns: | |
for idx, value in df[col].items(): | |
if pd.isna(value): | |
continue | |
value_str = str(value) | |
# Split by pipe delimiter to get individual topic::sentiment pairs | |
pairs = [pair.strip() for pair in value_str.split('|') if pair.strip()] | |
for pair in pairs: | |
if '::' in pair: | |
topic_part, sentiment_part = pair.split('::', 1) | |
topic = topic_part.strip() | |
sentiment = sentiment_part.strip() | |
if topic and sentiment: | |
topic_sentiment_combinations.add((topic, sentiment)) | |
# Update verbatim sentiment columns based on sentiment | |
sentiment_lower = sentiment.lower() | |
if 'positive' in sentiment_lower: | |
df.at[idx, 'Verbatim_Positive'] = 1 | |
elif 'negative' in sentiment_lower: | |
df.at[idx, 'Verbatim_Negative'] = 1 | |
elif 'neutral' in sentiment_lower: | |
df.at[idx, 'Verbatim_Neutral'] = 1 | |
# Create columns for topic-sentiment combinations | |
for topic, sentiment in topic_sentiment_combinations: | |
# Clean topic name for column naming | |
safe_topic = re.sub(r'[^\w]', '_', topic).strip('_') | |
col_name = f"S_{safe_topic}" | |
if col_name not in df.columns: | |
df[col_name] = "" | |
# Fill the S_ columns with sentiment values | |
for col in absa_columns: | |
for idx, value in df[col].items(): | |
if pd.isna(value): | |
continue | |
value_str = str(value) | |
# Split by pipe delimiter to get individual topic::sentiment pairs | |
pairs = [pair.strip() for pair in value_str.split('|') if pair.strip()] | |
for pair in pairs: | |
if '::' in pair: | |
topic_part, sentiment_part = pair.split('::', 1) | |
topic = topic_part.strip() | |
sentiment = sentiment_part.strip() | |
if topic and sentiment: | |
# Clean topic name for column naming | |
safe_topic = re.sub(r'[^\w]', '_', topic).strip('_') | |
col_name = f"S_{safe_topic}" | |
if col_name in df.columns: | |
df.at[idx, col_name] = sentiment | |
return df | |
def process_categories_columns(self, df: pd.DataFrame) -> pd.DataFrame: | |
"""Process Categories prefixed columns to create binary category columns""" | |
category_columns = [col for col in df.columns if col.startswith('Categories')] | |
if not category_columns: | |
return df | |
# Collect all unique categories | |
all_categories = set() | |
for col in category_columns: | |
for value in df[col].dropna(): | |
if isinstance(value, str): | |
# Split by common delimiters | |
categories = re.split(r'[,;|]', value) | |
for cat in categories: | |
cat = cat.strip() | |
if cat: | |
all_categories.add(cat) | |
# Create binary columns for each category | |
for category in all_categories: | |
col_name = f"C_{category}" | |
df[col_name] = 0 | |
for col in category_columns: | |
for idx, value in df[col].items(): | |
if pd.isna(value): | |
continue | |
if isinstance(value, str) and category.lower() in value.lower(): | |
df.at[idx, col_name] = 1 | |
return df | |
def process_topics_column(self, df: pd.DataFrame, topics_column: str) -> pd.DataFrame: | |
"""Process specified topics column to create binary topic columns""" | |
if not topics_column or topics_column not in df.columns: | |
return df | |
# Collect all unique topics | |
all_topics = set() | |
for value in df[topics_column].dropna(): | |
if isinstance(value, str): | |
# Split by common delimiters | |
topics = re.split(r'[,;|]', value) | |
for topic in topics: | |
topic = topic.strip() | |
if topic: | |
all_topics.add(topic) | |
# Create binary columns for each topic | |
for topic in all_topics: | |
col_name = f"T_{topic}" | |
df[col_name] = 0 | |
for idx, value in df[topics_column].items(): | |
if pd.isna(value): | |
continue | |
if isinstance(value, str) and topic.lower() in value.lower(): | |
df.at[idx, col_name] = 1 | |
return df | |
def save_transformed_data(self, output_format='xlsx'): | |
"""Save the transformed data and return the file path - using exact same method as working version""" | |
if self.processed_df is None: | |
raise ValueError("No transformed data to save") | |
# Create filename with original filename prefix and timestamp | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
prefix = self.original_filename if self.original_filename else 'transformed_data' | |
if output_format == 'xlsx': | |
filename = f"{prefix}_BinaryTransformation_{timestamp}.xlsx" | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') | |
self.processed_df.to_excel(temp_file.name, index=False) | |
temp_file.close() | |
final_path = os.path.join(tempfile.gettempdir(), filename) | |
if os.path.exists(final_path): | |
os.remove(final_path) | |
os.rename(temp_file.name, final_path) | |
else: # csv | |
filename = f"{prefix}_BinaryTransformation_{timestamp}.csv" | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.csv') | |
self.processed_df.to_csv(temp_file.name, index=False) | |
temp_file.close() | |
final_path = os.path.join(tempfile.gettempdir(), filename) | |
if os.path.exists(final_path): | |
os.remove(final_path) | |
os.rename(temp_file.name, final_path) | |
if not os.path.exists(final_path): | |
raise ValueError(f"Failed to create output file: {final_path}") | |
return final_path | |
# Initialize transformer | |
transformer = CSVBinaryTransformer() | |
def handle_file_upload(file): | |
"""Handle CSV file upload""" | |
if file is None: | |
return None, gr.update(choices=[], value=[]), "Please upload a CSV file" | |
try: | |
preview_df, column_choices = transformer.load_csv(file.name) | |
preview_html = preview_df.to_html(classes="table table-striped", table_id="upload-preview") | |
# Update the checkbox group with new choices and make all columns visible | |
column_names = [col for col, _ in column_choices] | |
return ( | |
preview_html, | |
gr.update(choices=column_choices, value=[], visible=True), | |
f"β Successfully loaded CSV with {len(transformer.df)} rows and {len(transformer.df.columns)} columns" | |
) | |
except Exception as e: | |
return None, gr.update(choices=[], value=[], visible=False), f"β Error: {str(e)}" | |
def select_all(): | |
"""Select all columns""" | |
if transformer.df is not None: | |
return gr.update(value=transformer.select_all_columns()) | |
return gr.update(value=[]) | |
def deselect_all(): | |
"""Deselect all columns""" | |
return gr.update(value=transformer.deselect_all_columns()) | |
def process_transformation(selected_columns, topics_column, export_format): | |
"""Process the transformation - using exact same pattern as working version""" | |
try: | |
if transformer.df is None: | |
return None, None, "β Error: No CSV file loaded" | |
if not selected_columns: | |
return None, None, "β Error: Please select at least one column" | |
# Create a copy of the dataframe with selected columns | |
processed_df = transformer.df[selected_columns].copy() | |
# Process ABSA columns | |
processed_df = transformer.process_absa_columns(processed_df) | |
# Process Categories columns | |
processed_df = transformer.process_categories_columns(processed_df) | |
# Process Topics column | |
processed_df = transformer.process_topics_column(processed_df, topics_column) | |
# Store processed data | |
transformer.processed_df = processed_df | |
# Generate preview | |
preview_html = processed_df.head(20).to_html(classes="table table-striped", table_id="preview-table") | |
# Save file using the exact same method as working version | |
output_file = transformer.save_transformed_data(export_format.lower().replace(' (.', '').replace(')', '')) | |
success_msg = f"β Transformation completed! Generated file: {os.path.basename(output_file)}" | |
success_msg += f"\nπ Processed {len(transformer.processed_df)} rows with {len(transformer.processed_df.columns)} columns" | |
success_msg += f"\nπΎ File saved successfully" | |
success_msg += f"\nπ₯ File download should start automatically" | |
return preview_html, output_file, success_msg | |
except Exception as e: | |
import traceback | |
error_msg = f"β Error during transformation: {str(e)}\n\n{traceback.format_exc()}" | |
return None, None, error_msg | |
# Create Gradio interface - using similar structure to working version | |
with gr.Blocks(title="CSV Binary Transformation Tool", theme=gr.themes.Soft()) as app: | |
gr.Markdown(""" | |
# π CSV Binary Transformation Tool | |
This tool transforms CSV files by creating binary columns for sentiment analysis, categories, and topics. | |
## Features: | |
- **ABSA Processing**: Creates sentiment columns and topic-sentiment combinations | |
- **Category Processing**: Creates binary columns for each category | |
- **Topic Processing**: Creates binary columns for each topic | |
- **Flexible Export**: Support for CSV and Excel formats | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
# File upload section | |
gr.Markdown("### 1. Upload CSV File") | |
file_input = gr.File( | |
label="Upload Input File", | |
file_types=[".csv"], | |
type="filepath" | |
) | |
upload_status = gr.Textbox( | |
label="Upload Status", | |
interactive=False, | |
lines=2 | |
) | |
# Column selection section | |
gr.Markdown("### 2. Select Columns") | |
gr.Markdown("*Choose which columns from your CSV to include in the output file*") | |
with gr.Row(): | |
select_all_btn = gr.Button("β Select All", size="sm", variant="secondary") | |
deselect_all_btn = gr.Button("β Deselect All", size="sm", variant="secondary") | |
column_selector = gr.CheckboxGroup( | |
label="Choose columns to include in output", | |
choices=[], | |
value=[], | |
interactive=True, | |
visible=False, | |
info="Select the columns you want to include in the transformed output file" | |
) | |
# Topics column input | |
gr.Markdown("### 3. Specify Topics Column") | |
topics_column_input = gr.Textbox( | |
label="Topics Column Name", | |
placeholder="Enter the name of the column containing topics", | |
info="This column will be used to create T_<topic> binary columns" | |
) | |
# Export options | |
gr.Markdown("### 4. Export Settings") | |
export_format = gr.Radio( | |
label="Output Format", | |
choices=["CSV (.csv)", "Excel (.xlsx)"], | |
value="Excel (.xlsx)" | |
) | |
# Process button | |
process_btn = gr.Button( | |
"π Transform Data", | |
variant="primary", | |
size="lg" | |
) | |
with gr.Column(scale=2): | |
# Preview sections | |
gr.Markdown("### File Preview") | |
with gr.Tabs(): | |
with gr.Tab("Original Data"): | |
original_preview = gr.HTML( | |
label="Original Data Preview (First 10 rows)", | |
value="<p>No file uploaded yet</p>" | |
) | |
with gr.Tab("Transformed Data"): | |
transformed_preview = gr.HTML( | |
label="Transformed Data Preview (First 20 rows)", | |
value="<p>No transformation performed yet</p>" | |
) | |
# Status and download | |
process_status = gr.Textbox( | |
label="Processing Status", | |
interactive=False, | |
lines=6 | |
) | |
# Download section - using exact same setup as working version | |
gr.Markdown("### π₯ Download Status") | |
gr.Markdown("Please click on the link inside the output file size value to download the transformed file (the number value on the right hand side below). You may need to right click and select Save Link As (or something similar)") | |
output_file = gr.File( | |
label="Transformed Binary Matrix (Auto-Download)", | |
interactive=False, | |
visible=True | |
) | |
# Event handlers - same pattern as working version | |
file_input.change( | |
fn=handle_file_upload, | |
inputs=[file_input], | |
outputs=[original_preview, column_selector, upload_status] | |
) | |
select_all_btn.click( | |
fn=select_all, | |
outputs=[column_selector] | |
) | |
deselect_all_btn.click( | |
fn=deselect_all, | |
outputs=[column_selector] | |
) | |
process_btn.click( | |
fn=process_transformation, | |
inputs=[column_selector, topics_column_input, export_format], | |
outputs=[transformed_preview, output_file, process_status] | |
) | |
# Add custom CSS for better table styling | |
app.load(js=""" | |
function() { | |
const style = document.createElement('style'); | |
style.textContent = ` | |
.table { font-size: 12px; } | |
.table th, .table td { padding: 4px 8px; } | |
#upload-preview, #preview-table { max-height: 400px; overflow-y: auto; } | |
`; | |
document.head.appendChild(style); | |
} | |
""") | |
if __name__ == "__main__": | |
app.launch( | |
) |