Skip to content

Commit

Permalink
Version 0.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Evgeny Belousov committed Jan 20, 2021
1 parent 776c926 commit 19bf405
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 0 deletions.
Empty file added demailer/__init__.py
Empty file.
23 changes: 23 additions & 0 deletions demailer/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import environ


@environ.config(prefix="DEMAILER")
class Config:
poll_interval = environ.var(default=30, converter=int)
imap_host = environ.var()
imap_port = environ.var(default=993, converter=int)
imap_login = environ.var()
imap_password = environ.var()
tg_bot_token = environ.var()
tg_user_id = environ.var(converter=int)


config = environ.to_config(Config)

sender_stop_list = ("user@example.com",)

recipient_stop_list = ("all_users@my_company.com",)

subject_stop_list = ("Best deals of the year!",)

body_stop_list = ("You are receiving this email because you have subscribed to",)
64 changes: 64 additions & 0 deletions demailer/mail.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import email
from email.message import Message
from imaplib import IMAP4_SSL
from typing import Any, Generator, List, Tuple, Union

from demailer.config import config


def get_new_mail() -> Tuple[str, List[Any]]:
conn = IMAP4_SSL(host=config.imap_host, port=config.imap_port)
conn.login(user=config.imap_login, password=config.imap_password)

conn.select()

res = conn.search(None, "(NEW)")

conn.close()
conn.logout()

return res


def get_messages(message_id_list: bytes) -> Generator[dict, None, None]:
conn = IMAP4_SSL(host=config.imap_host, port=config.imap_port)
conn.login(user=config.imap_login, password=config.imap_password)

conn.select(readonly=True)

for message_id in message_id_list.split(b" "):
(_, raw_mail) = conn.fetch(message_id.decode("utf8"), "(RFC822)")

parsed_mail = email.message_from_string(raw_mail[0][1].decode("utf8")) # type: ignore

sender_name, sender_address = email.utils.parseaddr(parsed_mail["from"])
sender = (decode_utf8(sender_name), sender_address)

recipients = []
for recipient in parsed_mail["to"].split(","):
name, address = email.utils.parseaddr(recipient)
recipients.append((decode_utf8(name), address))

subject = decode_utf8(parsed_mail["subject"])
body = get_message_body(parsed_mail).decode("utf8")

yield {"from": sender, "to": recipients, "subject": subject, "body": body}


def decode_utf8(raw: str) -> str:
if raw:
data, encoding = email.header.decode_header(raw)[0]
if encoding:
return data.decode(encoding)
return data # type: ignore
return ""


def get_message_body(message: email.message.Message) -> bytes:
if message.is_multipart():
for part in message.get_payload():
if part.get_content_type() == "text/plain":
return part.get_payload(decode=True)
return b""
else:
return message.get_payload(decode=True)
69 changes: 69 additions & 0 deletions demailer/processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from email.utils import parseaddr

from demailer.config import (
body_stop_list,
recipient_stop_list,
sender_stop_list,
subject_stop_list,
)
from demailer.mail import get_messages, get_new_mail
from demailer.tg import send_message


def process_new_mail():
(_, messages) = get_new_mail()
if len(messages[0]) != 0:
for message in get_messages(messages[0]):
if not (
filter_mail_from(message)
or filter_mail_to(message)
or filter_subject(message)
or filter_body(message)
):
notify(message)


def filter_mail_from(message):
if message["from"][1] in sender_stop_list:
return True


def filter_mail_to(message):
for recipient in message["to"]:
if recipient[1] in recipient_stop_list:
return True


def filter_subject(message):
for stop_words in subject_stop_list:
if stop_words in message["subject"]:
return True


def filter_body(message):
for stop_words in body_stop_list:
if stop_words in message["body"]:
return True


def notify(message):
sender = (
f"{message['from'][0]} <{message['from'][1]}>"
if message["from"][0]
else message["from"][1]
)
recipients = []
for recipient in message["to"]:
if recipient[0]:
recipients.append(f"{recipient[0]} <{recipient[1]}>")
else:
recipients.append(recipient[1])

send_message(
f"""
New mail!
From: {sender}
To: {", ".join(recipients)}
Subject: {message["subject"]}
Body: {message["body"]}"""
)
10 changes: 10 additions & 0 deletions demailer/tg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from telebot import TeleBot
from telebot.types import Message

from demailer.config import config

bot = TeleBot(config.tg_bot_token)


def send_message(message: str) -> Message:
bot.send_message(config.tg_user_id, message)
12 changes: 12 additions & 0 deletions runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from datetime import timedelta

from timeloop import Timeloop

from demailer.config import config
from demailer.processor import process_new_mail

loop = Timeloop()
loop._add_job(process_new_mail, timedelta(seconds=config.poll_interval))

if __name__ == "__main__":
loop.start(block=True)

0 comments on commit 19bf405

Please # to comment.