import json import os from enum import Enum from typing import List, Optional import pandas as pd import requests import streamlit as st from datasets import load_dataset from dotenv import load_dotenv from pydantic import BaseModel, Field # Load environment variables from .env file (for local development) load_dotenv() # Set up page config with custom styling st.set_page_config( page_title="🤖 LLM Data Analyst Agent", layout="wide", page_icon="🤖", initial_sidebar_state="expanded", ) # Custom CSS for styling st.markdown( """ """, unsafe_allow_html=True, ) # API configuration - works for both local and Hugging Face deployment api_key = os.environ.get("NEBIUS_API_KEY") if not api_key: st.markdown( """

🔑 API Key Configuration Required

For Local Development:

  1. Open the .env file in your project directory
  2. Replace your_api_key_here with your actual Nebius API key
  3. Save the file and restart the application

Example .env file:

NEBIUS_API_KEY=your_actual_api_key_here

For Hugging Face Spaces Deployment:

  1. Go to your Space settings
  2. Navigate to the "Variables and secrets" section
  3. Add a new secret: NEBIUS_API_KEY with your API key value
  4. Restart your Space

💡 The app will automatically detect the environment and use the appropriate method.

""", unsafe_allow_html=True, ) st.stop() # Set the API key in environment for consistency os.environ["OPENAI_API_KEY"] = api_key # Nebius API settings NEBIUS_API_URL = "https://api.studio.nebius.com/v1/chat/completions" MODEL_NAME = "Qwen/Qwen3-30B-A3B" # Function to call Nebius API def call_nebius_api(messages, response_format=None, thinking_mode=False): headers = {"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"} payload = {"model": MODEL_NAME, "messages": messages} if response_format: payload["response_format"] = response_format # If in thinking mode, ask the model to show its reasoning if thinking_mode: # Add instruction to show thinking process last_message = messages[-1] enhanced_content = ( f"{last_message['content']}\n\n" f"Important: First explain your thinking process step by step, " f"then provide your final answer clearly labeled as 'FINAL ANSWER:'" ) messages[-1]["content"] = enhanced_content payload["messages"] = messages try: response = requests.post(NEBIUS_API_URL, headers=headers, json=payload) response.raise_for_status() return response.json() except Exception as e: st.error(f"API Error: {str(e)}") if hasattr(e, "response") and hasattr(e.response, "text"): st.error(f"Response: {e.response.text}") return None # Load Bitext dataset @st.cache_data def load_bitext_dataset(): try: dataset = load_dataset( "bitext/Bitext-customer-support-llm-chatbot-training-dataset" ) df = pd.DataFrame(dataset["train"]) return df except Exception as e: st.error(f"Error loading dataset: {e}") return None # Define enums for request types class AnalysisType(str, Enum): QUANTITATIVE = "quantitative" QUALITATIVE = "qualitative" OUT_OF_SCOPE = "out_of_scope" class ColumnType(str, Enum): CATEGORY = "category" INTENT = "intent" CUSTOMER = "customer" AGENT = "agent" # Define schema for agent requests class AgentRequest(BaseModel): question: str = Field(..., description="The user's question") analysis_type: AnalysisType = Field(..., description="Type of analysis to perform") target_columns: Optional[List[ColumnType]] = Field( None, description="Columns to analyze" ) is_follow_up: bool = Field( False, description="Whether this is a follow-up question" ) previous_context: Optional[str] = Field( None, description="Context from previous question" ) # Function to classify the user question def classify_question( question: str, previous_context: Optional[str] = None ) -> AgentRequest: """ Use the LLM to classify the question and determine the analysis type and target columns. """ system_prompt = """ You are a data analyst assistant that classifies user questions about a customer support dataset. The dataset contains customer support conversations with these columns: - category: The category of the customer query - intent: The specific intent of the customer query - customer: The customer's message - agent: The agent's response Classify the question into one of these types: - quantitative: Questions about statistics, frequencies, distributions, or examples of categories/intents - qualitative: Questions asking for summaries or insights about specific categories/intents - out_of_scope: Questions that cannot be answered using the dataset Also identify which columns are relevant to the question. Return a JSON object with the following fields: { "analysis_type": "quantitative" | "qualitative" | "out_of_scope", "target_columns": ["category", "intent", "customer", "agent"] } """ context_info = f"\nPrevious context: {previous_context}" if previous_context else "" user_prompt = f"Classify this question: {question}{context_info}" response = call_nebius_api( [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ], response_format={"type": "json_object"}, ) if not response: # Fallback if API call fails return AgentRequest( question=question, analysis_type=AnalysisType.OUT_OF_SCOPE, target_columns=[], is_follow_up=bool(previous_context), previous_context=previous_context, ) try: content = ( response.get("choices", [{}])[0].get("message", {}).get("content", "{}") ) result = json.loads(content) # Convert string column names to ColumnType enum values target_columns = [] for col in result.get("target_columns", []): try: target_columns.append(ColumnType(col)) except ValueError: pass # Skip invalid column types return AgentRequest( question=question, analysis_type=AnalysisType(result.get("analysis_type", "out_of_scope")), target_columns=target_columns, is_follow_up=bool(previous_context), previous_context=previous_context, ) except (json.JSONDecodeError, ValueError) as e: st.warning(f"Error parsing API response: {str(e)}") return AgentRequest( question=question, analysis_type=AnalysisType.OUT_OF_SCOPE, target_columns=[], is_follow_up=bool(previous_context), previous_context=previous_context, ) # Function to generate a response to the user's question def generate_response(df: pd.DataFrame, request: AgentRequest) -> str: """ Generate a response to the user's question based on the request classification. """ # Get thinking mode setting from session state show_thinking = st.session_state.get("show_thinking", True) if request.analysis_type == AnalysisType.OUT_OF_SCOPE: return "I'm sorry, but I can't answer that question based on the available customer support data." # Prepare context with dataset information data_description = f"Dataset contains {len(df)} customer support conversations." if request.analysis_type == AnalysisType.QUANTITATIVE: # For quantitative questions, prepare relevant statistics stats_context = "" if ColumnType.CATEGORY in request.target_columns: category_counts = df["category"].value_counts().to_dict() stats_context += f"\nCategory distribution: {json.dumps(category_counts)}" if ColumnType.INTENT in request.target_columns: intent_counts = df["intent"].value_counts().to_dict() stats_context += f"\nIntent distribution: {json.dumps(intent_counts)}" # If specific examples are requested, include sample data if "example" in request.question.lower() or "show" in request.question.lower(): for col in request.target_columns: if col.value in df.columns: # Try to extract a specific value the user might be looking for search_terms = [term.lower() for term in df[col.value].unique()] for term in search_terms: if term in request.question.lower(): examples = ( df[df[col.value].str.lower() == term] .head(5) .to_dict("records") ) stats_context += f"\nExamples of {col.value}='{term}': {json.dumps(examples)}" break else: # QUALITATIVE stats_context = "" # For qualitative questions, prepare relevant data for summarization for col in request.target_columns: if col.value in df.columns: unique_values = df[col.value].unique().tolist() stats_context += ( f"\nUnique values for {col.value}: {json.dumps(unique_values)}" ) # If there's a specific category/intent mentioned in the question for value in unique_values: if value.lower() in request.question.lower(): filtered_data = ( df[df[col.value] == value].head(10).to_dict("records") ) stats_context += f"\nSample data for {col.value}='{value}': {json.dumps(filtered_data)}" break # Generate the response using LLM system_prompt = f""" You are a data analyst assistant that answers questions about a customer support dataset. {data_description} Use the following context to answer the question: {stats_context} Be concise and data-driven in your response. Mention specific numbers and patterns when appropriate. If there isn't enough information to fully answer the question, acknowledge that limitation. """ previous_context = "" if request.is_follow_up: previous_context = ( f"\nThis is a follow-up to previous context: {request.previous_context}" ) response = call_nebius_api( [ {"role": "system", "content": system_prompt}, { "role": "user", "content": f"Question: {request.question}{previous_context}", }, ], thinking_mode=show_thinking, ) if not response: return "I'm sorry, I encountered an error while processing your question. Please try again." return ( response.get("choices", [{}])[0] .get("message", {}) .get("content", "I couldn't generate a response. Please try again.") ) # Function to plan and execute approach based on mode def process_question( df: pd.DataFrame, question: str, mode: str, previous_context: Optional[str] = None ) -> str: """ Process the user question using the specified planning mode. """ # Add thinking indicator to the UI with custom styling thinking_placeholder = st.empty() thinking_placeholder.markdown( """
⚙️ Agent is thinking... Analyzing your question and preparing response.
""", unsafe_allow_html=True, ) # Get thinking mode setting from session state show_thinking = st.session_state.get("show_thinking", True) if mode == "pre_planning": # Pre-planning: First classify, then execute request = classify_question(question, previous_context) st.session_state.last_request = request # Show classification if thinking is enabled if show_thinking: thinking_placeholder.markdown( f"""
⚙️ Agent classified this as a {request.analysis_type} question
📊 Target columns: {[col.value for col in request.target_columns]}
""", unsafe_allow_html=True, ) result = generate_response(df, request) else: # reactive_planning # Reactive planning: Let the LLM decide approach dynamically system_prompt = """ You are a data analyst assistant that answers questions about a customer support dataset. The dataset contains customer support conversations with categories, intents, customer messages, and agent responses. Analyze the question and determine how to approach it: 1. Identify if it's asking for statistics, examples, summaries, or insights 2. Determine which aspects of the data are relevant 3. Generate a direct and concise response based on the data If the question cannot be answered with the customer support dataset, politely explain that it's outside your scope. """ # Prepare dataset information data_description = f"Dataset with {len(df)} records. " data_description += f"Sample of 5 records: {df.sample(5).to_dict('records')}" data_description += f"\nColumns: {df.columns.tolist()}" # Include full distributions for categories and intents # Check if the question is about distributions or frequencies question_lower = question.lower() include_distributions = any( term in question_lower for term in [ "distribution", "frequency", "count", "how many", "most frequent", "most common", "statistics", ] ) # Always include category values data_description += f"\nCategory values: {df['category'].unique().tolist()}" # Include full distribution data if the question appears to need it if include_distributions: if "category" in question_lower or "categories" in question_lower: category_counts = df["category"].value_counts().to_dict() data_description += ( f"\nCategory distribution: {json.dumps(category_counts)}" ) if "intent" in question_lower or "intents" in question_lower: intent_counts = df["intent"].value_counts().to_dict() data_description += ( f"\nIntent distribution: {json.dumps(intent_counts)}" ) else: # Just provide a sample of intents if not specifically asking about them data_description += f"\nIntent values sample: {df['intent'].sample(10).unique().tolist()}" else: # Just provide a sample of intents data_description += ( f"\nIntent values sample: {df['intent'].sample(10).unique().tolist()}" ) context_info = "" if previous_context: context_info = f"\nThis is a follow-up to: {previous_context}" response = call_nebius_api( [ {"role": "system", "content": system_prompt}, { "role": "user", "content": f"Question: {question}\n\nDataset information: {data_description}{context_info}", }, ], thinking_mode=show_thinking, ) if not response: thinking_placeholder.empty() return "I'm sorry, I encountered an error while processing your question. Please try again." result = ( response.get("choices", [{}])[0] .get("message", {}) .get("content", "I couldn't generate a response. Please try again.") ) # Clear the thinking indicator thinking_placeholder.empty() # Process the result to separate thinking from final answer if needed if show_thinking and "FINAL ANSWER:" in result: parts = result.split("FINAL ANSWER:") thinking = parts[0].strip() final_answer = parts[1].strip() # Display thinking and final answer with clear separation with st.expander("🧠 Agent's Thinking Process", expanded=True): st.markdown(thinking) return final_answer else: return result # Main app interface def main(): # Custom header st.markdown( """

