""" Unified Data Manager for GlycoAI - MODIFIED VERSION Sarah now has unstable glucose values for demonstration """ import logging from typing import Dict, Any, Optional, Tuple import pandas as pd from datetime import datetime, timedelta from dataclasses import asdict import numpy as np import random from apifunctions import ( DexcomAPI, GlucoseAnalyzer, DEMO_USERS, DemoUser ) logger = logging.getLogger(__name__) class UnifiedDataManager: """ MODIFIED: Unified data manager with Sarah having unstable glucose patterns """ def __init__(self): self.dexcom_api = DexcomAPI() self.analyzer = GlucoseAnalyzer() logger.info(f"UnifiedDataManager initialized - Sarah will have unstable glucose patterns") # Single source of truth for all data self.current_user: Optional[DemoUser] = None self.raw_glucose_data: Optional[list] = None self.processed_glucose_data: Optional[pd.DataFrame] = None self.calculated_stats: Optional[Dict] = None self.identified_patterns: Optional[Dict] = None # Metadata self.data_loaded_at: Optional[datetime] = None self.data_source: str = "none" # "dexcom_api", "mock", or "none" def load_user_data(self, user_key: str, force_reload: bool = False) -> Dict[str, Any]: """ MODIFIED: Load glucose data with Sarah having unstable patterns """ # Check if we already have data for this user and it's recent if (not force_reload and self.current_user and self.current_user == DEMO_USERS.get(user_key) and self.data_loaded_at and (datetime.now() - self.data_loaded_at).seconds < 300): # 5 minutes cache logger.info(f"Using cached data for {user_key}") return self._build_success_response() try: if user_key not in DEMO_USERS: return { "success": False, "message": f"❌ Invalid user key '{user_key}'. Available: {', '.join(DEMO_USERS.keys())}" } logger.info(f"Loading data for user: {user_key}") # Set current user self.current_user = DEMO_USERS[user_key] # Call API EXACTLY as it was working before try: logger.info(f"Attempting Dexcom API authentication for {user_key}") # ORIGINAL WORKING METHOD: Use the simulate_demo_login exactly as before access_token = self.dexcom_api.simulate_demo_login(user_key) logger.info(f"Dexcom authentication result: {bool(access_token)}") if access_token: # ORIGINAL WORKING METHOD: Get data with 14-day range end_date = datetime.now() start_date = end_date - timedelta(days=14) # Call get_egv_data EXACTLY as it was working before self.raw_glucose_data = self.dexcom_api.get_egv_data( start_date.isoformat(), end_date.isoformat() ) if self.raw_glucose_data and len(self.raw_glucose_data) > 0: self.data_source = "dexcom_api" logger.info(f"✅ Successfully loaded {len(self.raw_glucose_data)} readings from Dexcom API") else: logger.warning("Dexcom API returned empty data - falling back to mock data") raise Exception("Empty data from Dexcom API") else: logger.warning("Failed to get access token - falling back to mock data") raise Exception("Authentication failed") except Exception as api_error: logger.warning(f"Dexcom API failed ({str(api_error)}) - using mock data fallback") self.raw_glucose_data = self._generate_realistic_mock_data(user_key) self.data_source = "mock" # Process the raw data (same processing for everyone) self.processed_glucose_data = self.analyzer.process_egv_data(self.raw_glucose_data) if self.processed_glucose_data is None or self.processed_glucose_data.empty: return { "success": False, "message": "❌ Failed to process glucose data" } # Calculate statistics (single source of truth) self.calculated_stats = self._calculate_unified_stats() # Identify patterns self.identified_patterns = self.analyzer.identify_patterns(self.processed_glucose_data) # Mark when data was loaded self.data_loaded_at = datetime.now() logger.info(f"Successfully loaded and processed data for {self.current_user.name}") logger.info(f"Data source: {self.data_source}, Readings: {len(self.processed_glucose_data)}") logger.info(f"TIR: {self.calculated_stats.get('time_in_range_70_180', 0):.1f}%") return self._build_success_response() except Exception as e: logger.error(f"Failed to load user data: {e}") return { "success": False, "message": f"❌ Failed to load user data: {str(e)}" } def get_stats_for_ui(self) -> Dict[str, Any]: """Get statistics formatted for the UI display""" if not self.calculated_stats: return {} return { **self.calculated_stats, "data_source": self.data_source, "loaded_at": self.data_loaded_at.isoformat() if self.data_loaded_at else None, "user_name": self.current_user.name if self.current_user else None } def get_context_for_agent(self) -> Dict[str, Any]: """Get context formatted for the AI agent""" if not self.current_user or not self.calculated_stats: return {"error": "No user data loaded"} # Build agent context with the SAME data as UI context = { "user": { "name": self.current_user.name, "age": self.current_user.age, "diabetes_type": self.current_user.diabetes_type, "device_type": self.current_user.device_type, "years_with_diabetes": self.current_user.years_with_diabetes, "typical_pattern": getattr(self.current_user, 'typical_glucose_pattern', 'normal') }, "statistics": self._safe_convert_for_json(self.calculated_stats), "patterns": self._safe_convert_for_json(self.identified_patterns), "data_points": len(self.processed_glucose_data) if self.processed_glucose_data is not None else 0, "recent_readings": self._get_recent_readings_for_agent(), "data_metadata": { "source": self.data_source, "loaded_at": self.data_loaded_at.isoformat() if self.data_loaded_at else None, "data_age_minutes": int((datetime.now() - self.data_loaded_at).total_seconds() / 60) if self.data_loaded_at else None } } return context def get_chart_data(self) -> Optional[pd.DataFrame]: """Get processed data for chart display""" return self.processed_glucose_data def _calculate_unified_stats(self) -> Dict[str, Any]: """Calculate statistics using a single, consistent method""" if self.processed_glucose_data is None or self.processed_glucose_data.empty: return {"error": "No data available"} try: # Get glucose values glucose_values = self.processed_glucose_data['value'].dropna() if len(glucose_values) == 0: return {"error": "No valid glucose values"} # Convert to numpy array for consistent calculations import numpy as np values = np.array(glucose_values.tolist(), dtype=float) # Calculate basic statistics avg_glucose = float(np.mean(values)) min_glucose = float(np.min(values)) max_glucose = float(np.max(values)) std_glucose = float(np.std(values)) total_readings = int(len(values)) # Calculate time in ranges - CONSISTENT METHOD in_range_mask = (values >= 70) & (values <= 180) below_range_mask = values < 70 above_range_mask = values > 180 in_range_count = int(np.sum(in_range_mask)) below_range_count = int(np.sum(below_range_mask)) above_range_count = int(np.sum(above_range_mask)) # Calculate percentages time_in_range = (in_range_count / total_readings) * 100 if total_readings > 0 else 0 time_below_70 = (below_range_count / total_readings) * 100 if total_readings > 0 else 0 time_above_180 = (above_range_count / total_readings) * 100 if total_readings > 0 else 0 # Calculate additional metrics gmi = 3.31 + (0.02392 * avg_glucose) # Glucose Management Indicator cv = (std_glucose / avg_glucose) * 100 if avg_glucose > 0 else 0 # Coefficient of Variation stats = { "average_glucose": avg_glucose, "min_glucose": min_glucose, "max_glucose": max_glucose, "std_glucose": std_glucose, "time_in_range_70_180": time_in_range, "time_below_70": time_below_70, "time_above_180": time_above_180, "total_readings": total_readings, "gmi": gmi, "cv": cv, "in_range_count": in_range_count, "below_range_count": below_range_count, "above_range_count": above_range_count } # Log for debugging logger.info(f"Calculated stats - TIR: {time_in_range:.1f}%, Total: {total_readings}, In range: {in_range_count}") return stats except Exception as e: logger.error(f"Error calculating unified stats: {e}") return {"error": f"Statistics calculation failed: {str(e)}"} def _generate_realistic_mock_data(self, user_key: str) -> list: """Generate realistic mock data with SARAH having UNSTABLE patterns""" # MODIFIED: Sarah now has unstable glucose patterns pattern_map = { "sarah_g7": "unstable_high_variability", # CHANGED: Sarah now unstable "marcus_one": "dawn_phenomenon", "jennifer_g6": "normal", "robert_receiver": "dawn_phenomenon" } user_pattern = pattern_map.get(user_key, "normal") # Generate 14 days of data with specific patterns if user_key == "sarah_g7": # Generate UNSTABLE data for Sarah mock_data = self._generate_unstable_glucose_data() logger.info(f"Generated {len(mock_data)} UNSTABLE mock data points for Sarah") else: # Use normal patterns for other users mock_data = self._create_realistic_pattern(days=14, user_type=user_pattern) logger.info(f"Generated {len(mock_data)} mock data points for {user_key} with pattern {user_pattern}") return mock_data def _generate_unstable_glucose_data(self) -> list: """Generate highly variable, unstable glucose data for Sarah""" readings = [] now = datetime.now() # Generate 14 days of unstable data (every 5 minutes) total_minutes = 14 * 24 * 60 interval_minutes = 5 total_readings = total_minutes // interval_minutes logger.info(f"Generating {total_readings} unstable glucose readings for Sarah") for i in range(total_readings): timestamp = now - timedelta(minutes=total_minutes - (i * interval_minutes)) # Create highly variable glucose patterns hour = timestamp.hour day_of_week = timestamp.weekday() # Base glucose with high variability if hour >= 6 and hour <= 8: # Morning - dawn phenomenon + high variability base_glucose = random.uniform(140, 220) variability = random.uniform(-40, 60) elif hour >= 12 and hour <= 14: # Lunch - post-meal spikes base_glucose = random.uniform(120, 280) variability = random.uniform(-30, 80) elif hour >= 18 and hour <= 20: # Dinner - high spikes base_glucose = random.uniform(130, 300) variability = random.uniform(-50, 70) elif hour >= 22 or hour <= 4: # Night - unpredictable lows and highs base_glucose = random.uniform(60, 200) variability = random.uniform(-30, 50) else: # Other times - still unstable base_glucose = random.uniform(80, 220) variability = random.uniform(-40, 60) # Add weekend effect (even more unstable) if day_of_week >= 5: # Weekend base_glucose += random.uniform(-20, 40) variability += random.uniform(-20, 30) # Add random noise for high variability noise = random.uniform(-25, 25) glucose_value = base_glucose + variability + noise # Ensure realistic bounds but allow extreme values glucose_value = max(40, min(400, glucose_value)) # Add some random severe lows and highs if random.random() < 0.05: # 5% chance of severe events if random.random() < 0.5: glucose_value = random.uniform(45, 65) # Severe low else: glucose_value = random.uniform(280, 350) # Severe high # Determine trend based on glucose change if i > 0: prev_glucose = readings[-1]['value'] glucose_change = glucose_value - prev_glucose if glucose_change > 15: trend = "rising_rapidly" elif glucose_change > 5: trend = "rising" elif glucose_change < -15: trend = "falling_rapidly" elif glucose_change < -5: trend = "falling" else: trend = "flat" else: trend = "flat" reading = { "systemTime": timestamp.isoformat(), "displayTime": timestamp.isoformat(), "value": round(glucose_value, 1), "trend": trend, "realtimeValue": round(glucose_value, 1), "smoothedValue": round(glucose_value * 0.9 + random.uniform(-5, 5), 1) } readings.append(reading) # Log statistics of generated data values = [r['value'] for r in readings] avg_glucose = np.mean(values) std_glucose = np.std(values) cv = (std_glucose / avg_glucose) * 100 in_range = sum(1 for v in values if 70 <= v <= 180) below_range = sum(1 for v in values if v < 70) above_range = sum(1 for v in values if v > 180) tir = (in_range / len(values)) * 100 tbr = (below_range / len(values)) * 100 tar = (above_range / len(values)) * 100 logger.info(f"Sarah's UNSTABLE data generated:") logger.info(f" Average: {avg_glucose:.1f} mg/dL") logger.info(f" CV: {cv:.1f}% (VERY HIGH)") logger.info(f" TIR: {tir:.1f}% (LOW)") logger.info(f" TBR: {tbr:.1f}% (HIGH)") logger.info(f" TAR: {tar:.1f}% (HIGH)") return readings def _create_realistic_pattern(self, days: int = 14, user_type: str = "normal") -> list: """Create realistic glucose patterns for non-Sarah users""" readings = [] now = datetime.now() # Generate data every 5 minutes total_minutes = days * 24 * 60 interval_minutes = 5 total_readings = total_minutes // interval_minutes for i in range(total_readings): timestamp = now - timedelta(minutes=total_minutes - (i * interval_minutes)) hour = timestamp.hour # Base patterns for different user types if user_type == "dawn_phenomenon": if hour >= 6 and hour <= 8: # Dawn phenomenon base_glucose = random.uniform(150, 190) elif hour >= 12 and hour <= 14: # Post lunch base_glucose = random.uniform(140, 180) elif hour >= 18 and hour <= 20: # Post dinner base_glucose = random.uniform(130, 170) else: base_glucose = random.uniform(90, 140) else: # Normal pattern if hour >= 12 and hour <= 14: # Post lunch base_glucose = random.uniform(120, 160) elif hour >= 18 and hour <= 20: # Post dinner base_glucose = random.uniform(110, 150) else: base_glucose = random.uniform(80, 120) # Add moderate variability glucose_value = base_glucose + random.uniform(-15, 15) glucose_value = max(70, min(250, glucose_value)) reading = { "systemTime": timestamp.isoformat(), "displayTime": timestamp.isoformat(), "value": round(glucose_value, 1), "trend": "flat", "realtimeValue": round(glucose_value, 1), "smoothedValue": round(glucose_value, 1) } readings.append(reading) return readings def _get_recent_readings_for_agent(self, count: int = 5) -> list: """Get recent readings formatted for agent context""" if self.processed_glucose_data is None or self.processed_glucose_data.empty: return [] try: recent_df = self.processed_glucose_data.tail(count) readings = [] for _, row in recent_df.iterrows(): display_time = row.get('displayTime') or row.get('systemTime') glucose_value = row.get('value') trend_value = row.get('trend', 'flat') if pd.notna(display_time): if isinstance(display_time, str): time_str = display_time else: time_str = pd.to_datetime(display_time).isoformat() else: time_str = datetime.now().isoformat() if pd.notna(glucose_value): glucose_clean = self._safe_convert_for_json(glucose_value) else: glucose_clean = None trend_clean = str(trend_value) if pd.notna(trend_value) else 'flat' readings.append({ "time": time_str, "glucose": glucose_clean, "trend": trend_clean }) return readings except Exception as e: logger.error(f"Error getting recent readings: {e}") return [] def _safe_convert_for_json(self, obj): """Safely convert objects for JSON serialization""" import numpy as np if obj is None: return None elif isinstance(obj, (np.integer, np.int64, np.int32)): return int(obj) elif isinstance(obj, (np.floating, np.float64, np.float32)): if np.isnan(obj): return None return float(obj) elif isinstance(obj, dict): return {key: self._safe_convert_for_json(value) for key, value in obj.items()} elif isinstance(obj, list): return [self._safe_convert_for_json(item) for item in obj] elif isinstance(obj, pd.Timestamp): return obj.isoformat() else: return obj def _build_success_response(self) -> Dict[str, Any]: """Build a consistent success response""" data_points = len(self.processed_glucose_data) if self.processed_glucose_data is not None else 0 avg_glucose = self.calculated_stats.get('average_glucose', 0) time_in_range = self.calculated_stats.get('time_in_range_70_180', 0) return { "success": True, "message": f"✅ Successfully loaded data for {self.current_user.name}", "user": asdict(self.current_user), "data_points": data_points, "stats": self.calculated_stats, "data_source": self.data_source, "summary": f"📊 {data_points} readings | Avg: {avg_glucose:.1f} mg/dL | TIR: {time_in_range:.1f}% | Source: {self.data_source}" } def validate_data_consistency(self) -> Dict[str, Any]: """Validate that all components are using consistent data""" if not self.calculated_stats: return {"valid": False, "message": "No data loaded"} validation = { "valid": True, "data_source": self.data_source, "data_age_minutes": int((datetime.now() - self.data_loaded_at).total_seconds() / 60) if self.data_loaded_at else None, "total_readings": self.calculated_stats.get('total_readings', 0), "time_in_range": self.calculated_stats.get('time_in_range_70_180', 0), "average_glucose": self.calculated_stats.get('average_glucose', 0), "user": self.current_user.name if self.current_user else None } logger.info(f"Data consistency check: {validation}") return validation # ADDITIONAL: Debug function to test the API connection as it was working before def test_original_api_method(): """Test the API exactly as it was working before unified data manager""" from apifunctions import DexcomAPI, DEMO_USERS print("🔍 Testing API exactly as it was working before...") api = DexcomAPI() # Test with sarah_g7 as it was working before user_key = "sarah_g7" user = DEMO_USERS[user_key] print(f"Testing with {user.name} ({user.username}) - NOW WITH UNSTABLE GLUCOSE") try: # Call simulate_demo_login exactly as before access_token = api.simulate_demo_login(user_key) print(f"✅ Authentication: {bool(access_token)}") if access_token: # Call get_egv_data exactly as before end_date = datetime.now() start_date = end_date - timedelta(days=14) egv_data = api.get_egv_data( start_date.isoformat(), end_date.isoformat() ) print(f"✅ EGV Data: {len(egv_data)} readings") if egv_data: print(f"✅ SUCCESS! API is working as before (with Sarah's unstable patterns)") sample = egv_data[0] if egv_data else {} print(f"Sample reading: {sample}") return True else: print("⚠️ API authenticated but returned no data") return False else: print("❌ Authentication failed") return False except Exception as e: print(f"❌ Error: {e}") return False if __name__ == "__main__": # Test the original API method test_original_api_method()