spss_formatting / app.py
geekgirl3's picture
Update app.py
812a545 verified
raw
history blame
23.7 kB
import gradio as gr
import pandas as pd
import numpy as np
import os
import traceback
from typing import Tuple, Dict, Any, Optional
import tempfile
import io
import datetime
class FeedbackTransformer:
"""
A class to transform feedback data with topic and sentiment columns
into a binary format where each topic is a separate column.
"""
def __init__(self,
topic_prefix="TOPIC_",
sentiment_prefix="SENTIMENT_",
category_prefix="Categories:",
text_column="TEXT",
recommendation_column="Q4_Weiterempfehlung"):
"""
Initialize the FeedbackTransformer with column specifications.
"""
self.topic_prefix = topic_prefix
self.sentiment_prefix = sentiment_prefix
self.category_prefix = category_prefix
self.text_column = text_column
self.recommendation_column = recommendation_column
self.data = None
self.transformed_data = None
self.topic_cols = []
self.sentiment_cols = []
self.category_cols = []
self.unique_topics = set()
self.file_name = None
self.original_filename = None
self.selected_columns = [] # Store columns selected for inclusion
def load_data(self, file_obj):
"""
Load data from the uploaded file object.
"""
if file_obj is None:
raise ValueError("No file uploaded")
# Get file extension and store original filename
file_name = file_obj if isinstance(file_obj, str) else (file_obj.name if hasattr(file_obj, 'name') else 'unknown')
self.original_filename = os.path.splitext(os.path.basename(file_name))[0]
_, file_ext = os.path.splitext(file_name)
# Read the data based on file type
try:
if file_ext.lower() in ['.xlsx', '.xls']:
self.data = pd.read_excel(file_obj)
elif file_ext.lower() == '.csv':
# Try comma delimiter first
try:
self.data = pd.read_csv(file_obj, encoding='utf-8')
except:
# If comma fails, try tab delimiter
self.data = pd.read_csv(file_obj, sep='\t', encoding='utf-8')
else:
# Default to tab-delimited
self.data = pd.read_csv(file_obj, sep='\t', encoding='utf-8')
except Exception as e:
raise ValueError(f"Error reading file: {str(e)}")
return len(self.data), len(self.data.columns)
def identify_columns(self):
"""
Identify topic, category, and sentiment columns in the data.
"""
if self.data is None:
raise ValueError("Data not loaded")
# Extract columns based on prefixes
self.topic_cols = [col for col in self.data.columns if self.topic_prefix in col]
self.sentiment_cols = [col for col in self.data.columns if self.sentiment_prefix in col]
self.category_cols = [col for col in self.data.columns if col.startswith(self.category_prefix)]
# If no columns found with specified prefixes, return all columns for manual selection
all_cols = list(self.data.columns)
return {
'topic_cols': self.topic_cols,
'sentiment_cols': self.sentiment_cols,
'category_cols': self.category_cols,
'all_columns': all_cols
}
def extract_unique_topics(self):
"""
Extract all unique topics from the topic columns.
"""
self.unique_topics = set()
# Extract from topic columns
for col in self.topic_cols:
self.unique_topics.update(self.data[col].dropna().unique())
# Also extract from category columns if they exist
for col in self.category_cols:
self.unique_topics.update(self.data[col].dropna().unique())
# Remove empty topics
self.unique_topics = {t for t in self.unique_topics if isinstance(t, str) and t.strip()}
return len(self.unique_topics)
@staticmethod
def create_column_name(topic):
"""
Create a standardized column name from a topic string.
"""
# Remove special characters and standardize
topic_clean = str(topic).strip()
# Remove brackets and special characters
topic_clean = topic_clean.replace('[', '').replace(']', '').replace('(', '').replace(')', '')
topic_clean = topic_clean.replace('**', '').replace('*', '')
topic_clean = topic_clean.replace('.', '_').replace(' ', '_').replace('&', 'and')
topic_clean = topic_clean.replace(':', '_').replace('-', '_').replace('/', '_')
# Remove multiple underscores
while '__' in topic_clean:
topic_clean = topic_clean.replace('__', '_')
return topic_clean.lower().strip('_')
def set_selected_columns(self, selected_columns):
"""
Set which original columns should be included in the output.
"""
self.selected_columns = selected_columns if selected_columns else []
def transform_data(self):
"""
Transform the data into binary topic columns with sentiment values.
"""
if not self.unique_topics:
self.extract_unique_topics()
# Create output dataframe starting with feedback_id
self.transformed_data = pd.DataFrame({'feedback_id': range(1, len(self.data) + 1)})
# Add selected original columns first (right after feedback_id)
for col in self.selected_columns:
if col in self.data.columns:
self.transformed_data[col] = self.data[col]
# Initialize all topic columns to 0
for topic in sorted(self.unique_topics):
topic_col = self.create_column_name(topic)
self.transformed_data[topic_col] = 0
self.transformed_data[f'{topic_col}_sentiment'] = None
# Fill in the data from topic columns
for idx, row in self.data.iterrows():
# Process topic columns with sentiments
for i, t_col in enumerate(self.topic_cols):
topic = row.get(t_col)
# Find corresponding sentiment column
if i < len(self.sentiment_cols):
sentiment = row.get(self.sentiment_cols[i])
else:
sentiment = None
if pd.notna(topic) and isinstance(topic, str) and topic.strip():
topic_col = self.create_column_name(topic)
if topic_col in self.transformed_data.columns:
self.transformed_data.loc[idx, topic_col] = 1
# Convert sentiment to numeric value
if pd.notna(sentiment) and isinstance(sentiment, str):
sentiment_lower = sentiment.lower()
if 'positive' in sentiment_lower:
self.transformed_data.loc[idx, f'{topic_col}_sentiment'] = 1
elif 'negative' in sentiment_lower:
self.transformed_data.loc[idx, f'{topic_col}_sentiment'] = 0
elif 'neutral' in sentiment_lower:
self.transformed_data.loc[idx, f'{topic_col}_sentiment'] = 0.5
# Process category columns (these typically don't have sentiments)
for c_col in self.category_cols:
category = row.get(c_col)
if pd.notna(category) and isinstance(category, str) and category.strip():
category_col = self.create_column_name(category)
if category_col in self.transformed_data.columns:
self.transformed_data.loc[idx, category_col] = 1
return self.transformed_data.shape
def analyze_data(self):
"""
Analyze the transformed data to provide insights.
"""
if self.transformed_data is None:
raise ValueError("No transformed data to analyze")
# Identify topic columns (exclude feedback_id, selected original columns, and sentiment columns)
excluded_cols = ['feedback_id'] + self.selected_columns
topic_cols = [col for col in self.transformed_data.columns
if col not in excluded_cols and not col.endswith('_sentiment')]
# Count occurrences of each topic
topic_counts = {}
for topic in topic_cols:
topic_counts[topic] = self.transformed_data[topic].sum()
# Sort topics by frequency
sorted_topics = sorted(topic_counts.items(), key=lambda x: x[1], reverse=True)
# Prepare analysis summary
analysis_text = f"**Analysis Results**\n\n"
analysis_text += f"Total feedbacks: {len(self.transformed_data)}\n"
analysis_text += f"Selected original columns: {len(self.selected_columns)}\n"
analysis_text += f"Unique topics: {len(topic_cols)}\n\n"
if self.selected_columns:
analysis_text += f"**Included Original Columns:** {', '.join(self.selected_columns)}\n\n"
analysis_text += "**Top 10 Most Frequent Topics:**\n"
for topic, count in sorted_topics[:10]:
analysis_text += f"- {topic}: {count} occurrences\n"
# Calculate sentiment distributions for top topics
analysis_text += "\n**Sentiment Distributions for Top 5 Topics:**\n"
for topic, _ in sorted_topics[:5]:
sentiment_col = f"{topic}_sentiment"
if sentiment_col in self.transformed_data.columns:
# Filter rows where the topic is present
topic_rows = self.transformed_data[self.transformed_data[topic] == 1]
positive = (topic_rows[sentiment_col] == 1.0).sum()
negative = (topic_rows[sentiment_col] == 0.0).sum()
neutral = (topic_rows[sentiment_col] == 0.5).sum()
total = positive + negative + neutral
if total > 0:
analysis_text += f"\n{topic} ({total} occurrences):\n"
analysis_text += f" - Positive: {positive} ({positive/total*100:.1f}%)\n"
analysis_text += f" - Negative: {negative} ({negative/total*100:.1f}%)\n"
analysis_text += f" - Neutral: {neutral} ({neutral/total*100:.1f}%)\n"
# Calculate number of topics per feedback
self.transformed_data['topic_count'] = self.transformed_data[topic_cols].sum(axis=1)
avg_topics = self.transformed_data['topic_count'].mean()
max_topics = self.transformed_data['topic_count'].max()
analysis_text += f"\n**Topics per Feedback:**\n"
analysis_text += f"- Average: {avg_topics:.2f}\n"
analysis_text += f"- Maximum: {max_topics}\n"
# Remove the temporary topic_count column
self.transformed_data.drop('topic_count', axis=1, inplace=True)
return analysis_text
def save_transformed_data(self, output_format='xlsx'):
"""
Save the transformed data and return the file path.
Modified to work properly with Hugging Face Spaces downloads.
"""
if self.transformed_data is None:
raise ValueError("No transformed data to save")
# Create filename with original filename prefix and timestamp
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# Use original filename as prefix, or fallback to 'transformed_feedback' if not available
prefix = self.original_filename if self.original_filename else 'transformed_feedback'
if output_format == 'xlsx':
filename = f"{prefix}_transformed_{timestamp}.xlsx"
# Create temporary file that Gradio can handle
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx')
self.transformed_data.to_excel(temp_file.name, index=False)
temp_file.close()
# Rename the temporary file to have a meaningful name
final_path = os.path.join(tempfile.gettempdir(), filename)
if os.path.exists(final_path):
os.remove(final_path)
os.rename(temp_file.name, final_path)
else: # csv
filename = f"{prefix}_transformed_{timestamp}.csv"
# Create temporary file that Gradio can handle
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.csv')
self.transformed_data.to_csv(temp_file.name, index=False)
temp_file.close()
# Rename the temporary file to have a meaningful name
final_path = os.path.join(tempfile.gettempdir(), filename)
if os.path.exists(final_path):
os.remove(final_path)
os.rename(temp_file.name, final_path)
# Verify file was created and is readable
if not os.path.exists(final_path):
raise ValueError(f"Failed to create output file: {final_path}")
return final_path
# Gradio interface functions
def get_column_selector(file_obj):
"""
Get a combined column preview and selector interface.
"""
try:
if file_obj is None:
return gr.CheckboxGroup(
choices=[],
value=[],
label="πŸ“‹ Select Columns to Include",
info="Upload a file first to see available columns"
)
# Read first few rows to get column names
file_name = file_obj if isinstance(file_obj, str) else (file_obj.name if hasattr(file_obj, 'name') else 'unknown')
_, file_ext = os.path.splitext(file_name)
if file_ext.lower() in ['.xlsx', '.xls']:
df = pd.read_excel(file_obj, nrows=5)
elif file_ext.lower() == '.csv':
try:
df = pd.read_csv(file_obj, nrows=5)
except:
df = pd.read_csv(file_obj, sep='\t', nrows=5)
else:
df = pd.read_csv(file_obj, sep='\t', nrows=5)
columns = list(df.columns)
# Create column display with indices for easier reference
column_choices = [f"{i+1:2d}. {col}" for i, col in enumerate(columns)]
# Return updated CheckboxGroup with numbered columns and individual rows
return gr.CheckboxGroup(
choices=column_choices,
value=[], # No columns selected by default
label=f"πŸ“‹ Select Columns to Include ({len(columns)} available)",
info="Choose which original columns to include in the transformed file (in addition to feedback_id). Columns are numbered for easy reference.",
elem_classes=["column-selector"] # Add CSS class for styling
)
except Exception as e:
return gr.CheckboxGroup(
choices=[],
value=[],
label="πŸ“‹ Select Columns to Include",
info=f"Error reading file: {str(e)}"
)
def extract_column_names(selected_display_names):
"""
Extract actual column names from the numbered display format.
"""
if not selected_display_names:
return []
actual_names = []
for display_name in selected_display_names:
# Remove the number prefix (e.g., "1. Column Name" -> "Column Name")
if '. ' in display_name:
actual_name = display_name.split('. ', 1)[1]
actual_names.append(actual_name)
else:
actual_names.append(display_name)
return actual_names
def process_file(file_obj, topic_prefix, sentiment_prefix, category_prefix,
text_column, recommendation_column, output_format, analyze_data, selected_columns):
"""
Main processing function for Gradio interface.
"""
try:
# Extract actual column names from display format
actual_column_names = extract_column_names(selected_columns)
# Initialize transformer
transformer = FeedbackTransformer(
topic_prefix=topic_prefix,
sentiment_prefix=sentiment_prefix,
category_prefix=category_prefix,
text_column=text_column,
recommendation_column=recommendation_column
)
# Load data
rows, cols = transformer.load_data(file_obj)
status_msg = f"βœ… Loaded {rows} rows and {cols} columns\n"
# Set selected columns for inclusion
transformer.set_selected_columns(actual_column_names)
status_msg += f"πŸ“‹ Selected {len(actual_column_names)} original columns for inclusion\n"
if actual_column_names:
status_msg += f" Selected columns: {', '.join(actual_column_names)}\n"
# Identify columns
col_info = transformer.identify_columns()
status_msg += f"\nπŸ“Š Found columns:\n"
status_msg += f"- Topic columns: {len(col_info['topic_cols'])}\n"
status_msg += f"- Sentiment columns: {len(col_info['sentiment_cols'])}\n"
status_msg += f"- Category columns: {len(col_info['category_cols'])}\n"
# Extract unique topics
num_topics = transformer.extract_unique_topics()
status_msg += f"\n🎯 Found {num_topics} unique topics\n"
# Transform data
shape = transformer.transform_data()
status_msg += f"\n✨ Transformed data shape: {shape[0]} rows Γ— {shape[1]} columns\n"
# Analyze if requested
analysis_result = ""
if analyze_data:
analysis_result = transformer.analyze_data()
# Save transformed data
output_file = transformer.save_transformed_data(output_format)
status_msg += f"\nπŸ’Ύ File saved successfully: {os.path.basename(output_file)}\n"
return status_msg, analysis_result, output_file
except Exception as e:
error_msg = f"❌ Error: {str(e)}\n\n{traceback.format_exc()}"
return error_msg, "", None
# Create Gradio interface
with gr.Blocks(title="Feedback Topic & Sentiment Transformer", css="""
.column-selector .form-check {
display: block !important;
margin-bottom: 8px !important;
}
.column-selector .form-check-input {
margin-right: 8px !important;
}
""") as demo:
gr.Markdown("""
# πŸ“Š Feedback Topic & Sentiment Transformer
Transform feedback data with topic and sentiment columns into a binary matrix format.
Each unique topic becomes a separate column with 0/1 values and associated sentiment scores.
### πŸ“‹ Instructions:
1. Upload your Excel, CSV, or tab-delimited text file
2. Select which original columns to include in the output
3. Configure column prefixes (or use defaults)
4. Click "Transform Data" to process
5. Download the transformed file
""")
with gr.Row():
with gr.Column(scale=1):
# File upload
gr.Markdown("### πŸ“‹ 1. Source file upload")
input_file = gr.File(
label="Upload Input File",
file_types=[".xlsx", ".xls", ".csv", ".txt"],
type="filepath"
)
# Combined column selector (replaces both preview and checkboxes)
gr.Markdown("### πŸ“‹ 2. Column Selection")
column_selector = gr.CheckboxGroup(
choices=[],
value=[],
label="Select Columns to Include",
info="Upload a file first to see available columns"
)
with gr.Column(scale=1):
# Configuration parameters
gr.Markdown("### βš™οΈ 3. Configuration of column prefixes ")
topic_prefix = gr.Textbox(
label="Topic Column Prefix",
value="[**WORKSHOP] SwissLife Taxonomy",
info="Prefix to identify topic columns"
)
sentiment_prefix = gr.Textbox(
label="Sentiment Column Prefix",
value="ABSA:",
info="Prefix to identify sentiment columns"
)
category_prefix = gr.Textbox(
label="Category Column Prefix",
value="Categories:",
info="Prefix to identify category columns"
)
text_column = gr.Textbox(
label="Text Column Name",
value="TEXT",
info="Column containing original feedback text (for reference only)"
)
recommendation_column = gr.Textbox(
label="Recommendation Column Name",
value="Q4_Weiterempfehlung",
info="Column containing recommendation scores (for reference only)"
)
output_format = gr.Radio(
label="Output Format",
choices=["xlsx", "csv"],
value="xlsx"
)
analyze_checkbox = gr.Checkbox(
label="Analyze transformed data",
value=True
)
# Transform button
transform_btn = gr.Button("πŸ”„ 4. Transform Data", variant="primary", size="lg")
# Output sections
with gr.Row():
with gr.Column():
status_output = gr.Textbox(
label="Processing Status",
lines=10,
interactive=False
)
with gr.Column():
analysis_output = gr.Markdown(
label="Data Analysis"
)
# Download section - Modified for better download functionality
with gr.Row():
with gr.Column():
gr.Markdown("### πŸ“₯ 5. Download Transformed File")
output_file = gr.File(
label="Transformed File",
interactive=False,
visible=True
)
# Event handlers
input_file.change(
fn=get_column_selector,
inputs=[input_file],
outputs=[column_selector]
)
transform_btn.click(
fn=process_file,
inputs=[
input_file,
topic_prefix,
sentiment_prefix,
category_prefix,
text_column,
recommendation_column,
output_format,
analyze_checkbox,
column_selector
],
outputs=[status_output, analysis_output, output_file]
)
# Examples section
gr.Markdown("""
### πŸ“ Example Column Formats:
- **Topic columns**: `[**WORKSHOP] SwissLife Taxonomy(Kommentar) 1`, `[**WORKSHOP] SwissLife Taxonomy(Kommentar) 2`
- **Category columns**: `Categories:Topic1`, `Categories:Topic2`
- **Sentiment columns**: `ABSA:Sentiment1`, `ABSA:Sentiment2`
### 🎯 Output Format:
- **feedback_id**: Unique identifier for each row
- **Selected original columns**: Any columns you selected from the original file
- **Topic columns**: Each unique topic becomes a column with values 0 (absent) or 1 (present)
- **Sentiment columns**: Each topic has an associated `_sentiment` column with values:
- 1.0 = Positive
- 0.5 = Neutral
- 0.0 = Negative
- **Output filename**: `[original_filename]_transformed_[timestamp].[format]`
### πŸ’‘ Tips:
- Use the numbered column list to easily identify and select columns
- The text and recommendation column names in configuration are now for reference only
- To include them in output, select them using the column checkboxes
- Click on the download button that appears after processing to download the file
""")
# Launch the app
if __name__ == "__main__":
demo.launch()