from typing import TypedDict, List, Dict, Optional, Any from typing_extensions import List, TypedDict from dotenv import load_dotenv import chainlit as cl import os import asyncio import base64 import requests import time import datetime import random import string import fpdf from pathlib import Path # Re-enable the Tavily search tool from langchain_community.tools.tavily_search import TavilySearchResults from langchain_core.documents import Document from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage, AIMessage from langchain_openai import ChatOpenAI # from langchain_core.language_models import FakeListLLM # Add FakeListLLM for testing from langgraph.graph import StateGraph, END from openai import OpenAI, AsyncOpenAI # Import InsightFlow components from insight_state import InsightFlowState from utils.persona import PersonaFactory, PersonaReasoning # Load environment variables load_dotenv() # Initialize OpenAI client for DALL-E openai_client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY")) # --- INITIALIZE CORE COMPONENTS --- # Re-enable search tool initialization tavily_tool = TavilySearchResults(max_results=3) # Initialize LLMs with optimized settings for speed llm_planner = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.1, request_timeout=20) llm_analytical = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2, request_timeout=20) llm_scientific = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.3, request_timeout=20) llm_philosophical = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.4, request_timeout=20) llm_factual = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.3, request_timeout=20) llm_metaphorical = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.6, request_timeout=20) llm_futuristic = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.5, request_timeout=20) llm_synthesizer = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.2, request_timeout=20) # Direct mode LLM with slightly higher quality llm_direct = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.3, request_timeout=25) # --- SYSTEM PROMPTS --- PLANNER_SYSPROMPT = """You are an expert planner agent that coordinates research across multiple personas. Given a user query, your task is to create a research plan with specific sub-tasks for each selected persona. Break down complex queries into specific tasks that leverage each persona's unique perspective. """ SYNTHESIZER_SYSPROMPT = """You are a synthesis expert that combines multiple perspectives into a coherent response. Given different persona perspectives on the same query, create a unified response that: 1. Highlights unique insights from each perspective 2. Notes areas of agreement and divergence 3. Organizes information logically for the user Present the final response in a cohesive format that integrates all perspectives. """ DIRECT_SYSPROMPT = """You are a highly intelligent AI assistant that provides clear, direct, and helpful answers. Your responses should be accurate, concise, and well-reasoned. """ # --- LANGGRAPH NODES FOR INSIGHTFLOW AI --- async def run_planner_agent(state: InsightFlowState) -> InsightFlowState: """Plan the research approach for multiple personas""" query = state["query"] selected_personas = state["selected_personas"] # For the MVP implementation, we'll use a simplified planning approach # that just assigns the same query to each selected persona # In a full implementation, the planner would create custom tasks for each persona print(f"Planning research for query: {query}") print(f"Selected personas: {selected_personas}") state["current_step_name"] = "execute_persona_tasks" return state async def execute_persona_tasks(state: InsightFlowState) -> InsightFlowState: """Execute tasks for each selected persona""" query = state["query"] selected_personas = state["selected_personas"] persona_factory = cl.user_session.get("persona_factory") # Initialize responses dict if not exists if "persona_responses" not in state: state["persona_responses"] = {} print(f"Executing persona tasks for {len(selected_personas)} personas") # Get progress message if it exists progress_msg = cl.user_session.get("progress_msg") total_personas = len(selected_personas) # Process each persona with timeout safety # Using asyncio.gather to run multiple persona tasks in parallel for speed persona_tasks = [] # First, create all personas and tasks for persona_id in selected_personas: persona = persona_factory.create_persona(persona_id) if persona: # Add progress message for user feedback await cl.Message(content=f"Generating insights from {persona_id} perspective...").send() # Create task to run in parallel task = generate_perspective_with_timeout(persona, query) persona_tasks.append((persona_id, task)) # Run all perspective generations in parallel completed = 0 for persona_id, task in persona_tasks: try: # Update dynamic progress if progress message exists if progress_msg: percent_done = 40 + int((completed / total_personas) * 40) await update_message( progress_msg, f"⏳ Generating perspective from {persona_id} ({percent_done}%)..." ) response = await task state["persona_responses"][persona_id] = response print(f"Perspective generated for {persona_id}") # Increment completed count completed += 1 except Exception as e: print(f"Error getting {persona_id} perspective: {e}") state["persona_responses"][persona_id] = f"Could not generate perspective: {str(e)}" # Still increment completed count completed += 1 state["current_step_name"] = "synthesize_responses" return state async def generate_perspective_with_timeout(persona, query): """Generate a perspective with timeout handling""" try: # Set a timeout for each perspective generation response = await asyncio.wait_for( cl.make_async(persona.generate_perspective)(query), timeout=30 # 30-second timeout (reduced for speed) ) return response except asyncio.TimeoutError: # Handle timeout by providing a simplified response return f"The perspective generation timed out. This may be due to high API traffic or complexity of the query." except Exception as e: # Handle other errors return f"Error generating perspective: {str(e)}" async def synthesize_responses(state: InsightFlowState) -> InsightFlowState: """Combine perspectives from different personas""" query = state["query"] persona_responses = state["persona_responses"] if not persona_responses: state["synthesized_response"] = "No persona perspectives were generated." state["current_step_name"] = "present_results" return state print(f"Synthesizing responses from {len(persona_responses)} personas") # Add progress message for user feedback await cl.Message(content="Synthesizing insights from all perspectives...").send() # Prepare input for synthesizer perspectives_text = "" for persona_id, response in persona_responses.items(): perspectives_text += f"\n\n{persona_id.capitalize()} Perspective:\n{response}" # Use LLM to synthesize with timeout messages = [ SystemMessage(content=SYNTHESIZER_SYSPROMPT), HumanMessage(content=f"Query: {query}\n\nPerspectives:{perspectives_text}\n\nPlease synthesize these perspectives into a coherent response.") ] try: # Set a timeout for the synthesis synthesizer_response = await asyncio.wait_for( llm_synthesizer.ainvoke(messages), timeout=30 # 30-second timeout (reduced for speed) ) state["synthesized_response"] = synthesizer_response.content print("Synthesis complete") except asyncio.TimeoutError: # Handle timeout for synthesis state["synthesized_response"] = "The synthesis of perspectives timed out. Here are the individual perspectives instead." print("Synthesis timed out") except Exception as e: print(f"Error synthesizing perspectives: {e}") state["synthesized_response"] = f"Error synthesizing perspectives: {str(e)}" state["current_step_name"] = "generate_visualization" return state async def generate_dalle_image(prompt: str) -> Optional[str]: """Generate a DALL-E image and return the URL""" try: # Create a detailed prompt for hand-drawn style visualization full_prompt = f"Create a hand-drawn style visual note or sketch that represents: {prompt}. Make it look like a thoughtful drawing with annotations and key concepts highlighted. Include multiple perspectives connected together in a coherent visualization. Style: thoughtful hand-drawn sketch, notebook style with labels." # Call DALL-E to generate the image response = await openai_client.images.generate( model="dall-e-3", prompt=full_prompt, size="1024x1024", quality="standard", n=1 ) # Return the URL of the generated image return response.data[0].url except Exception as e: print(f"DALL-E image generation failed: {e}") return None async def generate_visualization(state: InsightFlowState) -> InsightFlowState: """Generate a Mermaid diagram from the multiple perspectives""" # Get progress message if available and update it progress_msg = cl.user_session.get("progress_msg") if progress_msg: await update_message(progress_msg, "⏳ Generating visual representation (90%)...") # Skip if no synthesized response or no personas if not state.get("synthesized_response") or not state.get("persona_responses"): state["current_step_name"] = "present_results" return state # Get visualization settings show_visualization = cl.user_session.get("show_visualization", True) visual_only_mode = cl.user_session.get("visual_only_mode", False) # Determine if we should generate visualizations (either mode is on) should_visualize = show_visualization or visual_only_mode # Generate mermaid diagram if visualizations are enabled if should_visualize: try: # Create the absolute simplest Mermaid diagram possible query = state.get("query", "Query") query_short = query[:20] + "..." if len(query) > 20 else query # Generate the most basic diagram structure mermaid_text = f"""graph TD Q["{query_short}"] S["Synthesized View"]""" # Add each persona with a simple connection for i, persona in enumerate(state.get("persona_responses", {}).keys()): persona_short = persona.capitalize() node_id = f"P{i+1}" mermaid_text += f""" {node_id}["{persona_short}"] Q --> {node_id} {node_id} --> S""" # Store the simplified mermaid code state["visualization_code"] = mermaid_text print("Visualization generation complete with simplified diagram") except Exception as e: print(f"Error generating visualization: {e}") state["visualization_code"] = None # Generate DALL-E image if visualizations are enabled try: # Update progress message if progress_msg: await update_message(progress_msg, "⏳ Generating hand-drawn visualization (92%)...") # Create a prompt from the synthesized response image_prompt = state.get("synthesized_response", "") if len(image_prompt) > 500: image_prompt = image_prompt[:500] # Limit prompt length # Add the query for context image_prompt = f"Query: {state.get('query', '')}\n\nSynthesis: {image_prompt}" # Generate the image image_url = await generate_dalle_image(image_prompt) state["visualization_image_url"] = image_url print("DALL-E visualization generated successfully") except Exception as e: print(f"Error generating DALL-E image: {e}") state["visualization_image_url"] = None state["current_step_name"] = "present_results" return state async def present_results(state: InsightFlowState) -> InsightFlowState: """Present the final results to the user""" synthesized_response = state.get("synthesized_response", "No synthesized response available.") print("Presenting results to user") # Ensure progress is at 100% before showing results progress_msg = cl.user_session.get("progress_msg") if progress_msg: await update_message(progress_msg, "✅ Process complete (100%)") # Get visualization settings visual_only_mode = cl.user_session.get("visual_only_mode", False) show_visualization = cl.user_session.get("show_visualization", True) # Check if either visualization mode is enabled visualization_enabled = visual_only_mode or show_visualization # Determine panel mode panel_mode = "Research Assistant" if state["panel_type"] == "research" else "Multi-Persona Discussion" # Check if we have visualizations available has_mermaid = state.get("visualization_code") is not None has_dalle_image = state.get("visualization_image_url") is not None has_any_visualization = has_mermaid or has_dalle_image # Send text response if we're not in visual-only mode OR if no visualizations are available if not visual_only_mode or (visual_only_mode and not has_any_visualization): panel_indicator = f"**{panel_mode} Insights:**\n\n" # In visual-only mode with no visualizations, add an explanation if visual_only_mode and not has_any_visualization: panel_indicator = f"**{panel_mode} Insights (No visualizations available):**\n\n" await cl.Message(content=panel_indicator + synthesized_response).send() # Display DALL-E generated image if available and visualizations are enabled if has_dalle_image and visualization_enabled: try: # Add a title for the image if visual_only_mode: image_title = f"**Hand-drawn Visualization of {panel_mode} Insights:**" else: image_title = "**Hand-drawn Visualization:**" # Send the title await cl.Message(content=image_title).send() # Send the image URL as markdown image_url = state["visualization_image_url"] image_markdown = f"![DALL-E Visualization]({image_url})" await cl.Message(content=image_markdown).send() except Exception as e: print(f"Error displaying DALL-E image: {e}") # If in visual-only mode and image fails but we have no other visualization or text shown if visual_only_mode and not has_mermaid and state.get("text_fallback_shown", False) is not True: panel_indicator = f"**{panel_mode} Insights (Image generation failed):**\n\n" await cl.Message(content=panel_indicator + synthesized_response).send() state["text_fallback_shown"] = True # Display Mermaid diagram if available and visualizations are enabled if has_mermaid and visualization_enabled: try: # Add a brief summary in visual-only mode if visual_only_mode: diagram_title = f"**Concept Map of {panel_mode} Insights:**" else: diagram_title = "**Concept Map:**" # First send a title message await cl.Message(content=diagram_title).send() # Try to render the mermaid diagram try: # Ensure the diagram is extremely simple and valid mermaid_code = state['visualization_code'] # Fallback to a guaranteed working diagram if rendering fails if not mermaid_code or len(mermaid_code) < 10: mermaid_code = """graph TD A[Query] --> B[Analysis] B --> C[Result]""" # Create the mermaid block with proper syntax # Each line needs to be separate without extra indentation mermaid_block = "```mermaid\n" for line in mermaid_code.split('\n'): mermaid_block += line.strip() + "\n" mermaid_block += "```" # Send the diagram as its own message await cl.Message(content=mermaid_block).send() except Exception as diagram_err: print(f"Error rendering diagram: {diagram_err}") # Try an ultra-simple fallback diagram ultra_simple = """```mermaid graph TD A[Start] --> B[End] ```""" await cl.Message(content=ultra_simple).send() # Send the footer only if we have visualizations if has_any_visualization: await cl.Message(content="_Visualizations represent the key relationships between concepts from different perspectives._").send() except Exception as e: print(f"Error displaying visualization: {e}") # If in visual-only mode and visualization fails but no image shown yet and no text shown yet if visual_only_mode and not has_dalle_image and state.get("text_fallback_shown", False) is not True: panel_indicator = f"**{panel_mode} Insights (Visualization failed):**\n\n" await cl.Message(content=panel_indicator + synthesized_response).send() # Mark that we showed the fallback text to avoid duplicates state["text_fallback_shown"] = True # Check if user wants to see individual perspectives (not in visual-only mode) if cl.user_session.get("show_perspectives", True) and not visual_only_mode: # Show individual perspectives as separate messages instead of expandable elements for persona_id, response in state["persona_responses"].items(): persona_name = persona_id.capitalize() # Get proper display name from config if available persona_factory = cl.user_session.get("persona_factory") if persona_factory: config = persona_factory.get_config(persona_id) if config and "name" in config: persona_name = config["name"] # Just send the perspective as a message with a header perspective_message = f"**{persona_name}'s Perspective:**\n\n{response}" await cl.Message(content=perspective_message).send() state["current_step_name"] = "END" return state # --- LANGGRAPH SETUP FOR INSIGHTFLOW AI --- # Now define the graph with the functions we've defined above insight_graph_builder = StateGraph(InsightFlowState) # Add all nodes insight_graph_builder.add_node("planner_agent", run_planner_agent) insight_graph_builder.add_node("execute_persona_tasks", execute_persona_tasks) insight_graph_builder.add_node("synthesize_responses", synthesize_responses) insight_graph_builder.add_node("generate_visualization", generate_visualization) insight_graph_builder.add_node("present_results", present_results) # Add edges insight_graph_builder.add_edge("planner_agent", "execute_persona_tasks") insight_graph_builder.add_edge("execute_persona_tasks", "synthesize_responses") insight_graph_builder.add_edge("synthesize_responses", "generate_visualization") insight_graph_builder.add_edge("generate_visualization", "present_results") insight_graph_builder.add_edge("present_results", END) # Set entry point insight_graph_builder.set_entry_point("planner_agent") # Compile the graph insight_flow_graph = insight_graph_builder.compile() print("InsightFlow graph compiled successfully") # --- DIRECT QUERY FUNCTION --- async def direct_query(query: str): """Process a direct query without using multiple personas""" messages = [ SystemMessage(content=DIRECT_SYSPROMPT), HumanMessage(content=query) ] try: # Direct query to LLM with streaming async for chunk in llm_direct.astream(messages): if chunk.content: # Yield chunk for streaming UI updates yield chunk.content except Exception as e: error_msg = f"Error processing direct query: {str(e)}" yield error_msg # Helper function to display help information async def display_help(): """Display all available commands""" help_text = """ # InsightFlow AI Commands **Persona Management:** - `/add persona_name` - Add a persona to your research team (e.g., `/add factual`) - `/remove persona_name` - Remove a persona from your team (e.g., `/remove philosophical`) - `/list` - Show all available personas - `/team` - Show your current team and settings **Speed and Mode Options:** - `/direct on|off` - Toggle direct LLM mode (bypasses multi-persona system) - `/quick on|off` - Toggle quick mode (uses fewer personas) - `/perspectives on|off` - Toggle showing individual perspectives - `/visualization on|off` - Toggle showing visualizations (Mermaid diagrams & DALL-E images) - `/visual_only on|off` - Show only visualizations without text (faster) **Export Options:** - `/export_md` - Export the current insight analysis to a markdown file - `/export_pdf` - Export the current insight analysis to a PDF file **System Commands:** - `/help` - Show this help message **Available Personas:** - analytical - Logical problem-solving - scientific - Evidence-based reasoning - philosophical - Meaning and implications - factual - Practical information - metaphorical - Creative analogies - futuristic - Forward-looking possibilities """ await cl.Message(content=help_text).send() # Export functions async def generate_random_id(length=8): """Generate a random ID for export filenames""" return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length)) async def export_to_markdown(state: InsightFlowState): """Export the current insight analysis to a markdown file""" if not state.get("synthesized_response"): return None, "No analysis available to export. Please run a query first." # Create exports directory if it doesn't exist Path("./exports").mkdir(exist_ok=True) # Generate a unique filename with timestamp timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") random_id = await generate_random_id() filename = f"exports/insightflow_analysis_{timestamp}_{random_id}.md" # Prepare content query = state.get("query", "No query specified") synthesized = state.get("synthesized_response", "No synthesized response") panel_mode = "Research Assistant" if state["panel_type"] == "research" else "Multi-Persona Discussion" # Create markdown content md_content = f"""# InsightFlow AI Analysis *Generated on: {datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}* ## Query {query} ## {panel_mode} Insights {synthesized} """ # Add perspectives if available if state.get("persona_responses"): md_content += "## Individual Perspectives\n\n" for persona_id, response in state["persona_responses"].items(): persona_name = persona_id.capitalize() md_content += f"### {persona_name}'s Perspective\n{response}\n\n" # Add visualization section header md_content += "## Visualizations\n\n" # Add DALL-E image if available if state.get("visualization_image_url"): md_content += f"### Hand-drawn Visual Representation\n\n" md_content += f"![InsightFlow Visualization]({state['visualization_image_url']})\n\n" # Add visualization if available if state.get("visualization_code"): md_content += "### Concept Map\n\n```mermaid\n" for line in state["visualization_code"].split('\n'): md_content += line.strip() + "\n" md_content += "```\n\n" md_content += "*Note: The mermaid diagram will render in applications that support mermaid syntax, like GitHub or VS Code with appropriate extensions.*\n\n" # Add footer md_content += "---\n*Generated by InsightFlow AI*" # Write to file try: with open(filename, "w", encoding="utf-8") as f: f.write(md_content) return filename, None except Exception as e: return None, f"Error exporting to markdown: {str(e)}" async def export_to_pdf(state: InsightFlowState): """Export the current insight analysis to a PDF file""" if not state.get("synthesized_response"): return None, "No analysis available to export. Please run a query first." # Create exports directory if it doesn't exist Path("./exports").mkdir(exist_ok=True) # Generate a unique filename with timestamp timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") random_id = await generate_random_id() filename = f"exports/insightflow_analysis_{timestamp}_{random_id}.pdf" try: # Create PDF pdf = fpdf.FPDF() pdf.add_page() # Add title pdf.set_font('Arial', 'B', 16) pdf.cell(0, 10, 'InsightFlow AI Analysis', 0, 1, 'C') pdf.set_font('Arial', 'I', 10) pdf.cell(0, 10, f"Generated on: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", 0, 1, 'C') pdf.ln(10) # Add query pdf.set_font('Arial', 'B', 12) pdf.cell(0, 10, 'Query:', 0, 1) pdf.set_font('Arial', '', 11) query = state.get("query", "No query specified") pdf.multi_cell(0, 10, query) pdf.ln(5) # Add synthesized insights panel_mode = "Research Assistant" if state["panel_type"] == "research" else "Multi-Persona Discussion" pdf.set_font('Arial', 'B', 12) pdf.cell(0, 10, f'{panel_mode} Insights:', 0, 1) pdf.set_font('Arial', '', 11) synthesized = state.get("synthesized_response", "No synthesized response") pdf.multi_cell(0, 10, synthesized) pdf.ln(10) # Add perspectives if available if state.get("persona_responses"): pdf.set_font('Arial', 'B', 12) pdf.cell(0, 10, 'Individual Perspectives:', 0, 1) pdf.ln(5) for persona_id, response in state["persona_responses"].items(): persona_name = persona_id.capitalize() pdf.set_font('Arial', 'B', 11) pdf.cell(0, 10, f"{persona_name}'s Perspective:", 0, 1) pdf.set_font('Arial', '', 11) pdf.multi_cell(0, 10, response) pdf.ln(5) # Add visualizations section pdf.add_page() pdf.set_font('Arial', 'B', 14) pdf.cell(0, 10, 'Visualizations', 0, 1, 'C') pdf.ln(5) # Add DALL-E image if available if state.get("visualization_image_url"): try: # Add header for the visualization pdf.set_font('Arial', 'B', 12) pdf.cell(0, 10, 'Hand-drawn Visual Representation:', 0, 1) pdf.ln(5) # Download the image image_url = state.get("visualization_image_url") image_path = f"exports/temp_image_{timestamp}_{random_id}.jpg" # Download the image using requests response = requests.get(image_url, stream=True) if response.status_code == 200: with open(image_path, 'wb') as img_file: for chunk in response.iter_content(1024): img_file.write(chunk) # Add the image to PDF with proper sizing pdf.image(image_path, x=10, y=None, w=190) pdf.ln(5) # Remove the temporary image os.remove(image_path) else: pdf.multi_cell(0, 10, "Could not download the visualization image.") except Exception as img_error: pdf.multi_cell(0, 10, f"Error including visualization image: {str(img_error)}") # Add mermaid diagram if available if state.get("visualization_code"): pdf.ln(10) pdf.set_font('Arial', 'B', 12) pdf.cell(0, 10, 'Concept Map Structure:', 0, 1) pdf.ln(5) # Extract relationships from the mermaid code mermaid_code = state.get("visualization_code", "") pdf.set_font('Arial', 'I', 10) pdf.multi_cell(0, 10, "Below is a text representation of the concept relationships:") pdf.ln(5) # Add a text representation of the diagram try: # Parse the mermaid code to extract relationships relationships = [] for line in mermaid_code.split('\n'): line = line.strip() if '-->' in line: parts = line.split('-->') if len(parts) == 2: source = parts[0].strip() target = parts[1].strip() relationships.append(f"• {source} connects to {target}") if relationships: pdf.set_font('Arial', '', 10) for rel in relationships: pdf.multi_cell(0, 8, rel) else: # Add a simplified representation of the concept map pdf.multi_cell(0, 10, "The concept map shows relationships between the query and multiple perspectives, leading to a synthesized view.") except Exception as diagram_error: pdf.multi_cell(0, 10, f"Error parsing concept map: {str(diagram_error)}") pdf.multi_cell(0, 10, "The concept map shows the relationships between different perspectives on the topic.") # Add footer pdf.set_y(-15) pdf.set_font('Arial', 'I', 8) pdf.cell(0, 10, 'Generated by InsightFlow AI', 0, 0, 'C') # Output PDF pdf.output(filename) return filename, None except Exception as e: return None, f"Error exporting to PDF: {str(e)}" # --- CHAINLIT INTEGRATION --- # Super simplified version with command-based persona selection @cl.on_chat_start async def start_chat(): """Initialize the InsightFlow AI session""" print("InsightFlow AI chat started: Initializing session...") # Initialize persona factory and load configs persona_factory = PersonaFactory(config_dir="persona_configs") cl.user_session.set("persona_factory", persona_factory) # Initialize state with default personas initial_state = InsightFlowState( panel_type="research", query="", selected_personas=["analytical", "scientific", "philosophical"], persona_responses={}, synthesized_response=None, current_step_name="awaiting_query", error_message=None ) # Initialize LangGraph cl.user_session.set("insight_state", initial_state) cl.user_session.set("insight_graph", insight_flow_graph) # Set default options cl.user_session.set("direct_mode", False) # Default to InsightFlow mode cl.user_session.set("show_perspectives", True) # Default to showing all perspectives cl.user_session.set("quick_mode", False) # Default to normal speed cl.user_session.set("show_visualization", True) # Default to showing visualizations cl.user_session.set("visual_only_mode", False) # Default to showing both text and visuals # Welcome message with command instructions welcome_message = """ # Welcome to InsightFlow AI This assistant provides multiple perspectives on your questions using specialized personas. **Your current research team:** - Analytical reasoning - Scientific reasoning - Philosophical reasoning Type `/help` to see all available commands. """ await cl.Message(content=welcome_message).send() # Display help initially await display_help() # Update function for Chainlit 2.5.5 compatibility async def update_message(message, new_content): """Update a message in a way that's compatible with Chainlit 2.5.5""" try: # First try the direct content update method (newer versions) await message.update(content=new_content) except TypeError: # Fall back to older method for Chainlit 2.5.5 message.content = new_content await message.update() @cl.on_message async def handle_message(message: cl.Message): """Handle user messages""" state = cl.user_session.get("insight_state") graph = cl.user_session.get("insight_graph") if not state or not graph: await cl.Message(content="Session error. Please refresh the page.").send() return # Check for commands to change personas or settings msg_content = message.content.strip() # Handle commands if msg_content.startswith('/'): parts = msg_content.split() command = parts[0].lower() if command == '/help': # Show help text await display_help() return elif command == '/list': # List available personas persona_list = """ **Available personas:** - analytical - Logical problem-solving - scientific - Evidence-based reasoning - philosophical - Meaning and implications - factual - Practical information - metaphorical - Creative analogies - futuristic - Forward-looking possibilities """ await cl.Message(content=persona_list).send() return elif command == '/team': # Show current team team_list = ", ".join([p.capitalize() for p in state["selected_personas"]]) direct_mode = "ON" if cl.user_session.get("direct_mode", False) else "OFF" quick_mode = "ON" if cl.user_session.get("quick_mode", False) else "OFF" show_perspectives = "ON" if cl.user_session.get("show_perspectives", True) else "OFF" show_visualization = "ON" if cl.user_session.get("show_visualization", True) else "OFF" visual_only_mode = "ON" if cl.user_session.get("visual_only_mode", False) else "OFF" status = f""" **Your current settings:** - Research team: {team_list} - Direct mode: {direct_mode} - Quick mode: {quick_mode} - Show perspectives: {show_perspectives} - Show visualizations: {show_visualization} - Visual-only mode: {visual_only_mode} (Mermaid diagrams & DALL-E images) """ await cl.Message(content=status).send() return elif command == '/add' and len(parts) > 1: # Add persona persona_id = parts[1].lower() persona_factory = cl.user_session.get("persona_factory") if persona_factory and persona_factory.get_config(persona_id): if persona_id not in state["selected_personas"]: state["selected_personas"].append(persona_id) cl.user_session.set("insight_state", state) await cl.Message(content=f"Added {persona_id} to your research team.").send() else: await cl.Message(content=f"{persona_id} is already in your research team.").send() else: await cl.Message(content=f"Unknown persona: {persona_id}. Use /list to see available personas.").send() return elif command == '/remove' and len(parts) > 1: # Remove persona persona_id = parts[1].lower() if persona_id in state["selected_personas"]: if len(state["selected_personas"]) > 1: # Don't remove the last persona state["selected_personas"].remove(persona_id) cl.user_session.set("insight_state", state) await cl.Message(content=f"Removed {persona_id} from your research team.").send() else: await cl.Message(content="Cannot remove the last persona. You need at least one for analysis.").send() else: await cl.Message(content=f"{persona_id} is not in your research team.").send() return elif command == '/direct' and len(parts) > 1: # Toggle direct mode setting = parts[1].lower() if setting in ['on', 'true', '1', 'yes']: cl.user_session.set("direct_mode", True) await cl.Message(content="Direct mode enabled. Bypassing InsightFlow for faster responses.").send() elif setting in ['off', 'false', '0', 'no']: cl.user_session.set("direct_mode", False) await cl.Message(content="Direct mode disabled. Using full InsightFlow system.").send() else: await cl.Message(content="Invalid option. Use `/direct on` or `/direct off`.").send() return elif command == '/perspectives' and len(parts) > 1: # Toggle showing perspectives setting = parts[1].lower() if setting in ['on', 'true', '1', 'yes']: cl.user_session.set("show_perspectives", True) await cl.Message(content="Individual perspectives will be shown.").send() elif setting in ['off', 'false', '0', 'no']: cl.user_session.set("show_perspectives", False) await cl.Message(content="Individual perspectives will be hidden for concise output.").send() else: await cl.Message(content="Invalid option. Use `/perspectives on` or `/perspectives off`.").send() return elif command == '/quick' and len(parts) > 1: # Toggle quick mode setting = parts[1].lower() if setting in ['on', 'true', '1', 'yes']: cl.user_session.set("quick_mode", True) if len(state["selected_personas"]) > 2: # In quick mode, use max 2 personas state["selected_personas"] = state["selected_personas"][:2] cl.user_session.set("insight_state", state) await cl.Message(content="Quick mode enabled. Using fewer personas for faster responses.").send() elif setting in ['off', 'false', '0', 'no']: cl.user_session.set("quick_mode", False) await cl.Message(content="Quick mode disabled. Using your full research team.").send() else: await cl.Message(content="Invalid option. Use `/quick on` or `/quick off`.").send() return elif command == '/visualization' and len(parts) > 1: # Toggle showing Mermaid diagrams setting = parts[1].lower() if setting in ['on', 'true', '1', 'yes']: cl.user_session.set("show_visualization", True) await cl.Message(content="Visual diagrams will be shown to represent insights.").send() elif setting in ['off', 'false', '0', 'no']: cl.user_session.set("show_visualization", False) await cl.Message(content="Visual diagrams will be hidden.").send() else: await cl.Message(content="Invalid option. Use `/visualization on` or `/visualization off`.").send() return elif command == '/visual_only' and len(parts) > 1: # Toggle visual-only mode setting = parts[1].lower() if setting in ['on', 'true', '1', 'yes']: # When enabling visual-only mode, turn off other display options cl.user_session.set("visual_only_mode", True) cl.user_session.set("show_visualization", True) # Ensure visualization is on cl.user_session.set("show_perspectives", False) # Turn off perspective display await cl.Message(content="Visual-only mode enabled. Only visualizations (Mermaid diagrams & DALL-E images) will be shown. Individual perspectives have been disabled.").send() elif setting in ['off', 'false', '0', 'no']: cl.user_session.set("visual_only_mode", False) cl.user_session.set("show_perspectives", True) # Restore default when turning off await cl.Message(content="Visual-only mode disabled. Both text and visualizations will be shown.").send() else: await cl.Message(content="Invalid option. Use `/visual_only on` or `/visual_only off`.").send() return elif command == '/export_md': # Export to markdown state = cl.user_session.get("insight_state") if not state: await cl.Message(content="No analysis data available. Run a query first.").send() return await cl.Message(content="Exporting analysis to markdown...").send() filename, error = await export_to_markdown(state) if error: await cl.Message(content=f"Error: {error}").send() else: await cl.Message(content=f"Analysis exported to: `{filename}`").send() return elif command == '/export_pdf': # Export to PDF state = cl.user_session.get("insight_state") if not state: await cl.Message(content="No analysis data available. Run a query first.").send() return await cl.Message(content="Exporting analysis to PDF...").send() filename, error = await export_to_pdf(state) if error: await cl.Message(content=f"Error: {error}").send() else: await cl.Message(content=f"Analysis exported to: `{filename}`").send() return # Process query (either direct or through InsightFlow) # Create streaming message for results answer_msg = cl.Message(content="") await answer_msg.send() # Create progress message progress_msg = cl.Message(content="⏳ Processing your query (0%)...") await progress_msg.send() try: # Check if direct mode is enabled if cl.user_session.get("direct_mode", False): # Direct mode with streaming - bypass InsightFlow await update_message(progress_msg, "⏳ Processing in direct mode (20%)...") # Stream response directly full_response = "" async for chunk in direct_query(msg_content): full_response += chunk # Update the message with the new chunk await update_message(answer_msg, f"**Direct Answer:**\n\n{full_response}") # Complete the progress await update_message(progress_msg, "✅ Processing complete (100%)") return # Apply quick mode if enabled if cl.user_session.get("quick_mode", False) and len(state["selected_personas"]) > 2: # Temporarily use just 2 personas for speed original_personas = state["selected_personas"].copy() state["selected_personas"] = state["selected_personas"][:2] await update_message(progress_msg, f"⏳ Using quick mode with personas: {', '.join(state['selected_personas'])} (10%)...") # Standard InsightFlow processing # Set query in state state["query"] = msg_content # Setup for progress tracking cl.user_session.set("progress_msg", progress_msg) cl.user_session.set("progress_steps", { "planner_agent": 10, "execute_persona_tasks": 40, "synthesize_responses": 80, "generate_visualization": 90, "present_results": 95, "END": 100 }) # Hook into state changes for progress async def state_monitor(): """Monitor state changes to update progress""" last_step = None while True: current_step = state.get("current_step_name") if current_step != last_step: progress_steps = cl.user_session.get("progress_steps", {}) if current_step in progress_steps: progress = progress_steps[current_step] status_messages = { "planner_agent": "Planning research approach", "execute_persona_tasks": "Generating persona perspectives", "synthesize_responses": "Synthesizing perspectives", "generate_visualization": "Generating visual representation", "present_results": "Finalizing results", "END": "Complete" } status = status_messages.get(current_step, current_step) await update_message(progress_msg, f"⏳ {status} ({progress}%)...") last_step = current_step # Check if we're done if current_step == "END": await update_message(progress_msg, f"✅ Process complete (100%)") break # Wait before checking again await asyncio.sleep(0.5) # Start the monitor in the background asyncio.create_task(state_monitor()) # Run the graph with timeout protection thread_id = cl.user_session.get("id", "default_thread_id") config = {"configurable": {"thread_id": thread_id}} # Set an overall timeout for the entire graph execution final_state = await asyncio.wait_for( graph.ainvoke(state, config), timeout=150 # 2.5 minute timeout ) cl.user_session.set("insight_state", final_state) # Update the answer message with the response panel_mode = "Research Assistant" if final_state["panel_type"] == "research" else "Multi-Persona Discussion" panel_indicator = f"**{panel_mode} Insights:**\n\n" await update_message(answer_msg, panel_indicator + final_state.get("synthesized_response", "No response generated.")) # Show individual perspectives if enabled if cl.user_session.get("show_perspectives", True): for persona_id, response in final_state["persona_responses"].items(): persona_name = persona_id.capitalize() # Get proper display name from config if available persona_factory = cl.user_session.get("persona_factory") if persona_factory: config = persona_factory.get_config(persona_id) if config and "name" in config: persona_name = config["name"] # Send perspective as a message perspective_message = f"**{persona_name}'s Perspective:**\n\n{response}" await cl.Message(content=perspective_message).send() # Restore original personas if in quick mode if cl.user_session.get("quick_mode", False) and 'original_personas' in locals(): state["selected_personas"] = original_personas cl.user_session.set("insight_state", state) except asyncio.TimeoutError: print("Overall graph execution timed out") await update_message(answer_msg, "The analysis took too long and timed out. Try using `/direct on` or `/quick on` for faster responses.") await update_message(progress_msg, "❌ Process timed out") except Exception as e: print(f"Error in query processing: {e}") await update_message(answer_msg, f"I encountered an error: {e}") await update_message(progress_msg, f"❌ Error: {str(e)}") print("InsightFlow AI setup complete. Ready to start.")