#!/usr/bin/env python3 """ MIXED ENDPOINTS FIX for Dexcom OAuth Login endpoint: developer.dexcom.com/sandbox-login (WORKING - you got the code!) Token endpoint: sandbox-api.dexcom.com/v2/oauth2/token (FROM OFFICIAL DOCS) """ import os import requests import urllib.parse import json import secrets import webbrowser from datetime import datetime, timedelta from typing import Dict, List, Optional import logging # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Credentials CLIENT_ID = os.getenv("DEXCOM_CLIENT_ID", "mLElKHKRwRDVUrAOPBzktFGY7qkTc7Zm") CLIENT_SECRET = os.getenv("DEXCOM_CLIENT_SECRET", "HmFpgyVweuwKrQpf") REDIRECT_URI = "http://localhost:7860/callback" # MIXED ENDPOINTS CONFIGURATION # Login: Use what worked for you # Token: Use official documentation SANDBOX_LOGIN_BASE = "https://developer.dexcom.com" # ✅ WORKED for login SANDBOX_TOKEN_BASE = "https://sandbox-api.dexcom.com" # ✅ OFFICIAL for token PRODUCTION_BASE = "https://api.dexcom.com" # ✅ OFFICIAL for production class DexcomRealAPI: """ MIXED ENDPOINTS Dexcom API SOLUTION: ✅ Login: developer.dexcom.com/sandbox-login (YOU CONFIRMED THIS WORKS) ✅ Token: sandbox-api.dexcom.com/v2/oauth2/token (OFFICIAL DOCS) ✅ API: sandbox-api.dexcom.com/v2/... (OFFICIAL DOCS) """ def __init__(self, client_id: str = None, client_secret: str = None, environment: str = "sandbox"): self.client_id = client_id or CLIENT_ID self.client_secret = client_secret or CLIENT_SECRET self.redirect_uri = REDIRECT_URI self.environment = environment # MIXED ENDPOINT CONFIGURATION if environment == "sandbox": self.login_base = SANDBOX_LOGIN_BASE # https://developer.dexcom.com self.token_base = SANDBOX_TOKEN_BASE # https://sandbox-api.dexcom.com self.api_base = SANDBOX_TOKEN_BASE # https://sandbox-api.dexcom.com else: self.login_base = PRODUCTION_BASE # https://api.dexcom.com self.token_base = PRODUCTION_BASE # https://api.dexcom.com self.api_base = PRODUCTION_BASE # https://api.dexcom.com self.access_token = None self.refresh_token = None self.token_expires_at = None logger.info(f"✅ MIXED ENDPOINTS Dexcom API initialized") logger.info(f" Environment: {environment}") logger.info(f" Login base: {self.login_base}") logger.info(f" Token base: {self.token_base}") logger.info(f" API base: {self.api_base}") def generate_auth_url(self, state: str = None) -> str: """Generate OAuth URL using the login endpoint that WORKED for you""" if not state: state = secrets.token_urlsafe(32) params = { 'client_id': self.client_id, 'redirect_uri': self.redirect_uri, 'response_type': 'code', 'scope': 'offline_access', 'state': state } query_string = urllib.parse.urlencode(params) # Use the login endpoint that worked for you if self.environment == "sandbox": auth_url = f"{self.login_base}/sandbox-login?{query_string}" else: auth_url = f"{self.login_base}/v2/oauth2/login?{query_string}" logger.info(f"✅ Auth URL (proven working): {auth_url}") return auth_url def start_oauth_flow(self) -> bool: """Start OAuth with proven working login endpoint""" print(f"\n🔐 MIXED ENDPOINTS DEXCOM OAUTH") print("=" * 60) print(f"Environment: {self.environment}") print(f"Login endpoint: {self.login_base} ← PROVEN TO WORK FOR YOU") print(f"Token endpoint: {self.token_base} ← FROM OFFICIAL DOCS") try: auth_url = self.generate_auth_url() print(f"\n📋 OAUTH INSTRUCTIONS:") print(f"1. 🌐 Opening: {auth_url}") print(f"2. 📝 Log in (this endpoint worked before!)") if self.environment == "sandbox": print(f" Sandbox: sandboxuser1@dexcom.com / Dexcom123!") print(f"3. ✅ Authorize application") print(f"4. 📋 Copy authorization code from callback") print(f"5. ⚡ Token exchange with CORRECTED endpoint") webbrowser.open(auth_url) print(f"\n✅ Browser opened to WORKING login endpoint") return True except Exception as e: logger.error(f"OAuth start error: {e}") return False def exchange_code_for_tokens(self, authorization_code: str) -> bool: """ Exchange code for tokens using CORRECTED token endpoint KEY FIX: Use sandbox-api.dexcom.com for token exchange NOT developer.dexcom.com """ # Clean authorization code auth_code = self._extract_auth_code(authorization_code.strip()) if not auth_code: logger.error("No valid authorization code provided") return False # CORRECTED TOKEN ENDPOINT (from official docs) token_url = f"{self.token_base}/v2/oauth2/token" logger.info(f"🔄 TOKEN EXCHANGE WITH CORRECTED ENDPOINT") logger.info(f" Login was: {self.login_base} (worked!)") logger.info(f" Token is: {token_url} ← CORRECTED!") logger.info(f" Auth code: {auth_code[:15]}...{auth_code[-8:]}") # Request data data = { 'client_id': self.client_id, 'client_secret': self.client_secret, 'code': auth_code, 'grant_type': 'authorization_code', 'redirect_uri': self.redirect_uri } # Headers headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json', 'User-Agent': 'GlycoAI/1.0' } try: print(f"\n📡 Making token request to CORRECTED endpoint...") print(f" URL: {token_url}") print(f" Data: client_id, client_secret, code, grant_type, redirect_uri") response = requests.post(token_url, data=data, headers=headers, timeout=30) print(f"📨 Response status: {response.status_code}") if response.status_code == 200: token_data = response.json() self.access_token = token_data.get('access_token') self.refresh_token = token_data.get('refresh_token') expires_in = token_data.get('expires_in', 7200) self.token_expires_at = datetime.now() + timedelta(seconds=expires_in) print(f"✅ TOKEN EXCHANGE SUCCESSFUL!") print(f" Access token: {self.access_token[:25]}...") print(f" Refresh token: {self.refresh_token[:25] if self.refresh_token else 'None'}...") print(f" Expires in: {expires_in} seconds ({expires_in/3600:.1f} hours)") print(f" Token type: {token_data.get('token_type', 'Bearer')}") logger.info("✅ TOKEN EXCHANGE SUCCESS WITH MIXED ENDPOINTS!") return True else: print(f"❌ Token exchange failed: {response.status_code}") try: error_data = response.json() print(f"📋 Error details:") for key, value in error_data.items(): print(f" {key}: {value}") # Specific guidance if response.status_code == 400: error_type = error_data.get('error', 'unknown') if error_type == 'invalid_grant': print(f"\n💡 SOLUTION: Authorization code expired or invalid") print(f" → Get a fresh code (expires in 1 minute!)") elif error_type == 'invalid_client': print(f"\n💡 SOLUTION: Check credentials") print(f" → Verify CLIENT_ID and CLIENT_SECRET") elif error_type == 'invalid_request': print(f"\n💡 SOLUTION: Check redirect URI") print(f" → Must match: {self.redirect_uri}") except: print(f"📋 Raw error response: {response.text}") return False except requests.exceptions.RequestException as e: print(f"❌ Network error: {e}") logger.error(f"Network error during token exchange: {e}") return False except Exception as e: print(f"❌ Unexpected error: {e}") logger.error(f"Unexpected error during token exchange: {e}") return False def _extract_auth_code(self, input_text: str) -> str: """Extract authorization code from various formats""" try: if input_text.startswith('http'): # Full callback URL parsed_url = urllib.parse.urlparse(input_text) query_params = urllib.parse.parse_qs(parsed_url.query) if 'code' in query_params: code = query_params['code'][0] logger.info(f"Extracted code from URL: {code[:15]}...") return code else: logger.error("No 'code' parameter in callback URL") return "" else: # Direct code input if input_text.startswith('code='): code = input_text[5:] else: code = input_text.strip() logger.info(f"Using direct code input: {code[:15]}...") return code except Exception as e: logger.error(f"Error extracting auth code: {e}") return "" def refresh_access_token(self) -> bool: """Refresh token using corrected endpoint""" if not self.refresh_token: logger.error("No refresh token available") return False token_url = f"{self.token_base}/v2/oauth2/token" data = { 'client_id': self.client_id, 'client_secret': self.client_secret, 'refresh_token': self.refresh_token, 'grant_type': 'refresh_token' } headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json' } try: response = requests.post(token_url, data=data, headers=headers) if response.status_code == 200: token_data = response.json() self.access_token = token_data.get('access_token') new_refresh_token = token_data.get('refresh_token') if new_refresh_token: self.refresh_token = new_refresh_token expires_in = token_data.get('expires_in', 7200) self.token_expires_at = datetime.now() + timedelta(seconds=expires_in) logger.info("✅ Token refreshed successfully") return True else: logger.error(f"Token refresh failed: {response.status_code} - {response.text}") return False except Exception as e: logger.error(f"Error refreshing token: {e}") return False def _ensure_valid_token(self): """Ensure valid access token""" if not self.access_token: raise Exception("No access token. Please authenticate first.") if self.token_expires_at and datetime.now() >= self.token_expires_at - timedelta(minutes=5): logger.info("Token expiring soon, refreshing...") if not self.refresh_access_token(): raise Exception("Token expired and refresh failed. Please re-authenticate.") def get_auth_headers(self) -> Dict[str, str]: """Get authorization headers""" self._ensure_valid_token() return { 'Authorization': f'Bearer {self.access_token}', 'Accept': 'application/json', 'User-Agent': 'GlycoAI/1.0' } def get_data_range(self) -> Dict: """Get data range using corrected API endpoint""" url = f"{self.api_base}/v2/users/self/dataRange" headers = self.get_auth_headers() try: response = requests.get(url, headers=headers) if response.status_code == 200: data = response.json() logger.info(f"✅ Data range retrieved: {data}") return data else: logger.error(f"Data range API error: {response.status_code} - {response.text}") raise Exception(f"Data range API error: {response.status_code}") except requests.exceptions.RequestException as e: logger.error(f"Network error getting data range: {e}") raise Exception(f"Network error: {e}") def get_egv_data(self, start_date: str = None, end_date: str = None) -> List[Dict]: """Get EGV data using corrected API endpoint""" url = f"{self.api_base}/v2/users/self/egvs" headers = self.get_auth_headers() params = {} if start_date: params['startDate'] = start_date if end_date: params['endDate'] = end_date try: response = requests.get(url, headers=headers, params=params) if response.status_code == 200: data = response.json() egvs = data.get('egvs', []) logger.info(f"✅ Retrieved {len(egvs)} EGV readings") return egvs else: logger.error(f"EGV API error: {response.status_code} - {response.text}") raise Exception(f"EGV API error: {response.status_code}") except requests.exceptions.RequestException as e: logger.error(f"Network error getting EGV data: {e}") raise Exception(f"Network error: {e}") # Quick test function def test_mixed_endpoints(): """Test the mixed endpoint configuration""" print("🧪 TESTING MIXED ENDPOINTS CONFIGURATION") print("=" * 60) api = DexcomRealAPI(environment="sandbox") print(f"✅ Login base: {api.login_base}") print(f" → This worked for you before!") print(f"✅ Token base: {api.token_base}") print(f" → From official documentation") print(f"✅ API base: {api.api_base}") print(f" → From official documentation") auth_url = api.generate_auth_url() print(f"\n✅ Auth URL: {auth_url}") print(f" → Should work (you got code before)") print(f"\n🎯 READY TO TEST TOKEN EXCHANGE!") print(f"1. Run: api.start_oauth_flow()") print(f"2. Get fresh authorization code") print(f"3. Run: api.exchange_code_for_tokens('fresh_code')") print(f"4. Should get 200 OK with corrected token endpoint!") return api def quick_token_test(auth_code: str): """Quick test of token exchange with mixed endpoints""" api = DexcomRealAPI(environment="sandbox") success = api.exchange_code_for_tokens(auth_code) if success: print(f"\n🎉 SUCCESS! Mixed endpoints work!") try: data_range = api.get_data_range() print(f"✅ API test successful: {data_range}") except Exception as e: print(f"⚠️ Token works but API call failed: {e}") else: print(f"\n❌ Still failed. Let's debug further...") return api if __name__ == "__main__": test_mixed_endpoints()