spss_formatting / app.py
geekgirl3's picture
Update app.py
94571e6 verified
#18-08-05
# -*- coding: utf-8 -*-
"""BinaryMatrix_Production.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/14s_B8s3ANig3-51wDw2PpBXnCCmBnbRo
"""
import gradio as gr
import pandas as pd
import numpy as np
import os
import re
import tempfile
import shutil
from datetime import datetime
from typing import List, Tuple, Dict, Any
import json
from io import BytesIO
class CSVBinaryTransformer:
def __init__(self):
self.df = None
self.original_filename = None
self.processed_df = None
def load_csv(self, file_path: str) -> Tuple[pd.DataFrame, List[str], str]:
"""Load CSV file and return dataframe, column list, and default topics column"""
try:
self.df = pd.read_csv(file_path)
self.original_filename = os.path.splitext(os.path.basename(file_path))[0]
# Create checkbox options for columns
column_choices = [(col, col) for col in self.df.columns.tolist()]
# Find default topics column (column before the one starting with "Categories")
default_topics_column = ""
columns = self.df.columns.tolist()
for i, col in enumerate(columns):
if col.startswith('Categories') and i > 0:
default_topics_column = columns[i-1]
break
return self.df.head(10), column_choices, default_topics_column
except Exception as e:
raise gr.Error(f"Error loading CSV: {str(e)}")
def select_all_columns(self) -> List[str]:
"""Return all column names for select all functionality"""
if self.df is not None:
return self.df.columns.tolist()
return []
def deselect_all_columns(self) -> List[str]:
"""Return empty list for deselect all functionality"""
return []
def process_absa_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""Process ABSA prefixed columns to create sentiment and topic columns"""
absa_columns = [col for col in df.columns if col.startswith('ABSA')]
if not absa_columns:
return df
# Create verbatim sentiment columns
df['Verbatim_Positive'] = 0
df['Verbatim_Neutral'] = 0
df['Verbatim_Negative'] = 0
# Dictionary to store topic-sentiment combinations
topic_sentiment_combinations = set()
for col in absa_columns:
for idx, value in df[col].items():
if pd.isna(value):
continue
value_str = str(value)
# Split by pipe delimiter to get individual topic::sentiment pairs
pairs = [pair.strip() for pair in value_str.split('|') if pair.strip()]
for pair in pairs:
if '::' in pair:
topic_part, sentiment_part = pair.split('::', 1)
topic = topic_part.strip()
sentiment = sentiment_part.strip()
if topic and sentiment:
topic_sentiment_combinations.add((topic, sentiment))
# Update verbatim sentiment columns based on sentiment
sentiment_lower = sentiment.lower()
if 'positive' in sentiment_lower:
df.at[idx, 'Verbatim_Positive'] = 1
elif 'negative' in sentiment_lower:
df.at[idx, 'Verbatim_Negative'] = 1
elif 'neutral' in sentiment_lower:
df.at[idx, 'Verbatim_Neutral'] = 1
# Create columns for topic-sentiment combinations
for topic, sentiment in topic_sentiment_combinations:
# Clean topic name for column naming
safe_topic = re.sub(r'[^\w]', '_', topic).strip('_')
col_name = f"S_{safe_topic}"
if col_name not in df.columns:
df[col_name] = ""
# Fill the S_ columns with sentiment values
for col in absa_columns:
for idx, value in df[col].items():
if pd.isna(value):
continue
value_str = str(value)
# Split by pipe delimiter to get individual topic::sentiment pairs
pairs = [pair.strip() for pair in value_str.split('|') if pair.strip()]
for pair in pairs:
if '::' in pair:
topic_part, sentiment_part = pair.split('::', 1)
topic = topic_part.strip()
sentiment = sentiment_part.strip()
if topic and sentiment:
# Clean topic name for column naming
safe_topic = re.sub(r'[^\w]', '_', topic).strip('_')
col_name = f"S_{safe_topic}"
if col_name in df.columns:
df.at[idx, col_name] = sentiment
return df
def process_categories_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""Process Categories prefixed columns to create binary category columns"""
category_columns = [col for col in df.columns if col.startswith('Categories')]
if not category_columns:
return df
# Collect all unique categories
all_categories = set()
for col in category_columns:
for value in df[col].dropna():
if isinstance(value, str):
# Split by common delimiters
categories = re.split(r'[,;|]', value)
for cat in categories:
cat = cat.strip()
if cat:
all_categories.add(cat)
# Create binary columns for each category
category_columns_created = []
for category in all_categories:
col_name = f"C_{category}"
df[col_name] = 0
category_columns_created.append(col_name)
for col in category_columns:
for idx, value in df[col].items():
if pd.isna(value):
continue
if isinstance(value, str) and category.lower() in value.lower():
df.at[idx, col_name] = 1
# Sort C_ columns alphabetically by the text after C_
if category_columns_created:
category_columns_created.sort(key=lambda x: x[2:]) # Sort by text after "C_"
# Reorder the columns in the dataframe
other_cols = [col for col in df.columns if not col.startswith('C_') or col not in category_columns_created]
df = df[other_cols + category_columns_created]
return df
def process_topics_column(self, df: pd.DataFrame, topics_column: str) -> pd.DataFrame:
"""Process specified topics column to create binary topic columns"""
if not topics_column or topics_column not in df.columns:
return df
# Collect all unique topics
all_topics = set()
for value in df[topics_column].dropna():
if isinstance(value, str):
# Split only by pipe delimiter (|) to preserve commas within topics
topics = [topic.strip() for topic in value.split('|') if topic.strip()]
for topic in topics:
if topic:
all_topics.add(topic)
# Create binary columns for each topic
topic_columns_created = []
for topic in all_topics:
col_name = f"T_{topic}"
df[col_name] = 0
topic_columns_created.append(col_name)
for idx, value in df[topics_column].items():
if pd.isna(value):
continue
if isinstance(value, str):
# Split by pipe delimiter and check for exact match
topics_in_row = [t.strip() for t in value.split('|')]
if topic in topics_in_row:
df.at[idx, col_name] = 1
# Sort T_ columns alphabetically by the text after T_
if topic_columns_created:
topic_columns_created.sort(key=lambda x: x[2:]) # Sort by text after "T_"
# Reorder the columns in the dataframe
other_cols = [col for col in df.columns if not col.startswith('T_') or col not in topic_columns_created]
df = df[other_cols + topic_columns_created]
return df
def save_transformed_data(self, output_format='xlsx'):
"""Save the transformed data and return the file path - FIXED Excel export"""
if self.processed_df is None:
raise ValueError("No transformed data to save")
# Create filename with original filename prefix and timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
prefix = self.original_filename if self.original_filename else 'transformed_data'
if output_format.lower() in ['xlsx', 'excel', 'excel (.xlsx)']:
filename = f"{prefix}_BinaryTransformation_{timestamp}.xlsx"
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx')
temp_file.close() # Close the file handle before writing
# Use pandas to_excel with proper engine
with pd.ExcelWriter(temp_file.name, engine='openpyxl') as writer:
self.processed_df.to_excel(writer, index=False)
final_path = os.path.join(tempfile.gettempdir(), filename)
if os.path.exists(final_path):
os.remove(final_path)
os.rename(temp_file.name, final_path)
else: # csv
filename = f"{prefix}_BinaryTransformation_{timestamp}.csv"
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.csv')
temp_file.close() # Close the file handle before writing
self.processed_df.to_csv(temp_file.name, index=False)
final_path = os.path.join(tempfile.gettempdir(), filename)
if os.path.exists(final_path):
os.remove(final_path)
os.rename(temp_file.name, final_path)
if not os.path.exists(final_path):
raise ValueError(f"Failed to create output file: {final_path}")
return final_path
# Initialize transformer
transformer = CSVBinaryTransformer()
def handle_file_upload(file):
"""Handle CSV file upload - UPDATED to return default topics column and preselect all columns"""
if file is None:
return None, gr.update(choices=[], value=[]), "", "Please upload a CSV file"
try:
preview_df, column_choices, default_topics_column = transformer.load_csv(file.name)
preview_html = preview_df.to_html(classes="table table-striped", table_id="upload-preview")
# Preselect all columns
all_columns = [col for col, _ in column_choices]
return (
preview_html,
gr.update(choices=column_choices, value=all_columns, visible=True), # Preselect all
default_topics_column, # Set default topics column
f"βœ… Successfully loaded CSV with {len(transformer.df)} rows and {len(transformer.df.columns)} columns"
)
except Exception as e:
return None, gr.update(choices=[], value=[], visible=False), "", f"❌ Error: {str(e)}"
def select_all():
"""Select all columns"""
if transformer.df is not None:
return gr.update(value=transformer.select_all_columns())
return gr.update(value=[])
def deselect_all():
"""Deselect all columns"""
return gr.update(value=transformer.deselect_all_columns())
def process_transformation(selected_columns, topics_column, export_format):
"""Process the transformation - UPDATED to validate topics column"""
try:
if transformer.df is None:
return None, None, "❌ Error: No CSV file loaded"
if not selected_columns:
return None, None, "❌ Error: Please select at least one column"
# Validate topics column is provided and exists
if not topics_column or topics_column.strip() == "":
return None, None, "❌ Error: Topics Column Name is mandatory. Please specify a topics column."
if topics_column not in transformer.df.columns:
return None, None, f"❌ Error: Topics column '{topics_column}' not found in the data. Available columns: {', '.join(transformer.df.columns.tolist())}"
# Create a copy of the dataframe with selected columns
processed_df = transformer.df[selected_columns].copy()
# Process ABSA columns
processed_df = transformer.process_absa_columns(processed_df)
# Process Categories columns
processed_df = transformer.process_categories_columns(processed_df)
# Process Topics column
processed_df = transformer.process_topics_column(processed_df, topics_column)
# Store processed data
transformer.processed_df = processed_df
# Generate preview
preview_html = processed_df.head(20).to_html(classes="table table-striped", table_id="preview-table")
# Save file with proper format detection
format_clean = export_format.lower().replace(' (', '').replace(')', '').replace('.', '')
output_file = transformer.save_transformed_data(format_clean)
success_msg = f"βœ… Transformation completed! Generated file: {os.path.basename(output_file)}"
success_msg += f"\nπŸ“Š Processed {len(transformer.processed_df)} rows with {len(transformer.processed_df.columns)} columns"
success_msg += f"\nπŸ’Ύ File saved successfully as {format_clean.upper()}"
success_msg += f"\nπŸ“₯ File download should start automatically"
return preview_html, output_file, success_msg
except Exception as e:
import traceback
error_msg = f"❌ Error during transformation: {str(e)}\n\n{traceback.format_exc()}"
return None, None, error_msg
# Create Gradio interface - UPDATED with colored buttons and mandatory field
with gr.Blocks(title="CSV Binary Transformation Tool", theme=gr.themes.Soft()) as app:
gr.Markdown("""
# πŸ“Š CSV Binary Transformation Tool
This tool transforms CSV files by creating binary columns for sentiment analysis, categories, and topics.
## Features:
- **ABSA Processing**: Creates sentiment columns and topic-sentiment combinations
- **Category Processing**: Creates binary columns for each category
- **Topic Processing**: Creates binary columns for each topic (using "|" delimiter)
- **Flexible Export**: Support for CSV and Excel formats
""")
with gr.Row():
with gr.Column(scale=1):
# File upload section
gr.Markdown("### 1. Upload CSV File")
file_input = gr.File(
label="Upload Input File",
file_types=[".csv"],
type="filepath"
)
upload_status = gr.Textbox(
label="Upload Status",
interactive=False,
lines=2
)
# Column selection section
gr.Markdown("### 2. Select Columns")
gr.Markdown("*Choose which columns from your CSV to include in the output file*")
with gr.Row():
# Colored buttons as requested
select_all_btn = gr.Button("βœ“ Select All", size="sm", variant="primary", elem_classes="green-button")
deselect_all_btn = gr.Button("βœ— Deselect All", size="sm", variant="secondary", elem_classes="red-button")
column_selector = gr.CheckboxGroup(
label="Choose columns to include in output",
choices=[],
value=[],
interactive=True,
visible=False,
info="Select the columns you want to include in the transformed output file"
)
# Topics column input - MADE MANDATORY
gr.Markdown("### 3. Specify Topics Column **(MANDATORY)**")
gr.Markdown("⚠️ **This field is required** - Enter the name of the column containing topics")
gr.Markdown("**Note**: Topics should be separated by '|' delimiter in your data")
topics_column_input = gr.Textbox(
label="Topics Column Name (Required)",
placeholder="This field will be auto-populated when you upload a CSV",
info="This column will be used to create T_<topic> binary columns (using '|' as delimiter)",
interactive=True
)
# Export options
gr.Markdown("### 4. Export Settings")
export_format = gr.Radio(
label="Output Format",
choices=["CSV (.csv)", "Excel (.xlsx)"],
value="Excel (.xlsx)"
)
# Process button
process_btn = gr.Button(
"πŸš€ Transform Data",
variant="primary",
size="lg"
)
with gr.Column(scale=2):
# Preview sections
gr.Markdown("### File Preview")
with gr.Tabs():
with gr.Tab("Original Data"):
original_preview = gr.HTML(
label="Original Data Preview (First 10 rows)",
value="<p>No file uploaded yet</p>"
)
with gr.Tab("Transformed Data"):
transformed_preview = gr.HTML(
label="Transformed Data Preview (First 20 rows)",
value="<p>No transformation performed yet</p>"
)
# Status and download
process_status = gr.Textbox(
label="Processing Status",
interactive=False,
lines=6
)
# Download section
gr.Markdown("### πŸ“₯ Download Status")
gr.Markdown("Please click on the link inside the output file size value to download the transformed file (the number value on the right hand side below). You may need to right click and select Save Link As (or something similar)")
output_file = gr.File(
label="Transformed Binary Matrix (Auto-Download)",
interactive=False,
visible=True
)
# Event handlers - UPDATED to include topics column default
file_input.change(
fn=handle_file_upload,
inputs=[file_input],
outputs=[original_preview, column_selector, topics_column_input, upload_status]
)
select_all_btn.click(
fn=select_all,
outputs=[column_selector]
)
deselect_all_btn.click(
fn=deselect_all,
outputs=[column_selector]
)
process_btn.click(
fn=process_transformation,
inputs=[column_selector, topics_column_input, export_format],
outputs=[transformed_preview, output_file, process_status]
)
# Add custom CSS for better styling and colored buttons
app.load(js="""
function() {
const style = document.createElement('style');
style.textContent = `
.table { font-size: 12px; }
.table th, .table td { padding: 4px 8px; }
#upload-preview, #preview-table { max-height: 400px; overflow-y: auto; }
/* Green button for Select All */
.green-button button {
background-color: #22c55e !important;
border-color: #16a34a !important;
color: white !important;
}
.green-button button:hover {
background-color: #16a34a !important;
border-color: #15803d !important;
}
/* Pale red button for Deselect All */
.red-button button {
background-color: #fca5a5 !important;
border-color: #f87171 !important;
color: #7f1d1d !important;
}
.red-button button:hover {
background-color: #f87171 !important;
border-color: #ef4444 !important;
color: #7f1d1d !important;
}
`;
document.head.appendChild(style);
}
""")
if __name__ == "__main__":
app.launch(
)