🤖 LLM-powered Data Analyst Agent

Intelligent Analysis of Bitext Customer Support Dataset

""", unsafe_allow_html=True, ) # Load dataset with st.spinner("🔄 Loading dataset..."): df = load_bitext_dataset() if df is None: st.markdown( """

❌ Dataset Loading Failed

Failed to load dataset. Please check your internet connection and try again.

""", unsafe_allow_html=True, ) return # Success message with dataset info st.markdown( f"""

✅ Dataset Loaded Successfully

Loaded {len(df):,} customer support records ready for analysis

""", unsafe_allow_html=True, ) # Sidebar configuration with st.sidebar: st.markdown("## ⚙️ Configuration") # Planning mode selection with styling st.markdown("### 🧠 Planning Mode") planning_mode = st.radio( "Select how the agent should approach questions:", ["pre_planning", "reactive_planning"], format_func=lambda x: ( "🎯 Pre-planning + Execution" if x == "pre_planning" else "⚡ Reactive Dynamic Planning" ), help="Choose between structured pre-analysis or dynamic reactive planning", ) # Display current mode with badge mode_class = ( "pre-planning" if planning_mode == "pre_planning" else "reactive-planning" ) mode_name = ( "Pre-Planning" if planning_mode == "pre_planning" else "Reactive Planning" ) st.markdown( f"""
{mode_name} Mode Active
""", unsafe_allow_html=True, ) st.markdown("---") # Thinking process toggle st.markdown("### 🧠 Agent Behavior") if "show_thinking" not in st.session_state: st.session_state.show_thinking = True show_thinking = st.checkbox( "🔍 Show Agent's Thinking Process", value=st.session_state.show_thinking, help="Display the agent's reasoning and analysis steps", ) st.session_state.show_thinking = show_thinking st.markdown("---") # Dataset stats in sidebar st.markdown("### 📊 Dataset Overview") col1, col2 = st.columns(2) with col1: st.metric("📝 Total Records", f"{len(df):,}") with col2: st.metric("📂 Categories", len(df["category"].unique())) st.metric("🎯 Unique Intents", len(df["intent"].unique())) # Main content area # Dataset information in an expandable section with st.expander("📊 Dataset Information", expanded=False): st.markdown("### Dataset Details") # Create metrics row metrics_col1, metrics_col2, metrics_col3, metrics_col4 = st.columns(4) with metrics_col1: st.metric("Total Records", f"{len(df):,}") with metrics_col2: st.metric("Columns", len(df.columns)) with metrics_col3: st.metric("Categories", len(df["category"].unique())) with metrics_col4: st.metric("Intents", len(df["intent"].unique())) st.markdown("### Sample Data") st.dataframe(df.head(), use_container_width=True) st.markdown("### Category Distribution") st.bar_chart(df["category"].value_counts()) # Initialize session state for conversation history if "conversation" not in st.session_state: st.session_state.conversation = [] if "last_request" not in st.session_state: st.session_state.last_request = None # User input section st.markdown("## 💬 Ask Your Question") # Create a more prominent input area user_question = st.text_input( "What would you like to know about the customer support data?", placeholder="e.g., What are the most common customer issues?", key="user_input", help="Ask questions about statistics, examples, or insights from the dataset", ) # Submit button with custom styling col1, col2, col3 = st.columns([1, 2, 1]) with col2: submit_clicked = st.button("🚀 Analyze Question", use_container_width=True) if submit_clicked and user_question: # Add user question to conversation st.session_state.conversation.append({"role": "user", "content": user_question}) # Get previous context if this might be a follow-up previous_context = None if len(st.session_state.conversation) > 2: # Get the previous assistant response previous_context = st.session_state.conversation[-3]["content"] # Process the question with enhanced loading indicator with st.spinner("🤖 Agent is analyzing your question..."): response = process_question( df, user_question, planning_mode, previous_context ) # Add response to conversation st.session_state.conversation.append({"role": "assistant", "content": response}) # Display conversation with styled messages if st.session_state.conversation: st.markdown("## 💭 Conversation History") for i, message in enumerate(st.session_state.conversation): if message["role"] == "user": st.markdown( f"""
👤 You: {message['content']}
""", unsafe_allow_html=True, ) else: st.markdown( f"""
🤖 Agent: {message['content']}
""", unsafe_allow_html=True, ) if i < len(st.session_state.conversation) - 1: # Not the last message st.markdown("---") # Clear conversation button if st.button("🗑️ Clear Conversation"): st.session_state.conversation = [] st.rerun() if __name__ == "__main__": main()