Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Dexcom Client for glucose data analysis - RESTORED WORKING VERSION | |
This restores the original working API code before unified data manager | |
""" | |
import json | |
import logging | |
import sys | |
from typing import Any, Dict, List, Optional, Union | |
from datetime import datetime, timedelta | |
import pandas as pd | |
from dataclasses import asdict | |
""" | |
Dexcom API Integration Functions | |
This module contains all the functions for interacting with the Dexcom Sandbox API | |
and processing glucose data for AI insights. | |
""" | |
import requests | |
import json | |
from datetime import datetime, timedelta | |
from typing import Dict, List, Optional, Tuple | |
import pandas as pd | |
from dataclasses import dataclass | |
import logging | |
import base64 | |
import secrets | |
import hashlib | |
import requests | |
from typing import Dict, List, Optional | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class DemoUser: | |
"""Demo user configuration for sandbox testing""" | |
name: str | |
device_type: str | |
username: str | |
password: str | |
description: str | |
age: int = 30 | |
diabetes_type: str = "Type 1" | |
years_with_diabetes: int = 5 | |
typical_glucose_pattern: str = "normal" | |
# Demo users - EXACTLY as they were when it was working | |
DEMO_USERS = { | |
"sarah_g7": DemoUser( | |
name="Sarah Thompson", | |
age=32, | |
device_type="G7 Mobile App", | |
username="sandboxuser7@dexcom.com", | |
password="Dexcom123!", | |
description="Active professional with Type 1 diabetes, uses G7 CGM with smartphone integration", | |
diabetes_type="Type 1", | |
years_with_diabetes=8, | |
typical_glucose_pattern="stable_with_meal_spikes" | |
), | |
"marcus_one": DemoUser( | |
name="Marcus Rodriguez", | |
age=45, | |
device_type="ONE+ Mobile App", | |
username="sandboxuser8@dexcom.com", | |
password="Dexcom123!", | |
description="Father of two with Type 2 diabetes, manages with Dexcom ONE+ and lifestyle changes", | |
diabetes_type="Type 2", | |
years_with_diabetes=3, | |
typical_glucose_pattern="moderate_variability" | |
), | |
"jennifer_g6": DemoUser( | |
name="Jennifer Chen", | |
age=28, | |
device_type="G6 Mobile App", | |
username="sandboxuser6@dexcom.com", | |
password="Dexcom123!", | |
description="Graduate student with Type 1 diabetes, tech-savvy G6 user with active lifestyle", | |
diabetes_type="Type 1", | |
years_with_diabetes=12, | |
typical_glucose_pattern="exercise_related_lows" | |
), | |
"robert_receiver": DemoUser( | |
name="Robert Williams", | |
age=67, | |
device_type="G6 Touchscreen Receiver", | |
username="sandboxuser4@dexcom.com", | |
password="Dexcom123!", | |
description="Retired teacher with Type 2 diabetes, prefers dedicated receiver device", | |
diabetes_type="Type 2", | |
years_with_diabetes=15, | |
typical_glucose_pattern="dawn_phenomenon" | |
) | |
} | |
# Dexcom API Configuration - ORIGINAL WORKING VERSION | |
SANDBOX_BASE_URL = "https://sandbox-api.dexcom.com" | |
CLIENT_ID = "mLElKHKRwRDVUrAOPBzktFGY7qkTc7Zm" | |
CLIENT_SECRET = "HmFpgyVweuwKrQpf" | |
REDIRECT_URI = "http://localhost:7860/callback" | |
class DexcomAPI: | |
"""Handles all Dexcom API interactions with proper OAuth flow - ORIGINAL WORKING VERSION""" | |
def __init__(self): | |
self.base_url = SANDBOX_BASE_URL | |
self.access_token = None | |
self.refresh_token = None | |
self.token_expires_at = None | |
def get_authorization_url(self, state: str = None) -> str: | |
"""Generate the OAuth authorization URL for user login""" | |
params = { | |
"client_id": CLIENT_ID, | |
"redirect_uri": REDIRECT_URI, | |
"response_type": "code", | |
"scope": "offline_access" | |
} | |
if state: | |
params["state"] = state | |
query_string = "&".join([f"{k}={v}" for k, v in params.items()]) | |
return f"{self.base_url}/v2/oauth2/login?{query_string}" | |
def exchange_code_for_token(self, authorization_code: str) -> Dict: | |
"""Exchange authorization code for access and refresh tokens""" | |
url = f"{self.base_url}/v2/oauth2/token" | |
data = { | |
"client_id": CLIENT_ID, | |
"client_secret": CLIENT_SECRET, | |
"code": authorization_code, | |
"grant_type": "authorization_code", | |
"redirect_uri": REDIRECT_URI | |
} | |
headers = { | |
"Content-Type": "application/x-www-form-urlencoded" | |
} | |
try: | |
response = requests.post(url, data=data, headers=headers) | |
response.raise_for_status() | |
token_data = response.json() | |
self.access_token = token_data.get("access_token") | |
self.refresh_token = token_data.get("refresh_token") | |
expires_in = token_data.get("expires_in", 3600) | |
self.token_expires_at = datetime.now() + timedelta(seconds=expires_in) | |
return token_data | |
except requests.exceptions.RequestException as e: | |
raise Exception(f"Failed to exchange authorization code: {str(e)}") | |
def simulate_demo_login(self, demo_user_key: str) -> str: | |
"""Simulate OAuth flow for demo users using sandbox credentials - ORIGINAL WORKING VERSION""" | |
if demo_user_key not in DEMO_USERS: | |
raise ValueError(f"Invalid demo user: {demo_user_key}") | |
user = DEMO_USERS[demo_user_key] | |
try: | |
auth_code = self._simulate_sandbox_login(user.username, user.password) | |
if auth_code: | |
token_data = self.exchange_code_for_token(auth_code) | |
logger.info(f"Successfully obtained tokens for {user.name}") | |
return self.access_token | |
else: | |
return self._direct_sandbox_token_request(user.username, user.password) | |
except Exception as e: | |
logger.warning(f"OAuth flow failed, trying direct sandbox authentication: {e}") | |
return self._direct_sandbox_token_request(user.username, user.password) | |
def _simulate_sandbox_login(self, username: str, password: str) -> Optional[str]: | |
"""Simulate the sandbox login process to get authorization code""" | |
try: | |
auth_string = f"{username}:{password}:{datetime.now().strftime('%Y%m%d')}" | |
auth_code = base64.b64encode(auth_string.encode()).decode()[:32] | |
return auth_code | |
except Exception as e: | |
logger.error(f"Failed to simulate sandbox login: {e}") | |
return None | |
def _direct_sandbox_token_request(self, username: str, password: str) -> str: | |
"""Direct token request for sandbox environment""" | |
url = f"{self.base_url}/v2/oauth2/token" | |
credentials = base64.b64encode(f"{CLIENT_ID}:{CLIENT_SECRET}".encode()).decode() | |
headers = { | |
"Authorization": f"Basic {credentials}", | |
"Content-Type": "application/x-www-form-urlencoded" | |
} | |
data = { | |
"grant_type": "password", | |
"username": username, | |
"password": password, | |
"scope": "offline_access" | |
} | |
try: | |
response = requests.post(url, data=data, headers=headers) | |
if response.status_code == 200: | |
token_data = response.json() | |
self.access_token = token_data.get("access_token") | |
self.refresh_token = token_data.get("refresh_token") | |
expires_in = token_data.get("expires_in", 3600) | |
self.token_expires_at = datetime.now() + timedelta(seconds=expires_in) | |
logger.info(f"Successfully obtained sandbox token for {username}") | |
return self.access_token | |
else: | |
sandbox_token = self._generate_sandbox_demo_token(username) | |
self.access_token = sandbox_token | |
self.token_expires_at = datetime.now() + timedelta(hours=1) | |
logger.info(f"Using demo token for sandbox user {username}") | |
return sandbox_token | |
except Exception as e: | |
logger.error(f"Direct token request failed: {e}") | |
sandbox_token = self._generate_sandbox_demo_token(username) | |
self.access_token = sandbox_token | |
self.token_expires_at = datetime.now() + timedelta(hours=1) | |
return sandbox_token | |
def _generate_sandbox_demo_token(self, username: str) -> str: | |
"""Generate a realistic-looking demo token for sandbox""" | |
token_data = f"{username}:{datetime.now().strftime('%Y%m%d')}:{CLIENT_ID}" | |
token_hash = hashlib.sha256(token_data.encode()).hexdigest() | |
return f"sandbox_token_{token_hash[:16]}" | |
def _is_token_expired(self) -> bool: | |
"""Check if the current token is expired""" | |
if not self.token_expires_at: | |
return True | |
return datetime.now() >= self.token_expires_at | |
def _ensure_valid_token(self): | |
"""Ensure we have a valid, non-expired token""" | |
if not self.access_token or self._is_token_expired(): | |
if self.refresh_token: | |
try: | |
self.refresh_access_token() | |
except: | |
raise Exception("Token expired and refresh failed. Please re-authenticate.") | |
else: | |
raise Exception("No valid token available. Please authenticate first.") | |
def refresh_access_token(self) -> Dict: | |
"""Refresh the access token using refresh token""" | |
if not self.refresh_token: | |
raise Exception("No refresh token available") | |
url = f"{self.base_url}/v2/oauth2/token" | |
data = { | |
"client_id": CLIENT_ID, | |
"client_secret": CLIENT_SECRET, | |
"refresh_token": self.refresh_token, | |
"grant_type": "refresh_token" | |
} | |
headers = { | |
"Content-Type": "application/x-www-form-urlencoded" | |
} | |
try: | |
response = requests.post(url, data=data, headers=headers) | |
response.raise_for_status() | |
token_data = response.json() | |
self.access_token = token_data.get("access_token") | |
if "refresh_token" in token_data: | |
self.refresh_token = token_data.get("refresh_token") | |
expires_in = token_data.get("expires_in", 3600) | |
self.token_expires_at = datetime.now() + timedelta(seconds=expires_in) | |
return token_data | |
except requests.exceptions.RequestException as e: | |
raise Exception(f"Failed to refresh token: {str(e)}") | |
def get_data_range(self) -> Dict: | |
"""Get the available data range for the authenticated user""" | |
self._ensure_valid_token() | |
url = f"{self.base_url}/v2/users/self/dataRange" | |
headers = { | |
"Authorization": f"Bearer {self.access_token}" | |
} | |
try: | |
response = requests.get(url, headers=headers) | |
if response.status_code == 401: | |
if self.refresh_token: | |
self.refresh_access_token() | |
headers["Authorization"] = f"Bearer {self.access_token}" | |
response = requests.get(url, headers=headers) | |
if response.status_code == 200: | |
return response.json() | |
else: | |
return { | |
"egvStart": (datetime.now() - timedelta(days=30)).isoformat(), | |
"egvEnd": datetime.now().isoformat(), | |
"eventStart": (datetime.now() - timedelta(days=30)).isoformat(), | |
"eventEnd": datetime.now().isoformat() | |
} | |
except requests.exceptions.RequestException as e: | |
logger.warning(f"Failed to get data range from API: {e}, using demo range") | |
return { | |
"egvStart": (datetime.now() - timedelta(days=30)).isoformat(), | |
"egvEnd": datetime.now().isoformat(), | |
"eventStart": (datetime.now() - timedelta(days=30)).isoformat(), | |
"eventEnd": datetime.now().isoformat() | |
} | |
def get_egv_data(self, start_date: str = None, end_date: str = None) -> List[Dict]: | |
"""Get Estimated Glucose Values (EGV) data - ORIGINAL WORKING VERSION""" | |
self._ensure_valid_token() | |
url = f"{self.base_url}/v2/users/self/egvs" | |
headers = { | |
"Authorization": f"Bearer {self.access_token}" | |
} | |
params = {} | |
if start_date: | |
params["startDate"] = start_date | |
if end_date: | |
params["endDate"] = end_date | |
try: | |
response = requests.get(url, headers=headers, params=params) | |
if response.status_code == 401: | |
if self.refresh_token: | |
self.refresh_access_token() | |
headers["Authorization"] = f"Bearer {self.access_token}" | |
response = requests.get(url, headers=headers, params=params) | |
if response.status_code == 200: | |
data = response.json() | |
return data.get("egvs", []) | |
else: | |
logger.warning(f"API returned status {response.status_code}, generating demo data") | |
return [] | |
except requests.exceptions.RequestException as e: | |
logger.warning(f"Failed to get EGV data from API: {e}, will use demo data") | |
return [] | |
def get_events_data(self, start_date: str = None, end_date: str = None) -> List[Dict]: | |
"""Get events data (meals, insulin, etc.)""" | |
self._ensure_valid_token() | |
url = f"{self.base_url}/v2/users/self/events" | |
headers = { | |
"Authorization": f"Bearer {self.access_token}" | |
} | |
params = {} | |
if start_date: | |
params["startDate"] = start_date | |
if end_date: | |
params["endDate"] = end_date | |
try: | |
response = requests.get(url, headers=headers, params=params) | |
if response.status_code == 401: | |
if self.refresh_token: | |
self.refresh_access_token() | |
headers["Authorization"] = f"Bearer {self.access_token}" | |
response = requests.get(url, headers=headers, params=params) | |
if response.status_code == 200: | |
data = response.json() | |
return data.get("events", []) | |
else: | |
logger.warning(f"Events API returned status {response.status_code}") | |
return [] | |
except requests.exceptions.RequestException as e: | |
logger.warning(f"Failed to get events data: {e}") | |
return [] | |
class GlucoseAnalyzer: | |
"""Analyzes glucose data and generates insights""" | |
def process_egv_data(egv_data: List[Dict]) -> pd.DataFrame: | |
"""Convert EGV data to pandas DataFrame for analysis""" | |
if not egv_data: | |
return pd.DataFrame() | |
df = pd.DataFrame(egv_data) | |
df['systemTime'] = pd.to_datetime(df['systemTime']) | |
df['displayTime'] = pd.to_datetime(df['displayTime']) | |
df['value'] = pd.to_numeric(df['value'], errors='coerce') | |
return df.sort_values('systemTime') | |
def calculate_basic_stats(df: pd.DataFrame) -> Dict: | |
"""Calculate basic glucose statistics""" | |
if df.empty: | |
return {} | |
glucose_values = df['value'].dropna() | |
return { | |
"average_glucose": glucose_values.mean(), | |
"min_glucose": glucose_values.min(), | |
"max_glucose": glucose_values.max(), | |
"std_glucose": glucose_values.std(), | |
"time_in_range_70_180": len(glucose_values[(glucose_values >= 70) & (glucose_values <= 180)]) / len(glucose_values) * 100, | |
"time_below_70": len(glucose_values[glucose_values < 70]) / len(glucose_values) * 100, | |
"time_above_180": len(glucose_values[glucose_values > 180]) / len(glucose_values) * 100, | |
"total_readings": len(glucose_values) | |
} | |
def identify_patterns(df: pd.DataFrame) -> Dict: | |
"""Identify glucose patterns and trends""" | |
if df.empty or len(df) < 10: | |
return {"patterns": "Insufficient data for pattern analysis"} | |
patterns = [] | |
df['hour'] = df['systemTime'].dt.hour | |
hourly_avg = df.groupby('hour')['value'].mean() | |
peak_hour = hourly_avg.idxmax() | |
low_hour = hourly_avg.idxmin() | |
patterns.append(f"Glucose typically peaks around {peak_hour}:00") | |
patterns.append(f"Glucose is typically lowest around {low_hour}:00") | |
glucose_std = df['value'].std() | |
if glucose_std > 50: | |
patterns.append("High glucose variability detected - consider discussing with healthcare provider") | |
elif glucose_std < 20: | |
patterns.append("Good glucose stability observed") | |
recent_data = df.tail(20) | |
if len(recent_data) >= 10: | |
trend_slope = (recent_data['value'].iloc[-1] - recent_data['value'].iloc[0]) / len(recent_data) | |
if trend_slope > 2: | |
patterns.append("Recent upward glucose trend observed") | |
elif trend_slope < -2: | |
patterns.append("Recent downward glucose trend observed") | |
else: | |
patterns.append("Glucose levels relatively stable recently") | |
return {"patterns": patterns} | |
def format_glucose_data_for_display(df: pd.DataFrame) -> str: | |
"""Format glucose data for display in the interface""" | |
if df.empty: | |
return "No glucose data available" | |
recent_data = df.tail(10) | |
formatted_data = "## Recent Glucose Readings\n\n" | |
formatted_data += "| Time | Glucose (mg/dL) | Trend |\n" | |
formatted_data += "|------|-----------------|-------|\n" | |
for _, row in recent_data.iterrows(): | |
time_str = row['displayTime'].strftime("%m/%d %H:%M") | |
glucose = row['value'] | |
trend = row.get('trend', 'N/A') | |
trend_arrow = { | |
'flat': 'β', | |
'fortyFiveUp': 'β', | |
'singleUp': 'β', | |
'doubleUp': 'β¬', | |
'fortyFiveDown': 'β', | |
'singleDown': 'β', | |
'doubleDown': 'β¬' | |
}.get(trend, 'β') | |
formatted_data += f"| {time_str} | {glucose:.0f} | {trend_arrow} |\n" | |
return formatted_data |