#25-08-05 import gradio as gr import pandas as pd import numpy as np import os import re import tempfile import shutil from datetime import datetime from typing import List, Tuple, Dict, Any import json from io import BytesIO class CSVBinaryTransformer: def __init__(self): self.df = None self.original_filename = None self.processed_df = None def load_csv(self, file_path: str) -> Tuple[pd.DataFrame, List[str]]: """Load CSV file and return dataframe and column list""" try: self.df = pd.read_csv(file_path) self.original_filename = os.path.splitext(os.path.basename(file_path))[0] # Create checkbox options for columns column_choices = [(col, col) for col in self.df.columns.tolist()] return self.df.head(10), column_choices except Exception as e: raise gr.Error(f"Error loading CSV: {str(e)}") def select_all_columns(self) -> List[str]: """Return all column names for select all functionality""" if self.df is not None: return self.df.columns.tolist() return [] def deselect_all_columns(self) -> List[str]: """Return empty list for deselect all functionality""" return [] def process_absa_columns(self, df: pd.DataFrame) -> pd.DataFrame: """Process ABSA prefixed columns to create sentiment and topic columns""" absa_columns = [col for col in df.columns if col.startswith('ABSA')] if not absa_columns: return df # Create verbatim sentiment columns df['Verbatim_Positive'] = 0 df['Verbatim_Neutral'] = 0 df['Verbatim_Negative'] = 0 # Dictionary to store topic-sentiment combinations topic_sentiment_combinations = set() for col in absa_columns: for idx, value in df[col].items(): if pd.isna(value): continue value_str = str(value) # Split by pipe delimiter to get individual topic::sentiment pairs pairs = [pair.strip() for pair in value_str.split('|') if pair.strip()] for pair in pairs: if '::' in pair: topic_part, sentiment_part = pair.split('::', 1) topic = topic_part.strip() sentiment = sentiment_part.strip() if topic and sentiment: topic_sentiment_combinations.add((topic, sentiment)) # Update verbatim sentiment columns based on sentiment sentiment_lower = sentiment.lower() if 'positive' in sentiment_lower: df.at[idx, 'Verbatim_Positive'] = 1 elif 'negative' in sentiment_lower: df.at[idx, 'Verbatim_Negative'] = 1 elif 'neutral' in sentiment_lower: df.at[idx, 'Verbatim_Neutral'] = 1 # Create columns for topic-sentiment combinations for topic, sentiment in topic_sentiment_combinations: # Clean topic name for column naming safe_topic = re.sub(r'[^\w]', '_', topic).strip('_') col_name = f"S_{safe_topic}" if col_name not in df.columns: df[col_name] = "" # Fill the S_ columns with sentiment values for col in absa_columns: for idx, value in df[col].items(): if pd.isna(value): continue value_str = str(value) # Split by pipe delimiter to get individual topic::sentiment pairs pairs = [pair.strip() for pair in value_str.split('|') if pair.strip()] for pair in pairs: if '::' in pair: topic_part, sentiment_part = pair.split('::', 1) topic = topic_part.strip() sentiment = sentiment_part.strip() if topic and sentiment: # Clean topic name for column naming safe_topic = re.sub(r'[^\w]', '_', topic).strip('_') col_name = f"S_{safe_topic}" if col_name in df.columns: df.at[idx, col_name] = sentiment return df def process_categories_columns(self, df: pd.DataFrame) -> pd.DataFrame: """Process Categories prefixed columns to create binary category columns""" category_columns = [col for col in df.columns if col.startswith('Categories')] if not category_columns: return df # Collect all unique categories all_categories = set() for col in category_columns: for value in df[col].dropna(): if isinstance(value, str): # Split by common delimiters categories = re.split(r'[,;|]', value) for cat in categories: cat = cat.strip() if cat: all_categories.add(cat) # Create binary columns for each category for category in all_categories: col_name = f"C_{category}" df[col_name] = 0 for col in category_columns: for idx, value in df[col].items(): if pd.isna(value): continue if isinstance(value, str) and category.lower() in value.lower(): df.at[idx, col_name] = 1 return df def process_topics_column(self, df: pd.DataFrame, topics_column: str) -> pd.DataFrame: """Process specified topics column to create binary topic columns""" if not topics_column or topics_column not in df.columns: return df # Collect all unique topics all_topics = set() for value in df[topics_column].dropna(): if isinstance(value, str): # Split by common delimiters topics = re.split(r'[,;|]', value) for topic in topics: topic = topic.strip() if topic: all_topics.add(topic) # Create binary columns for each topic for topic in all_topics: col_name = f"T_{topic}" df[col_name] = 0 for idx, value in df[topics_column].items(): if pd.isna(value): continue if isinstance(value, str) and topic.lower() in value.lower(): df.at[idx, col_name] = 1 return df def save_transformed_data(self, output_format='xlsx'): """Save the transformed data and return the file path - using exact same method as working version""" if self.processed_df is None: raise ValueError("No transformed data to save") # Create filename with original filename prefix and timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") prefix = self.original_filename if self.original_filename else 'transformed_data' if output_format == 'xlsx': filename = f"{prefix}_BinaryTransformation_{timestamp}.xlsx" temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') self.processed_df.to_excel(temp_file.name, index=False) temp_file.close() final_path = os.path.join(tempfile.gettempdir(), filename) if os.path.exists(final_path): os.remove(final_path) os.rename(temp_file.name, final_path) else: # csv filename = f"{prefix}_BinaryTransformation_{timestamp}.csv" temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.csv') self.processed_df.to_csv(temp_file.name, index=False) temp_file.close() final_path = os.path.join(tempfile.gettempdir(), filename) if os.path.exists(final_path): os.remove(final_path) os.rename(temp_file.name, final_path) if not os.path.exists(final_path): raise ValueError(f"Failed to create output file: {final_path}") return final_path # Initialize transformer transformer = CSVBinaryTransformer() def handle_file_upload(file): """Handle CSV file upload""" if file is None: return None, gr.update(choices=[], value=[]), "Please upload a CSV file" try: preview_df, column_choices = transformer.load_csv(file.name) preview_html = preview_df.to_html(classes="table table-striped", table_id="upload-preview") # Update the checkbox group with new choices and make all columns visible column_names = [col for col, _ in column_choices] return ( preview_html, gr.update(choices=column_choices, value=[], visible=True), f"✅ Successfully loaded CSV with {len(transformer.df)} rows and {len(transformer.df.columns)} columns" ) except Exception as e: return None, gr.update(choices=[], value=[], visible=False), f"❌ Error: {str(e)}" def select_all(): """Select all columns""" if transformer.df is not None: return gr.update(value=transformer.select_all_columns()) return gr.update(value=[]) def deselect_all(): """Deselect all columns""" return gr.update(value=transformer.deselect_all_columns()) def process_transformation(selected_columns, topics_column, export_format): """Process the transformation - using exact same pattern as working version""" try: if transformer.df is None: return None, None, "❌ Error: No CSV file loaded" if not selected_columns: return None, None, "❌ Error: Please select at least one column" # Create a copy of the dataframe with selected columns processed_df = transformer.df[selected_columns].copy() # Process ABSA columns processed_df = transformer.process_absa_columns(processed_df) # Process Categories columns processed_df = transformer.process_categories_columns(processed_df) # Process Topics column processed_df = transformer.process_topics_column(processed_df, topics_column) # Store processed data transformer.processed_df = processed_df # Generate preview preview_html = processed_df.head(20).to_html(classes="table table-striped", table_id="preview-table") # Save file using the exact same method as working version output_file = transformer.save_transformed_data(export_format.lower().replace(' (.', '').replace(')', '')) success_msg = f"✅ Transformation completed! Generated file: {os.path.basename(output_file)}" success_msg += f"\n📊 Processed {len(transformer.processed_df)} rows with {len(transformer.processed_df.columns)} columns" success_msg += f"\n💾 File saved successfully" success_msg += f"\n📥 File download should start automatically" return preview_html, output_file, success_msg except Exception as e: import traceback error_msg = f"❌ Error during transformation: {str(e)}\n\n{traceback.format_exc()}" return None, None, error_msg # Create Gradio interface - using similar structure to working version with gr.Blocks(title="CSV Binary Transformation Tool", theme=gr.themes.Soft()) as app: gr.Markdown(""" # 📊 CSV Binary Transformation Tool This tool transforms CSV files by creating binary columns for sentiment analysis, categories, and topics. ## Features: - **ABSA Processing**: Creates sentiment columns and topic-sentiment combinations - **Category Processing**: Creates binary columns for each category - **Topic Processing**: Creates binary columns for each topic - **Flexible Export**: Support for CSV and Excel formats """) with gr.Row(): with gr.Column(scale=1): # File upload section gr.Markdown("### 1. Upload CSV File") file_input = gr.File( label="Upload Input File", file_types=[".csv"], type="filepath" ) upload_status = gr.Textbox( label="Upload Status", interactive=False, lines=2 ) # Column selection section gr.Markdown("### 2. Select Columns") gr.Markdown("*Choose which columns from your CSV to include in the output file*") with gr.Row(): select_all_btn = gr.Button("✓ Select All", size="sm", variant="secondary") deselect_all_btn = gr.Button("✗ Deselect All", size="sm", variant="secondary") column_selector = gr.CheckboxGroup( label="Choose columns to include in output", choices=[], value=[], interactive=True, visible=False, info="Select the columns you want to include in the transformed output file" ) # Topics column input gr.Markdown("### 3. Specify Topics Column") topics_column_input = gr.Textbox( label="Topics Column Name", placeholder="Enter the name of the column containing topics", info="This column will be used to create T_ binary columns" ) # Export options gr.Markdown("### 4. Export Settings") export_format = gr.Radio( label="Output Format", choices=["CSV (.csv)", "Excel (.xlsx)"], value="Excel (.xlsx)" ) # Process button process_btn = gr.Button( "🚀 Transform Data", variant="primary", size="lg" ) with gr.Column(scale=2): # Preview sections gr.Markdown("### File Preview") with gr.Tabs(): with gr.Tab("Original Data"): original_preview = gr.HTML( label="Original Data Preview (First 10 rows)", value="

