Prospea / app.py
Pranav0111's picture
Update app.py
68070f2 verified
raw
history blame
16.5 kB
import streamlit as st
import plotly.express as px
import plotly.graph_objects as go
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, List, Any
# --- Data Processing Class ---
class DataProcessor:
def __init__(self):
self.data = None
self.numeric_columns = []
self.categorical_columns = []
self.date_columns = []
def load_data(self, file) -> bool:
try:
self.data = pd.read_csv(file)
self._classify_columns()
return True
except Exception as e:
st.error(f"Error loading data: {str(e)}")
return False
def _classify_columns(self):
for col in self.data.columns:
if pd.api.types.is_numeric_dtype(self.data[col]):
self.numeric_columns.append(col)
elif pd.api.types.is_datetime64_any_dtype(self.data[col]):
self.date_columns.append(col)
else:
try:
pd.to_datetime(self.data[col])
self.date_columns.append(col)
except:
self.categorical_columns.append(col)
def get_basic_stats(self) -> Dict[str, Any]:
if self.data is None:
return {}
stats = {
'summary': self.data[self.numeric_columns].describe(),
'missing_values': self.data.isnull().sum(),
'row_count': len(self.data),
'column_count': len(self.data.columns)
}
return stats
def create_visualization(self, chart_type: str, x_col: str, y_col: str, color_col: str = None) -> go.Figure:
if chart_type == "Line Plot":
fig = px.line(self.data, x=x_col, y=y_col, color=color_col)
elif chart_type == "Bar Plot":
fig = px.bar(self.data, x=x_col, y=y_col, color=color_col)
elif chart_type == "Scatter Plot":
fig = px.scatter(self.data, x=x_col, y=y_col, color=color_col)
elif chart_type == "Box Plot":
fig = px.box(self.data, x=x_col, y=y_col, color=color_col)
else:
fig = px.histogram(self.data, x=x_col, color=color_col)
return fig
class BrainstormManager:
def __init__(self):
if 'products' not in st.session_state:
st.session_state.products = {}
def generate_product_form(self) -> Dict:
with st.form("product_form"):
basic_info = {
"name": st.text_input("Product Name"),
"category": st.selectbox("Category", ["Digital", "Physical", "Service"]),
"description": st.text_area("Description"),
"target_audience": st.multiselect("Target Audience",
["Students", "Professionals", "Businesses", "Seniors", "Youth"]),
"price_range": st.slider("Price Range ($)", 0, 1000, (50, 200)),
"launch_date": st.date_input("Expected Launch Date")
}
st.subheader("Market Analysis")
market_analysis = {
"competitors": st.text_area("Main Competitors (one per line)"),
"unique_features": st.text_area("Unique Selling Points"),
"market_size": st.selectbox("Market Size",
["Small", "Medium", "Large", "Enterprise"]),
"growth_potential": st.slider("Growth Potential", 1, 10)
}
submitted = st.form_submit_button("Save Product")
return basic_info, market_analysis, submitted
def analyze_product(self, product_data: Dict) -> Dict:
insights = {
"market_opportunity": self._calculate_opportunity_score(product_data),
"suggested_price": self._suggest_price(product_data),
"risk_factors": self._identify_risks(product_data),
"next_steps": self._generate_next_steps(product_data)
}
return insights
def _calculate_opportunity_score(self, data: Dict) -> int:
score = 0
if data.get("market_size") == "Large":
score += 3
if len(data.get("target_audience", [])) >= 2:
score += 2
if data.get("growth_potential", 0) > 7:
score += 2
return min(score, 10)
def _suggest_price(self, data: Dict) -> float:
base_price = sum(data.get("price_range", (0, 0))) / 2
if data.get("market_size") == "Enterprise":
base_price *= 1.5
return round(base_price, 2)
def _identify_risks(self, data: Dict) -> List[str]:
risks = []
if data.get("competitors"):
risks.append("Competitive market - differentiation crucial")
if len(data.get("target_audience", [])) < 2:
risks.append("Narrow target audience - consider expansion")
return risks
def _generate_next_steps(self, data: Dict) -> List[str]:
steps = [
"Create detailed product specification",
"Develop MVP timeline",
"Plan marketing strategy"
]
if data.get("market_size") == "Enterprise":
steps.append("Prepare enterprise sales strategy")
return steps
# --- Sample Data Generation ---
def generate_sample_data():
dates = pd.date_range(start='2024-01-01', end='2024-01-31', freq='D')
return pd.DataFrame({
'Date': dates,
'Revenue': np.random.normal(1000, 100, len(dates)),
'Users': np.random.randint(100, 200, len(dates)),
'Engagement': np.random.uniform(0.5, 0.9, len(dates)),
'Category': np.random.choice(['A', 'B', 'C'], len(dates))
})
# --- Page Rendering Functions ---
def render_dashboard():
st.header("πŸ“Š Advanced Business Intelligence Dashboard")
# Enhanced Data Generation with Predictive Elements
def generate_predictive_data():
dates = pd.date_range(start='2024-01-01', end='2024-02-15', freq='D')
base_data = pd.DataFrame({
'Date': dates,
'Revenue': np.random.normal(1000, 100, len(dates)),
'Users': np.random.randint(100, 200, len(dates)),
'Engagement': np.random.uniform(0.5, 0.9, len(dates)),
'Category': np.random.choice(['Digital', 'Physical', 'Service'], len(dates))
})
# Simple predictive modeling
base_data['Predicted_Revenue'] = base_data['Revenue'] * np.linspace(1, 1.2, len(dates))
base_data['Revenue_Trend'] = np.where(base_data['Predicted_Revenue'] > base_data['Revenue'], 'Positive', 'Negative')
return base_data
# Data Preparation
data = generate_predictive_data()
# Sidebar Filters
st.sidebar.header("Dashboard Filters")
selected_categories = st.sidebar.multiselect(
"Select Categories",
options=data['Category'].unique(),
default=data['Category'].unique()
)
date_range = st.sidebar.date_input(
"Select Date Range",
[data['Date'].min(), data['Date'].max()]
)
# Filter Data
filtered_data = data[
(data['Category'].isin(selected_categories)) &
(data['Date'].between(date_range[0], date_range[1]))
]
# KPI Metrics
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("Total Revenue",
f"${filtered_data['Revenue'].sum():,.2f}",
delta=f"{filtered_data['Revenue'].pct_change().mean()*100:.2f}%")
with col2:
st.metric("Total Users",
f"{filtered_data['Users'].sum():,}",
delta=f"{filtered_data['Users'].pct_change().mean()*100:.2f}%")
with col3:
st.metric("Avg Engagement",
f"{filtered_data['Engagement'].mean():.2%}")
with col4:
st.metric("Predicted Trend",
filtered_data['Revenue_Trend'].mode()[0])
# Advanced Visualizations
col1, col2 = st.columns(2)
with col1:
st.subheader("Revenue Forecast")
forecast_fig = go.Figure()
forecast_fig.add_trace(go.Scatter(
x=filtered_data['Date'],
y=filtered_data['Revenue'],
mode='lines',
name='Actual Revenue',
line=dict(color='blue')
))
forecast_fig.add_trace(go.Scatter(
x=filtered_data['Date'],
y=filtered_data['Predicted_Revenue'],
mode='lines',
name='Predicted Revenue',
line=dict(color='red', dash='dot')
))
st.plotly_chart(forecast_fig, use_container_width=True)
with col2:
st.subheader("Category Performance")
category_performance = filtered_data.groupby('Category').agg({
'Revenue': ['sum', 'mean'],
'Users': 'sum',
'Engagement': 'mean'
}).reset_index()
category_performance.columns = ['Category', 'Total_Revenue', 'Avg_Revenue', 'Total_Users', 'Avg_Engagement']
perf_fig = px.bar(
category_performance,
x='Category',
y='Total_Revenue',
color='Avg_Engagement',
hover_data=['Total_Users', 'Avg_Revenue']
)
st.plotly_chart(perf_fig, use_container_width=True)
# Predictive Insights
st.subheader("Predictive Insights")
col1, col2 = st.columns(2)
with col1:
top_category = category_performance.loc[category_performance['Total_Revenue'].idxmax()]
st.metric("Top Revenue Category",
top_category['Category'],
delta=f"${top_category['Total_Revenue']:,.2f}")
with col2:
growth_prediction = filtered_data['Predicted_Revenue'].mean() / filtered_data['Revenue'].mean() - 1
st.metric("Revenue Growth Prediction",
f"{growth_prediction:.2%}")
def render_analytics():
st.header("πŸ” Data Analytics")
processor = DataProcessor()
uploaded_file = st.file_uploader("Upload your CSV data", type=['csv'])
if uploaded_file is not None:
if processor.load_data(uploaded_file):
st.success("Data loaded successfully!")
tabs = st.tabs(["Data Preview", "Statistics", "Visualization", "Metrics"])
with tabs[0]:
st.subheader("Data Preview")
st.dataframe(processor.data.head())
st.info(f"Total rows: {len(processor.data)}, Total columns: {len(processor.data.columns)}")
with tabs[1]:
st.subheader("Basic Statistics")
stats = processor.get_basic_stats()
st.write(stats['summary'])
st.subheader("Missing Values")
st.write(stats['missing_values'])
with tabs[2]:
st.subheader("Create Visualization")
col1, col2, col3 = st.columns(3)
with col1:
chart_type = st.selectbox(
"Select Chart Type",
["Line Plot", "Bar Plot", "Scatter Plot", "Box Plot", "Histogram"]
)
with col2:
x_col = st.selectbox("Select X-axis", processor.data.columns)
with col3:
y_col = st.selectbox("Select Y-axis", processor.numeric_columns) if chart_type != "Histogram" else None
color_col = st.selectbox("Select Color Variable (optional)",
['None'] + processor.categorical_columns)
color_col = None if color_col == 'None' else color_col
fig = processor.create_visualization(
chart_type,
x_col,
y_col if y_col else x_col,
color_col
)
st.plotly_chart(fig, use_container_width=True)
with tabs[3]:
st.subheader("Column Metrics")
selected_col = st.selectbox("Select column", processor.numeric_columns)
metrics = {
'Mean': processor.data[selected_col].mean(),
'Median': processor.data[selected_col].median(),
'Std Dev': processor.data[selected_col].std(),
'Min': processor.data[selected_col].min(),
'Max': processor.data[selected_col].max()
}
cols = st.columns(len(metrics))
for col, (metric, value) in zip(cols, metrics.items()):
col.metric(metric, f"{value:.2f}")
def render_brainstorm_page():
st.title("Product Brainstorm Hub")
manager = BrainstormManager()
action = st.sidebar.radio("Action", ["View Products", "Create New Product"])
if action == "Create New Product":
basic_info, market_analysis, submitted = manager.generate_product_form()
if submitted:
product_data = {**basic_info, **market_analysis}
insights = manager.analyze_product(product_data)
product_id = f"prod_{len(st.session_state.products)}"
st.session_state.products[product_id] = {
"data": product_data,
"insights": insights,
"created_at": str(datetime.now())
}
st.success("Product added! View insights in the Products tab.")
else:
if st.session_state.products:
for prod_id, product in st.session_state.products.items():
with st.expander(f"🎯 {product['data']['name']}"):
col1, col2 = st.columns(2)
with col1:
st.subheader("Product Details")
st.write(f"Category: {product['data']['category']}")
st.write(f"Target: {', '.join(product['data']['target_audience'])}")
st.write(f"Description: {product['data']['description']}")
with col2:
st.subheader("Insights")
st.metric("Opportunity Score", f"{product['insights']['market_opportunity']}/10")
st.metric("Suggested Price", f"${product['insights']['suggested_price']}")
st.write("**Risk Factors:**")
for risk in product['insights']['risk_factors']:
st.write(f"- {risk}")
st.write("**Next Steps:**")
for step in product['insights']['next_steps']:
st.write(f"- {step}")
else:
st.info("No products yet. Create one to get started!")
def render_chat():
st.header("πŸ’¬ Business Assistant")
if "messages" not in st.session_state:
st.session_state.messages = []
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
if prompt := st.chat_input("Ask about your business..."):
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
response = f"Thank you for your question about '{prompt}'. The LLM integration will be implemented soon."
with st.chat_message("assistant"):
st.markdown(response)
st.session_state.messages.append({"role": "assistant", "content": response})
def main():
st.set_page_config(
page_title="Prospira",
page_icon="πŸ“Š",
layout="wide",
initial_sidebar_state="expanded"
)
with st.sidebar:
st.title("Prospira")
st.subheader("Data-Driven Solutions")
page = st.radio(
"Navigation",
["Dashboard", "Analytics", "Brainstorm", "Chat"]
)
if page == "Dashboard":
render_dashboard()
elif page == "Analytics":
render_analytics()
elif page == "Brainstorm":
render_brainstorm_page()
elif page == "Chat":
render_chat()
if __name__ == "__main__":
main()