import os import ssl import smtplib from datetime import datetime from email.message import EmailMessage from typing import Optional, List, Union from pydantic import BaseModel, EmailStr, Field # === ENV SETTINGS === EMAIL_SENDER = os.getenv("EMAIL_SENDER", "your@email.com") EMAIL_PASSWORD = os.getenv("EMAIL_PASSWORD", "yourpassword") SMTP_SERVER = os.getenv("SMTP_SERVER", "smtp.gmail.com") SMTP_PORT = int(os.getenv("SMTP_PORT", 465)) USE_SSL = os.getenv("USE_SSL", "true").lower() in ("true", "1", "yes") EMAIL_TEST_MODE = os.getenv("EMAIL_TEST_MODE", "false").lower() in ("true", "1", "yes") # === Models === class EmailInput(BaseModel): to: Union[EmailStr, List[EmailStr]] = Field(..., description="Recipient(s) email address") subject: str = Field(..., description="Email subject line") body: str = Field(..., description="Plaintext or HTML body") html: Optional[bool] = Field(default=False, description="Send as HTML?") attachments: Optional[List[str]] = Field(default=None, description="List of file paths to attach") # === Email Body Generator === def generate_email(recipient_name: str, product_name: str, discount: float) -> str: return f"""\ Dear {recipient_name}, 🎉 Great news! We're offering you an exclusive {discount}% discount on {product_name}. Visit our site today and don't miss out! Best regards, Your AI Assistant """ # === Email Sending Core === def send_email(email_input: EmailInput) -> dict: msg = EmailMessage() msg["From"] = EMAIL_SENDER msg["To"] = ", ".join(email_input.to) if isinstance(email_input.to, list) else email_input.to msg["Subject"] = email_input.subject if email_input.html: msg.add_alternative(email_input.body, subtype="html") else: msg.set_content(email_input.body) # Add attachments if email_input.attachments: for path in email_input.attachments: if os.path.exists(path): with open(path, "rb") as f: data = f.read() filename = os.path.basename(path) msg.add_attachment(data, maintype="application", subtype="octet-stream", filename=filename) # Early exit in test mode if EMAIL_TEST_MODE: return { "status": "✅ TEST MODE — email not actually sent", "to": msg["To"], "subject": msg["Subject"], "timestamp": datetime.utcnow().isoformat() } # Send email via SMTP try: if USE_SSL: context = ssl.create_default_context() with smtplib.SMTP_SSL(SMTP_SERVER, SMTP_PORT, context=context) as smtp: smtp.login(EMAIL_SENDER, EMAIL_PASSWORD) smtp.send_message(msg) else: with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as smtp: smtp.ehlo() smtp.starttls(context=ssl.create_default_context()) smtp.ehlo() smtp.login(EMAIL_SENDER, EMAIL_PASSWORD) smtp.send_message(msg) return { "status": "✅ Email sent successfully", "to": msg["To"], "subject": msg["Subject"], "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "status": f"❌ Failed to send email: {e}", "to": msg["To"], "subject": msg["Subject"], "timestamp": datetime.utcnow().isoformat() } # === Example usage === if __name__ == "__main__": email_body = generate_email("Alice", "Super Serum", 25) email_data = EmailInput( to=["alice@example.com", "bob@example.com"], subject="🎁 Special Deal Just For You!", body=email_body, html=False, attachments=["/path/to/brochure.pdf"] ) result = send_email(email_data) print(result)