No file uploaded yet

" ) with gr.Tab("Transformed Data"): transformed_preview = gr.HTML( label="Transformed Data Preview (First 20 rows)", value="

No transformation performed yet

" ) # Status and download process_status = gr.Textbox( label="Processing Status", interactive=False, lines=6 ) # Download section - using exact same setup as working version gr.Markdown("### 📥 Download Status") gr.Markdown("Please click on the link inside the output file size value to download the transformed file (the number value on the right hand side below). You may need to right click and select Save Link As (or something similar)") output_file = gr.File( label="Transformed Binary Matrix (Auto-Download)", interactive=False, visible=True ) # Event handlers - same pattern as working version file_input.change( fn=handle_file_upload, inputs=[file_input], outputs=[original_preview, column_selector, upload_status] ) select_all_btn.click( fn=select_all, outputs=[column_selector] ) deselect_all_btn.click( fn=deselect_all, outputs=[column_selector] ) process_btn.click( fn=process_transformation, inputs=[column_selector, topics_column_input, export_format], outputs=[transformed_preview, output_file, process_status] ) # Add custom CSS for better table styling app.load(js=""" function() { const style = document.createElement('style'); style.textContent = ` .table { font-size: 12px; } .table th, .table td { padding: 4px 8px; } #upload-preview, #preview-table { max-height: 400px; overflow-y: auto; } `; document.head.appendChild(style); } """) if __name__ == "__main__": app.launch( share=True, max_file_size="50mb" )