diff --git a/poetry.lock b/poetry.lock index 46a6b0e..0f9074b 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1771,4 +1771,4 @@ test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools", [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "0862d43be83826d540d263abb4a5ae526904ac1a047cd3c5ce155042c5bcdfa8" +content-hash = "1641beb55bac3d47aa608785834060e68c40fcf0c01f2a702c2808ee34c3e049" diff --git a/pyproject.toml b/pyproject.toml index 5b233b8..eb45a57 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ setuptools = "^70.2.0" wheel = "^0.43.0" twine = "^5.1.1" sphinx-rtd-theme = "^2.0.0" +requests = "^2.32.3" [tool.poetry.group.dev.dependencies] black = "^24.4.2" diff --git a/sage_imap/services/email.py b/sage_imap/services/email.py index 96834b5..5597aa1 100644 --- a/sage_imap/services/email.py +++ b/sage_imap/services/email.py @@ -1,5 +1,6 @@ import logging import os +import re import smtplib import socket from email import encoders @@ -7,6 +8,7 @@ from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.utils import formatdate, make_msgid +from pathlib import Path from typing import Any, Dict, List, Optional import requests @@ -20,29 +22,34 @@ SpamResult, ) +EmailAddress = str +HeaderDict = Dict[str, str] +AttachmentList = List[Path] + class SmartEmailMessage: def __init__( self, subject: str, body: str, - from_email: Optional[str] = None, - to: Optional[List[str]] = None, - cc: Optional[List[str]] = None, - bcc: Optional[List[str]] = None, - extra_headers: Optional[Dict[str, str]] = None, + from_email: Optional[EmailAddress] = None, + to: Optional[List[EmailAddress]] = None, + cc: Optional[List[EmailAddress]] = None, + bcc: Optional[List[EmailAddress]] = None, + extra_headers: Optional[HeaderDict] = None, + body_html: Optional[str] = None, # Add body_html parameter **kwargs: Any, ): logging.info("Initializing SmartEmailMessage") self.subject = subject self.body = body + self.body_html = body_html # Store HTML body self.from_email = from_email - self.to = to or [] - self.cc = cc or [] - self.bcc = bcc or [] + self.to = self._sanitize_email_list(to) + self.cc = self._sanitize_email_list(cc) + self.bcc = self._sanitize_email_list(bcc) - # Combine additional attributes into dictionaries - self.attachments = kwargs.get("attachments", []) + self.attachments: AttachmentList = kwargs.get("attachments", []) self.message_id = self._generate_message_id() self.date = self._generate_date() @@ -53,8 +60,7 @@ def __init__( self.update_attachment_status() self.update_content_type_and_encoding() - # Default headers - self.default_headers = { + self.default_headers: HeaderDict = { "MIME-Version": "1.0", "Content-Type": self.content_type, "Content-Transfer-Encoding": self.content_transfer_encoding, @@ -67,14 +73,14 @@ def __init__( "References": "", "Reply-To": "", "X-Originating-IP": self.originating_ip, - "X-Priority": Priority.NORMAL.value, + "X-Priority": Priority.NORMAL, "X-MS-Has-Attach": self.has_attach, "X-Report-Abuse-To": "", - "X-Spamd-Result": SpamResult.DEFAULT.value, - "X-Auto-Response-Suppress": AutoResponseSuppress.ALL.value, + "X-Spamd-Result": SpamResult.DEFAULT, + "X-Auto-Response-Suppress": AutoResponseSuppress.ALL, } - self.extra_headers = self.merge_headers(extra_headers or {}) + self.extra_headers: HeaderDict = self.merge_headers(extra_headers or {}) self.validate_headers() def _generate_message_id(self) -> str: @@ -88,7 +94,7 @@ def _generate_date(self) -> str: def _get_originating_ip(self) -> str: logging.debug("Getting originating IP") try: - ip = requests.get("https://api.ipify.org", timeout=5).text + ip = requests.get("https://api.ipify.org", timeout=5, verify=True).text logging.info(f"Originating IP: {ip}") return ip except requests.RequestException as e: @@ -102,11 +108,14 @@ def update_attachment_status(self) -> None: def update_content_type_and_encoding(self) -> None: logging.debug("Updating content type and encoding") if self.attachments: - self.content_type = ContentType.MULTIPART.value - self.content_transfer_encoding = ContentTransferEncoding.BASE64.value + self.content_type = "multipart/mixed" + self.content_transfer_encoding = ContentTransferEncoding.BASE64 + elif self.body_html: + self.content_type = "multipart/alternative" + self.content_transfer_encoding = ContentTransferEncoding.SEVEN_BIT else: - self.content_type = ContentType.PLAIN.value - self.content_transfer_encoding = ContentTransferEncoding.SEVEN_BIT.value + self.content_type = ContentType.PLAIN + self.content_transfer_encoding = ContentTransferEncoding.SEVEN_BIT def _generate_received_header(self) -> str: logging.debug("Generating received header") @@ -128,7 +137,7 @@ def _generate_received_header(self) -> str: ) return header - def merge_headers(self, extra_headers: Dict[str, str]) -> Dict[str, str]: + def merge_headers(self, extra_headers: HeaderDict) -> HeaderDict: logging.debug("Merging extra headers with default headers") headers = self.default_headers.copy() headers.update(extra_headers) @@ -137,19 +146,19 @@ def merge_headers(self, extra_headers: Dict[str, str]) -> Dict[str, str]: def validate_headers(self) -> None: logging.debug("Validating headers") priority = self.extra_headers.get("X-Priority") - if priority and priority not in Priority._value2member_map_: + if priority and priority != Priority.NORMAL: logging.error(f"Invalid X-Priority header value: {priority}") raise EmailException(f"Invalid X-Priority header value: {priority}") spamd_result = self.extra_headers.get("X-Spamd-Result") - if spamd_result and spamd_result not in SpamResult._value2member_map_: + if spamd_result and spamd_result != SpamResult.DEFAULT: logging.error(f"Invalid X-Spamd-Result header value: {spamd_result}") raise EmailException(f"Invalid X-Spamd-Result header value: {spamd_result}") auto_response_suppress = self.extra_headers.get("X-Auto-Response-Suppress") if ( auto_response_suppress - and auto_response_suppress not in AutoResponseSuppress._value2member_map_ + and auto_response_suppress != AutoResponseSuppress.ALL ): logging.error( f"Invalid X-Auto-Response-Suppress header value: {auto_response_suppress}" @@ -159,15 +168,23 @@ def validate_headers(self) -> None: ) content_type = self.extra_headers.get("Content-Type") - if content_type and content_type not in ContentType._value2member_map_: + valid_content_types = [ + ContentType.PLAIN, + "multipart/mixed", + "multipart/alternative", + ] + if content_type and content_type not in valid_content_types: logging.error(f"Invalid Content-Type header value: {content_type}") raise EmailException(f"Invalid Content-Type header value: {content_type}") content_transfer_encoding = self.extra_headers.get("Content-Transfer-Encoding") + valid_encodings = [ + ContentTransferEncoding.SEVEN_BIT, + ContentTransferEncoding.BASE64, + ] if ( content_transfer_encoding - and content_transfer_encoding - not in ContentTransferEncoding._value2member_map_ + and content_transfer_encoding not in valid_encodings ): logging.error( f"Invalid Content-Transfer-Encoding header value: {content_transfer_encoding}" @@ -182,12 +199,16 @@ def send( smtp_port: int, smtp_user: str, smtp_password: str, - use_tls: bool = False, + use_tls: bool = True, # Default to TLS use_ssl: bool = False, ) -> None: logging.info("Sending email") try: - msg = MIMEMultipart() + if self.body_html: + msg = MIMEMultipart("alternative") + else: + msg = MIMEMultipart() + msg["Subject"] = self.subject msg["From"] = self.from_email msg["To"] = ", ".join(self.to) @@ -200,16 +221,23 @@ def send( msg.add_header(header, value) msg.attach(MIMEText(self.body, "plain")) + if self.body_html: + msg.attach(MIMEText(self.body_html, "html")) for attachment_path in self.attachments: + if not os.path.isfile(attachment_path): + logging.error(f"Attachment file does not exist: {attachment_path}") + raise EmailException( + f"Attachment file does not exist: {attachment_path}" + ) + with open(attachment_path, "rb") as file: part = MIMEBase("application", "octet-stream") part.set_payload(file.read()) encoders.encode_base64(part) part.add_header( "Content-Disposition", - "attachment", - filename=os.path.basename(attachment_path), + f'attachment; filename="{os.path.basename(attachment_path)}"', ) msg.attach(part) @@ -232,3 +260,18 @@ def send( except Exception as e: logging.error(f"An unexpected error occurred: {e}") raise EmailException(f"An unexpected error occurred: {e}") + + def _sanitize_email_list( + self, email_list: Optional[List[EmailAddress]] + ) -> List[EmailAddress]: + email_regex = re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$") + if not email_list: + return [] + sanitized_list: List[EmailAddress] = [] + for email in email_list: + sanitized_email = email.strip() + if not email_regex.match(sanitized_email): + logging.error(f"Invalid email address: {sanitized_email}") + raise EmailException(f"Invalid email address: {sanitized_email}") + sanitized_list.append(sanitized_email) + return sanitized_list diff --git a/tests/services/test_email.py b/tests/services/test_email.py index e3f8253..40982c3 100644 --- a/tests/services/test_email.py +++ b/tests/services/test_email.py @@ -1,16 +1,12 @@ import pytest import requests import smtplib -from unittest.mock import patch, MagicMock +from unittest.mock import patch from sage_imap.exceptions import EmailException from sage_imap.helpers.email import ( - AutoResponseSuppress, ContentTransferEncoding, ContentType, - Priority, - SpamResult, ) -from email.mime.multipart import MIMEMultipart from sage_imap.services.email import SmartEmailMessage # Correct import path # Sample data for testing @@ -70,14 +66,14 @@ def test_update_attachment_status(email_message): def test_update_content_type_and_encoding_with_attachments(email_message): email_message.update_content_type_and_encoding() - assert email_message.content_type == ContentType.MULTIPART.value - assert email_message.content_transfer_encoding == ContentTransferEncoding.BASE64.value + assert email_message.content_type == "multipart/mixed" + assert email_message.content_transfer_encoding == ContentTransferEncoding.BASE64 def test_update_content_type_and_encoding_without_attachments(): email_message = SmartEmailMessage(subject=sample_subject, body=sample_body) email_message.update_content_type_and_encoding() - assert email_message.content_type == ContentType.PLAIN.value - assert email_message.content_transfer_encoding == ContentTransferEncoding.SEVEN_BIT.value + assert email_message.content_type == ContentType.PLAIN + assert email_message.content_transfer_encoding == ContentTransferEncoding.SEVEN_BIT def test_merge_headers(email_message): merged_headers = email_message.merge_headers({"X-New-Header": "NewValue"})