Skip to content

Commit

Permalink
adding email queue
Browse files Browse the repository at this point in the history
  • Loading branch information
nrdagar committed Feb 4, 2025
1 parent e4ffaa1 commit 1bb72d1
Show file tree
Hide file tree
Showing 3 changed files with 251 additions and 11 deletions.
35 changes: 24 additions & 11 deletions fastapi_mail/fastmail.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from email.message import EmailMessage, Message
from email.utils import formataddr
from typing import Any, Dict, Optional, Union
from datetime import datetime

import blinker
from jinja2 import Environment, Template
Expand All @@ -12,6 +13,8 @@
from fastapi_mail.errors import PydanticClassRequired
from fastapi_mail.msg import MailMsg
from fastapi_mail.schemas import MessageSchema
from fastapi_mail.queue import EmailQueue
from fastapi_mail.signals import email_dispatched


class _MailMixin:
Expand Down Expand Up @@ -49,6 +52,13 @@ class FastMail(_MailMixin):

def __init__(self, config: ConnectionConfig) -> None:
self.config = config
self._queue: Optional[EmailQueue] = None

@property
def queue(self) -> EmailQueue:
if self._queue is None:
self._queue = EmailQueue(self)
return self._queue

async def get_mail_template(
self, env_path: Environment, template_name: str
Expand Down Expand Up @@ -92,25 +102,28 @@ async def __sender(self) -> Union[EmailStr, str]:
return sender

async def send_message(
self, message: MessageSchema, template_name: Optional[str] = None
) -> None:
self,
message: MessageSchema,
template_name: Optional[str] = None,
queue: bool = False,
schedule_time: Optional[datetime] = None
) -> Optional[str]:
if not queue:
await self._send_message(message, template_name)
return None
return await self.queue.add_to_queue(message, template_name, schedule_time)

async def _send_message(self, message: MessageSchema, template_name: Optional[str] = None) -> None:
if not isinstance(message, MessageSchema):
raise PydanticClassRequired(
"Message schema should be provided from MessageSchema class"
)

raise PydanticClassRequired("Message schema should be provided from MessageSchema class")
if self.config.TEMPLATE_FOLDER and template_name:
template = await self.get_mail_template(
self.config.template_engine(), template_name
)
template = await self.get_mail_template(self.config.template_engine(), template_name)
msg = await self.__prepare_message(message, template)
else:
msg = await self.__prepare_message(message)

async with Connection(self.config) as session:
if not self.config.SUPPRESS_SEND:
await session.session.send_message(msg)

email_dispatched.send(msg)


Expand Down
107 changes: 107 additions & 0 deletions fastapi_mail/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from typing import Optional, Dict
from enum import Enum
import asyncio
from datetime import datetime
from uuid import uuid4

from .schemas import MessageSchema
from .fastmail import FastMail

class EmailStatus(Enum):
QUEUED = "queued"
SENDING = "sending"
SENT = "sent"
FAILED = "failed"

class QueuedEmail:
def __init__(self, message: MessageSchema, template_name: Optional[str] = None):
self.id = str(uuid4())
self.message = message
self.template_name = template_name
self.status = EmailStatus.QUEUED
self.created_at = datetime.now()
self.updated_at = datetime.now()
self.retry_count = 0
self.error = None
self.scheduled_time = None

class EmailQueue:
def __init__(self, fastmail: FastMail, max_retries: int = 3):
self.fastmail = fastmail
self.max_retries = max_retries
self.queue: Dict[str, QueuedEmail] = {}
self._processing = False
self._task = None

async def add_to_queue(self, message: MessageSchema, template_name: Optional[str] = None, schedule_time: Optional[datetime] = None) -> str:
queued_email = QueuedEmail(message, template_name)
if schedule_time:
queued_email.scheduled_time = schedule_time
self.queue[queued_email.id] = queued_email
return queued_email.id

async def process_queue(self):
self._processing = True
while self._processing:
now = datetime.now()
for email in list(self.queue.values()):
if email.status == EmailStatus.QUEUED:
if email.scheduled_time and email.scheduled_time > now:
continue
try:
email.status = EmailStatus.SENDING
email.updated_at = datetime.now()
await self.fastmail._send_message(email.message, email.template_name)
email.status = EmailStatus.SENT
email.updated_at = datetime.now()
except Exception as e:
email.error = str(e)
email.retry_count += 1
if email.retry_count >= self.max_retries:
email.status = EmailStatus.FAILED
else:
email.status = EmailStatus.QUEUED
email.updated_at = datetime.now()
await asyncio.sleep(1)

def start_processing(self):
if not self._task:
self._task = asyncio.create_task(self.process_queue())

def stop_processing(self):
self._processing = False
if self._task:
self._task.cancel()
self._task = None

def get_queue_status(self) -> Dict[str, int]:
status = {
"total": len(self.queue),
"queued": 0,
"sending": 0,
"sent": 0,
"failed": 0
}
for email in self.queue.values():
status[email.status.value] += 1
return status

def get_email_status(self, email_id: str) -> Optional[QueuedEmail]:
return self.queue.get(email_id)

