Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
MIXED ENDPOINTS FIX for Dexcom OAuth | |
Login endpoint: developer.dexcom.com/sandbox-login (WORKING - you got the code!) | |
Token endpoint: sandbox-api.dexcom.com/v2/oauth2/token (FROM OFFICIAL DOCS) | |
""" | |
import os | |
import requests | |
import urllib.parse | |
import json | |
import secrets | |
import webbrowser | |
from datetime import datetime, timedelta | |
from typing import Dict, List, Optional | |
import logging | |
# Setup logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Credentials | |
CLIENT_ID = os.getenv("DEXCOM_CLIENT_ID", "mLElKHKRwRDVUrAOPBzktFGY7qkTc7Zm") | |
CLIENT_SECRET = os.getenv("DEXCOM_CLIENT_SECRET", "HmFpgyVweuwKrQpf") | |
REDIRECT_URI = "http://localhost:7860/callback" | |
# MIXED ENDPOINTS CONFIGURATION | |
# Login: Use what worked for you | |
# Token: Use official documentation | |
SANDBOX_LOGIN_BASE = "https://developer.dexcom.com" # β WORKED for login | |
SANDBOX_TOKEN_BASE = "https://sandbox-api.dexcom.com" # β OFFICIAL for token | |
PRODUCTION_BASE = "https://api.dexcom.com" # β OFFICIAL for production | |
class DexcomRealAPI: | |
""" | |
MIXED ENDPOINTS Dexcom API | |
SOLUTION: | |
β Login: developer.dexcom.com/sandbox-login (YOU CONFIRMED THIS WORKS) | |
β Token: sandbox-api.dexcom.com/v2/oauth2/token (OFFICIAL DOCS) | |
β API: sandbox-api.dexcom.com/v2/... (OFFICIAL DOCS) | |
""" | |
def __init__(self, client_id: str = None, client_secret: str = None, environment: str = "sandbox"): | |
self.client_id = client_id or CLIENT_ID | |
self.client_secret = client_secret or CLIENT_SECRET | |
self.redirect_uri = REDIRECT_URI | |
self.environment = environment | |
# MIXED ENDPOINT CONFIGURATION | |
if environment == "sandbox": | |
self.login_base = SANDBOX_LOGIN_BASE # https://developer.dexcom.com | |
self.token_base = SANDBOX_TOKEN_BASE # https://sandbox-api.dexcom.com | |
self.api_base = SANDBOX_TOKEN_BASE # https://sandbox-api.dexcom.com | |
else: | |
self.login_base = PRODUCTION_BASE # https://api.dexcom.com | |
self.token_base = PRODUCTION_BASE # https://api.dexcom.com | |
self.api_base = PRODUCTION_BASE # https://api.dexcom.com | |
self.access_token = None | |
self.refresh_token = None | |
self.token_expires_at = None | |
logger.info(f"β MIXED ENDPOINTS Dexcom API initialized") | |
logger.info(f" Environment: {environment}") | |
logger.info(f" Login base: {self.login_base}") | |
logger.info(f" Token base: {self.token_base}") | |
logger.info(f" API base: {self.api_base}") | |
def generate_auth_url(self, state: str = None) -> str: | |
"""Generate OAuth URL using the login endpoint that WORKED for you""" | |
if not state: | |
state = secrets.token_urlsafe(32) | |
params = { | |
'client_id': self.client_id, | |
'redirect_uri': self.redirect_uri, | |
'response_type': 'code', | |
'scope': 'offline_access', | |
'state': state | |
} | |
query_string = urllib.parse.urlencode(params) | |
# Use the login endpoint that worked for you | |
if self.environment == "sandbox": | |
auth_url = f"{self.login_base}/sandbox-login?{query_string}" | |
else: | |
auth_url = f"{self.login_base}/v2/oauth2/login?{query_string}" | |
logger.info(f"β Auth URL (proven working): {auth_url}") | |
return auth_url | |
def start_oauth_flow(self) -> bool: | |
"""Start OAuth with proven working login endpoint""" | |
print(f"\nπ MIXED ENDPOINTS DEXCOM OAUTH") | |
print("=" * 60) | |
print(f"Environment: {self.environment}") | |
print(f"Login endpoint: {self.login_base} β PROVEN TO WORK FOR YOU") | |
print(f"Token endpoint: {self.token_base} β FROM OFFICIAL DOCS") | |
try: | |
auth_url = self.generate_auth_url() | |
print(f"\nπ OAUTH INSTRUCTIONS:") | |
print(f"1. π Opening: {auth_url}") | |
print(f"2. π Log in (this endpoint worked before!)") | |
if self.environment == "sandbox": | |
print(f" Sandbox: sandboxuser1@dexcom.com / Dexcom123!") | |
print(f"3. β Authorize application") | |
print(f"4. π Copy authorization code from callback") | |
print(f"5. β‘ Token exchange with CORRECTED endpoint") | |
webbrowser.open(auth_url) | |
print(f"\nβ Browser opened to WORKING login endpoint") | |
return True | |
except Exception as e: | |
logger.error(f"OAuth start error: {e}") | |
return False | |
def exchange_code_for_tokens(self, authorization_code: str) -> bool: | |
""" | |
Exchange code for tokens using CORRECTED token endpoint | |
KEY FIX: Use sandbox-api.dexcom.com for token exchange | |
NOT developer.dexcom.com | |
""" | |
# Clean authorization code | |
auth_code = self._extract_auth_code(authorization_code.strip()) | |
if not auth_code: | |
logger.error("No valid authorization code provided") | |
return False | |
# CORRECTED TOKEN ENDPOINT (from official docs) | |
token_url = f"{self.token_base}/v2/oauth2/token" | |
logger.info(f"π TOKEN EXCHANGE WITH CORRECTED ENDPOINT") | |
logger.info(f" Login was: {self.login_base} (worked!)") | |
logger.info(f" Token is: {token_url} β CORRECTED!") | |
logger.info(f" Auth code: {auth_code[:15]}...{auth_code[-8:]}") | |
# Request data | |
data = { | |
'client_id': self.client_id, | |
'client_secret': self.client_secret, | |
'code': auth_code, | |
'grant_type': 'authorization_code', | |
'redirect_uri': self.redirect_uri | |
} | |
# Headers | |
headers = { | |
'Content-Type': 'application/x-www-form-urlencoded', | |
'Accept': 'application/json', | |
'User-Agent': 'GlycoAI/1.0' | |
} | |
try: | |
print(f"\nπ‘ Making token request to CORRECTED endpoint...") | |
print(f" URL: {token_url}") | |
print(f" Data: client_id, client_secret, code, grant_type, redirect_uri") | |
response = requests.post(token_url, data=data, headers=headers, timeout=30) | |
print(f"π¨ Response status: {response.status_code}") | |
if response.status_code == 200: | |
token_data = response.json() | |
self.access_token = token_data.get('access_token') | |
self.refresh_token = token_data.get('refresh_token') | |
expires_in = token_data.get('expires_in', 7200) | |
self.token_expires_at = datetime.now() + timedelta(seconds=expires_in) | |
print(f"β TOKEN EXCHANGE SUCCESSFUL!") | |
print(f" Access token: {self.access_token[:25]}...") | |
print(f" Refresh token: {self.refresh_token[:25] if self.refresh_token else 'None'}...") | |
print(f" Expires in: {expires_in} seconds ({expires_in/3600:.1f} hours)") | |
print(f" Token type: {token_data.get('token_type', 'Bearer')}") | |
logger.info("β TOKEN EXCHANGE SUCCESS WITH MIXED ENDPOINTS!") | |
return True | |
else: | |
print(f"β Token exchange failed: {response.status_code}") | |
try: | |
error_data = response.json() | |
print(f"π Error details:") | |
for key, value in error_data.items(): | |
print(f" {key}: {value}") | |
# Specific guidance | |
if response.status_code == 400: | |
error_type = error_data.get('error', 'unknown') | |
if error_type == 'invalid_grant': | |
print(f"\nπ‘ SOLUTION: Authorization code expired or invalid") | |
print(f" β Get a fresh code (expires in 1 minute!)") | |
elif error_type == 'invalid_client': | |
print(f"\nπ‘ SOLUTION: Check credentials") | |
print(f" β Verify CLIENT_ID and CLIENT_SECRET") | |
elif error_type == 'invalid_request': | |
print(f"\nπ‘ SOLUTION: Check redirect URI") | |
print(f" β Must match: {self.redirect_uri}") | |
except: | |
print(f"π Raw error response: {response.text}") | |
return False | |
except requests.exceptions.RequestException as e: | |
print(f"β Network error: {e}") | |
logger.error(f"Network error during token exchange: {e}") | |
return False | |
except Exception as e: | |
print(f"β Unexpected error: {e}") | |
logger.error(f"Unexpected error during token exchange: {e}") | |
return False | |
def _extract_auth_code(self, input_text: str) -> str: | |
"""Extract authorization code from various formats""" | |
try: | |
if input_text.startswith('http'): | |
# Full callback URL | |
parsed_url = urllib.parse.urlparse(input_text) | |
query_params = urllib.parse.parse_qs(parsed_url.query) | |
if 'code' in query_params: | |
code = query_params['code'][0] | |
logger.info(f"Extracted code from URL: {code[:15]}...") | |
return code | |
else: | |
logger.error("No 'code' parameter in callback URL") | |
return "" | |
else: | |
# Direct code input | |
if input_text.startswith('code='): | |
code = input_text[5:] | |
else: | |
code = input_text.strip() | |
logger.info(f"Using direct code input: {code[:15]}...") | |
return code | |
except Exception as e: | |
logger.error(f"Error extracting auth code: {e}") | |
return "" | |
def refresh_access_token(self) -> bool: | |
"""Refresh token using corrected endpoint""" | |
if not self.refresh_token: | |
logger.error("No refresh token available") | |
return False | |
token_url = f"{self.token_base}/v2/oauth2/token" | |
data = { | |
'client_id': self.client_id, | |
'client_secret': self.client_secret, | |
'refresh_token': self.refresh_token, | |
'grant_type': 'refresh_token' | |
} | |
headers = { | |
'Content-Type': 'application/x-www-form-urlencoded', | |
'Accept': 'application/json' | |
} | |
try: | |
response = requests.post(token_url, data=data, headers=headers) | |
if response.status_code == 200: | |
token_data = response.json() | |
self.access_token = token_data.get('access_token') | |
new_refresh_token = token_data.get('refresh_token') | |
if new_refresh_token: | |
self.refresh_token = new_refresh_token | |
expires_in = token_data.get('expires_in', 7200) | |
self.token_expires_at = datetime.now() + timedelta(seconds=expires_in) | |
logger.info("β Token refreshed successfully") | |
return True | |
else: | |
logger.error(f"Token refresh failed: {response.status_code} - {response.text}") | |
return False | |
except Exception as e: | |
logger.error(f"Error refreshing token: {e}") | |
return False | |
def _ensure_valid_token(self): | |
"""Ensure valid access token""" | |
if not self.access_token: | |
raise Exception("No access token. Please authenticate first.") | |
if self.token_expires_at and datetime.now() >= self.token_expires_at - timedelta(minutes=5): | |
logger.info("Token expiring soon, refreshing...") | |
if not self.refresh_access_token(): | |
raise Exception("Token expired and refresh failed. Please re-authenticate.") | |
def get_auth_headers(self) -> Dict[str, str]: | |
"""Get authorization headers""" | |
self._ensure_valid_token() | |
return { | |
'Authorization': f'Bearer {self.access_token}', | |
'Accept': 'application/json', | |
'User-Agent': 'GlycoAI/1.0' | |
} | |
def get_data_range(self) -> Dict: | |
"""Get data range using corrected API endpoint""" | |
url = f"{self.api_base}/v2/users/self/dataRange" | |
headers = self.get_auth_headers() | |
try: | |
response = requests.get(url, headers=headers) | |
if response.status_code == 200: | |
data = response.json() | |
logger.info(f"β Data range retrieved: {data}") | |
return data | |
else: | |
logger.error(f"Data range API error: {response.status_code} - {response.text}") | |
raise Exception(f"Data range API error: {response.status_code}") | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Network error getting data range: {e}") | |
raise Exception(f"Network error: {e}") | |
def get_egv_data(self, start_date: str = None, end_date: str = None) -> List[Dict]: | |
"""Get EGV data using corrected API endpoint""" | |
url = f"{self.api_base}/v2/users/self/egvs" | |
headers = self.get_auth_headers() | |
params = {} | |
if start_date: | |
params['startDate'] = start_date | |
if end_date: | |
params['endDate'] = end_date | |
try: | |
response = requests.get(url, headers=headers, params=params) | |
if response.status_code == 200: | |
data = response.json() | |
egvs = data.get('egvs', []) | |
logger.info(f"β Retrieved {len(egvs)} EGV readings") | |
return egvs | |
else: | |
logger.error(f"EGV API error: {response.status_code} - {response.text}") | |
raise Exception(f"EGV API error: {response.status_code}") | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Network error getting EGV data: {e}") | |
raise Exception(f"Network error: {e}") | |
# Quick test function | |
def test_mixed_endpoints(): | |
"""Test the mixed endpoint configuration""" | |
print("π§ͺ TESTING MIXED ENDPOINTS CONFIGURATION") | |
print("=" * 60) | |
api = DexcomRealAPI(environment="sandbox") | |
print(f"β Login base: {api.login_base}") | |
print(f" β This worked for you before!") | |
print(f"β Token base: {api.token_base}") | |
print(f" β From official documentation") | |
print(f"β API base: {api.api_base}") | |
print(f" β From official documentation") | |
auth_url = api.generate_auth_url() | |
print(f"\nβ Auth URL: {auth_url}") | |
print(f" β Should work (you got code before)") | |
print(f"\nπ― READY TO TEST TOKEN EXCHANGE!") | |
print(f"1. Run: api.start_oauth_flow()") | |
print(f"2. Get fresh authorization code") | |
print(f"3. Run: api.exchange_code_for_tokens('fresh_code')") | |
print(f"4. Should get 200 OK with corrected token endpoint!") | |
return api | |
def quick_token_test(auth_code: str): | |
"""Quick test of token exchange with mixed endpoints""" | |
api = DexcomRealAPI(environment="sandbox") | |
success = api.exchange_code_for_tokens(auth_code) | |
if success: | |
print(f"\nπ SUCCESS! Mixed endpoints work!") | |
try: | |
data_range = api.get_data_range() | |
print(f"β API test successful: {data_range}") | |
except Exception as e: | |
print(f"β οΈ Token works but API call failed: {e}") | |
else: | |
print(f"\nβ Still failed. Let's debug further...") | |
return api | |
if __name__ == "__main__": | |
test_mixed_endpoints() |