from flask import Blueprint, render_template, current_app, jsonify, request, redirect, url_for, flash from models import ( db, User, Model, Vote, EloHistory, ModelType, CoordinatedVotingCampaign, CampaignParticipant, UserTimeout, get_user_timeouts, get_coordinated_campaigns, resolve_campaign, create_user_timeout, cancel_user_timeout, check_user_timeout ) from auth import admin_required from security import check_user_security_score from sqlalchemy import func, desc, extract, text from datetime import datetime, timedelta import json import os from sqlalchemy import or_ admin = Blueprint("admin", __name__, url_prefix="/admin") @admin.route("/") @admin_required def index(): """Admin dashboard homepage""" # Get count statistics stats = { "total_users": User.query.count(), "total_votes": Vote.query.count(), "tts_votes": Vote.query.filter_by(model_type=ModelType.TTS).count(), "conversational_votes": Vote.query.filter_by(model_type=ModelType.CONVERSATIONAL).count(), "tts_models": Model.query.filter_by(model_type=ModelType.TTS).count(), "conversational_models": Model.query.filter_by(model_type=ModelType.CONVERSATIONAL).count(), } # Get recent votes recent_votes = Vote.query.order_by(Vote.vote_date.desc()).limit(10).all() # Get recent users recent_users = User.query.order_by(User.join_date.desc()).limit(10).all() # Get daily votes for the past 30 days thirty_days_ago = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=30) daily_votes = db.session.query( func.date(Vote.vote_date).label('date'), func.count().label('count') ).filter(Vote.vote_date >= thirty_days_ago).group_by( func.date(Vote.vote_date) ).order_by(func.date(Vote.vote_date)).all() # Generate a complete list of dates for the past 30 days date_list = [] current_date = datetime.utcnow() for i in range(30, -1, -1): date_list.append((current_date - timedelta(days=i)).date()) # Create a dictionary with actual vote counts vote_counts = {day.date: day.count for day in daily_votes} # Build complete datasets including days with zero votes formatted_dates = [date.strftime("%Y-%m-%d") for date in date_list] vote_counts_list = [vote_counts.get(date, 0) for date in date_list] daily_votes_data = { "labels": formatted_dates, "counts": vote_counts_list } # Get top models top_tts_models = Model.query.filter_by( model_type=ModelType.TTS ).order_by(Model.current_elo.desc()).limit(5).all() top_conversational_models = Model.query.filter_by( model_type=ModelType.CONVERSATIONAL ).order_by(Model.current_elo.desc()).limit(5).all() return render_template( "admin/index.html", stats=stats, recent_votes=recent_votes, recent_users=recent_users, daily_votes_data=json.dumps(daily_votes_data), top_tts_models=top_tts_models, top_conversational_models=top_conversational_models ) @admin.route("/models") @admin_required def models(): """Manage models""" tts_models = Model.query.filter_by(model_type=ModelType.TTS).order_by(Model.name).all() conversational_models = Model.query.filter_by(model_type=ModelType.CONVERSATIONAL).order_by(Model.name).all() return render_template( "admin/models.html", tts_models=tts_models, conversational_models=conversational_models ) @admin.route("/model/", methods=["GET", "POST"]) @admin_required def edit_model(model_id): """Edit a model""" model = Model.query.get_or_404(model_id) if request.method == "POST": model.name = request.form.get("name") model.is_active = "is_active" in request.form model.is_open = "is_open" in request.form model.model_url = request.form.get("model_url") db.session.commit() flash(f"Model '{model.name}' updated successfully", "success") return redirect(url_for("admin.models")) return render_template("admin/edit_model.html", model=model) @admin.route("/users") @admin_required def users(): """Manage users""" users = User.query.order_by(User.username).all() admin_users = os.getenv("ADMIN_USERS", "").split(",") admin_users = [username.strip() for username in admin_users] # Calculate security scores for all users users_with_scores = [] for user in users: score, factors = check_user_security_score(user.id) users_with_scores.append({ 'user': user, 'security_score': score, 'security_factors': factors }) # Sort by security score (lowest first to highlight problematic users) users_with_scores.sort(key=lambda x: x['security_score']) return render_template("admin/users.html", users_with_scores=users_with_scores, admin_users=admin_users) @admin.route("/user/") @admin_required def user_detail(user_id): """View user details""" user = User.query.get_or_404(user_id) # Get security score and factors security_score, security_factors = check_user_security_score(user_id) # Get user votes recent_votes = Vote.query.filter_by(user_id=user_id).order_by(Vote.vote_date.desc()).limit(20).all() # Get vote statistics tts_votes = Vote.query.filter_by(user_id=user_id, model_type=ModelType.TTS).count() conversational_votes = Vote.query.filter_by(user_id=user_id, model_type=ModelType.CONVERSATIONAL).count() # Get comprehensive model bias analysis # This counts how often each model was chosen vs how often it appeared model_bias_analysis = [] # Get all votes by this user user_votes = Vote.query.filter_by(user_id=user_id).all() if user_votes: model_stats = {} for vote in user_votes: # Track model_chosen chosen_id = vote.model_chosen rejected_id = vote.model_rejected # Initialize model stats if not exists if chosen_id not in model_stats: model_stats[chosen_id] = {'chosen': 0, 'appeared': 0, 'name': None} if rejected_id not in model_stats: model_stats[rejected_id] = {'chosen': 0, 'appeared': 0, 'name': None} # Count appearances and choices model_stats[chosen_id]['chosen'] += 1 model_stats[chosen_id]['appeared'] += 1 model_stats[rejected_id]['appeared'] += 1 # Get model names and calculate bias ratios for model_id, stats in model_stats.items(): model = Model.query.get(model_id) if model: stats['name'] = model.name stats['bias_ratio'] = stats['chosen'] / stats['appeared'] if stats['appeared'] > 0 else 0 stats['model_id'] = model_id # Sort by bias ratio (highest bias first) and take top 5 model_bias_analysis = sorted( [stats for stats in model_stats.values() if stats['name'] is not None], key=lambda x: x['bias_ratio'], reverse=True )[:5] return render_template( "admin/user_detail.html", user=user, security_score=security_score, security_factors=security_factors, recent_votes=recent_votes, tts_votes=tts_votes, conversational_votes=conversational_votes, model_bias_analysis=model_bias_analysis, total_votes=tts_votes + conversational_votes ) @admin.route("/votes") @admin_required def votes(): """View recent votes""" page = request.args.get('page', 1, type=int) per_page = 50 # Get votes with pagination votes_pagination = Vote.query.order_by( Vote.vote_date.desc() ).paginate(page=page, per_page=per_page) return render_template( "admin/votes.html", votes=votes_pagination.items, pagination=votes_pagination ) @admin.route("/statistics") @admin_required def statistics(): """View detailed statistics""" # Get daily votes for the past 30 days by model type thirty_days_ago = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) - timedelta(days=30) tts_daily_votes = db.session.query( func.date(Vote.vote_date).label('date'), func.count().label('count') ).filter( Vote.vote_date >= thirty_days_ago, Vote.model_type == ModelType.TTS ).group_by( func.date(Vote.vote_date) ).order_by(func.date(Vote.vote_date)).all() conv_daily_votes = db.session.query( func.date(Vote.vote_date).label('date'), func.count().label('count') ).filter( Vote.vote_date >= thirty_days_ago, Vote.model_type == ModelType.CONVERSATIONAL ).group_by( func.date(Vote.vote_date) ).order_by(func.date(Vote.vote_date)).all() # Monthly new users monthly_users = db.session.query( extract('year', User.join_date).label('year'), extract('month', User.join_date).label('month'), func.count().label('count') ).group_by( 'year', 'month' ).order_by('year', 'month').all() # Generate a complete list of dates for the past 30 days date_list = [] current_date = datetime.utcnow() for i in range(30, -1, -1): date_list.append((current_date - timedelta(days=i)).date()) # Create dictionaries with actual vote counts tts_vote_counts = {day.date: day.count for day in tts_daily_votes} conv_vote_counts = {day.date: day.count for day in conv_daily_votes} # Format dates consistently for charts formatted_dates = [date.strftime("%Y-%m-%d") for date in date_list] # Build complete datasets including days with zero votes tts_counts = [tts_vote_counts.get(date, 0) for date in date_list] conv_counts = [conv_vote_counts.get(date, 0) for date in date_list] # Generate all month/year combinations for the past 12 months current_date = datetime.utcnow() month_list = [] for i in range(11, -1, -1): past_date = current_date - timedelta(days=i*30) # Approximate month_list.append((past_date.year, past_date.month)) # Create a dictionary with actual user counts user_counts = {(record.year, record.month): record.count for record in monthly_users} # Build complete monthly datasets including months with zero new users monthly_labels = [f"{month}/{year}" for year, month in month_list] monthly_counts = [user_counts.get((year, month), 0) for year, month in month_list] # Model performance over time top_models = Model.query.order_by(Model.match_count.desc()).limit(5).all() # Get first and last timestamp to create a consistent timeline earliest = datetime.utcnow() - timedelta(days=30) # Default to 30 days ago latest = datetime.utcnow() # Default to now # Find actual earliest and latest timestamps across all models has_elo_history = False for model in top_models: first = EloHistory.query.filter_by(model_id=model.id).order_by(EloHistory.timestamp).first() last = EloHistory.query.filter_by(model_id=model.id).order_by(EloHistory.timestamp.desc()).first() if first and last: has_elo_history = True if first.timestamp < earliest: earliest = first.timestamp if last.timestamp > latest: latest = last.timestamp # If no history was found, use a default range of the last 30 days if not has_elo_history: earliest = datetime.utcnow() - timedelta(days=30) latest = datetime.utcnow() # Make sure the date range is valid (earliest before latest) if earliest > latest: earliest = latest - timedelta(days=30) # Generate a list of dates for the ELO history timeline # Using 1-day intervals for a smoother chart elo_dates = [] current = earliest while current <= latest: elo_dates.append(current.date()) current += timedelta(days=1) # Format dates consistently formatted_elo_dates = [date.strftime("%Y-%m-%d") for date in elo_dates] model_history = {} # Initialize empty data for all top models for model in top_models: model_history[model.name] = { "timestamps": formatted_elo_dates, "scores": [None] * len(formatted_elo_dates) # Initialize with None values } history = EloHistory.query.filter_by( model_id=model.id ).order_by(EloHistory.timestamp).all() if history: # Create a dictionary mapping dates to scores history_dict = {} for h in history: date_key = h.timestamp.date().strftime("%Y-%m-%d") history_dict[date_key] = h.elo_score # Fill in missing dates with the previous score last_score = model.current_elo # Default to current ELO if no history scores = [] for date in formatted_elo_dates: if date in history_dict: last_score = history_dict[date] scores.append(last_score) model_history[model.name]["scores"] = scores else: # If no history, use the current Elo for all dates model_history[model.name]["scores"] = [model.current_elo] * len(formatted_elo_dates) chart_data = { "dailyVotes": { "labels": formatted_dates, "ttsCounts": tts_counts, "convCounts": conv_counts }, "monthlyUsers": { "labels": monthly_labels, "counts": monthly_counts }, "modelHistory": model_history } return render_template( "admin/statistics.html", chart_data=json.dumps(chart_data) ) @admin.route("/activity") @admin_required def activity(): """View recent text generations""" # Check if we have any active sessions from app.py tts_session_count = 0 conversational_session_count = 0 # Access global variables from app.py through current_app if hasattr(current_app, 'tts_sessions'): tts_session_count = len(current_app.tts_sessions) else: # Try to access through app module from app import tts_sessions tts_session_count = len(tts_sessions) if hasattr(current_app, 'conversational_sessions'): conversational_session_count = len(current_app.conversational_sessions) else: # Try to access through app module from app import conversational_sessions conversational_session_count = len(conversational_sessions) # Get recent votes which represent completed generations recent_tts_votes = Vote.query.filter_by( model_type=ModelType.TTS ).order_by(Vote.vote_date.desc()).limit(20).all() recent_conv_votes = Vote.query.filter_by( model_type=ModelType.CONVERSATIONAL ).order_by(Vote.vote_date.desc()).limit(20).all() # Get votes per hour for the last 24 hours current_time = datetime.utcnow() last_24h = current_time.replace(minute=0, second=0, microsecond=0) - timedelta(hours=24) # Use SQLite-compatible date formatting hourly_votes = db.session.query( func.strftime('%Y-%m-%d %H:00', Vote.vote_date).label('hour'), func.count().label('count') ).filter( Vote.vote_date >= last_24h ).group_by('hour').order_by('hour').all() # Generate all hours for the past 24 hours with correct hour formatting hour_list = [] for i in range(24, -1, -1): # Calculate the hour time and truncate to hour hour_time = current_time - timedelta(hours=i) hour_time = hour_time.replace(minute=0, second=0, microsecond=0) hour_list.append(hour_time.strftime('%Y-%m-%d %H:00')) # Create a dictionary with actual vote counts vote_counts = {hour.hour: hour.count for hour in hourly_votes} # Build complete hourly datasets including hours with zero votes hourly_data = { "labels": hour_list, "counts": [vote_counts.get(hour, 0) for hour in hour_list] } return render_template( "admin/activity.html", tts_session_count=tts_session_count, conversational_session_count=conversational_session_count, recent_tts_votes=recent_tts_votes, recent_conv_votes=recent_conv_votes, hourly_data=json.dumps(hourly_data) ) @admin.route("/analytics") @admin_required def analytics(): """View analytics data including session duration, IP addresses, etc.""" # Get analytics statistics analytics_stats = {} try: # Session duration statistics duration_stats = db.session.execute(text(""" SELECT AVG(session_duration_seconds) as avg_duration, MIN(session_duration_seconds) as min_duration, MAX(session_duration_seconds) as max_duration, COUNT(session_duration_seconds) as total_with_duration FROM vote WHERE session_duration_seconds IS NOT NULL """)).fetchone() analytics_stats['duration'] = { 'avg': round(duration_stats.avg_duration, 2) if duration_stats.avg_duration else 0, 'min': round(duration_stats.min_duration, 2) if duration_stats.min_duration else 0, 'max': round(duration_stats.max_duration, 2) if duration_stats.max_duration else 0, 'total': duration_stats.total_with_duration or 0 } # Cache hit statistics cache_stats = db.session.execute(text(""" SELECT cache_hit, COUNT(*) as count FROM vote WHERE cache_hit IS NOT NULL GROUP BY cache_hit """)).fetchall() analytics_stats['cache'] = { 'hits': 0, 'misses': 0, 'total': 0 } for stat in cache_stats: if stat.cache_hit: analytics_stats['cache']['hits'] = stat.count else: analytics_stats['cache']['misses'] = stat.count analytics_stats['cache']['total'] += stat.count # Top IP address regions (anonymized) ip_stats = db.session.execute(text(""" SELECT ip_address_partial, COUNT(*) as count FROM vote WHERE ip_address_partial IS NOT NULL GROUP BY ip_address_partial ORDER BY count DESC LIMIT 10 """)).fetchall() analytics_stats['top_ips'] = [ {'ip': stat.ip_address_partial, 'count': stat.count} for stat in ip_stats ] # User agent statistics (top browsers/devices) ua_stats = db.session.execute(text(""" SELECT CASE WHEN user_agent LIKE '%Chrome%' THEN 'Chrome' WHEN user_agent LIKE '%Firefox%' THEN 'Firefox' WHEN user_agent LIKE '%Safari%' AND user_agent NOT LIKE '%Chrome%' THEN 'Safari' WHEN user_agent LIKE '%Edge%' THEN 'Edge' WHEN user_agent LIKE '%Mobile%' OR user_agent LIKE '%Android%' THEN 'Mobile' ELSE 'Other' END as browser, COUNT(*) as count FROM vote WHERE user_agent IS NOT NULL GROUP BY browser ORDER BY count DESC """)).fetchall() analytics_stats['browsers'] = [ {'browser': stat.browser, 'count': stat.count} for stat in ua_stats ] # Recent votes with analytics data recent_analytics = db.session.execute(text(""" SELECT v.id, v.vote_date, v.session_duration_seconds, v.ip_address_partial, v.cache_hit, v.model_type, u.username, m1.name as chosen_model, m2.name as rejected_model FROM vote v LEFT JOIN user u ON v.user_id = u.id LEFT JOIN model m1 ON v.model_chosen = m1.id LEFT JOIN model m2 ON v.model_rejected = m2.id WHERE v.session_duration_seconds IS NOT NULL ORDER BY v.vote_date DESC LIMIT 20 """)).fetchall() analytics_stats['recent_votes'] = [ { 'id': vote.id, 'vote_date': vote.vote_date, 'duration': round(vote.session_duration_seconds, 2) if vote.session_duration_seconds else None, 'ip': vote.ip_address_partial, 'cache_hit': vote.cache_hit, 'model_type': vote.model_type, 'username': vote.username, 'chosen_model': vote.chosen_model, 'rejected_model': vote.rejected_model } for vote in recent_analytics ] except Exception as e: flash(f"Error retrieving analytics data: {str(e)}", "error") analytics_stats = {} return render_template( "admin/analytics.html", analytics_stats=analytics_stats ) @admin.route("/security") @admin_required def security(): """View security monitoring data and suspicious activity.""" try: from security import ( detect_suspicious_voting_patterns, detect_coordinated_voting, check_user_security_score, detect_model_bias ) # Get recent suspicious users recent_users = User.query.order_by(User.join_date.desc()).limit(50).all() suspicious_users = [] for user in recent_users: score, factors = check_user_security_score(user.id) if score < 50: # Flag users with low security scores suspicious_users.append({ 'user': user, 'score': score, 'factors': factors }) # Sort by lowest score first suspicious_users.sort(key=lambda x: x['score']) # Check for coordinated voting on top models top_models = Model.query.order_by(Model.current_elo.desc()).limit(10).all() coordinated_campaigns = [] for model in top_models: is_coordinated, user_count, vote_count, suspicious_users_list = detect_coordinated_voting(model.id) if is_coordinated: coordinated_campaigns.append({ 'model': model, 'user_count': user_count, 'vote_count': vote_count, 'suspicious_users': suspicious_users_list }) # Get users with high model bias biased_users = [] for model in top_models: # Check recent voters for this model recent_voters = db.session.query(Vote.user_id).filter( Vote.model_chosen == model.id ).distinct().limit(20).all() for voter in recent_voters: if voter.user_id: is_biased, bias_ratio, votes_for_model, total_votes = detect_model_bias( voter.user_id, model.id ) if is_biased and total_votes >= 5: user = User.query.get(voter.user_id) if user: biased_users.append({ 'user': user, 'model': model, 'bias_ratio': bias_ratio, 'votes_for_model': votes_for_model, 'total_votes': total_votes }) # Remove duplicates and sort by bias ratio seen_users = set() unique_biased_users = [] for item in biased_users: user_model_key = (item['user'].id, item['model'].id) if user_model_key not in seen_users: seen_users.add(user_model_key) unique_biased_users.append(item) unique_biased_users.sort(key=lambda x: x['bias_ratio'], reverse=True) # Get recent security blocks from logs (if available) security_blocks = [] try: # This would require parsing application logs # For now, we'll show a placeholder pass except Exception: pass return render_template( "admin/security.html", suspicious_users=suspicious_users[:20], # Limit to top 20 coordinated_campaigns=coordinated_campaigns, biased_users=unique_biased_users[:20], # Limit to top 20 security_blocks=security_blocks ) except ImportError: flash("Security module not available", "error") return redirect(url_for("admin.index")) except Exception as e: flash(f"Error loading security data: {str(e)}", "error") return redirect(url_for("admin.index")) @admin.route("/timeouts") @admin_required def timeouts(): """Manage user timeouts""" # Get active timeouts active_timeouts = get_user_timeouts(active_only=True, limit=100) # Get recent expired/cancelled timeouts recent_inactive = UserTimeout.query.filter( or_( UserTimeout.is_active == False, UserTimeout.expires_at <= datetime.utcnow() ) ).order_by(UserTimeout.created_at.desc()).limit(50).all() # Get coordinated campaigns for context recent_campaigns = get_coordinated_campaigns(limit=20) return render_template( "admin/timeouts.html", active_timeouts=active_timeouts, recent_inactive=recent_inactive, recent_campaigns=recent_campaigns ) @admin.route("/timeout/create", methods=["POST"]) @admin_required def create_timeout(): """Create a new user timeout""" try: user_id = request.form.get("user_id", type=int) reason = request.form.get("reason", "").strip() timeout_type = request.form.get("timeout_type", "manual") duration_days = request.form.get("duration_days", type=int) if not all([user_id, reason, duration_days]): flash("All fields are required", "error") return redirect(url_for("admin.timeouts")) if duration_days < 1 or duration_days > 365: flash("Duration must be between 1 and 365 days", "error") return redirect(url_for("admin.timeouts")) # Check if user exists user = User.query.get(user_id) if not user: flash("User not found", "error") return redirect(url_for("admin.timeouts")) # Check if user already has an active timeout is_timed_out, existing_timeout = check_user_timeout(user_id) if is_timed_out: flash(f"User {user.username} already has an active timeout until {existing_timeout.expires_at}", "error") return redirect(url_for("admin.timeouts")) # Create timeout from flask_login import current_user timeout = create_user_timeout( user_id=user_id, reason=reason, timeout_type=timeout_type, duration_days=duration_days, created_by=current_user.id if current_user.is_authenticated else None ) flash(f"Timeout created for {user.username} (expires: {timeout.expires_at})", "success") except Exception as e: flash(f"Error creating timeout: {str(e)}", "error") return redirect(url_for("admin.timeouts")) @admin.route("/timeout/cancel/", methods=["POST"]) @admin_required def cancel_timeout(timeout_id): """Cancel an active timeout""" try: cancel_reason = request.form.get("cancel_reason", "").strip() if not cancel_reason: flash("Cancel reason is required", "error") return redirect(url_for("admin.timeouts")) from flask_login import current_user success, message = cancel_user_timeout( timeout_id=timeout_id, cancelled_by=current_user.id if current_user.is_authenticated else None, cancel_reason=cancel_reason ) if success: flash(message, "success") else: flash(message, "error") except Exception as e: flash(f"Error cancelling timeout: {str(e)}", "error") return redirect(url_for("admin.timeouts")) @admin.route("/campaigns") @admin_required def campaigns(): """View and manage coordinated voting campaigns""" status_filter = request.args.get("status", "all") if status_filter == "all": campaigns = get_coordinated_campaigns(limit=100) else: campaigns = get_coordinated_campaigns(status=status_filter, limit=100) # Get campaign statistics stats = { "total": CoordinatedVotingCampaign.query.count(), "active": CoordinatedVotingCampaign.query.filter_by(status="active").count(), "resolved": CoordinatedVotingCampaign.query.filter_by(status="resolved").count(), "false_positive": CoordinatedVotingCampaign.query.filter_by(status="false_positive").count(), } return render_template( "admin/campaigns.html", campaigns=campaigns, stats=stats, current_filter=status_filter ) @admin.route("/campaign/") @admin_required def campaign_detail(campaign_id): """View detailed information about a coordinated voting campaign""" campaign = CoordinatedVotingCampaign.query.get_or_404(campaign_id) # Get participants with user details participants = db.session.query(CampaignParticipant, User).join( User, CampaignParticipant.user_id == User.id ).filter(CampaignParticipant.campaign_id == campaign_id).all() # Get related timeouts related_timeouts = UserTimeout.query.filter_by( related_campaign_id=campaign_id ).all() return render_template( "admin/campaign_detail.html", campaign=campaign, participants=participants, related_timeouts=related_timeouts ) @admin.route("/campaign/resolve/", methods=["POST"]) @admin_required def resolve_campaign_route(campaign_id): """Mark a campaign as resolved""" try: status = request.form.get("status") admin_notes = request.form.get("admin_notes", "").strip() if status not in ["resolved", "false_positive"]: flash("Invalid status", "error") return redirect(url_for("admin.campaign_detail", campaign_id=campaign_id)) from flask_login import current_user success, message = resolve_campaign( campaign_id=campaign_id, resolved_by=current_user.id if current_user.is_authenticated else None, status=status, admin_notes=admin_notes ) if success: flash(f"Campaign marked as {status}", "success") else: flash(message, "error") except Exception as e: flash(f"Error resolving campaign: {str(e)}", "error") return redirect(url_for("admin.campaign_detail", campaign_id=campaign_id)) @admin.route("/api/user-search") @admin_required def user_search(): """Search for users by username (for timeout creation)""" query = request.args.get("q", "").strip() if len(query) < 2: return jsonify([]) users = User.query.filter( User.username.ilike(f"%{query}%") ).limit(10).all() return jsonify([{ "id": user.id, "username": user.username, "join_date": user.join_date.strftime("%Y-%m-%d") if user.join_date else "N/A" } for user in users])