Spaces:
Sleeping
Sleeping
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
import os | |
import traceback | |
from typing import Tuple, Dict, Any, Optional | |
import tempfile | |
import io | |
import datetime | |
class FeedbackTransformer: | |
""" | |
A class to transform feedback data with topic and sentiment columns | |
into a binary format where each topic is a separate column. | |
""" | |
def __init__(self, | |
topic_prefix="TOPIC_", | |
sentiment_prefix="SENTIMENT_", | |
category_prefix="Categories:", | |
text_column="TEXT", | |
recommendation_column="Q4_Weiterempfehlung"): | |
""" | |
Initialize the FeedbackTransformer with column specifications. | |
""" | |
self.topic_prefix = topic_prefix | |
self.sentiment_prefix = sentiment_prefix | |
self.category_prefix = category_prefix | |
self.text_column = text_column | |
self.recommendation_column = recommendation_column | |
self.data = None | |
self.transformed_data = None | |
self.topic_cols = [] | |
self.sentiment_cols = [] | |
self.category_cols = [] | |
self.unique_topics = set() | |
self.file_name = None | |
self.original_filename = None | |
self.selected_columns = [] # Store columns selected for inclusion | |
def load_data(self, file_obj): | |
""" | |
Load data from the uploaded file object. | |
""" | |
if file_obj is None: | |
raise ValueError("No file uploaded") | |
# Get file extension and store original filename | |
file_name = file_obj if isinstance(file_obj, str) else (file_obj.name if hasattr(file_obj, 'name') else 'unknown') | |
self.original_filename = os.path.splitext(os.path.basename(file_name))[0] | |
_, file_ext = os.path.splitext(file_name) | |
# Read the data based on file type | |
try: | |
if file_ext.lower() in ['.xlsx', '.xls']: | |
self.data = pd.read_excel(file_obj) | |
elif file_ext.lower() == '.csv': | |
# Try comma delimiter first | |
try: | |
self.data = pd.read_csv(file_obj, encoding='utf-8') | |
except: | |
# If comma fails, try tab delimiter | |
self.data = pd.read_csv(file_obj, sep='\t', encoding='utf-8') | |
else: | |
# Default to tab-delimited | |
self.data = pd.read_csv(file_obj, sep='\t', encoding='utf-8') | |
except Exception as e: | |
raise ValueError(f"Error reading file: {str(e)}") | |
return len(self.data), len(self.data.columns) | |
def identify_columns(self): | |
""" | |
Identify topic, category, and sentiment columns in the data. | |
""" | |
if self.data is None: | |
raise ValueError("Data not loaded") | |
# Extract columns based on prefixes | |
self.topic_cols = [col for col in self.data.columns if self.topic_prefix in col] | |
self.sentiment_cols = [col for col in self.data.columns if self.sentiment_prefix in col] | |
self.category_cols = [col for col in self.data.columns if col.startswith(self.category_prefix)] | |
# If no columns found with specified prefixes, return all columns for manual selection | |
all_cols = list(self.data.columns) | |
return { | |
'topic_cols': self.topic_cols, | |
'sentiment_cols': self.sentiment_cols, | |
'category_cols': self.category_cols, | |
'all_columns': all_cols | |
} | |
def extract_unique_topics(self): | |
""" | |
Extract all unique topics from the topic columns. | |
""" | |
self.unique_topics = set() | |
# Extract from topic columns | |
for col in self.topic_cols: | |
self.unique_topics.update(self.data[col].dropna().unique()) | |
# Also extract from category columns if they exist | |
for col in self.category_cols: | |
self.unique_topics.update(self.data[col].dropna().unique()) | |
# Remove empty topics | |
self.unique_topics = {t for t in self.unique_topics if isinstance(t, str) and t.strip()} | |
return len(self.unique_topics) | |
def create_column_name(topic): | |
""" | |
Create a standardized column name from a topic string. | |
""" | |
# Remove special characters and standardize | |
topic_clean = str(topic).strip() | |
# Remove brackets and special characters | |
topic_clean = topic_clean.replace('[', '').replace(']', '').replace('(', '').replace(')', '') | |
topic_clean = topic_clean.replace('**', '').replace('*', '') | |
topic_clean = topic_clean.replace('.', '_').replace(' ', '_').replace('&', 'and') | |
topic_clean = topic_clean.replace(':', '_').replace('-', '_').replace('/', '_') | |
# Remove multiple underscores | |
while '__' in topic_clean: | |
topic_clean = topic_clean.replace('__', '_') | |
return topic_clean.lower().strip('_') | |
def set_selected_columns(self, selected_columns): | |
""" | |
Set which original columns should be included in the output. | |
""" | |
self.selected_columns = selected_columns if selected_columns else [] | |
def transform_data(self): | |
""" | |
Transform the data into binary topic columns with sentiment values. | |
""" | |
if not self.unique_topics: | |
self.extract_unique_topics() | |
# Create output dataframe starting with feedback_id | |
self.transformed_data = pd.DataFrame({'feedback_id': range(1, len(self.data) + 1)}) | |
# Add selected original columns first (right after feedback_id) | |
for col in self.selected_columns: | |
if col in self.data.columns: | |
self.transformed_data[col] = self.data[col] | |
# Initialize all topic columns to 0 | |
for topic in sorted(self.unique_topics): | |
topic_col = self.create_column_name(topic) | |
self.transformed_data[topic_col] = 0 | |
self.transformed_data[f'{topic_col}_sentiment'] = None | |
# Fill in the data from topic columns | |
for idx, row in self.data.iterrows(): | |
# Process topic columns with sentiments | |
for i, t_col in enumerate(self.topic_cols): | |
topic = row.get(t_col) | |
# Find corresponding sentiment column | |
if i < len(self.sentiment_cols): | |
sentiment = row.get(self.sentiment_cols[i]) | |
else: | |
sentiment = None | |
if pd.notna(topic) and isinstance(topic, str) and topic.strip(): | |
topic_col = self.create_column_name(topic) | |
if topic_col in self.transformed_data.columns: | |
self.transformed_data.loc[idx, topic_col] = 1 | |
# Convert sentiment to numeric value | |
if pd.notna(sentiment) and isinstance(sentiment, str): | |
sentiment_lower = sentiment.lower() | |
if 'positive' in sentiment_lower: | |
self.transformed_data.loc[idx, f'{topic_col}_sentiment'] = 1 | |
elif 'negative' in sentiment_lower: | |
self.transformed_data.loc[idx, f'{topic_col}_sentiment'] = 0 | |
elif 'neutral' in sentiment_lower: | |
self.transformed_data.loc[idx, f'{topic_col}_sentiment'] = 0.5 | |
# Process category columns (these typically don't have sentiments) | |
for c_col in self.category_cols: | |
category = row.get(c_col) | |
if pd.notna(category) and isinstance(category, str) and category.strip(): | |
category_col = self.create_column_name(category) | |
if category_col in self.transformed_data.columns: | |
self.transformed_data.loc[idx, category_col] = 1 | |
return self.transformed_data.shape | |
def analyze_data(self): | |
""" | |
Analyze the transformed data to provide insights. | |
""" | |
if self.transformed_data is None: | |
raise ValueError("No transformed data to analyze") | |
# Identify topic columns (exclude feedback_id, selected original columns, and sentiment columns) | |
excluded_cols = ['feedback_id'] + self.selected_columns | |
topic_cols = [col for col in self.transformed_data.columns | |
if col not in excluded_cols and not col.endswith('_sentiment')] | |
# Count occurrences of each topic | |
topic_counts = {} | |
for topic in topic_cols: | |
topic_counts[topic] = self.transformed_data[topic].sum() | |
# Sort topics by frequency | |
sorted_topics = sorted(topic_counts.items(), key=lambda x: x[1], reverse=True) | |
# Prepare analysis summary | |
analysis_text = f"**Analysis Results**\n\n" | |
analysis_text += f"Total feedbacks: {len(self.transformed_data)}\n" | |
analysis_text += f"Selected original columns: {len(self.selected_columns)}\n" | |
analysis_text += f"Unique topics: {len(topic_cols)}\n\n" | |
if self.selected_columns: | |
analysis_text += f"**Included Original Columns:** {', '.join(self.selected_columns)}\n\n" | |
analysis_text += "**Top 10 Most Frequent Topics:**\n" | |
for topic, count in sorted_topics[:10]: | |
analysis_text += f"- {topic}: {count} occurrences\n" | |
# Calculate sentiment distributions for top topics | |
analysis_text += "\n**Sentiment Distributions for Top 5 Topics:**\n" | |
for topic, _ in sorted_topics[:5]: | |
sentiment_col = f"{topic}_sentiment" | |
if sentiment_col in self.transformed_data.columns: | |
# Filter rows where the topic is present | |
topic_rows = self.transformed_data[self.transformed_data[topic] == 1] | |
positive = (topic_rows[sentiment_col] == 1.0).sum() | |
negative = (topic_rows[sentiment_col] == 0.0).sum() | |
neutral = (topic_rows[sentiment_col] == 0.5).sum() | |
total = positive + negative + neutral | |
if total > 0: | |
analysis_text += f"\n{topic} ({total} occurrences):\n" | |
analysis_text += f" - Positive: {positive} ({positive/total*100:.1f}%)\n" | |
analysis_text += f" - Negative: {negative} ({negative/total*100:.1f}%)\n" | |
analysis_text += f" - Neutral: {neutral} ({neutral/total*100:.1f}%)\n" | |
# Calculate number of topics per feedback | |
self.transformed_data['topic_count'] = self.transformed_data[topic_cols].sum(axis=1) | |
avg_topics = self.transformed_data['topic_count'].mean() | |
max_topics = self.transformed_data['topic_count'].max() | |
analysis_text += f"\n**Topics per Feedback:**\n" | |
analysis_text += f"- Average: {avg_topics:.2f}\n" | |
analysis_text += f"- Maximum: {max_topics}\n" | |
# Remove the temporary topic_count column | |
self.transformed_data.drop('topic_count', axis=1, inplace=True) | |
return analysis_text | |
def save_transformed_data(self, output_format='xlsx'): | |
""" | |
Save the transformed data and return the file path. | |
Modified to work properly with Hugging Face Spaces downloads. | |
""" | |
if self.transformed_data is None: | |
raise ValueError("No transformed data to save") | |
# Create filename with original filename prefix and timestamp | |
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
# Use original filename as prefix, or fallback to 'transformed_feedback' if not available | |
prefix = self.original_filename if self.original_filename else 'transformed_feedback' | |
if output_format == 'xlsx': | |
filename = f"{prefix}_transformed_{timestamp}.xlsx" | |
# Create temporary file that Gradio can handle | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') | |
self.transformed_data.to_excel(temp_file.name, index=False) | |
temp_file.close() | |
# Rename the temporary file to have a meaningful name | |
final_path = os.path.join(tempfile.gettempdir(), filename) | |
if os.path.exists(final_path): | |
os.remove(final_path) | |
os.rename(temp_file.name, final_path) | |
else: # csv | |
filename = f"{prefix}_transformed_{timestamp}.csv" | |
# Create temporary file that Gradio can handle | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.csv') | |
self.transformed_data.to_csv(temp_file.name, index=False) | |
temp_file.close() | |
# Rename the temporary file to have a meaningful name | |
final_path = os.path.join(tempfile.gettempdir(), filename) | |
if os.path.exists(final_path): | |
os.remove(final_path) | |
os.rename(temp_file.name, final_path) | |
# Verify file was created and is readable | |
if not os.path.exists(final_path): | |
raise ValueError(f"Failed to create output file: {final_path}") | |
return final_path | |
# Gradio interface functions | |
def get_column_selector(file_obj): | |
""" | |
Get a combined column preview and selector interface. | |
""" | |
try: | |
if file_obj is None: | |
return gr.CheckboxGroup( | |
choices=[], | |
value=[], | |
label="π Select Columns to Include", | |
info="Upload a file first to see available columns" | |
) | |
# Read first few rows to get column names | |
file_name = file_obj if isinstance(file_obj, str) else (file_obj.name if hasattr(file_obj, 'name') else 'unknown') | |
_, file_ext = os.path.splitext(file_name) | |
if file_ext.lower() in ['.xlsx', '.xls']: | |
df = pd.read_excel(file_obj, nrows=5) | |
elif file_ext.lower() == '.csv': | |
try: | |
df = pd.read_csv(file_obj, nrows=5) | |
except: | |
df = pd.read_csv(file_obj, sep='\t', nrows=5) | |
else: | |
df = pd.read_csv(file_obj, sep='\t', nrows=5) | |
columns = list(df.columns) | |
# Create column display with indices for easier reference | |
column_choices = [f"{i+1:2d}. {col}" for i, col in enumerate(columns)] | |
# Return updated CheckboxGroup with numbered columns and individual rows | |
return gr.CheckboxGroup( | |
choices=column_choices, | |
value=[], # No columns selected by default | |
label=f"π Select Columns to Include ({len(columns)} available)", | |
info="Choose which original columns to include in the transformed file (in addition to feedback_id). Columns are numbered for easy reference.", | |
elem_classes=["column-selector"] # Add CSS class for styling | |
) | |
except Exception as e: | |
return gr.CheckboxGroup( | |
choices=[], | |
value=[], | |
label="π Select Columns to Include", | |
info=f"Error reading file: {str(e)}" | |
) | |
def extract_column_names(selected_display_names): | |
""" | |
Extract actual column names from the numbered display format. | |
""" | |
if not selected_display_names: | |
return [] | |
actual_names = [] | |
for display_name in selected_display_names: | |
# Remove the number prefix (e.g., "1. Column Name" -> "Column Name") | |
if '. ' in display_name: | |
actual_name = display_name.split('. ', 1)[1] | |
actual_names.append(actual_name) | |
else: | |
actual_names.append(display_name) | |
return actual_names | |
def process_file(file_obj, topic_prefix, sentiment_prefix, category_prefix, | |
text_column, recommendation_column, output_format, analyze_data, selected_columns): | |
""" | |
Main processing function for Gradio interface. | |
Modified to return file path for automatic download. | |
""" | |
try: | |
# Extract actual column names from display format | |
actual_column_names = extract_column_names(selected_columns) | |
# Initialize transformer | |
transformer = FeedbackTransformer( | |
topic_prefix=topic_prefix, | |
sentiment_prefix=sentiment_prefix, | |
category_prefix=category_prefix, | |
text_column=text_column, | |
recommendation_column=recommendation_column | |
) | |
# Load data | |
rows, cols = transformer.load_data(file_obj) | |
status_msg = f"β Loaded {rows} rows and {cols} columns\n" | |
# Set selected columns for inclusion | |
transformer.set_selected_columns(actual_column_names) | |
status_msg += f"π Selected {len(actual_column_names)} original columns for inclusion\n" | |
if actual_column_names: | |
status_msg += f" Selected columns: {', '.join(actual_column_names)}\n" | |
# Identify columns | |
col_info = transformer.identify_columns() | |
status_msg += f"\nπ Found columns:\n" | |
status_msg += f"- Topic columns: {len(col_info['topic_cols'])}\n" | |
status_msg += f"- Sentiment columns: {len(col_info['sentiment_cols'])}\n" | |
status_msg += f"- Category columns: {len(col_info['category_cols'])}\n" | |
# Extract unique topics | |
num_topics = transformer.extract_unique_topics() | |
status_msg += f"\nπ― Found {num_topics} unique topics\n" | |
# Transform data | |
shape = transformer.transform_data() | |
status_msg += f"\n⨠Transformed data shape: {shape[0]} rows à {shape[1]} columns\n" | |
# Analyze if requested | |
analysis_result = "" | |
if analyze_data: | |
analysis_result = transformer.analyze_data() | |
# Save transformed data | |
output_file = transformer.save_transformed_data(output_format) | |
status_msg += f"\nπΎ File saved successfully: {os.path.basename(output_file)}\n" | |
status_msg += f"π₯ File download should start automatically\n" | |
# Return the file path directly for automatic download | |
return status_msg, analysis_result, output_file | |
except Exception as e: | |
error_msg = f"β Error: {str(e)}\n\n{traceback.format_exc()}" | |
return error_msg, "", None | |
# Create Gradio interface | |
with gr.Blocks(title="Feedback Topic & Sentiment Transformer", css=""" | |
.column-selector .form-check { | |
display: block !important; | |
margin-bottom: 8px !important; | |
} | |
.column-selector .form-check-input { | |
margin-right: 8px !important; | |
} | |
""") as demo: | |
gr.Markdown(""" | |
# π Feedback Topic & Sentiment Transformer | |
Transform feedback data with topic and sentiment columns into a binary matrix format. | |
Each unique topic becomes a separate column with 0/1 values and associated sentiment scores. | |
### π Instructions: | |
1. Upload your Excel, CSV, or tab-delimited text file | |
2. Select which original columns to include in the output | |
3. Configure column prefixes (or use defaults) | |
4. Click "Transform Data" to process and automatically download the result | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
# File upload | |
gr.Markdown("### π 1. Source file upload") | |
input_file = gr.File( | |
label="Upload Input File", | |
file_types=[".xlsx", ".xls", ".csv", ".txt"], | |
type="filepath" | |
) | |
# Combined column selector (replaces both preview and checkboxes) | |
gr.Markdown("### π 2. Column Selection") | |
column_selector = gr.CheckboxGroup( | |
choices=[], | |
value=[], | |
label="Select Columns to Include", | |
info="Upload a file first to see available columns" | |
) | |
with gr.Column(scale=1): | |
# Configuration parameters | |
gr.Markdown("### βοΈ 3. Configuration of column prefixes ") | |
topic_prefix = gr.Textbox( | |
label="Topic Column Prefix", | |
value="[**WORKSHOP] SwissLife Taxonomy", | |
info="Prefix to identify topic columns" | |
) | |
sentiment_prefix = gr.Textbox( | |
label="Sentiment Column Prefix", | |
value="ABSA:", | |
info="Prefix to identify sentiment columns" | |
) | |
category_prefix = gr.Textbox( | |
label="Category Column Prefix", | |
value="Categories:", | |
info="Prefix to identify category columns" | |
) | |
text_column = gr.Textbox( | |
label="Text Column Name", | |
value="TEXT", | |
info="Column containing original feedback text (for reference only)" | |
) | |
recommendation_column = gr.Textbox( | |
label="Recommendation Column Name", | |
value="Q4_Weiterempfehlung", | |
info="Column containing recommendation scores (for reference only)" | |
) | |
output_format = gr.Radio( | |
label="Output Format", | |
choices=["xlsx", "csv"], | |
value="xlsx" | |
) | |
analyze_checkbox = gr.Checkbox( | |
label="Analyze transformed data", | |
value=True | |
) | |
# Transform button | |
transform_btn = gr.Button("π 4. Transform Data & Download", variant="primary", size="lg") | |
# Output sections | |
with gr.Row(): | |
with gr.Column(): | |
status_output = gr.Textbox( | |
label="Processing Status", | |
lines=10, | |
interactive=False | |
) | |
with gr.Column(): | |
analysis_output = gr.Markdown( | |
label="Data Analysis" | |
) | |
# Hidden download component for automatic download | |
with gr.Row(): | |
with gr.Column(): | |
gr.Markdown("### π₯ Download Status") | |
gr.Markdown("The transformed file will download automatically after processing completes.") | |
# File component that will trigger automatic download | |
output_file = gr.File( | |
label="Transformed File (Auto-Download)", | |
interactive=False, | |
visible=True | |
) | |
# Event handlers | |
input_file.change( | |
fn=get_column_selector, | |
inputs=[input_file], | |
outputs=[column_selector] | |
) | |
# Modified to ensure automatic download | |
transform_btn.click( | |
fn=process_file, | |
inputs=[ | |
input_file, | |
topic_prefix, | |
sentiment_prefix, | |
category_prefix, | |
text_column, | |
recommendation_column, | |
output_format, | |
analyze_checkbox, | |
column_selector | |
], | |
outputs=[status_output, analysis_output, output_file] | |
) | |
# Examples section | |
gr.Markdown(""" | |
### π Example Column Formats: | |
- **Topic columns**: `[**WORKSHOP] SwissLife Taxonomy(Kommentar) 1`, `[**WORKSHOP] SwissLife Taxonomy(Kommentar) 2` | |
- **Category columns**: `Categories:Topic1`, `Categories:Topic2` | |
- **Sentiment columns**: `ABSA:Sentiment1`, `ABSA:Sentiment2` | |
### π― Output Format: | |
- **feedback_id**: Unique identifier for each row | |
- **Selected original columns**: Any columns you selected from the original file | |
- **Topic columns**: Each unique topic becomes a column with values 0 (absent) or 1 (present) | |
- **Sentiment columns**: Each topic has an associated `_sentiment` column with values: | |
- 1.0 = Positive | |
- 0.5 = Neutral | |
- 0.0 = Negative | |
- **Output filename**: `[original_filename]_transformed_[timestamp].[format]` | |
### π‘ Tips: | |
- Use the numbered column list to easily identify and select columns | |
- The text and recommendation column names in configuration are now for reference only | |
- To include them in output, select them using the column checkboxes | |
- **The file will download automatically** after transformation completes | |
- Look for the download in your browser's download folder | |
""") | |
# Launch the app | |
if __name__ == "__main__": | |
demo.launch() |