GlycoAI / dexcom_real_auth_system.py
Valentina9502's picture
fixes
d4f154f
#!/usr/bin/env python3
"""
MIXED ENDPOINTS FIX for Dexcom OAuth
Login endpoint: developer.dexcom.com/sandbox-login (WORKING - you got the code!)
Token endpoint: sandbox-api.dexcom.com/v2/oauth2/token (FROM OFFICIAL DOCS)
"""
import os
import requests
import urllib.parse
import json
import secrets
import webbrowser
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import logging
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Credentials
CLIENT_ID = os.getenv("DEXCOM_CLIENT_ID", "mLElKHKRwRDVUrAOPBzktFGY7qkTc7Zm")
CLIENT_SECRET = os.getenv("DEXCOM_CLIENT_SECRET", "HmFpgyVweuwKrQpf")
REDIRECT_URI = "http://localhost:7860/callback"
# MIXED ENDPOINTS CONFIGURATION
# Login: Use what worked for you
# Token: Use official documentation
SANDBOX_LOGIN_BASE = "https://developer.dexcom.com" # βœ… WORKED for login
SANDBOX_TOKEN_BASE = "https://sandbox-api.dexcom.com" # βœ… OFFICIAL for token
PRODUCTION_BASE = "https://api.dexcom.com" # βœ… OFFICIAL for production
class DexcomRealAPI:
"""
MIXED ENDPOINTS Dexcom API
SOLUTION:
βœ… Login: developer.dexcom.com/sandbox-login (YOU CONFIRMED THIS WORKS)
βœ… Token: sandbox-api.dexcom.com/v2/oauth2/token (OFFICIAL DOCS)
βœ… API: sandbox-api.dexcom.com/v2/... (OFFICIAL DOCS)
"""
def __init__(self, client_id: str = None, client_secret: str = None, environment: str = "sandbox"):
self.client_id = client_id or CLIENT_ID
self.client_secret = client_secret or CLIENT_SECRET
self.redirect_uri = REDIRECT_URI
self.environment = environment
# MIXED ENDPOINT CONFIGURATION
if environment == "sandbox":
self.login_base = SANDBOX_LOGIN_BASE # https://developer.dexcom.com
self.token_base = SANDBOX_TOKEN_BASE # https://sandbox-api.dexcom.com
self.api_base = SANDBOX_TOKEN_BASE # https://sandbox-api.dexcom.com
else:
self.login_base = PRODUCTION_BASE # https://api.dexcom.com
self.token_base = PRODUCTION_BASE # https://api.dexcom.com
self.api_base = PRODUCTION_BASE # https://api.dexcom.com
self.access_token = None
self.refresh_token = None
self.token_expires_at = None
logger.info(f"βœ… MIXED ENDPOINTS Dexcom API initialized")
logger.info(f" Environment: {environment}")
logger.info(f" Login base: {self.login_base}")
logger.info(f" Token base: {self.token_base}")
logger.info(f" API base: {self.api_base}")
def generate_auth_url(self, state: str = None) -> str:
"""Generate OAuth URL using the login endpoint that WORKED for you"""
if not state:
state = secrets.token_urlsafe(32)
params = {
'client_id': self.client_id,
'redirect_uri': self.redirect_uri,
'response_type': 'code',
'scope': 'offline_access',
'state': state
}
query_string = urllib.parse.urlencode(params)
# Use the login endpoint that worked for you
if self.environment == "sandbox":
auth_url = f"{self.login_base}/sandbox-login?{query_string}"
else:
auth_url = f"{self.login_base}/v2/oauth2/login?{query_string}"
logger.info(f"βœ… Auth URL (proven working): {auth_url}")
return auth_url
def start_oauth_flow(self) -> bool:
"""Start OAuth with proven working login endpoint"""
print(f"\nπŸ” MIXED ENDPOINTS DEXCOM OAUTH")
print("=" * 60)
print(f"Environment: {self.environment}")
print(f"Login endpoint: {self.login_base} ← PROVEN TO WORK FOR YOU")
print(f"Token endpoint: {self.token_base} ← FROM OFFICIAL DOCS")
try:
auth_url = self.generate_auth_url()
print(f"\nπŸ“‹ OAUTH INSTRUCTIONS:")
print(f"1. 🌐 Opening: {auth_url}")
print(f"2. πŸ“ Log in (this endpoint worked before!)")
if self.environment == "sandbox":
print(f" Sandbox: sandboxuser1@dexcom.com / Dexcom123!")
print(f"3. βœ… Authorize application")
print(f"4. πŸ“‹ Copy authorization code from callback")
print(f"5. ⚑ Token exchange with CORRECTED endpoint")
webbrowser.open(auth_url)
print(f"\nβœ… Browser opened to WORKING login endpoint")
return True
except Exception as e:
logger.error(f"OAuth start error: {e}")
return False
def exchange_code_for_tokens(self, authorization_code: str) -> bool:
"""
Exchange code for tokens using CORRECTED token endpoint
KEY FIX: Use sandbox-api.dexcom.com for token exchange
NOT developer.dexcom.com
"""
# Clean authorization code
auth_code = self._extract_auth_code(authorization_code.strip())
if not auth_code:
logger.error("No valid authorization code provided")
return False
# CORRECTED TOKEN ENDPOINT (from official docs)
token_url = f"{self.token_base}/v2/oauth2/token"
logger.info(f"πŸ”„ TOKEN EXCHANGE WITH CORRECTED ENDPOINT")
logger.info(f" Login was: {self.login_base} (worked!)")
logger.info(f" Token is: {token_url} ← CORRECTED!")
logger.info(f" Auth code: {auth_code[:15]}...{auth_code[-8:]}")
# Request data
data = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': auth_code,
'grant_type': 'authorization_code',
'redirect_uri': self.redirect_uri
}
# Headers
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json',
'User-Agent': 'GlycoAI/1.0'
}
try:
print(f"\nπŸ“‘ Making token request to CORRECTED endpoint...")
print(f" URL: {token_url}")
print(f" Data: client_id, client_secret, code, grant_type, redirect_uri")
response = requests.post(token_url, data=data, headers=headers, timeout=30)
print(f"πŸ“¨ Response status: {response.status_code}")
if response.status_code == 200:
token_data = response.json()
self.access_token = token_data.get('access_token')
self.refresh_token = token_data.get('refresh_token')
expires_in = token_data.get('expires_in', 7200)
self.token_expires_at = datetime.now() + timedelta(seconds=expires_in)
print(f"βœ… TOKEN EXCHANGE SUCCESSFUL!")
print(f" Access token: {self.access_token[:25]}...")
print(f" Refresh token: {self.refresh_token[:25] if self.refresh_token else 'None'}...")
print(f" Expires in: {expires_in} seconds ({expires_in/3600:.1f} hours)")
print(f" Token type: {token_data.get('token_type', 'Bearer')}")
logger.info("βœ… TOKEN EXCHANGE SUCCESS WITH MIXED ENDPOINTS!")
return True
else:
print(f"❌ Token exchange failed: {response.status_code}")
try:
error_data = response.json()
print(f"πŸ“‹ Error details:")
for key, value in error_data.items():
print(f" {key}: {value}")
# Specific guidance
if response.status_code == 400:
error_type = error_data.get('error', 'unknown')
if error_type == 'invalid_grant':
print(f"\nπŸ’‘ SOLUTION: Authorization code expired or invalid")
print(f" β†’ Get a fresh code (expires in 1 minute!)")
elif error_type == 'invalid_client':
print(f"\nπŸ’‘ SOLUTION: Check credentials")
print(f" β†’ Verify CLIENT_ID and CLIENT_SECRET")
elif error_type == 'invalid_request':
print(f"\nπŸ’‘ SOLUTION: Check redirect URI")
print(f" β†’ Must match: {self.redirect_uri}")
except:
print(f"πŸ“‹ Raw error response: {response.text}")
return False
except requests.exceptions.RequestException as e:
print(f"❌ Network error: {e}")
logger.error(f"Network error during token exchange: {e}")
return False
except Exception as e:
print(f"❌ Unexpected error: {e}")
logger.error(f"Unexpected error during token exchange: {e}")
return False
def _extract_auth_code(self, input_text: str) -> str:
"""Extract authorization code from various formats"""
try:
if input_text.startswith('http'):
# Full callback URL
parsed_url = urllib.parse.urlparse(input_text)
query_params = urllib.parse.parse_qs(parsed_url.query)
if 'code' in query_params:
code = query_params['code'][0]
logger.info(f"Extracted code from URL: {code[:15]}...")
return code
else:
logger.error("No 'code' parameter in callback URL")
return ""
else:
# Direct code input
if input_text.startswith('code='):
code = input_text[5:]
else:
code = input_text.strip()
logger.info(f"Using direct code input: {code[:15]}...")
return code
except Exception as e:
logger.error(f"Error extracting auth code: {e}")
return ""
def refresh_access_token(self) -> bool:
"""Refresh token using corrected endpoint"""
if not self.refresh_token:
logger.error("No refresh token available")
return False
token_url = f"{self.token_base}/v2/oauth2/token"
data = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'refresh_token': self.refresh_token,
'grant_type': 'refresh_token'
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': 'application/json'
}
try:
response = requests.post(token_url, data=data, headers=headers)
if response.status_code == 200:
token_data = response.json()
self.access_token = token_data.get('access_token')
new_refresh_token = token_data.get('refresh_token')
if new_refresh_token:
self.refresh_token = new_refresh_token
expires_in = token_data.get('expires_in', 7200)
self.token_expires_at = datetime.now() + timedelta(seconds=expires_in)
logger.info("βœ… Token refreshed successfully")
return True
else:
logger.error(f"Token refresh failed: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"Error refreshing token: {e}")
return False
def _ensure_valid_token(self):
"""Ensure valid access token"""
if not self.access_token:
raise Exception("No access token. Please authenticate first.")
if self.token_expires_at and datetime.now() >= self.token_expires_at - timedelta(minutes=5):
logger.info("Token expiring soon, refreshing...")
if not self.refresh_access_token():
raise Exception("Token expired and refresh failed. Please re-authenticate.")
def get_auth_headers(self) -> Dict[str, str]:
"""Get authorization headers"""
self._ensure_valid_token()
return {
'Authorization': f'Bearer {self.access_token}',
'Accept': 'application/json',
'User-Agent': 'GlycoAI/1.0'
}
def get_data_range(self) -> Dict:
"""Get data range using corrected API endpoint"""
url = f"{self.api_base}/v2/users/self/dataRange"
headers = self.get_auth_headers()
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
logger.info(f"βœ… Data range retrieved: {data}")
return data
else:
logger.error(f"Data range API error: {response.status_code} - {response.text}")
raise Exception(f"Data range API error: {response.status_code}")
except requests.exceptions.RequestException as e:
logger.error(f"Network error getting data range: {e}")
raise Exception(f"Network error: {e}")
def get_egv_data(self, start_date: str = None, end_date: str = None) -> List[Dict]:
"""Get EGV data using corrected API endpoint"""
url = f"{self.api_base}/v2/users/self/egvs"
headers = self.get_auth_headers()
params = {}
if start_date:
params['startDate'] = start_date
if end_date:
params['endDate'] = end_date
try:
response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
data = response.json()
egvs = data.get('egvs', [])
logger.info(f"βœ… Retrieved {len(egvs)} EGV readings")
return egvs
else:
logger.error(f"EGV API error: {response.status_code} - {response.text}")
raise Exception(f"EGV API error: {response.status_code}")
except requests.exceptions.RequestException as e:
logger.error(f"Network error getting EGV data: {e}")
raise Exception(f"Network error: {e}")
# Quick test function
def test_mixed_endpoints():
"""Test the mixed endpoint configuration"""
print("πŸ§ͺ TESTING MIXED ENDPOINTS CONFIGURATION")
print("=" * 60)
api = DexcomRealAPI(environment="sandbox")
print(f"βœ… Login base: {api.login_base}")
print(f" β†’ This worked for you before!")
print(f"βœ… Token base: {api.token_base}")
print(f" β†’ From official documentation")
print(f"βœ… API base: {api.api_base}")
print(f" β†’ From official documentation")
auth_url = api.generate_auth_url()
print(f"\nβœ… Auth URL: {auth_url}")
print(f" β†’ Should work (you got code before)")
print(f"\n🎯 READY TO TEST TOKEN EXCHANGE!")
print(f"1. Run: api.start_oauth_flow()")
print(f"2. Get fresh authorization code")
print(f"3. Run: api.exchange_code_for_tokens('fresh_code')")
print(f"4. Should get 200 OK with corrected token endpoint!")
return api
def quick_token_test(auth_code: str):
"""Quick test of token exchange with mixed endpoints"""
api = DexcomRealAPI(environment="sandbox")
success = api.exchange_code_for_tokens(auth_code)
if success:
print(f"\nπŸŽ‰ SUCCESS! Mixed endpoints work!")
try:
data_range = api.get_data_range()
print(f"βœ… API test successful: {data_range}")
except Exception as e:
print(f"⚠️ Token works but API call failed: {e}")
else:
print(f"\n❌ Still failed. Let's debug further...")
return api
if __name__ == "__main__":
test_mixed_endpoints()