Prospea / app.py
Pranav0111's picture
Update app.py
af686c2 verified
raw
history blame
17.9 kB
import streamlit as st
import plotly.express as px
import plotly.graph_objects as go
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Any
from render_ai_assistant import render_ai_assistant
# --- Data Processing Class ---
class DataProcessor:
def __init__(self):
self.data = None
self.numeric_columns = []
self.categorical_columns = []
self.date_columns = []
def load_data(self, file) -> bool:
try:
self.data = pd.read_csv(file)
self._classify_columns()
return True
except Exception as e:
st.error(f"Error loading data: {str(e)}")
return False
def _classify_columns(self):
for col in self.data.columns:
if pd.api.types.is_numeric_dtype(self.data[col]):
self.numeric_columns.append(col)
elif pd.api.types.is_datetime64_any_dtype(self.data[col]):
self.date_columns.append(col)
else:
try:
pd.to_datetime(self.data[col])
self.date_columns.append(col)
except:
self.categorical_columns.append(col)
def get_basic_stats(self) -> Dict[str, Any]:
if self.data is None:
return {}
stats = {
'summary': self.data[self.numeric_columns].describe(),
'missing_values': self.data.isnull().sum(),
'row_count': len(self.data),
'column_count': len(self.data.columns)
}
return stats
def create_visualization(self, chart_type: str, x_col: str, y_col: str, color_col: str = None) -> go.Figure:
if chart_type == "Line Plot":
fig = px.line(self.data, x=x_col, y=y_col, color=color_col)
elif chart_type == "Bar Plot":
fig = px.bar(self.data, x=x_col, y=y_col, color=color_col)
elif chart_type == "Scatter Plot":
fig = px.scatter(self.data, x=x_col, y=y_col, color=color_col)
elif chart_type == "Box Plot":
fig = px.box(self.data, x=x_col, y=y_col, color=color_col)
else:
fig = px.histogram(self.data, x=x_col, color=color_col)
return fig
class BrainstormManager:
def __init__(self):
if 'products' not in st.session_state:
st.session_state.products = {}
def generate_product_form(self) -> Dict:
with st.form("product_form"):
basic_info = {
"name": st.text_input("Product Name"),
"category": st.selectbox("Category", ["Digital", "Physical", "Service"]),
"description": st.text_area("Description"),
"target_audience": st.multiselect("Target Audience",
["Students", "Professionals", "Businesses", "Seniors", "Youth"]),
"price_range": st.slider("Price Range ($)", 0, 1000, (50, 200)),
"launch_date": st.date_input("Expected Launch Date")
}
st.subheader("Market Analysis")
market_analysis = {
"competitors": st.text_area("Main Competitors (one per line)"),
"unique_features": st.text_area("Unique Selling Points"),
"market_size": st.selectbox("Market Size",
["Small", "Medium", "Large", "Enterprise"]),
"growth_potential": st.slider("Growth Potential", 1, 10)
}
submitted = st.form_submit_button("Save Product")
return basic_info, market_analysis, submitted
def analyze_product(self, product_data: Dict) -> Dict:
insights = {
"market_opportunity": self._calculate_opportunity_score(product_data),
"suggested_price": self._suggest_price(product_data),
"risk_factors": self._identify_risks(product_data),
"next_steps": self._generate_next_steps(product_data)
}
return insights
def _calculate_opportunity_score(self, data: Dict) -> int:
score = 0
if data.get("market_size") == "Large":
score += 3
if len(data.get("target_audience", [])) >= 2:
score += 2
if data.get("growth_potential", 0) > 7:
score += 2
return min(score, 10)
def _suggest_price(self, data: Dict) -> float:
base_price = sum(data.get("price_range", (0, 0))) / 2
if data.get("market_size") == "Enterprise":
base_price *= 1.5
return round(base_price, 2)
def _identify_risks(self, data: Dict) -> List[str]:
risks = []
if data.get("competitors"):
risks.append("Competitive market - differentiation crucial")
if len(data.get("target_audience", [])) < 2:
risks.append("Narrow target audience - consider expansion")
return risks
def _generate_next_steps(self, data: Dict) -> List[str]:
steps = [
"Create detailed product specification",
"Develop MVP timeline",
"Plan marketing strategy"
]
if data.get("market_size") == "Enterprise":
steps.append("Prepare enterprise sales strategy")
return steps
# --- Sample Data Generation ---
def generate_sample_data():
dates = pd.date_range(start='2024-01-01', end='2024-01-31', freq='D')
return pd.DataFrame({
'Date': dates,
'Revenue': np.random.normal(1000, 100, len(dates)),
'Users': np.random.randint(100, 200, len(dates)),
'Engagement': np.random.uniform(0.5, 0.9, len(dates)),
'Category': np.random.choice(['A', 'B', 'C'], len(dates))
})
# --- Page Rendering Functions ---
def render_dashboard():
st.header("πŸ“Š Comprehensive Business Performance Dashboard")
# Generate sample data with more complex structure
data = generate_sample_data()
data['Profit_Margin'] = data['Revenue'] * np.random.uniform(0.1, 0.3, len(data))
# Top-level KPI Section
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Total Revenue",
f"${data['Revenue'].sum():,.2f}",
delta=f"{data['Revenue'].pct_change().mean()*100:.2f}%")
with col2:
st.metric("Total Users",
f"{data['Users'].sum():,}",
delta=f"{data['Users'].pct_change().mean()*100:.2f}%")
with col3:
st.metric("Avg Engagement",
f"{data['Engagement'].mean():.2%}",
delta=f"{data['Engagement'].pct_change().mean()*100:.2f}%")
with col4:
st.metric("Profit Margin",
f"{data['Profit_Margin'].mean():.2%}",
delta=f"{data['Profit_Margin'].pct_change().mean()*100:.2f}%")
# Visualization Grid
col1, col2 = st.columns(2)
with col1:
st.subheader("Revenue & Profit Trends")
fig_revenue = go.Figure()
fig_revenue.add_trace(go.Scatter(
x=data['Date'],
y=data['Revenue'],
mode='lines',
name='Revenue',
line=dict(color='blue')
))
fig_revenue.add_trace(go.Scatter(
x=data['Date'],
y=data['Profit_Margin'],
mode='lines',
name='Profit Margin',
line=dict(color='green')
))
fig_revenue.update_layout(height=350)
st.plotly_chart(fig_revenue, use_container_width=True)
with col2:
st.subheader("User Engagement Analysis")
fig_engagement = px.scatter(
data,
x='Users',
y='Engagement',
color='Category',
size='Revenue',
hover_data=['Date'],
title='User Engagement Dynamics'
)
fig_engagement.update_layout(height=350)
st.plotly_chart(fig_engagement, use_container_width=True)
# Category Performance
st.subheader("Category Performance Breakdown")
category_performance = data.groupby('Category').agg({
'Revenue': 'sum',
'Users': 'sum',
'Engagement': 'mean'
}).reset_index()
fig_category = px.bar(
category_performance,
x='Category',
y='Revenue',
color='Engagement',
title='Revenue by Category with Engagement Overlay'
)
st.plotly_chart(fig_category, use_container_width=True)
# Bottom Summary
st.subheader("Quick Insights")
insights_col1, insights_col2 = st.columns(2)
with insights_col1:
st.metric("Top Performing Category",
category_performance.loc[category_performance['Revenue'].idxmax(), 'Category'])
with insights_col2:
st.metric("Highest Engagement Category",
category_performance.loc[category_performance['Engagement'].idxmax(), 'Category'])
def render_analytics():
st.header("πŸ” Data Analytics")
processor = DataProcessor()
uploaded_file = st.file_uploader("Upload your CSV data", type=['csv'])
if uploaded_file is not None:
if processor.load_data(uploaded_file):
st.success("Data loaded successfully!")
tabs = st.tabs(["Data Preview", "Statistics", "Visualization", "Metrics"])
with tabs[0]:
st.subheader("Data Preview")
st.dataframe(processor.data.head())
st.info(f"Total rows: {len(processor.data)}, Total columns: {len(processor.data.columns)}")
with tabs[1]:
st.subheader("Basic Statistics")
stats = processor.get_basic_stats()
st.write(stats['summary'])
st.subheader("Missing Values")
st.write(stats['missing_values'])
with tabs[2]:
st.subheader("Create Visualization")
col1, col2, col3 = st.columns(3)
with col1:
chart_type = st.selectbox(
"Select Chart Type",
["Line Plot", "Bar Plot", "Scatter Plot", "Box Plot", "Histogram"]
)
with col2:
x_col = st.selectbox("Select X-axis", processor.data.columns)
with col3:
y_col = st.selectbox("Select Y-axis", processor.numeric_columns) if chart_type != "Histogram" else None
color_col = st.selectbox("Select Color Variable (optional)",
['None'] + processor.categorical_columns)
color_col = None if color_col == 'None' else color_col
fig = processor.create_visualization(
chart_type,
x_col,
y_col if y_col else x_col,
color_col
)
st.plotly_chart(fig, use_container_width=True)
with tabs[3]:
st.subheader("Column Metrics")
selected_col = st.selectbox("Select column", processor.numeric_columns)
metrics = {
'Mean': processor.data[selected_col].mean(),
'Median': processor.data[selected_col].median(),
'Std Dev': processor.data[selected_col].std(),
'Min': processor.data[selected_col].min(),
'Max': processor.data[selected_col].max()
}
cols = st.columns(len(metrics))
for col, (metric, value) in zip(cols, metrics.items()):
col.metric(metric, f"{value:.2f}")
def render_brainstorm_page():
st.title("Product Brainstorm Hub")
manager = BrainstormManager()
action = st.sidebar.radio("Action", ["View Products", "Create New Product"])
if action == "Create New Product":
basic_info, market_analysis, submitted = manager.generate_product_form()
if submitted:
product_data = {**basic_info, **market_analysis}
insights = manager.analyze_product(product_data)
product_id = f"prod_{len(st.session_state.products)}"
st.session_state.products[product_id] = {
"data": product_data,
"insights": insights,
"created_at": str(datetime.now())
}
st.success("Product added! View insights in the Products tab.")
else:
if st.session_state.products:
for prod_id, product in st.session_state.products.items():
with st.expander(f"🎯 {product['data']['name']}"):
col1, col2 = st.columns(2)
with col1:
st.subheader("Product Details")
st.write(f"Category: {product['data']['category']}")
st.write(f"Target: {', '.join(product['data']['target_audience'])}")
st.write(f"Description: {product['data']['description']}")
with col2:
st.subheader("Insights")
st.metric("Opportunity Score", f"{product['insights']['market_opportunity']}/10")
st.metric("Suggested Price", f"${product['insights']['suggested_price']}")
st.write("**Risk Factors:**")
for risk in product['insights']['risk_factors']:
st.write(f"- {risk}")
st.write("**Next Steps:**")
for step in product['insights']['next_steps']:
st.write(f"- {step}")
else:
st.info("No products yet. Create one to get started!")
def render_chat():
st.header("πŸ’¬ Business Assistant")
if "messages" not in st.session_state:
st.session_state.messages = []
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
if prompt := st.chat_input("Ask about your business..."):
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
response = f"Thank you for your question about '{prompt}'. The LLM integration will be implemented soon."
with st.chat_message("assistant"):
st.markdown(response)
st.session_state.messages.append({"role": "assistant", "content": response})
def load_huggingface_model(model_name="google/flan-t5-base"):
"""
Load a pre-trained model from Hugging Face
Args:
model_name (str): Hugging Face model identifier
Returns:
tuple: Loaded model and tokenizer
"""
try:
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
return model, tokenizer
except Exception as e:
st.error(f"Error loading model: {e}")
return None, None
def generate_text(model, tokenizer, prompt, max_length=200):
"""
Generate text based on input prompt
Args:
model: Loaded Hugging Face model
tokenizer: Model's tokenizer
prompt (str): Input text prompt
max_length (int): Maximum generated text length
Returns:
str: Generated text
"""
inputs = tokenizer(prompt, return_tensors="pt")
with torch.no_grad():
outputs = model.generate(
**inputs,
max_length=max_length,
num_return_sequences=1,
do_sample=True,
temperature=0.7
)
return tokenizer.decode(outputs[0], skip_special_tokens=True)
def render_ai_assistant():
st.title("πŸ€– Business AI Assistant")
# Model Selection
model_options = {
"Google Flan-T5": "google/flan-t5-base",
"DialoGPT": "microsoft/DialoGPT-medium",
"GPT-2 Small": "gpt2"
}
selected_model = st.selectbox(
"Choose AI Model",
list(model_options.keys())
)
# Load Selected Model
model_name = model_options[selected_model]
model, tokenizer = load_huggingface_model(model_name)
if model and tokenizer:
# Prompt Input
user_prompt = st.text_area(
"Enter your business query",
placeholder="Ask about business strategy, product analysis, etc."
)
if st.button("Generate Response"):
with st.spinner("Generating response..."):
response = generate_text(model, tokenizer, user_prompt)
st.success("AI Response:")
st.write(response)
else:
st.error("Failed to load model")
def main():
st.set_page_config(
page_title="Prospira",
page_icon="πŸ“Š",
layout="wide",
initial_sidebar_state="expanded"
)
with st.sidebar:
st.title("Prospira")
st.subheader("Data-Driven Solutions")
page = st.radio(
"Navigation",
["Dashboard", "Analytics", "Brainstorm", "AI Assistant", "Chat"] # Added AI Assistant
)
if page == "Dashboard":
render_dashboard()
elif page == "Analytics":
render_analytics()
elif page == "Brainstorm":
render_brainstorm_page()
elif page == "AI Assistant": # New condition
render_ai_assistant()
elif page == "Chat":
render_chat()