spss_formatting / app.py
geekgirl3's picture
Update app.py
e49d959 verified
raw
history blame
17.2 kB
#25-08-05
import gradio as gr
import pandas as pd
import numpy as np
import os
import re
import tempfile
import shutil
from datetime import datetime
from typing import List, Tuple, Dict, Any
import json
from io import BytesIO
class CSVBinaryTransformer:
def __init__(self):
self.df = None
self.original_filename = None
self.processed_df = None
def load_csv(self, file_path: str) -> Tuple[pd.DataFrame, List[str]]:
"""Load CSV file and return dataframe and column list"""
try:
self.df = pd.read_csv(file_path)
self.original_filename = os.path.splitext(os.path.basename(file_path))[0]
# Create checkbox options for columns
column_choices = [(col, col) for col in self.df.columns.tolist()]
return self.df.head(10), column_choices
except Exception as e:
raise gr.Error(f"Error loading CSV: {str(e)}")
def select_all_columns(self) -> List[str]:
"""Return all column names for select all functionality"""
if self.df is not None:
return self.df.columns.tolist()
return []
def deselect_all_columns(self) -> List[str]:
"""Return empty list for deselect all functionality"""
return []
def process_absa_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""Process ABSA prefixed columns to create sentiment and topic columns"""
absa_columns = [col for col in df.columns if col.startswith('ABSA')]
if not absa_columns:
return df
# Create verbatim sentiment columns
df['Verbatim_Positive'] = 0
df['Verbatim_Neutral'] = 0
df['Verbatim_Negative'] = 0
# Dictionary to store topic-sentiment combinations
topic_sentiment_combinations = set()
for col in absa_columns:
for idx, value in df[col].items():
if pd.isna(value):
continue
value_str = str(value)
# Split by pipe delimiter to get individual topic::sentiment pairs
pairs = [pair.strip() for pair in value_str.split('|') if pair.strip()]
for pair in pairs:
if '::' in pair:
topic_part, sentiment_part = pair.split('::', 1)
topic = topic_part.strip()
sentiment = sentiment_part.strip()
if topic and sentiment:
topic_sentiment_combinations.add((topic, sentiment))
# Update verbatim sentiment columns based on sentiment
sentiment_lower = sentiment.lower()
if 'positive' in sentiment_lower:
df.at[idx, 'Verbatim_Positive'] = 1
elif 'negative' in sentiment_lower:
df.at[idx, 'Verbatim_Negative'] = 1
elif 'neutral' in sentiment_lower:
df.at[idx, 'Verbatim_Neutral'] = 1
# Create columns for topic-sentiment combinations
for topic, sentiment in topic_sentiment_combinations:
# Clean topic name for column naming
safe_topic = re.sub(r'[^\w]', '_', topic).strip('_')
col_name = f"S_{safe_topic}"
if col_name not in df.columns:
df[col_name] = ""
# Fill the S_ columns with sentiment values
for col in absa_columns:
for idx, value in df[col].items():
if pd.isna(value):
continue
value_str = str(value)
# Split by pipe delimiter to get individual topic::sentiment pairs
pairs = [pair.strip() for pair in value_str.split('|') if pair.strip()]
for pair in pairs:
if '::' in pair:
topic_part, sentiment_part = pair.split('::', 1)
topic = topic_part.strip()
sentiment = sentiment_part.strip()
if topic and sentiment:
# Clean topic name for column naming
safe_topic = re.sub(r'[^\w]', '_', topic).strip('_')
col_name = f"S_{safe_topic}"
if col_name in df.columns:
df.at[idx, col_name] = sentiment
return df
def process_categories_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""Process Categories prefixed columns to create binary category columns"""
category_columns = [col for col in df.columns if col.startswith('Categories')]
if not category_columns:
return df
# Collect all unique categories
all_categories = set()
for col in category_columns:
for value in df[col].dropna():
if isinstance(value, str):
# Split by common delimiters
categories = re.split(r'[,;|]', value)
for cat in categories:
cat = cat.strip()
if cat:
all_categories.add(cat)
# Create binary columns for each category
for category in all_categories:
col_name = f"C_{category}"
df[col_name] = 0
for col in category_columns:
for idx, value in df[col].items():
if pd.isna(value):
continue
if isinstance(value, str) and category.lower() in value.lower():
df.at[idx, col_name] = 1
return df
def process_topics_column(self, df: pd.DataFrame, topics_column: str) -> pd.DataFrame:
"""Process specified topics column to create binary topic columns"""
if not topics_column or topics_column not in df.columns:
return df
# Collect all unique topics
all_topics = set()
for value in df[topics_column].dropna():
if isinstance(value, str):
# Split by common delimiters
topics = re.split(r'[,;|]', value)
for topic in topics:
topic = topic.strip()
if topic:
all_topics.add(topic)
# Create binary columns for each topic
for topic in all_topics:
col_name = f"T_{topic}"
df[col_name] = 0
for idx, value in df[topics_column].items():
if pd.isna(value):
continue
if isinstance(value, str) and topic.lower() in value.lower():
df.at[idx, col_name] = 1
return df
def save_transformed_data(self, output_format='xlsx'):
"""Save the transformed data and return the file path - using exact same method as working version"""
if self.processed_df is None:
raise ValueError("No transformed data to save")
# Create filename with original filename prefix and timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
prefix = self.original_filename if self.original_filename else 'transformed_data'
if output_format == 'xlsx':
filename = f"{prefix}_BinaryTransformation_{timestamp}.xlsx"
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx')
self.processed_df.to_excel(temp_file.name, index=False)
temp_file.close()
final_path = os.path.join(tempfile.gettempdir(), filename)
if os.path.exists(final_path):
os.remove(final_path)
os.rename(temp_file.name, final_path)
else: # csv
filename = f"{prefix}_BinaryTransformation_{timestamp}.csv"
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.csv')
self.processed_df.to_csv(temp_file.name, index=False)
temp_file.close()
final_path = os.path.join(tempfile.gettempdir(), filename)
if os.path.exists(final_path):
os.remove(final_path)
os.rename(temp_file.name, final_path)
if not os.path.exists(final_path):
raise ValueError(f"Failed to create output file: {final_path}")
return final_path
# Initialize transformer
transformer = CSVBinaryTransformer()
def handle_file_upload(file):
"""Handle CSV file upload"""
if file is None:
return None, gr.update(choices=[], value=[]), "Please upload a CSV file"
try:
preview_df, column_choices = transformer.load_csv(file.name)
preview_html = preview_df.to_html(classes="table table-striped", table_id="upload-preview")
# Update the checkbox group with new choices and make all columns visible
column_names = [col for col, _ in column_choices]
return (
preview_html,
gr.update(choices=column_choices, value=[], visible=True),
f"βœ… Successfully loaded CSV with {len(transformer.df)} rows and {len(transformer.df.columns)} columns"
)
except Exception as e:
return None, gr.update(choices=[], value=[], visible=False), f"❌ Error: {str(e)}"
def select_all():
"""Select all columns"""
if transformer.df is not None:
return gr.update(value=transformer.select_all_columns())
return gr.update(value=[])
def deselect_all():
"""Deselect all columns"""
return gr.update(value=transformer.deselect_all_columns())
def process_transformation(selected_columns, topics_column, export_format):
"""Process the transformation - using exact same pattern as working version"""
try:
if transformer.df is None:
return None, None, "❌ Error: No CSV file loaded"
if not selected_columns:
return None, None, "❌ Error: Please select at least one column"
# Create a copy of the dataframe with selected columns
processed_df = transformer.df[selected_columns].copy()
# Process ABSA columns
processed_df = transformer.process_absa_columns(processed_df)
# Process Categories columns
processed_df = transformer.process_categories_columns(processed_df)
# Process Topics column
processed_df = transformer.process_topics_column(processed_df, topics_column)
# Store processed data
transformer.processed_df = processed_df
# Generate preview
preview_html = processed_df.head(20).to_html(classes="table table-striped", table_id="preview-table")
# Save file using the exact same method as working version
output_file = transformer.save_transformed_data(export_format.lower().replace(' (.', '').replace(')', ''))
success_msg = f"βœ… Transformation completed! Generated file: {os.path.basename(output_file)}"
success_msg += f"\nπŸ“Š Processed {len(transformer.processed_df)} rows with {len(transformer.processed_df.columns)} columns"
success_msg += f"\nπŸ’Ύ File saved successfully"
success_msg += f"\nπŸ“₯ File download should start automatically"
return preview_html, output_file, success_msg
except Exception as e:
import traceback
error_msg = f"❌ Error during transformation: {str(e)}\n\n{traceback.format_exc()}"
return None, None, error_msg
# Create Gradio interface - using similar structure to working version
with gr.Blocks(title="CSV Binary Transformation Tool", theme=gr.themes.Soft()) as app:
gr.Markdown("""
# πŸ“Š CSV Binary Transformation Tool
This tool transforms CSV files by creating binary columns for sentiment analysis, categories, and topics.
## Features:
- **ABSA Processing**: Creates sentiment columns and topic-sentiment combinations
- **Category Processing**: Creates binary columns for each category
- **Topic Processing**: Creates binary columns for each topic
- **Flexible Export**: Support for CSV and Excel formats
""")
with gr.Row():
with gr.Column(scale=1):
# File upload section
gr.Markdown("### 1. Upload CSV File")
file_input = gr.File(
label="Upload Input File",
file_types=[".csv"],
type="filepath"
)
upload_status = gr.Textbox(
label="Upload Status",
interactive=False,
lines=2
)
# Column selection section
gr.Markdown("### 2. Select Columns")
gr.Markdown("*Choose which columns from your CSV to include in the output file*")
with gr.Row():
select_all_btn = gr.Button("βœ“ Select All", size="sm", variant="secondary")
deselect_all_btn = gr.Button("βœ— Deselect All", size="sm", variant="secondary")
column_selector = gr.CheckboxGroup(
label="Choose columns to include in output",
choices=[],
value=[],
interactive=True,
visible=False,
info="Select the columns you want to include in the transformed output file"
)
# Topics column input
gr.Markdown("### 3. Specify Topics Column")
topics_column_input = gr.Textbox(
label="Topics Column Name",
placeholder="Enter the name of the column containing topics",
info="This column will be used to create T_<topic> binary columns"
)
# Export options
gr.Markdown("### 4. Export Settings")
export_format = gr.Radio(
label="Output Format",
choices=["CSV (.csv)", "Excel (.xlsx)"],
value="Excel (.xlsx)"
)
# Process button
process_btn = gr.Button(
"πŸš€ Transform Data",
variant="primary",
size="lg"
)
with gr.Column(scale=2):
# Preview sections
gr.Markdown("### File Preview")
with gr.Tabs():
with gr.Tab("Original Data"):
original_preview = gr.HTML(
label="Original Data Preview (First 10 rows)",
value="<p>No file uploaded yet</p>"
)
with gr.Tab("Transformed Data"):
transformed_preview = gr.HTML(
label="Transformed Data Preview (First 20 rows)",
value="<p>No transformation performed yet</p>"
)
# Status and download
process_status = gr.Textbox(
label="Processing Status",
interactive=False,
lines=6
)
# Download section - using exact same setup as working version
gr.Markdown("### πŸ“₯ Download Status")
gr.Markdown("Please click on the link inside the output file size value to download the transformed file (the number value on the right hand side below). You may need to right click and select Save Link As (or something similar)")
output_file = gr.File(
label="Transformed Binary Matrix (Auto-Download)",
interactive=False,
visible=True
)
# Event handlers - same pattern as working version
file_input.change(
fn=handle_file_upload,
inputs=[file_input],
outputs=[original_preview, column_selector, upload_status]
)
select_all_btn.click(
fn=select_all,
outputs=[column_selector]
)
deselect_all_btn.click(
fn=deselect_all,
outputs=[column_selector]
)
process_btn.click(
fn=process_transformation,
inputs=[column_selector, topics_column_input, export_format],
outputs=[transformed_preview, output_file, process_status]
)
# Add custom CSS for better table styling
app.load(js="""
function() {
const style = document.createElement('style');
style.textContent = `
.table { font-size: 12px; }
.table th, .table td { padding: 4px 8px; }
#upload-preview, #preview-table { max-height: 400px; overflow-y: auto; }
`;
document.head.appendChild(style);
}
""")
if __name__ == "__main__":
app.launch(
)