File size: 88,513 Bytes
1136fac |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441 1442 1443 1444 1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683 1684 1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 |
#!/usr/bin/env python3
# Updated for Hugging Face Space compatibility
"""
Enhanced Track 3: Gmail AI Agent with Advanced Behaviors
Sophisticated agent decision-making and workflow automation
"""
# β οΈ IMPORTANT NOTICE β οΈ
# On first run, the app may take 120-200 seconds to respond to queries
# This is because the Qwen LLM is hosted on Modal and requires time for cold start
# Subsequent requests will be much faster after the initial cold start
# Please be patient during the initial interaction with the AI assistant
import gradio as gr
import json
import logging
import os
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
import pandas as pd
from datetime import datetime, timedelta
import re
import requests # Added for Modal API requests
import time # Added for retry logic
from collections import Counter
# Import Google API libraries for authentication
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
# Import existing modules
from dotenv import load_dotenv
load_dotenv()
from mcp_client import GmailMCPClientSync
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("enhanced-gmail-agent")
# Get environment variables from Hugging Face Secrets
GMAIL_CLIENT_ID = os.getenv("GMAIL_CLIENT_ID")
GMAIL_CLIENT_SECRET = os.getenv("GMAIL_CLIENT_SECRET")
GMAIL_TOKEN_JSON = os.getenv("GMAIL_TOKEN_JSON")
MODAL_API_URL = os.getenv("MODAL_API_URL")
# Log environment variable status (without revealing values)
logger.info(f"GMAIL_CLIENT_ID available: {GMAIL_CLIENT_ID is not None}")
logger.info(f"GMAIL_CLIENT_SECRET available: {GMAIL_CLIENT_SECRET is not None}")
logger.info(f"GMAIL_TOKEN_JSON available: {GMAIL_TOKEN_JSON is not None}")
logger.info(f"MODAL_API_URL available: {MODAL_API_URL is not None}")
# Create credentials.json from environment variables if not present
if GMAIL_CLIENT_ID and GMAIL_CLIENT_SECRET:
credentials_data = {
"installed": {
"client_id": GMAIL_CLIENT_ID,
"client_secret": GMAIL_CLIENT_SECRET,
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"redirect_uris": ["urn:ietf:wg:oauth:2.0:oob", "http://localhost"]
}
}
# Write credentials.json file for Gmail MCP server
with open("credentials.json", "w") as f:
json.dump(credentials_data, f)
logger.info("Created credentials.json from environment variables")
# Create token.json from environment variable if not present
if GMAIL_TOKEN_JSON:
with open("token.json", "w") as f:
f.write(GMAIL_TOKEN_JSON)
logger.info("Created token.json from environment variable")
class QwenClient:
"""Client for the Modal-hosted Qwen model - Updated to properly handle thinking model"""
def __init__(self, api_url: str = None):
self.api_url = api_url
self.first_request = True
logger.info(f"Initializing QwenClient with API URL: {self.api_url}")
def _strip_thinking_tags(self, text: str) -> str:
"""Strip <think> sections from the response to get clean output"""
import re
# Find and remove content between <think> and </think> or end of string
return re.sub(r'<think>.*?(?:</think>|$)', '', text, flags=re.DOTALL).strip()
def generate_content(self, prompt: str, max_tokens: int = 2048,
temperature: float = 0.7, strip_thinking: bool = True,
retries: int = 3, retry_delay: float = 2.0,
timeout: float = 180.0) -> Any:
"""Generate content using the Modal-hosted Qwen model with proper thinking model support"""
# Check if prompt is too long - truncate if needed
if len(prompt) > 4000:
logger.warning(f"Prompt too long ({len(prompt)} chars), truncating to 4000 chars")
prompt = prompt[:4000] + "... [truncated]"
# Show extra message for first request
if self.first_request:
logger.info("β οΈ First request might take longer due to cold start...")
self.first_request = False
for attempt in range(retries):
try:
payload = {
"message": prompt,
"max_tokens": max_tokens,
"temperature": temperature,
"top_p": 0.9,
"strip_thinking": strip_thinking
}
logger.info(f"Sending request to Qwen API (attempt {attempt+1}/{retries}) with prompt length: {len(prompt)}")
start_time = time.time()
response = requests.post(
self.api_url,
json=payload,
headers={"Content-Type": "application/json"},
timeout=timeout
)
duration = time.time() - start_time
logger.info(f"β±οΈ Response received in {duration:.1f} seconds")
# Check for successful response
response.raise_for_status()
result = response.json()
response_text = result.get("response", "")
logger.info(f"Received response from Qwen API: {len(response_text)} chars")
# Only return if we actually got content
if response_text and len(response_text.strip()) > 0:
# Create a response object that mimics Gemini's response format
class QwenResponse:
def __init__(self, text):
self.text = text
# If strip_thinking was False, we already got clean response from API
# If strip_thinking was True, we also got clean response from API
# But let's double-check and clean locally if needed
if not strip_thinking and '<think>' in response_text:
# User wants to see thinking, so keep everything
final_text = response_text
else:
# Clean any remaining thinking tags
final_text = self._strip_thinking_tags(response_text)
# If after cleaning we have very little content, try to extract from original
if len(final_text.strip()) < 10 and '<think>' in response_text:
# Extract content after </think> tag
parts = response_text.split('</think>')
if len(parts) > 1:
final_text = parts[-1].strip()
else:
# No closing tag, take content after last >
final_text = response_text.split('>')[-1].strip()
# Log token usage if available
if 'tokens_used' in result:
logger.info(f"π Token usage: {result.get('input_tokens', 'N/A')} input, {result['tokens_used']} output")
return QwenResponse(final_text)
else:
logger.warning(f"Empty response received from Qwen API (attempt {attempt+1}/{retries})")
if attempt < retries - 1:
logger.info(f"Retrying in {retry_delay} seconds...")
time.sleep(retry_delay)
except requests.exceptions.Timeout:
logger.error(f"Timeout calling Qwen API (attempt {attempt+1}/{retries}) - request took too long")
if attempt < retries - 1:
next_delay = retry_delay * (attempt + 1)
logger.info(f"Retrying in {next_delay} seconds...")
time.sleep(next_delay)
else:
# Return a helpful fallback response on last attempt
class QwenResponse:
def __init__(self, text):
self.text = text
return QwenResponse("I apologize, but I'm having difficulty accessing enhanced AI capabilities right now due to timeout. Please try again later when the service may be more responsive.")
except requests.RequestException as e:
logger.error(f"Error calling Qwen API (attempt {attempt+1}/{retries}): {str(e)}")
if hasattr(e, 'response') and e.response is not None:
logger.error(f"Response: {e.response.text}")
if attempt < retries - 1:
next_delay = retry_delay * (attempt + 1)
logger.info(f"Retrying in {next_delay} seconds...")
time.sleep(next_delay)
logger.error("All retry attempts failed for Qwen API request")
# Create a fallback response when all attempts fail
class QwenResponse:
def __init__(self, text):
self.text = text
return QwenResponse("I apologize, but I can't access my advanced AI capabilities right now. Let me provide a simplified response based on your request. You might want to try again later when the network connection to the AI service is more stable.")
@dataclass
class AgentMemory:
"""Memory system for the agent to track context and user patterns"""
user_preferences: Dict[str, Any]
conversation_context: List[Dict[str, Any]]
email_patterns: Dict[str, Any]
workflow_history: List[Dict[str, Any]]
class EnhancedGmailAgent:
"""Enhanced Gmail AI Agent with sophisticated behaviors"""
def __init__(self, mcp_server_path: str = "gmail_mcp_server.py", modal_api_url: str = None):
self.mcp_client = GmailMCPClientSync(mcp_server_path)
self.memory = AgentMemory(
user_preferences={},
conversation_context=[],
email_patterns={},
workflow_history=[]
)
# Initialize Qwen model - Use environment variable if no URL provided
self.modal_api_url = modal_api_url or MODAL_API_URL
try:
logger.info("Initializing Qwen model client from Modal")
if not self.modal_api_url:
logger.warning("No Modal API URL provided, Qwen functionality will be limited")
self.model = None
self.model_status = "Not configured"
else:
self.model = QwenClient(api_url=self.modal_api_url)
logger.info("Qwen model client initialized successfully")
self.model_status = "Initialized"
except Exception as e:
logger.error(f"Error initializing Qwen model: {str(e)}")
self.model = None
self.model_status = "Error during initialization"
# Initialize agent state
self.last_analysis = {}
self.active_workflows = []
def intelligent_email_triage(self, max_emails: int = 20) -> Dict[str, Any]:
"""
Sophisticated email triage with AI-powered categorization and priority scoring - SINGLE API CALL VERSION
"""
logger.info("π§ Starting intelligent email triage...")
try:
# Fetch recent emails
emails = self.mcp_client.fetch_emails(query="newer_than:3d", max_results=max_emails)
if not emails or 'emails' not in emails:
return {"error": "No emails to analyze"}
email_list = emails['emails']
logger.info(f"Fetched {len(email_list)} emails for analysis")
# Categorize and prioritize emails using AI - SINGLE API CALL
triage_results = {
"high_priority": [],
"meetings_and_calls": [],
"action_required": [],
"newsletters_and_updates": [],
"personal": [],
"low_priority": [],
"analysis_summary": "",
"recommendations": []
}
if self.model and len(email_list) > 0:
logger.info(f"Processing ALL {len(email_list)} emails in SINGLE API call")
# Create a comprehensive prompt for ALL emails at once
email_summaries = []
for i, email in enumerate(email_list):
# Create compact representations of emails
subject = email.get('subject', 'No subject')[:80] # Limit subject length
sender = self._clean_sender(email.get('sender', 'Unknown'))[:50] # Limit sender length
snippet = email.get('snippet', 'No preview')[:60] # Limit snippet length
is_unread = email.get('is_unread', False)
email_summaries.append(f"Email {i+1}: Subject=\"{subject}\" | From={sender} | Preview=\"{snippet}\" | Unread={is_unread}")
# Create the master prompt for all emails - IMPROVED FOR BETTER ANALYSIS
master_prompt = f"""You are an expert email analyst and productivity coach. Analyze ALL {len(email_list)} emails below and provide detailed, actionable insights.
EMAILS TO ANALYZE:
{chr(10).join(email_summaries)}
TASK: For each email, determine:
1. Category (select exactly one):
- high_priority: Urgent emails needing immediate attention (deadlines, emergencies)
- meetings_and_calls: Calendar items, meeting invites, calls
- action_required: Emails requiring specific action but not urgent
- newsletters_and_updates: Marketing, newsletters, product updates
- personal: Personal communications
- low_priority: Everything else with minimal importance
2. Priority score: 0.0 to 1.0 (0.0 = lowest, 1.0 = highest priority)
- 0.9-1.0: Critical/urgent - requires immediate attention
- 0.7-0.8: Important - handle today
- 0.5-0.6: Moderate - handle within 48 hours
- 0.3-0.4: Low - handle when convenient
- 0.1-0.2: Very low - can be archived or ignored
3. Detailed reasoning: Explain WHY you categorized this way (deadline mentions, sender importance, action words, etc.)
RESPONSE FORMAT - Respond with ONLY this JSON array (no extra text):
[
{{
"email_num": 1,
"category": "category_name",
"priority_score": 0.8,
"reasoning": "Detailed reason with specific insights from email content"
}},
{{
"email_num": 2,
"category": "category_name",
"priority_score": 0.6,
"reasoning": "Detailed reason with specific insights from email content"
}},
...continue for all {len(email_list)} emails...
]
IMPORTANT ANALYSIS GUIDELINES:
- Look for urgency indicators: words like "urgent", "ASAP", "deadline", "today", "overdue"
- Consider sender importance: work contacts vs marketing emails
- Identify action verbs: "confirm", "review", "approve", "respond", "complete"
- Check for meeting details: times, dates, calendar invites
- Detect personal communication markers: friendly tone, personal questions
- Evaluate if the email requires a response or action
- Consider unread status as potentially more important
Respond with ONLY the JSON array - no introduction, explanation or additional text.
"""
try:
# Single API call for all emails
logger.info(f"Sending SINGLE request to Qwen API for ALL {len(email_list)} emails")
response = self.model.generate_content(
master_prompt,
max_tokens=2048, # Increased for longer response
temperature=0.2, # Lower temperature for more consistent JSON
strip_thinking=True, # We want clean JSON output
timeout=240.0 # Longer timeout for processing many emails
)
if response and hasattr(response, 'text') and response.text:
response_text = response.text.strip()
logger.info(f"Received comprehensive response: {len(response_text)} chars")
# Extract JSON from response
json_start = response_text.find('[')
json_end = response_text.rfind(']') + 1
if json_start >= 0 and json_end > json_start:
try:
json_text = response_text[json_start:json_end]
results = json.loads(json_text)
logger.info(f"Successfully parsed JSON with {len(results)} email analyses")
# Process results and assign to categories
valid_categories = ['high_priority', 'meetings_and_calls', 'action_required',
'newsletters_and_updates', 'personal', 'low_priority']
for result in results:
try:
email_num = int(result.get('email_num', 1)) - 1 # Convert to 0-based index
if 0 <= email_num < len(email_list):
email = email_list[email_num]
# Validate and clean category
category = result.get('category', '').lower()
if category not in valid_categories:
category = 'newsletters_and_updates' # Default fallback
# Validate priority score
try:
priority_score = float(result.get('priority_score', 0.5))
if not (0.0 <= priority_score <= 1.0):
priority_score = 0.5
except:
priority_score = 0.5
reasoning = result.get('reasoning', 'AI analysis completed')
# Create enhanced email data
email_analysis = {
**email,
"priority_score": priority_score,
"category": category,
"ai_reasoning": reasoning,
"suggested_actions": self._suggest_email_actions(email, category, priority_score)
}
triage_results[category].append(email_analysis)
logger.info(f"β
Email {email_num+1}: '{email.get('subject', '')[:40]}...' β {category} (score: {priority_score})")
except Exception as e:
logger.warning(f"Error processing email result {result}: {str(e)}")
# Use fallback for this email
if 0 <= email_num < len(email_list):
self._apply_fallback_categorization(email_list[email_num], triage_results)
# Check if we processed all emails, add fallback for any missing
processed_count = sum(len(emails) for category, emails in triage_results.items()
if category not in ['analysis_summary', 'recommendations'])
if processed_count < len(email_list):
logger.warning(f"Only processed {processed_count}/{len(email_list)} emails, using fallback for remaining")
# Add fallback for unprocessed emails
for i, email in enumerate(email_list):
# Check if this email was already processed
email_found = False
for category in ['high_priority', 'meetings_and_calls', 'action_required',
'newsletters_and_updates', 'personal', 'low_priority']:
if any(e.get('id') == email.get('id') for e in triage_results[category]):
email_found = True
break
if not email_found:
logger.info(f"Adding fallback categorization for email {i+1}")
self._apply_fallback_categorization(email, triage_results)
logger.info(f"π Successfully analyzed {len(email_list)} emails with AI in single API call!")
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse JSON from AI response: {e}")
logger.warning(f"Response was: {response_text[:500]}...")
# Use fallback for all emails
for email in email_list:
self._apply_fallback_categorization(email, triage_results)
else:
logger.warning("Could not find valid JSON array in AI response")
logger.warning(f"Response was: {response_text[:500]}...")
# Use fallback for all emails
for email in email_list:
self._apply_fallback_categorization(email, triage_results)
else:
logger.warning("Empty or invalid response from AI")
# Use fallback for all emails
for email in email_list:
self._apply_fallback_categorization(email, triage_results)
except Exception as e:
logger.error(f"Error in AI analysis: {str(e)}")
# Use fallback for all emails
for email in email_list:
self._apply_fallback_categorization(email, triage_results)
else:
# No AI model available, use fallback for all emails
logger.info("No AI model available, using fallback categorization for all emails")
for email in email_list:
self._apply_fallback_categorization(email, triage_results)
# Generate intelligent summary and recommendations
triage_results["analysis_summary"] = self._generate_triage_summary(triage_results)
triage_results["recommendations"] = self._generate_workflow_recommendations(triage_results)
# Update agent memory
self._update_email_patterns(email_list)
# Format results for beautiful display
logger.info("π¨ Formatting triage results for display")
formatted_output = self._display_triage_results(triage_results)
# Return both raw results and formatted display
triage_results["formatted_display"] = formatted_output
return triage_results
except Exception as e:
logger.error(f"Error in intelligent triage: {e}")
return {"error": str(e)}
def _apply_fallback_categorization(self, email: Dict, triage_results: Dict):
"""Apply fallback categorization and add to triage results"""
category, priority_score, reasoning = self._fallback_categorization(email)
email_analysis = {
**email,
"priority_score": priority_score,
"category": category,
"ai_reasoning": reasoning,
"suggested_actions": self._suggest_email_actions(email, category, priority_score)
}
triage_results[category].append(email_analysis)
def _analyze_email_batch_with_ai(self, emails: List[Dict]) -> List[Tuple[str, float, str]]:
"""Use AI to analyze a batch of emails at once"""
if not self.model or not emails:
return [self._fallback_categorization(email) for email in emails]
try:
# Create a more efficient batch prompt for analyzing multiple emails at once
# Simplify and shorten the prompt to prevent timeouts
email_prompts = []
for i, email in enumerate(emails):
# Limit snippet length to avoid timeouts
snippet = email.get('snippet', 'No preview')
snippet = snippet[:50] if snippet else 'No preview'
# Simpler prompt format
email_prompts.append(f"""Email #{i+1}: Subject: "{email.get('subject', 'No subject')}" | From: {email.get('sender', 'Unknown')} | Preview: "{snippet}" | Unread: {email.get('is_unread', False)}""")
batch_prompt = f"""
You are an email categorization expert. Analyze each email below and categorize them.
Respond ONLY with a JSON array containing one object per email.
Emails to analyze:
{chr(10).join(email_prompts)}
Categories: high_priority, meetings_and_calls, action_required, newsletters_and_updates, personal, low_priority
Response format (JSON array only):
[
{{"category": "category_name", "priority_score": 0.1_to_1.0, "reasoning": "brief reason"}}
]
"""
# Attempt to get response with increased timeout for batch processing
response = self.model.generate_content(
batch_prompt,
max_tokens=512, # Reduced token count to speed up generation
temperature=0.2, # Lower temperature for more deterministic results
timeout=120.0
)
# Validate response exists and has content
if not response or not hasattr(response, 'text') or not response.text:
logger.warning("Empty response from Qwen model for batch analysis")
return [self._fallback_categorization(email) for email in emails]
response_text = response.text.strip()
logger.debug(f"Qwen batch response: {response_text}")
# Try to extract JSON from response (in case there's extra text)
json_start = response_text.find('[')
json_end = response_text.rfind(']') + 1
if json_start == -1 or json_end == 0:
logger.warning("No valid JSON array found in batch response")
return [self._fallback_categorization(email) for email in emails]
json_text = response_text[json_start:json_end]
try:
# Parse JSON with validation
results = json.loads(json_text)
if not isinstance(results, list) or len(results) == 0:
logger.warning("JSON response is not a valid array")
return [self._fallback_categorization(email) for email in emails]
valid_categories = [
'high_priority', 'meetings_and_calls', 'action_required',
'newsletters_and_updates', 'personal', 'low_priority'
]
# Process and validate each result
processed_results = []
for i, result in enumerate(results):
if i >= len(emails): # Skip extra results
break
# Validate required fields
if not all(key in result for key in ['category', 'priority_score', 'reasoning']):
logger.warning(f"Missing required fields in JSON for email #{i+1}")
processed_results.append(self._fallback_categorization(emails[i]))
continue
category = result['category']
if category not in valid_categories:
logger.warning(f"Invalid category '{category}' returned for email #{i+1}")
processed_results.append(self._fallback_categorization(emails[i]))
continue
# Validate priority score is a number between 0 and 1
try:
priority_score = float(result['priority_score'])
if not 0.0 <= priority_score <= 1.0:
priority_score = 0.5 # Default if out of range
except (ValueError, TypeError):
priority_score = 0.5
reasoning = str(result.get('reasoning', 'AI analysis completed'))
logger.info(f"AI categorized email '{emails[i].get('subject', '')[:50]}...' as {category} (score: {priority_score})")
processed_results.append((category, priority_score, reasoning))
# Fill in any missing results with fallback categorization
while len(processed_results) < len(emails):
missing_idx = len(processed_results)
processed_results.append(self._fallback_categorization(emails[missing_idx]))
return processed_results
except json.JSONDecodeError as e:
logger.warning(f"JSON parsing failed for batch: {e}. Response was: {json_text}")
return [self._fallback_categorization(email) for email in emails]
except Exception as e:
logger.warning(f"Batch AI analysis failed with error: {e}")
return [self._fallback_categorization(email) for email in emails]
def _analyze_email_with_ai(self, email: Dict) -> Tuple[str, float, str]:
"""Use AI to analyze email and determine category, priority, and reasoning"""
if not self.model:
logger.info("No Qwen model available, using rule-based categorization")
return self._fallback_categorization(email)
try:
# Create a more detailed, explicit prompt for the model
prompt = f"""
You are an email categorization expert. Your task is to analyze the email details below and categorize it.
You MUST respond with ONLY a valid JSON object in the exact format requested at the end.
Email Details:
- Subject: {email.get('subject', 'No subject')}
- From: {email.get('sender', 'Unknown sender')}
- Content Preview: {email.get('snippet', 'No preview')}
- Is Unread: {email.get('is_unread', False)}
- Date: {email.get('date', 'Unknown date')}
Available Categories (choose exactly one):
1. high_priority: For urgent emails that need immediate attention
2. meetings_and_calls: For meeting invites, call schedules, or appointment-related emails
3. action_required: For emails that require a specific action or response, but aren't urgent
4. newsletters_and_updates: For subscription emails, product updates, marketing content
5. personal: For emails that are personal in nature but not urgent
6. low_priority: For emails that can be handled later or are low importance
Your response MUST be ONLY a JSON object in this exact format:
{{"category": "one_of_the_categories_above", "priority_score": 0.1_to_1.0, "reasoning": "Brief explanation of your categorization"}}
Examples of valid responses:
{{"category": "high_priority", "priority_score": 0.9, "reasoning": "Contains urgent deadline requiring immediate action"}}
{{"category": "newsletters_and_updates", "priority_score": 0.3, "reasoning": "Marketing newsletter with no action required"}}
"""
# Attempt to get response with retry logic built into the client
response = self.model.generate_content(
prompt,
max_tokens=512,
temperature=0.3,
retries=3
)
# Validate response exists and has content
if not response or not hasattr(response, 'text') or not response.text:
logger.warning("Empty response from Qwen model after retries")
return self._fallback_categorization(email)
response_text = response.text.strip()
logger.debug(f"Qwen response: {response_text}")
# Try to extract JSON from response (in case there's extra text)
json_start = response_text.find('{')
json_end = response_text.rfind('}') + 1
if json_start == -1 or json_end == 0:
logger.warning("No valid JSON found in response")
# Try to parse in a more lenient way if possible
try:
import re
# Look for patterns like "category": "value"
category_match = re.search(r'"category"\s*:\s*"([^"]+)"', response_text)
score_match = re.search(r'"priority_score"\s*:\s*([\d.]+)', response_text)
reasoning_match = re.search(r'"reasoning"\s*:\s*"([^"]+)"', response_text)
if category_match and score_match:
category = category_match.group(1)
priority_score = float(score_match.group(1))
reasoning = reasoning_match.group(1) if reasoning_match else "AI analysis"
valid_categories = [
'high_priority', 'meetings_and_calls', 'action_required',
'newsletters_and_updates', 'personal', 'low_priority'
]
if category in valid_categories and 0 <= priority_score <= 1:
logger.info(f"Recovered partial JSON data from malformed response")
return category, priority_score, reasoning
except Exception as e:
logger.warning(f"Could not recover data from malformed response: {e}")
return self._fallback_categorization(email)
json_text = response_text[json_start:json_end]
try:
# Parse JSON with validation
result = json.loads(json_text)
# Validate required fields
if not all(key in result for key in ['category', 'priority_score', 'reasoning']):
logger.warning("Missing required fields in JSON response")
return self._fallback_categorization(email)
# Validate category is valid
valid_categories = [
'high_priority', 'meetings_and_calls', 'action_required',
'newsletters_and_updates', 'personal', 'low_priority'
]
category = result['category']
if category not in valid_categories:
logger.warning(f"Invalid category '{category}' returned")
return self._fallback_categorization(email)
# Validate priority score is a number between 0 and 1
try:
priority_score = float(result['priority_score'])
if not 0.0 <= priority_score <= 1.0:
priority_score = 0.5 # Default if out of range
except (ValueError, TypeError):
priority_score = 0.5
reasoning = str(result.get('reasoning', 'AI analysis completed'))
logger.info(f"AI categorized email '{email.get('subject', '')[:50]}...' as {category} (score: {priority_score})")
return category, priority_score, reasoning
except json.JSONDecodeError as e:
logger.warning(f"JSON parsing failed: {e}. Response was: {json_text}")
return self._fallback_categorization(email)
except Exception as e:
logger.warning(f"AI analysis failed with error: {e}")
return self._fallback_categorization(email)
def _fallback_categorization(self, email: Dict) -> Tuple[str, float, str]:
"""Fallback categorization when AI is not available"""
subject = email.get('subject', '').lower()
sender = email.get('sender', '').lower()
is_unread = email.get('is_unread', False)
# Priority keywords
if any(word in subject for word in ['urgent', 'asap', 'critical', 'emergency']):
return "high_priority", 0.9, "Contains urgent keywords"
elif any(word in subject for word in ['meeting', 'call', 'zoom', 'appointment']):
return "meetings_and_calls", 0.7, "Meeting or call related"
elif any(word in subject for word in ['action', 'required', 'todo', 'task']):
return "action_required", 0.6, "Appears to require action"
elif any(word in subject for word in ['newsletter', 'digest', 'update', 'notification']):
return "newsletters_and_updates", 0.3, "Newsletter or update"
elif is_unread:
return "personal", 0.5, "Unread personal email"
else:
return "low_priority", 0.2, "Standard email"
def _suggest_email_actions(self, email: Dict, category: str, priority_score: float) -> List[str]:
"""Suggest specific actions for each email"""
actions = []
if category == "high_priority":
actions.extend(["π Call sender immediately", "β‘ Respond within 1 hour", "π Add to priority list"])
elif category == "meetings_and_calls":
actions.extend(["π
Add to calendar", "β
Send confirmation", "π Prepare agenda"])
elif category == "action_required":
actions.extend(["βοΈ Create task", "β° Set reminder", "π Draft response"])
elif category == "newsletters_and_updates":
actions.extend(["π Schedule reading time", "ποΈ Archive after reading"])
elif priority_score > 0.6:
actions.extend(["π Review carefully", "π Respond today"])
else:
actions.extend(["π Archive if not important", "ποΈ Quick scan"])
return actions
def _generate_triage_summary(self, triage_results: Dict) -> str:
"""Generate intelligent summary of triage results based on actual email content"""
total_emails = sum(len(emails) for key, emails in triage_results.items()
if key not in ['analysis_summary', 'recommendations'])
if total_emails == 0:
return "π **No emails to analyze**\n\nYour inbox is empty or no emails match the search criteria."
# Get specific counts
high_priority_count = len(triage_results.get('high_priority', []))
action_required_count = len(triage_results.get('action_required', []))
meetings_count = len(triage_results.get('meetings_and_calls', []))
newsletters_count = len(triage_results.get('newsletters_and_updates', []))
personal_count = len(triage_results.get('personal', []))
# Get specific email subjects for personalized insights
urgent_subjects = [email['subject'][:30] + "..." for email in triage_results.get('high_priority', [])[:2]]
action_subjects = [email['subject'][:30] + "..." for email in triage_results.get('action_required', [])[:2]]
# Extract senders for more personalized recommendations
all_senders = []
for category in triage_results:
if category in ['analysis_summary', 'recommendations']:
continue
for email in triage_results[category]:
all_senders.append(self._clean_sender(email.get('sender', 'Unknown')))
top_senders = Counter(all_senders).most_common(3)
# Calculate unread percentage
unread_count = sum(1 for cat in triage_results.keys()
if cat not in ['analysis_summary', 'recommendations']
for email in triage_results[cat]
if email.get('is_unread', False))
unread_percentage = (unread_count / total_emails * 100) if total_emails > 0 else 0
# Generate personalized summary
summary = f"""
π§ **Intelligent Email Triage Analysis**
π **Overview**: Analyzed {total_emails} emails from the last 3 days
"""
# Personalized urgent section
if high_priority_count > 0:
summary += f"π¨ **Immediate Attention**: {high_priority_count} high-priority emails require urgent response\n"
if urgent_subjects:
summary += " β’ " + "\n β’ ".join(f'"{subject}"' for subject in urgent_subjects)
if high_priority_count > len(urgent_subjects):
summary += f"\n β’ ...and {high_priority_count - len(urgent_subjects)} more"
summary += "\n\n"
else:
summary += "β
**No urgent emails requiring immediate attention**\n\n"
# Personalized action section
if action_required_count > 0:
summary += f"β
**Action Items**: {action_required_count} emails need specific actions\n"
if action_subjects:
summary += " β’ " + "\n β’ ".join(f'"{subject}"' for subject in action_subjects)
if action_required_count > len(action_subjects):
summary += f"\n β’ ...and {action_required_count - len(action_subjects)} more"
summary += "\n\n"
# Add meeting info
if meetings_count > 0:
summary += f"π
**Calendar Items**: {meetings_count} meeting-related emails\n\n"
# Add distribution info
summary += f"π¦ **Email Distribution**:\n"
if newsletters_count > 0:
summary += f" β’ Newsletters/Updates: {newsletters_count} ({newsletters_count/total_emails*100:.0f}%)\n"
if personal_count > 0:
summary += f" β’ Personal: {personal_count} ({personal_count/total_emails*100:.0f}%)\n"
if action_required_count > 0:
summary += f" β’ Action Required: {action_required_count} ({action_required_count/total_emails*100:.0f}%)\n"
if high_priority_count > 0:
summary += f" β’ High Priority: {high_priority_count} ({high_priority_count/total_emails*100:.0f}%)\n"
# Add unread status
summary += f"\n㪠**Inbox Status**: {unread_count} unread emails ({unread_percentage:.0f}% of analyzed emails)\n"
# Add top senders if available
if top_senders:
summary += f"\nπ₯ **Top Senders**:\n"
for sender, count in top_senders:
summary += f" β’ {sender}: {count} emails\n"
# Add AI insights based on actual data
summary += f"""
π‘ **AI Insights**:
- {'High' if unread_percentage > 70 else 'Moderate' if unread_percentage > 30 else 'Low'} unread email ratio ({unread_percentage:.0f}%)
- {'High' if high_priority_count > 3 else 'Normal'} priority workload ({high_priority_count} urgent emails)
- {'Consider' if action_required_count > 5 else 'Manageable'} task delegation for action items ({action_required_count} tasks)
- {top_senders[0][0] if top_senders else 'No single sender'} is your most frequent correspondent ({top_senders[0][1] if top_senders else 0} emails)
"""
return summary
def _generate_workflow_recommendations(self, triage_results: Dict) -> List[str]:
"""Generate intelligent workflow recommendations"""
recommendations = []
high_priority_count = len(triage_results.get('high_priority', []))
action_count = len(triage_results.get('action_required', []))
if high_priority_count > 0:
recommendations.append(f"π¨ Handle {high_priority_count} urgent emails first")
if action_count > 5:
recommendations.append("π Consider batching similar action items")
recommendations.append("β° Set aside 2-3 hour block for email processing")
recommendations.extend([
"π
Schedule 15-min email review sessions",
"π Set up automated filters for newsletters",
"π± Enable smart notifications for high-priority senders"
])
return recommendations
def _update_email_patterns(self, emails: List[Dict]):
"""Update agent memory with email patterns"""
# Simple pattern tracking
current_time = datetime.now().isoformat()
self.memory.email_patterns[current_time] = {
"total_emails": len(emails),
"unread_count": sum(1 for e in emails if e.get('is_unread', False)),
"top_senders": self._get_sender_stats(emails)
}
def _get_sender_stats(self, emails: List[Dict]) -> Dict[str, int]:
"""Get sender statistics"""
senders = {}
for email in emails:
sender = email.get('sender', 'Unknown')
senders[sender] = senders.get(sender, 0) + 1
return dict(sorted(senders.items(), key=lambda x: x[1], reverse=True)[:5])
def proactive_assistant_chat(self, user_message: str, chat_history: List) -> Tuple[str, List]:
"""Enhanced chat with proactive suggestions and agent reasoning"""
try:
# Analyze user intent
intent, confidence = self._analyze_user_intent(user_message)
# Add to chat history first to show user message immediately
chat_history.append((user_message, None))
# Try to generate smart response with better timeout handling
try:
# Use a smaller query for recent emails with a shorter timeout
logger.info("Fetching recent emails for chat context")
recent_emails = self.mcp_client.fetch_emails(query="newer_than:3d", max_results=10)
if recent_emails and 'emails' in recent_emails:
# Use emails for context-aware response
response = self._generate_smart_response_batch(user_message, intent, confidence, recent_emails['emails'][:5])
else:
# Fallback if email fetching fails
response = self._handle_intent_fallback(user_message, intent, confidence)
# Update conversation context
self.memory.conversation_context.append({
"user_message": user_message,
"intent": intent,
"confidence": confidence,
"timestamp": datetime.now().isoformat()
})
except requests.exceptions.Timeout:
logger.error(f"Timeout when processing chat response")
response = f"π§ **AI Analysis**\nπ *Understanding: {intent.replace('_', ' ').title()}*\n\nI apologize, but I'm having trouble connecting to the advanced AI service at the moment. Here's a simplified response based on your query:\n\n{self._handle_intent_fallback(user_message, intent, confidence)}"
except Exception as e:
logger.error(f"Error generating chat response: {e}")
response = f"π§ **AI Analysis**\nπ *Understanding: {intent.replace('_', ' ').title()}*\n\nI apologize, but I encountered an error while analyzing your emails. Let me provide a simple response instead:\n\n{self._handle_intent_fallback(user_message, intent, confidence)}"
# Replace the temporary thinking message with the actual response
chat_history[-1] = (user_message, response)
return "", chat_history
except Exception as e:
error_response = f"π§ **Agent Analysis**\n\nβ Error: {str(e)}\n\nI apologize for the inconvenience. Please try a different question or check the email connection status."
# Handle the case where chat history might be empty or invalid
if not chat_history:
chat_history = []
# Add user message and error response
chat_history.append((user_message, error_response))
return "", chat_history
def _generate_smart_response_batch(self, message: str, intent: str, confidence: float, emails: List[Dict]) -> str:
"""Generate intelligent response based on intent analysis and email processing - SINGLE API CALL"""
# Add agent header without confidence score
response_header = f"π§ **AI Analysis**\n"
response_header += f"<span class='detected-intent'>Understanding: {intent.replace('_', ' ').title()}</span>\n\n"
try:
if self.model and emails:
logger.info(f"Generating smart response with email context ({len(emails)} emails)")
# Create simplified email context for chat
email_summaries = []
for i, email in enumerate(emails[:5]): # Limit to 5 emails for chat context
sender = self._clean_sender(email.get('sender', 'Unknown'))
subject = email.get('subject', 'No subject')[:60] # Limit subject length
is_unread = "unread" if email.get('is_unread', False) else "read"
email_summaries.append(f"Email {i+1}: \"{subject}\" from {sender} ({is_unread})")
# Create an improved chat prompt with better instructions
chat_prompt = f"""You are an intelligent email assistant with access to the user's actual emails. The user asked: "{message}"
I've detected their intent as: {intent}
Here are their recent emails for context:
{chr(10).join(email_summaries)}
IMPORTANT GUIDELINES:
1. Be concise but informative - keep responses under 250 words
2. Reference specific emails by subject or sender when relevant
3. Format your response with clear sections and bullet points when appropriate
4. Provide actionable advice based on the actual emails
5. Use a friendly, helpful tone
6. For search queries, mention specific matching emails
7. For summaries, group similar emails together
8. For workflow questions, suggest specific organization strategies
Make your response personalized to their actual emails. Be direct and helpful without unnecessary explanations.
"""
logger.info(f"Sending chat request to Qwen API with {len(emails)} email context")
response = self.model.generate_content(
chat_prompt,
max_tokens=512,
temperature=0.7,
strip_thinking=True, # Clean response for chat
timeout=120.0
)
if response and hasattr(response, 'text') and response.text:
ai_response = response.text.strip()
logger.info(f"Received chat response: {len(ai_response)} chars")
# Make sure we have actual content
if len(ai_response) > 10:
# Add agent analysis header
final_response = response_header + ai_response
return final_response
else:
logger.warning("AI response too short, using fallback")
return response_header + self._handle_intent_fallback(message, intent, confidence)
else:
# If we got no response, fall back to simpler intent handling
logger.warning("Empty response from Qwen for chat, using fallback")
return response_header + self._handle_intent_fallback(message, intent, confidence)
else:
# No model or emails available
logger.info("Using fallback response (no model or emails available)")
return response_header + self._handle_intent_fallback(message, intent, confidence)
except requests.exceptions.Timeout:
logger.warning(f"Timeout in Qwen response generation - using fallback")
return response_header + "I'll help with your request using my basic capabilities:\n\n" + self._handle_intent_fallback(message, intent, confidence)
except Exception as e:
logger.warning(f"Error generating smart response: {e}")
return response_header + self._handle_intent_fallback(message, intent, confidence)
def _analyze_user_intent(self, message: str) -> Tuple[str, float]:
"""Analyze user intent and confidence level"""
message_lower = message.lower()
# Intent patterns with confidence scores
intent_patterns = {
"email_search": (["find", "search", "look for", "show me"], 0.8),
"email_summary": (["summarize", "summary", "overview", "brief"], 0.9),
"workflow_automation": (["automate", "organize", "cleanup", "triage"], 0.7),
"meeting_prep": (["meeting", "call", "appointment", "schedule"], 0.8),
"priority_focus": (["urgent", "important", "priority", "critical"], 0.9),
"general_help": (["help", "how to", "what can", "assistance"], 0.6)
}
best_intent = "general_help"
best_confidence = 0.3
for intent, (keywords, base_confidence) in intent_patterns.items():
matches = sum(1 for keyword in keywords if keyword in message_lower)
if matches > 0:
confidence = min(base_confidence + (matches - 1) * 0.1, 1.0)
if confidence > best_confidence:
best_intent = intent
best_confidence = confidence
return best_intent, best_confidence
def _generate_smart_response(self, message: str, intent: str, confidence: float) -> str:
"""Generate intelligent response based on intent analysis using real email data"""
# Add header without confidence
response_header = f"π§ **AI Analysis**\n"
response_header += f"<span class='detected-intent'>Understanding: {intent.replace('_', ' ').title()}</span>\n\n"
try:
# Get real email context for better responses
recent_emails = self.mcp_client.fetch_emails(query="newer_than:7d", max_results=15)
if self.model and recent_emails and 'emails' in recent_emails:
# Use Qwen with real email context
return self._generate_qwen_response(message, intent, confidence, recent_emails['emails'])
else:
# Fallback to intent-based handling with real data
if intent == "email_search" and confidence > 0.7:
return response_header + self._handle_search_intent(message)
elif intent == "email_summary" and confidence > 0.8:
return response_header + self._handle_summary_intent(message)
elif intent == "workflow_automation":
return response_header + self._handle_automation_intent(message)
elif intent == "meeting_prep":
return response_header + self._handle_meeting_intent(message)
elif intent == "priority_focus":
return response_header + self._handle_priority_intent(message)
else:
return response_header + self._handle_general_intent(message)
except Exception as e:
return f"Error generating smart response: {str(e)}"
def _handle_search_intent(self, message: str) -> str:
"""Handle search-related queries"""
# Extract search terms from message
search_terms = self._extract_search_terms(message)
if search_terms:
# Execute search
results = self.mcp_client.search_emails(
subject_contains=" OR ".join(search_terms),
max_results=10
)
if results and 'emails' in results:
count = len(results['emails'])
return f"π **Search Results**: Found {count} emails matching your criteria.\n\n" + \
f"**Search Terms Used**: {', '.join(search_terms)}\n\n" + \
"π§ **Top Matches**:\n" + \
"\n".join([f"β’ {email['subject']} (from {self._clean_sender(email['sender'])})"
for email in results['emails'][:3]])
return "π I can help you search your emails! Try being more specific, like:\n" + \
"β’ 'Find emails from John about the project'\n" + \
"β’ 'Search for meeting emails from last week'\n" + \
"β’ 'Show me emails with attachments'"
def _handle_summary_intent(self, message: str) -> str:
"""Handle summary-related queries"""
# Determine time period
if "week" in message.lower():
days = 7
elif "month" in message.lower():
days = 30
elif "today" in message.lower():
days = 1
else:
days = 3 # Default
summary_result = self.mcp_client.summarize_emails(days=days, include_body=True)
if summary_result:
return f"π **Email Summary - Last {days} Days**\n\n" + \
f"π§ Total: {summary_result.get('total_emails', 0)} emails\n" + \
f"π΄ Unread: {summary_result.get('unread_count', 0)} emails\n\n" + \
"π‘ **Agent Recommendation**: Focus on unread emails first for maximum efficiency."
return f"π No emails found for the last {days} days."
def _handle_automation_intent(self, message: str) -> str:
"""Handle workflow automation queries"""
return """π€ **Email Organization Options**:
π **Email Analysis** - Get insights about your emails
π― **Priority Focus** - Focus on high-priority emails
π§Ή **Inbox Management** - Get organization suggestions
π
**Meeting Preparation** - Find meeting-related emails
π‘ **Try asking**: "Help me organize my inbox" or "Analyze my emails and provide recommendations"
"""
def _handle_meeting_intent(self, message: str) -> str:
"""Handle meeting-related queries"""
meeting_workflow = self._meeting_preparation_workflow()
return f"π
**Meeting Assistant Activated**\n\n{meeting_workflow}"
def _handle_priority_intent(self, message: str) -> str:
"""Handle priority/urgent email queries"""
priority_workflow = self._priority_focus_workflow()
return f"π¨ **Priority Mode Activated**\n\n{priority_workflow}"
def _handle_general_intent(self, message: str) -> str:
"""Handle general queries with proactive suggestions"""
return """π€ **How can I help you today?**
I'm your intelligent email assistant with advanced capabilities:
π§ **Smart Features**:
β’ Intent recognition and context awareness
β’ Automated email triage and categorization
β’ Intelligent recommendations
β’ Email pattern analysis
π¬ **Try asking me**:
β’ "Show me my most important emails"
β’ "Help me organize my inbox"
β’ "What emails do I have from [specific sender]?"
β’ "Summarize emails from this week"
π― **Proactive Suggestion**: Let me analyze your emails to provide personalized recommendations!"""
def _extract_search_terms(self, message: str) -> List[str]:
"""Extract search terms from user message"""
# Simple keyword extraction
words = message.lower().split()
stop_words = {'find', 'search', 'show', 'me', 'my', 'from', 'about', 'with', 'for', 'the', 'a', 'an'}
return [word for word in words if word not in stop_words and len(word) > 2]
def _clean_sender(self, sender: str) -> str:
"""Clean sender email for display"""
if '<' in sender:
email_match = re.search(r'<([^>]+)>', sender)
if email_match:
return email_match.group(1)
return sender
def _generate_qwen_response(self, message: str, intent: str, confidence: float, emails: List[Dict]) -> str:
"""Generate response using Qwen with real email context"""
try:
# Prepare email context for Qwen - MINIMIZE CONTENT TO AVOID TIMEOUT
email_context = []
# Include at most 3 emails to avoid timeout issues
for email in emails[:2]: # Further reduced from 3 to 2
email_context.append({
'subject': email.get('subject', 'No subject')[:20], # Further limit subject length
'sender': self._clean_sender(email.get('sender', 'Unknown')),
'is_unread': email.get('is_unread', False)
# Removed more fields to reduce prompt size
})
# Create an even simpler system prompt to avoid timeouts
system_prompt = f"""
You are an email assistant. Answer the user's query about their emails briefly.
USER QUERY: "{message}"
INTENT: {intent}
EMAIL SAMPLE: {len(email_context)} emails
"""
# Use a shorter max_tokens and higher temperature for faster responses
response = self.model.generate_content(
system_prompt,
max_tokens=512, # Reduced from 1024 to 512
temperature=0.7,
strip_thinking=True,
retries=2
)
if response and hasattr(response, 'text') and response.text:
ai_response = response.text.strip()
# Add header without confidence
final_response = f"π§ **AI Analysis**\n"
final_response += f"<span class='detected-intent'>Understanding: {intent.replace('_', ' ').title()}</span>\n\n"
final_response += ai_response
return final_response
else:
# If we got no response, fall back to simpler intent handling
logger.warning("Empty response from Qwen, using fallback")
return self._handle_intent_fallback(message, intent, confidence)
except requests.exceptions.Timeout:
logger.warning(f"Timeout in Qwen response generation - using fallback")
response_header = f"π§ **AI Analysis**\n"
response_header += f"<span class='detected-intent'>Understanding: {intent.replace('_', ' ').title()}</span>\n\n"
response_header += "I'll help with your request using my basic capabilities:\n\n"
return response_header + self._handle_intent_fallback(message, intent, confidence)
except Exception as e:
logger.warning(f"Qwen response generation failed: {e}")
return self._handle_intent_fallback(message, intent, confidence)
def _handle_intent_fallback(self, message: str, intent: str, confidence: float) -> str:
"""Fallback intent handling when Qwen is not available"""
response_header = f"π§ **AI Analysis**\n"
response_header += f"<span class='detected-intent'>Understanding: {intent.replace('_', ' ').title()}</span>\n\n"
if intent == "email_search" and confidence > 0.7:
return response_header + self._handle_search_intent(message)
elif intent == "email_summary" and confidence > 0.8:
return response_header + self._handle_summary_intent(message)
elif intent == "workflow_automation":
return response_header + self._handle_automation_intent(message)
elif intent == "meeting_prep":
return response_header + self._handle_meeting_intent(message)
elif intent == "priority_focus":
return response_header + self._handle_priority_intent(message)
else:
return response_header + self._handle_general_intent(message)
def get_connection_status(self) -> Tuple[str, str]:
"""Get the current connection status of the MCP client"""
try:
# Test connection by attempting to fetch a small number of emails
test_result = self.mcp_client.fetch_emails(query="newer_than:1d", max_results=1)
if test_result and 'emails' in test_result:
total_count = test_result.get('total_count', 0)
return f"Connected β
({total_count} emails accessible)", "success"
else:
return "Connected but no emails found π", "warning"
except Exception as e:
return f"Connection Error β ({str(e)[:50]}...)", "error"
def _display_triage_results(self, triage_results: Dict) -> str:
"""Format triage results for beautiful, readable display in Gradio"""
if "error" in triage_results:
return f"β **Error:** {triage_results['error']}"
# Start with a clean, formatted header
output = "# π§ **AI Email Analysis Results**\n\n"
# Add the analysis summary with better formatting
summary = triage_results.get("analysis_summary", "")
if summary:
output += summary + "\n\n"
output += "---\n\n" # Add separator
# Count emails by category for overview
categories = [
("high_priority", "π¨ **High Priority**", "Urgent attention needed"),
("meetings_and_calls", "π
**Meetings & Calls**", "Calendar items"),
("action_required", "β
**Action Required**", "Tasks to complete"),
("newsletters_and_updates", "π° **Newsletters**", "Updates & info"),
("personal", "π€ **Personal**", "Personal messages"),
("low_priority", "π **Low Priority**", "Can wait")
]
# Count total emails processed
total_processed = sum(len(triage_results.get(cat, [])) for cat, _, _ in categories)
if total_processed == 0:
return output + "π **No emails found to analyze.**"
# Add quick stats overview with visual bar chart
output += f"## π **Quick Overview** ({total_processed} emails analyzed)\n\n"
# Create a visual bar chart representation
output += "| Category | Count | Distribution |\n"
output += "|----------|-------|-------------|\n"
for category_key, category_icon, category_desc in categories:
count = len(triage_results.get(category_key, []))
if total_processed > 0:
percentage = (count / total_processed * 100)
# Create a visual bar using emoji
bar_length = int(percentage / 5) # 5% per character
bar = "β" * bar_length if bar_length > 0 else ""
output += f"| {category_icon} | {count} | {bar} {percentage:.0f}% |\n"
output += "\n---\n\n"
# Show detailed breakdown for important categories
priority_categories = ["high_priority", "meetings_and_calls", "action_required"]
for category_key, category_title, category_desc in categories:
emails = triage_results.get(category_key, [])
if not emails:
continue
# Only show full details for priority categories, summary for others
show_full_details = category_key in priority_categories
output += f"## {category_title} ({len(emails)} emails)\n"
output += f"*{category_desc}*\n\n"
if show_full_details:
# Show detailed view for important emails
for i, email in enumerate(emails[:5]): # Increased from 3 to 5 emails
priority_score = email.get('priority_score', 0.5)
priority_icon = "π₯" if priority_score > 0.8 else "β οΈ" if priority_score > 0.6 else "π"
subject = email.get('subject', 'No subject')
sender = self._clean_sender(email.get('sender', 'Unknown'))
# Truncate for readability
if len(subject) > 60:
subject = subject[:57] + "..."
if len(sender) > 35:
sender = sender[:32] + "..."
output += f"### {priority_icon} {subject}\n"
output += f"π€ **From:** {sender} | β **Priority:** {priority_score:.1f}/1.0\n\n"
# Add AI reasoning if available - improved formatting
reasoning = email.get('ai_reasoning', '')
if reasoning and len(reasoning) > 5:
output += f"π§ **AI Analysis:** {reasoning}\n\n"
# Add suggested actions with better formatting
actions = email.get('suggested_actions', [])
if actions:
output += "π‘ **Suggested Actions:**\n"
for action in actions[:3]: # Limit to 3 actions
output += f"- {action}\n"
output += "\n"
output += "---\n\n"
# Show count if there are more emails
if len(emails) > 5:
output += f"*...and {len(emails) - 5} more emails in this category*\n\n"
else:
# Show summary view for less important categories
output += "<div class='email-summary-table'>\n\n"
output += "| Subject | From | Priority |\n"
output += "|---------|------|----------|\n"
for email in emails[:5]: # Show up to 5 emails
subject = email.get('subject', 'No subject')
if len(subject) > 40:
subject = subject[:37] + "..."
sender = self._clean_sender(email.get('sender', 'Unknown'))
if len(sender) > 25:
sender = sender[:22] + "..."
priority = email.get('priority_score', 0.5)
priority_icon = "π₯" if priority > 0.8 else "β οΈ" if priority > 0.6 else "π"
output += f"| {subject} | {sender} | {priority_icon} {priority:.1f} |\n"
output += "\n</div>\n\n"
if len(emails) > 5:
output += f"*...and {len(emails) - 5} more emails in this category*\n\n"
# Add recommendations section with better formatting
recommendations = triage_results.get("recommendations", [])
if recommendations:
output += "---\n\n"
output += "## π‘ **AI Recommendations**\n\n"
for i, rec in enumerate(recommendations[:5], 1): # Limit to 5 recommendations
output += f"{i}. {rec}\n"
output += "\n"
# Add concise next steps with better visual separation
high_count = len(triage_results.get('high_priority', []))
action_count = len(triage_results.get('action_required', []))
output += "---\n\n"
output += "## π― **What to Do Next**\n\n"
output += "<div class='next-steps'>\n\n"
if high_count > 0:
output += f"1. π¨ **URGENT:** Handle {high_count} high-priority emails first\n\n"
if action_count > 0:
output += f"2. β
**TODAY:** Complete {action_count} action items\n\n"
output += f"3. π
**SCHEDULE:** Review meeting emails and update calendar\n\n"
output += f"4. ποΈ **ORGANIZE:** Archive newsletters and low-priority items\n\n"
output += "</div>\n\n"
output += "---\n\n"
output += "*π€ Analysis powered by Qwen AI β’ Results based on your actual email content*"
return output
# Create the enhanced Gradio interface
def create_enhanced_gradio_interface(modal_api_url: str = None):
"""Create the enhanced Gradio interface for the Gmail AI Agent"""
# Use environment variable if no URL provided
modal_api_url = modal_api_url or MODAL_API_URL
# Initialize the agent
agent = EnhancedGmailAgent(modal_api_url=modal_api_url)
# Check if we have valid credentials
mcp_status, model_status = agent.get_connection_status()
# Theme configuration
theme = gr.Theme(
primary_hue="indigo",
secondary_hue="blue",
neutral_hue="slate",
)
# Custom CSS for better styling
custom_css = """
/* ... existing code ... */
"""
with gr.Blocks(theme=theme, title="Enhanced Gmail AI Agent - Track 3") as app:
# Enhanced Header
gr.HTML("""
<div class="main-header">
<h1>π€ Enhanced Gmail AI Agent</h1>
<p>Track 3: Sophisticated Agent Behaviors & Workflow Automation</p>
<p style="font-size: 1.1rem; margin-top: 10px;">Intelligent Decision-Making β’ Workflow Automation β’ Proactive Assistance</p>
</div>
""")
# Warning about initial response time
gr.HTML("""
<div style="background-color: #FF9800; color: #000000; padding: 15px; border-radius: 5px; margin-bottom: 20px; border: 2px solid #FF5722; box-shadow: 0 2px 5px rgba(0,0,0,0.2);">
<h3 style="margin-top: 0; font-size: 1.3rem; color: #000000;">β οΈ IMPORTANT: Initial Loading Time</h3>
<p style="font-size: 1.1rem; font-weight: 500; margin: 10px 0;">On first run, AI responses may take <strong style="color: #D32F2F; font-size: 1.2rem;">120-200 seconds</strong> because the Qwen LLM is hosted on Modal and requires time for cold start.</p>
<p style="font-size: 1.1rem;">Subsequent requests will be much faster after the initial cold start. Please be patient during your first interaction.</p>
</div>
""")
# Status section with Qwen model status
with gr.Row():
with gr.Column():
status_text = gr.HTML(elem_classes=["status-card"])
def update_status():
email_status, email_status_type = agent.get_connection_status()
# Add Qwen model status
qwen_status = agent.model_status if hasattr(agent, 'model_status') else "Unknown"
qwen_class = "status-connected" if agent.model else "status-error"
return f'''<div class="status-card" style="background-color: #1A237E; color: white; padding: 12px; border-radius: 5px; margin-bottom: 15px; box-shadow: 0 2px 5px rgba(0,0,0,0.2);">
<div style="font-size: 1.1rem; font-weight: bold; margin-bottom: 5px;">π§ Qwen Model Status: <span style="color: {'#4CAF50' if agent.model else '#F44336'};">{qwen_status}</span></div>
<p style="margin: 5px 0; font-size: 0.9rem;">Note: The Qwen API will be tested when you first use an AI feature</p>
</div>'''
status_text.value = update_status()
with gr.Tabs() as tabs:
# Intelligent Triage Tab with loading
with gr.Tab("π§ Intelligent Triage"):
with gr.Column(elem_classes=["content-card"]):
gr.HTML('<h2 class="section-header">π§ AI-Powered Email Triage & Analysis</h2>')
gr.HTML("""
<div class="info-box">
<h3>β¨ Smart Email Analysis</h3>
<p>The agent analyzes your emails using AI to categorize, prioritize, and suggest specific actions. Each email gets a priority score and reasoning.</p>
<p><strong>β±οΈ Expected Time:</strong> 20-30 seconds for thorough analysis</p>
</div>
""")
# First-run warning
gr.HTML("""
<div style="background-color: #FF9800; color: #000000; padding: 12px; border-radius: 5px; margin-bottom: 15px; border: 2px solid #FF5722; box-shadow: 0 2px 5px rgba(0,0,0,0.2);">
<p style="margin: 0; font-weight: 600; font-size: 1rem;"><span style="font-size: 1.2rem;">β οΈ</span> <strong>IMPORTANT:</strong> First-time AI operations may take <strong style="color: #D32F2F;">120-200 seconds</strong> due to Modal cold start.</p>
</div>
""")
with gr.Row():
max_emails_triage = gr.Slider(
minimum=10, maximum=50, value=20, step=5,
label="π Emails to Analyze",
info="More emails = more thorough analysis but longer wait time"
)
triage_btn = gr.Button("π§ Run Intelligent Triage", variant="primary", size="lg")
# Loading state for triage
triage_loading = gr.HTML(visible=False)
# Updated to use a scrollable container with proper styling
triage_output = gr.Markdown(
label="π― Triage Results",
value="Click 'Run Intelligent Triage' to see AI-powered email analysis with specific recommendations",
elem_classes=["triage-results"],
show_label=True
)
def run_triage_with_loading(max_emails):
loading_html = f"""
<div class="loading-container">
<div class="loading-spinner"></div>
<h3>π§ Analyzing Your {max_emails} Most Recent Emails...</h3>
<p>AI is categorizing, prioritizing, and generating recommendations</p>
<div class="progress-bar">
<div class="progress-fill"></div>
</div>
<p style="color: #666; font-size: 14px;">β±οΈ This usually takes 20-30 seconds β’ Please wait patiently</p>
<p style="color: #888; font-size: 12px;">π Processing: Fetch emails β AI analysis β Priority scoring β Recommendations</p>
</div>
"""
return (
gr.update(value=loading_html, visible=True),
gr.update(visible=False),
gr.update(interactive=False)
)
def run_triage_complete(max_emails):
results = agent.intelligent_email_triage(max_emails)
if "error" in results:
output = f"β **Error:** {results['error']}"
else:
# Use the beautifully formatted display instead of manual formatting
output = results.get("formatted_display", "Analysis complete but no formatted output available.")
return (
gr.update(visible=False),
gr.update(value=output, visible=True),
gr.update(interactive=True)
)
triage_btn.click(
run_triage_with_loading,
inputs=[max_emails_triage],
outputs=[triage_loading, triage_output, triage_btn]
).then(
run_triage_complete,
inputs=[max_emails_triage],
outputs=[triage_loading, triage_output, triage_btn]
)
# Proactive AI Assistant Tab
with gr.Tab("π€ Proactive Assistant"):
with gr.Column(elem_classes=["content-card"]):
gr.HTML('<h2 class="section-header">π€ Intelligent AI Assistant</h2>')
gr.HTML("""
<div class="info-box">
<h3>π§ Advanced AI Capabilities</h3>
<p>This AI assistant analyzes your <strong>actual email data</strong> using Qwen model to provide personalized, specific recommendations:</p>
<ul>
<li><strong>Intent Recognition:</strong> Understands what you want with confidence scoring</li>
<li><strong>Real Email Analysis:</strong> Uses your actual recent emails for context</li>
<li><strong>Specific Recommendations:</strong> Mentions actual email subjects and senders</li>
<li><strong>Proactive Suggestions:</strong> Based on your email patterns and priorities</li>
</ul>
</div>
""")
# First-run warning
gr.HTML("""
<div style="background-color: #FF9800; color: #000000; padding: 12px; border-radius: 5px; margin-bottom: 15px; border: 2px solid #FF5722; box-shadow: 0 2px 5px rgba(0,0,0,0.2);">
<p style="margin: 0; font-weight: 600; font-size: 1rem;"><span style="font-size: 1.2rem;">β οΈ</span> <strong>IMPORTANT:</strong> First-time AI operations may take <strong style="color: #D32F2F;">120-200 seconds</strong> due to Modal cold start.</p>
</div>
""")
# Improved chatbot with proper scrolling and styling
with gr.Column(elem_classes=["chat-container"]):
chatbot = gr.Chatbot(
height=500,
container=True,
value=[(None, "π€ **Welcome to your Enhanced Email Assistant!**\n\nI'm powered by advanced AI and can analyze your actual emails to provide specific, personalized recommendations.\n\nβ οΈ **IMPORTANT: First-time responses may take 120-200 seconds** due to the Qwen model's cold start on Modal. Please be patient!\n\nπ‘ **Try asking me:**\nβ’ \"What emails should I focus on today?\"\nβ’ \"Help me organize my inbox based on my recent emails\"\nβ’ \"Show me my most important unread emails\"\nβ’ \"Summarize my emails from this week\"\n\nI'll analyze your real email data and give you specific advice! β¨")],
elem_classes=["enhanced-chatbot", "chat-messages"]
)
with gr.Row(elem_classes=["chat-input-container"]):
chat_input = gr.Textbox(
placeholder="Ask me about your emails or request email analysis...",
scale=4,
lines=2,
max_lines=8,
elem_classes=["enhanced-input"],
autofocus=True,
show_label=False
)
chat_send = gr.Button("π€ Send", variant="primary", scale=1, elem_classes=["send-button"])
chat_send.click(
agent.proactive_assistant_chat,
inputs=[chat_input, chatbot],
outputs=[chat_input, chatbot]
)
chat_input.submit(
agent.proactive_assistant_chat,
inputs=[chat_input, chatbot],
outputs=[chat_input, chatbot]
)
# Enhanced Footer
gr.HTML("""
<div class="enhanced-footer">
<h3>π Track 3: Enhanced Agentic Demo Features</h3>
<p>
β¨ <strong>Specific Email Recommendations</strong> β’
π <strong>Real Data Analysis</strong> β’
π§ <strong>Qwen-Powered Intelligence</strong>
</p>
<p style="font-size: 14px;">
Enhanced with real-time email analysis, smart loading states, and beautiful UI design
</p>
</div>
""")
return app
if __name__ == "__main__":
print("π Starting Enhanced Gmail AI Agent (Track 3)")
print("π§ Features: Intelligent Triage | Smart Workflows | Proactive Assistant")
# Get Modal API URL from environment variable
modal_api_url = MODAL_API_URL
if modal_api_url:
print(f"π‘ Using Modal API URL from environment: {modal_api_url}")
else:
print("β οΈ No Modal API URL found in environment variables. Some features may be limited.")
app = create_enhanced_gradio_interface(modal_api_url=modal_api_url)
app.launch(
server_name="0.0.0.0", # Use 0.0.0.0 to allow external connections (needed for Hugging Face Spaces)
server_port=7860, # Use port 7860 for Hugging Face Spaces
share=False,
show_error=True
)
# Create the Gradio app for Hugging Face Spaces
app = create_enhanced_gradio_interface(modal_api_url=MODAL_API_URL) |