def cancel_email(self, email_id: str) -> bool:
if email_id in self.queue and self.queue[email_id].status == EmailStatus.QUEUED:
del self.queue[email_id]
return True
return False

def retry_failed(self, email_id: str) -> bool:
if email_id in self.queue:
email = self.queue[email_id]
if email.status == EmailStatus.FAILED:
email.status = EmailStatus.QUEUED
email.retry_count = 0
email.error = None
email.updated_at = datetime.now()
return True
return False
120 changes: 120 additions & 0 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
import pytest
from datetime import datetime, timedelta
from fastapi_mail import FastMail, ConnectionConfig, MessageSchema, MessageType
from fastapi_mail.queue import EmailQueue, EmailStatus
import asyncio

@pytest.fixture
def queued_message():
return MessageSchema(
subject="Test subject",
recipients=["test@example.com"],
body="Test email body",
subtype=MessageType.plain
)

@pytest.fixture
def queue_config(mail_config):
mail_config["SUPPRESS_SEND"] = 1
return ConnectionConfig(**mail_config)

@pytest.fixture
def email_queue(queue_config):
fastmail = FastMail(queue_config)
return EmailQueue(fastmail)

@pytest.mark.asyncio
async def test_add_to_queue(email_queue, queued_message):
queue_id = await email_queue.add_to_queue(queued_message)
assert queue_id in email_queue.queue
assert email_queue.queue[queue_id].status.name == "QUEUED"
assert email_queue.queue[queue_id].message == queued_message
assert email_queue.queue[queue_id].retry_count == 0

@pytest.mark.asyncio
async def test_scheduled_email(email_queue, queued_message):
schedule_time = datetime.now() + timedelta(hours=1)
queue_id = await email_queue.add_to_queue(queued_message, schedule_time=schedule_time)
queued_email = email_queue.queue[queue_id]
assert queued_email.scheduled_time == schedule_time
assert queued_email.status.name == "QUEUED"

@pytest.mark.asyncio
async def test_process_queue(email_queue, queued_message):
queue_id = await email_queue.add_to_queue(queued_message)
email_queue.start_processing()
await asyncio.sleep(2)
email_queue.stop_processing()
processed_email = email_queue.queue[queue_id]
assert processed_email.status.name == "SENT"

@pytest.mark.asyncio
async def test_queue_status(email_queue, queued_message):
queue_id1 = await email_queue.add_to_queue(queued_message)
queue_id2 = await email_queue.add_to_queue(queued_message)
email_queue.queue[queue_id2].status = EmailStatus.FAILED
email_queue.queue[queue_id2].retry_count = 3
status = email_queue.get_queue_status()
assert status["total"] == 2
assert status["queued"] == 1
assert status["failed"] == 1

@pytest.mark.asyncio
async def test_cancel_email(email_queue, queued_message):
queue_id = await email_queue.add_to_queue(queued_message)
success = email_queue.cancel_email(queue_id)
assert success is True
assert queue_id not in email_queue.queue
assert email_queue.cancel_email("non-existent") is False

@pytest.mark.asyncio
async def test_retry_failed_email(email_queue, queued_message):
queue_id = await email_queue.add_to_queue(queued_message)
email_queue.queue[queue_id].status = EmailStatus.FAILED
email_queue.queue[queue_id].retry_count = 3
email_queue.queue[queue_id].error = "Error occurred"
success = email_queue.retry_failed(queue_id)
assert success is True
retried_email = email_queue.queue[queue_id]
assert retried_email.status.name == "QUEUED"
assert retried_email.retry_count == 0
assert retried_email.error is None

@pytest.mark.asyncio
async def test_get_email_status(email_queue, queued_message):
queue_id = await email_queue.add_to_queue(queued_message)
email_status = email_queue.get_email_status(queue_id)
assert email_status is not None
assert email_status.status.name == "QUEUED"
assert email_queue.get_email_status("non-existent") is None

@pytest.mark.asyncio
async def test_max_retries(email_queue, queued_message):
queue_id = await email_queue.add_to_queue(queued_message)
queued_email = email_queue.queue[queue_id]
for _ in range(email_queue.max_retries):
queued_email.status = EmailStatus.SENDING
queued_email.retry_count += 1
queued_email.status = EmailStatus.QUEUED
queued_email.status = EmailStatus.SENDING
queued_email.retry_count += 1
assert queued_email.retry_count > email_queue.max_retries
assert queued_email.status.name == "FAILED"

@pytest.mark.asyncio
async def test_fastmail_queue_integration(queue_config, queued_message):
fastmail = FastMail(queue_config)
queue_id = await fastmail.send_message(queued_message, queue=True)
assert queue_id is not None
immediate_result = await fastmail.send_message(queued_message, queue=False)
assert immediate_result is None

@pytest.mark.asyncio
async def test_scheduled_processing(email_queue, queued_message):
schedule_time = datetime.now() + timedelta(seconds=2)
queue_id = await email_queue.add_to_queue(queued_message, schedule_time=schedule_time)
email_queue.start_processing()
assert email_queue.queue[queue_id].status.name == "QUEUED"
await asyncio.sleep(3)
email_queue.stop_processing()
assert email_queue.queue[queue_id].status.name == "SENT"

0 comments on commit 1bb72d1

Please # to comment.