Spaces:
Running
Running
""" | |
Unified Data Manager for GlycoAI - MODIFIED VERSION | |
Sarah now has unstable glucose values for demonstration | |
""" | |
import logging | |
from typing import Dict, Any, Optional, Tuple | |
import pandas as pd | |
from datetime import datetime, timedelta | |
from dataclasses import asdict | |
import numpy as np | |
import random | |
from apifunctions import ( | |
DexcomAPI, | |
GlucoseAnalyzer, | |
DEMO_USERS, | |
DemoUser | |
) | |
logger = logging.getLogger(__name__) | |
class UnifiedDataManager: | |
""" | |
MODIFIED: Unified data manager with Sarah having unstable glucose patterns | |
""" | |
def __init__(self): | |
self.dexcom_api = DexcomAPI() | |
self.analyzer = GlucoseAnalyzer() | |
logger.info(f"UnifiedDataManager initialized - Sarah will have unstable glucose patterns") | |
# Single source of truth for all data | |
self.current_user: Optional[DemoUser] = None | |
self.raw_glucose_data: Optional[list] = None | |
self.processed_glucose_data: Optional[pd.DataFrame] = None | |
self.calculated_stats: Optional[Dict] = None | |
self.identified_patterns: Optional[Dict] = None | |
# Metadata | |
self.data_loaded_at: Optional[datetime] = None | |
self.data_source: str = "none" # "dexcom_api", "mock", or "none" | |
def load_user_data(self, user_key: str, force_reload: bool = False) -> Dict[str, Any]: | |
""" | |
MODIFIED: Load glucose data with Sarah having unstable patterns | |
""" | |
# Check if we already have data for this user and it's recent | |
if (not force_reload and | |
self.current_user and | |
self.current_user == DEMO_USERS.get(user_key) and | |
self.data_loaded_at and | |
(datetime.now() - self.data_loaded_at).seconds < 300): # 5 minutes cache | |
logger.info(f"Using cached data for {user_key}") | |
return self._build_success_response() | |
try: | |
if user_key not in DEMO_USERS: | |
return { | |
"success": False, | |
"message": f"β Invalid user key '{user_key}'. Available: {', '.join(DEMO_USERS.keys())}" | |
} | |
logger.info(f"Loading data for user: {user_key}") | |
# Set current user | |
self.current_user = DEMO_USERS[user_key] | |
# Call API EXACTLY as it was working before | |
try: | |
logger.info(f"Attempting Dexcom API authentication for {user_key}") | |
# ORIGINAL WORKING METHOD: Use the simulate_demo_login exactly as before | |
access_token = self.dexcom_api.simulate_demo_login(user_key) | |
logger.info(f"Dexcom authentication result: {bool(access_token)}") | |
if access_token: | |
# ORIGINAL WORKING METHOD: Get data with 14-day range | |
end_date = datetime.now() | |
start_date = end_date - timedelta(days=14) | |
# Call get_egv_data EXACTLY as it was working before | |
self.raw_glucose_data = self.dexcom_api.get_egv_data( | |
start_date.isoformat(), | |
end_date.isoformat() | |
) | |
if self.raw_glucose_data and len(self.raw_glucose_data) > 0: | |
self.data_source = "dexcom_api" | |
logger.info(f"β Successfully loaded {len(self.raw_glucose_data)} readings from Dexcom API") | |
else: | |
logger.warning("Dexcom API returned empty data - falling back to mock data") | |
raise Exception("Empty data from Dexcom API") | |
else: | |
logger.warning("Failed to get access token - falling back to mock data") | |
raise Exception("Authentication failed") | |
except Exception as api_error: | |
logger.warning(f"Dexcom API failed ({str(api_error)}) - using mock data fallback") | |
self.raw_glucose_data = self._generate_realistic_mock_data(user_key) | |
self.data_source = "mock" | |
# Process the raw data (same processing for everyone) | |
self.processed_glucose_data = self.analyzer.process_egv_data(self.raw_glucose_data) | |
if self.processed_glucose_data is None or self.processed_glucose_data.empty: | |
return { | |
"success": False, | |
"message": "β Failed to process glucose data" | |
} | |
# Calculate statistics (single source of truth) | |
self.calculated_stats = self._calculate_unified_stats() | |
# Identify patterns | |
self.identified_patterns = self.analyzer.identify_patterns(self.processed_glucose_data) | |
# Mark when data was loaded | |
self.data_loaded_at = datetime.now() | |
logger.info(f"Successfully loaded and processed data for {self.current_user.name}") | |
logger.info(f"Data source: {self.data_source}, Readings: {len(self.processed_glucose_data)}") | |
logger.info(f"TIR: {self.calculated_stats.get('time_in_range_70_180', 0):.1f}%") | |
return self._build_success_response() | |
except Exception as e: | |
logger.error(f"Failed to load user data: {e}") | |
return { | |
"success": False, | |
"message": f"β Failed to load user data: {str(e)}" | |
} | |
def get_stats_for_ui(self) -> Dict[str, Any]: | |
"""Get statistics formatted for the UI display""" | |
if not self.calculated_stats: | |
return {} | |
return { | |
**self.calculated_stats, | |
"data_source": self.data_source, | |
"loaded_at": self.data_loaded_at.isoformat() if self.data_loaded_at else None, | |
"user_name": self.current_user.name if self.current_user else None | |
} | |
def get_context_for_agent(self) -> Dict[str, Any]: | |
"""Get context formatted for the AI agent""" | |
if not self.current_user or not self.calculated_stats: | |
return {"error": "No user data loaded"} | |
# Build agent context with the SAME data as UI | |
context = { | |
"user": { | |
"name": self.current_user.name, | |
"age": self.current_user.age, | |
"diabetes_type": self.current_user.diabetes_type, | |
"device_type": self.current_user.device_type, | |
"years_with_diabetes": self.current_user.years_with_diabetes, | |
"typical_pattern": getattr(self.current_user, 'typical_glucose_pattern', 'normal') | |
}, | |
"statistics": self._safe_convert_for_json(self.calculated_stats), | |
"patterns": self._safe_convert_for_json(self.identified_patterns), | |
"data_points": len(self.processed_glucose_data) if self.processed_glucose_data is not None else 0, | |
"recent_readings": self._get_recent_readings_for_agent(), | |
"data_metadata": { | |
"source": self.data_source, | |
"loaded_at": self.data_loaded_at.isoformat() if self.data_loaded_at else None, | |
"data_age_minutes": int((datetime.now() - self.data_loaded_at).total_seconds() / 60) if self.data_loaded_at else None | |
} | |
} | |
return context | |
def get_chart_data(self) -> Optional[pd.DataFrame]: | |
"""Get processed data for chart display""" | |
return self.processed_glucose_data | |
def _calculate_unified_stats(self) -> Dict[str, Any]: | |
"""Calculate statistics using a single, consistent method""" | |
if self.processed_glucose_data is None or self.processed_glucose_data.empty: | |
return {"error": "No data available"} | |
try: | |
# Get glucose values | |
glucose_values = self.processed_glucose_data['value'].dropna() | |
if len(glucose_values) == 0: | |
return {"error": "No valid glucose values"} | |
# Convert to numpy array for consistent calculations | |
import numpy as np | |
values = np.array(glucose_values.tolist(), dtype=float) | |
# Calculate basic statistics | |
avg_glucose = float(np.mean(values)) | |
min_glucose = float(np.min(values)) | |
max_glucose = float(np.max(values)) | |
std_glucose = float(np.std(values)) | |
total_readings = int(len(values)) | |
# Calculate time in ranges - CONSISTENT METHOD | |
in_range_mask = (values >= 70) & (values <= 180) | |
below_range_mask = values < 70 | |
above_range_mask = values > 180 | |
in_range_count = int(np.sum(in_range_mask)) | |
below_range_count = int(np.sum(below_range_mask)) | |
above_range_count = int(np.sum(above_range_mask)) | |
# Calculate percentages | |
time_in_range = (in_range_count / total_readings) * 100 if total_readings > 0 else 0 | |
time_below_70 = (below_range_count / total_readings) * 100 if total_readings > 0 else 0 | |
time_above_180 = (above_range_count / total_readings) * 100 if total_readings > 0 else 0 | |
# Calculate additional metrics | |
gmi = 3.31 + (0.02392 * avg_glucose) # Glucose Management Indicator | |
cv = (std_glucose / avg_glucose) * 100 if avg_glucose > 0 else 0 # Coefficient of Variation | |
stats = { | |
"average_glucose": avg_glucose, | |
"min_glucose": min_glucose, | |
"max_glucose": max_glucose, | |
"std_glucose": std_glucose, | |
"time_in_range_70_180": time_in_range, | |
"time_below_70": time_below_70, | |
"time_above_180": time_above_180, | |
"total_readings": total_readings, | |
"gmi": gmi, | |
"cv": cv, | |
"in_range_count": in_range_count, | |
"below_range_count": below_range_count, | |
"above_range_count": above_range_count | |
} | |
# Log for debugging | |
logger.info(f"Calculated stats - TIR: {time_in_range:.1f}%, Total: {total_readings}, In range: {in_range_count}") | |
return stats | |
except Exception as e: | |
logger.error(f"Error calculating unified stats: {e}") | |
return {"error": f"Statistics calculation failed: {str(e)}"} | |
def _generate_realistic_mock_data(self, user_key: str) -> list: | |
"""Generate realistic mock data with SARAH having UNSTABLE patterns""" | |
# MODIFIED: Sarah now has unstable glucose patterns | |
pattern_map = { | |
"sarah_g7": "unstable_high_variability", # CHANGED: Sarah now unstable | |
"marcus_one": "dawn_phenomenon", | |
"jennifer_g6": "normal", | |
"robert_receiver": "dawn_phenomenon" | |
} | |
user_pattern = pattern_map.get(user_key, "normal") | |
# Generate 14 days of data with specific patterns | |
if user_key == "sarah_g7": | |
# Generate UNSTABLE data for Sarah | |
mock_data = self._generate_unstable_glucose_data() | |
logger.info(f"Generated {len(mock_data)} UNSTABLE mock data points for Sarah") | |
else: | |
# Use normal patterns for other users | |
mock_data = self._create_realistic_pattern(days=14, user_type=user_pattern) | |
logger.info(f"Generated {len(mock_data)} mock data points for {user_key} with pattern {user_pattern}") | |
return mock_data | |
def _generate_unstable_glucose_data(self) -> list: | |
"""Generate highly variable, unstable glucose data for Sarah""" | |
readings = [] | |
now = datetime.now() | |
# Generate 14 days of unstable data (every 5 minutes) | |
total_minutes = 14 * 24 * 60 | |
interval_minutes = 5 | |
total_readings = total_minutes // interval_minutes | |
logger.info(f"Generating {total_readings} unstable glucose readings for Sarah") | |
for i in range(total_readings): | |
timestamp = now - timedelta(minutes=total_minutes - (i * interval_minutes)) | |
# Create highly variable glucose patterns | |
hour = timestamp.hour | |
day_of_week = timestamp.weekday() | |
# Base glucose with high variability | |
if hour >= 6 and hour <= 8: # Morning - dawn phenomenon + high variability | |
base_glucose = random.uniform(140, 220) | |
variability = random.uniform(-40, 60) | |
elif hour >= 12 and hour <= 14: # Lunch - post-meal spikes | |
base_glucose = random.uniform(120, 280) | |
variability = random.uniform(-30, 80) | |
elif hour >= 18 and hour <= 20: # Dinner - high spikes | |
base_glucose = random.uniform(130, 300) | |
variability = random.uniform(-50, 70) | |
elif hour >= 22 or hour <= 4: # Night - unpredictable lows and highs | |
base_glucose = random.uniform(60, 200) | |
variability = random.uniform(-30, 50) | |
else: # Other times - still unstable | |
base_glucose = random.uniform(80, 220) | |
variability = random.uniform(-40, 60) | |
# Add weekend effect (even more unstable) | |
if day_of_week >= 5: # Weekend | |
base_glucose += random.uniform(-20, 40) | |
variability += random.uniform(-20, 30) | |
# Add random noise for high variability | |
noise = random.uniform(-25, 25) | |
glucose_value = base_glucose + variability + noise | |
# Ensure realistic bounds but allow extreme values | |
glucose_value = max(40, min(400, glucose_value)) | |
# Add some random severe lows and highs | |
if random.random() < 0.05: # 5% chance of severe events | |
if random.random() < 0.5: | |
glucose_value = random.uniform(45, 65) # Severe low | |
else: | |
glucose_value = random.uniform(280, 350) # Severe high | |
# Determine trend based on glucose change | |
if i > 0: | |
prev_glucose = readings[-1]['value'] | |
glucose_change = glucose_value - prev_glucose | |
if glucose_change > 15: | |
trend = "rising_rapidly" | |
elif glucose_change > 5: | |
trend = "rising" | |
elif glucose_change < -15: | |
trend = "falling_rapidly" | |
elif glucose_change < -5: | |
trend = "falling" | |
else: | |
trend = "flat" | |
else: | |
trend = "flat" | |
reading = { | |
"systemTime": timestamp.isoformat(), | |
"displayTime": timestamp.isoformat(), | |
"value": round(glucose_value, 1), | |
"trend": trend, | |
"realtimeValue": round(glucose_value, 1), | |
"smoothedValue": round(glucose_value * 0.9 + random.uniform(-5, 5), 1) | |
} | |
readings.append(reading) | |
# Log statistics of generated data | |
values = [r['value'] for r in readings] | |
avg_glucose = np.mean(values) | |
std_glucose = np.std(values) | |
cv = (std_glucose / avg_glucose) * 100 | |
in_range = sum(1 for v in values if 70 <= v <= 180) | |
below_range = sum(1 for v in values if v < 70) | |
above_range = sum(1 for v in values if v > 180) | |
tir = (in_range / len(values)) * 100 | |
tbr = (below_range / len(values)) * 100 | |
tar = (above_range / len(values)) * 100 | |
logger.info(f"Sarah's UNSTABLE data generated:") | |
logger.info(f" Average: {avg_glucose:.1f} mg/dL") | |
logger.info(f" CV: {cv:.1f}% (VERY HIGH)") | |
logger.info(f" TIR: {tir:.1f}% (LOW)") | |
logger.info(f" TBR: {tbr:.1f}% (HIGH)") | |
logger.info(f" TAR: {tar:.1f}% (HIGH)") | |
return readings | |
def _create_realistic_pattern(self, days: int = 14, user_type: str = "normal") -> list: | |
"""Create realistic glucose patterns for non-Sarah users""" | |
readings = [] | |
now = datetime.now() | |
# Generate data every 5 minutes | |
total_minutes = days * 24 * 60 | |
interval_minutes = 5 | |
total_readings = total_minutes // interval_minutes | |
for i in range(total_readings): | |
timestamp = now - timedelta(minutes=total_minutes - (i * interval_minutes)) | |
hour = timestamp.hour | |
# Base patterns for different user types | |
if user_type == "dawn_phenomenon": | |
if hour >= 6 and hour <= 8: # Dawn phenomenon | |
base_glucose = random.uniform(150, 190) | |
elif hour >= 12 and hour <= 14: # Post lunch | |
base_glucose = random.uniform(140, 180) | |
elif hour >= 18 and hour <= 20: # Post dinner | |
base_glucose = random.uniform(130, 170) | |
else: | |
base_glucose = random.uniform(90, 140) | |
else: # Normal pattern | |
if hour >= 12 and hour <= 14: # Post lunch | |
base_glucose = random.uniform(120, 160) | |
elif hour >= 18 and hour <= 20: # Post dinner | |
base_glucose = random.uniform(110, 150) | |
else: | |
base_glucose = random.uniform(80, 120) | |
# Add moderate variability | |
glucose_value = base_glucose + random.uniform(-15, 15) | |
glucose_value = max(70, min(250, glucose_value)) | |
reading = { | |
"systemTime": timestamp.isoformat(), | |
"displayTime": timestamp.isoformat(), | |
"value": round(glucose_value, 1), | |
"trend": "flat", | |
"realtimeValue": round(glucose_value, 1), | |
"smoothedValue": round(glucose_value, 1) | |
} | |
readings.append(reading) | |
return readings | |
def _get_recent_readings_for_agent(self, count: int = 5) -> list: | |
"""Get recent readings formatted for agent context""" | |
if self.processed_glucose_data is None or self.processed_glucose_data.empty: | |
return [] | |
try: | |
recent_df = self.processed_glucose_data.tail(count) | |
readings = [] | |
for _, row in recent_df.iterrows(): | |
display_time = row.get('displayTime') or row.get('systemTime') | |
glucose_value = row.get('value') | |
trend_value = row.get('trend', 'flat') | |
if pd.notna(display_time): | |
if isinstance(display_time, str): | |
time_str = display_time | |
else: | |
time_str = pd.to_datetime(display_time).isoformat() | |
else: | |
time_str = datetime.now().isoformat() | |
if pd.notna(glucose_value): | |
glucose_clean = self._safe_convert_for_json(glucose_value) | |
else: | |
glucose_clean = None | |
trend_clean = str(trend_value) if pd.notna(trend_value) else 'flat' | |
readings.append({ | |
"time": time_str, | |
"glucose": glucose_clean, | |
"trend": trend_clean | |
}) | |
return readings | |
except Exception as e: | |
logger.error(f"Error getting recent readings: {e}") | |
return [] | |
def _safe_convert_for_json(self, obj): | |
"""Safely convert objects for JSON serialization""" | |
import numpy as np | |
if obj is None: | |
return None | |
elif isinstance(obj, (np.integer, np.int64, np.int32)): | |
return int(obj) | |
elif isinstance(obj, (np.floating, np.float64, np.float32)): | |
if np.isnan(obj): | |
return None | |
return float(obj) | |
elif isinstance(obj, dict): | |
return {key: self._safe_convert_for_json(value) for key, value in obj.items()} | |
elif isinstance(obj, list): | |
return [self._safe_convert_for_json(item) for item in obj] | |
elif isinstance(obj, pd.Timestamp): | |
return obj.isoformat() | |
else: | |
return obj | |
def _build_success_response(self) -> Dict[str, Any]: | |
"""Build a consistent success response""" | |
data_points = len(self.processed_glucose_data) if self.processed_glucose_data is not None else 0 | |
avg_glucose = self.calculated_stats.get('average_glucose', 0) | |
time_in_range = self.calculated_stats.get('time_in_range_70_180', 0) | |
return { | |
"success": True, | |
"message": f"β Successfully loaded data for {self.current_user.name}", | |
"user": asdict(self.current_user), | |
"data_points": data_points, | |
"stats": self.calculated_stats, | |
"data_source": self.data_source, | |
"summary": f"π {data_points} readings | Avg: {avg_glucose:.1f} mg/dL | TIR: {time_in_range:.1f}% | Source: {self.data_source}" | |
} | |
def validate_data_consistency(self) -> Dict[str, Any]: | |
"""Validate that all components are using consistent data""" | |
if not self.calculated_stats: | |
return {"valid": False, "message": "No data loaded"} | |
validation = { | |
"valid": True, | |
"data_source": self.data_source, | |
"data_age_minutes": int((datetime.now() - self.data_loaded_at).total_seconds() / 60) if self.data_loaded_at else None, | |
"total_readings": self.calculated_stats.get('total_readings', 0), | |
"time_in_range": self.calculated_stats.get('time_in_range_70_180', 0), | |
"average_glucose": self.calculated_stats.get('average_glucose', 0), | |
"user": self.current_user.name if self.current_user else None | |
} | |
logger.info(f"Data consistency check: {validation}") | |
return validation | |
# ADDITIONAL: Debug function to test the API connection as it was working before | |
def test_original_api_method(): | |
"""Test the API exactly as it was working before unified data manager""" | |
from apifunctions import DexcomAPI, DEMO_USERS | |
print("π Testing API exactly as it was working before...") | |
api = DexcomAPI() | |
# Test with sarah_g7 as it was working before | |
user_key = "sarah_g7" | |
user = DEMO_USERS[user_key] | |
print(f"Testing with {user.name} ({user.username}) - NOW WITH UNSTABLE GLUCOSE") | |
try: | |
# Call simulate_demo_login exactly as before | |
access_token = api.simulate_demo_login(user_key) | |
print(f"β Authentication: {bool(access_token)}") | |
if access_token: | |
# Call get_egv_data exactly as before | |
end_date = datetime.now() | |
start_date = end_date - timedelta(days=14) | |
egv_data = api.get_egv_data( | |
start_date.isoformat(), | |
end_date.isoformat() | |
) | |
print(f"β EGV Data: {len(egv_data)} readings") | |
if egv_data: | |
print(f"β SUCCESS! API is working as before (with Sarah's unstable patterns)") | |
sample = egv_data[0] if egv_data else {} | |
print(f"Sample reading: {sample}") | |
return True | |
else: | |
print("β οΈ API authenticated but returned no data") | |
return False | |
else: | |
print("β Authentication failed") | |
return False | |
except Exception as e: | |
print(f"β Error: {e}") | |
return False | |
if __name__ == "__main__": | |
# Test the original API method | |
test_original_api_method() |