#!/usr/bin/env python3
###############################################################################
#
# This program performs email rules for an IMAP mail account. The
# rules are given in a file and the config file or command line
# arguments specifies the server and account information. Passwords
# can be fetch from the system keychain or keyring. For details, run
# the command line program with the "-h" argument:
#
# usage: IMAPrules [-h] [--config_file FILE] [--rule-file FILE]
# [--server SERVER] [--port PORT] [--smtp SMTP]
# [--smtp-port SMTP_PORT] [--account ACCOUNT]
# [--smtp-account SMTP_ACCOUNT] [--pw PASSWORD]
# [--smtp-pw PASSWORD] [--dump-path DUMP_PATH]
# [--log {0,1,2}]
#
# Perform IMAP rules.
#
# optional arguments:
# -h, --help show this help message and exit
# --config-file FILE the config file
# --rule-file FILE the file with the rules
# --server SERVER the IMAP server
# --port PORT the IMAP server port (default used if not given)
# --smtp SMTP the SMTP server (default is the IMAP server)
# --smtp-port SMTP_PORT the SMTP server port (default used if not given)
# --account ACCOUNT the IMAP account
# --smtp-account SMTP_ACCOUNT
# the SMTP account (default is the IMAP account)
# --pw PASSWORD the IMAP account password (not from keychain)
# --smtp-pw PASSWORD the SMTP account password (not from keychain)
# --dump-path DUMP_PATH the path to where content of emails are dumped
# (default is ".")
# --log {0,1,2} print log messages to stdout (0, 1 or 2)
#
# And even better, read the following blog post:
#
# https://blog.pg12.org/server-side-imap-rules
#
# Copyright (c) 2021-2025, Anders Andersen, UiT The Arctic University
# of Norway. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# - Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#
# - Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
#
# - Neither the name of the organizations nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
###############################################################################
#------------------------------------------------------------------------------
# Main external modules used
#------------------------------------------------------------------------------
# From the Python Standard Library
import sys, os, re, datetime, argparse, configparser
Arguments = argparse.Namespace
# For type checking
from typing import Any, Optional, Union, Literal
from typing import TypedDict, Callable, TextIO
# For path manipulation
from pathlib import PurePath
# Actually perform type checking (see https://typeguard.readthedocs.io/)
from typeguard import typechecked
# The imap_tools (https://github.com/ikvk/imap_tools)
from imap_tools import MailBox, AND, OR, NOT, Header, UidRange
from imap_tools.consts import MailMessageFlags
# To be able to redirect messages
from smtplib import SMTP, SMTP_SSL
# Fetch password from the system keychain or keyring
import keyring
# Safe file names
from sanitize_filename import sanitize
# Ensure that the file name of dumped content is unique
def dump_fname(fpath: str, ext: str = ""):
fp = fpath
num = 0
while os.path.exists(fp + ext):
num += 1
fp = f"{fpath} {num:02d}"
return fp + ext
#------------------------------------------------------------------------------
# Configurable variables
#------------------------------------------------------------------------------
# Class used to create an SMTP session object
SMTPSessionClass: Literal[SMTP, SMTP_SSL] = SMTP_SSL
# The amount of log messages to be printed (0, 1 or 2, where 0 is silent)
logMsgLevel: int = 0
# Print error messages (to stderr)?
errMsgOn: bool = True
# Comment out this to turn on run-time type checking (off by default)
typechecked = lambda f: f
#------------------------------------------------------------------------------
# Help functions
#------------------------------------------------------------------------------
# Write message
@typechecked
def amsg(msg: str, pre: str = PurePath(sys.argv[0]).name,
file: TextIO = sys.stdout):
if pre: msg = f"{pre}: {msg}"
print(msg, file=file)
# Write log message to stdout (if turned on)
@typechecked
def logmsg(msg: str, pre: str = PurePath(sys.argv[0]).name,
level: int = 1):
if level <= logMsgLevel:
amsg(msg, pre)
# Write error message to stderr (if turned on)
@typechecked
def errmsg(msg: str, pre: str = PurePath(sys.argv[0]).name):
if errMsgOn:
amsg(msg, pre, file=sys.stderr)
# Decode number of messages operated on (res is result from operation)
@typechecked
def _decode_num_msgs(res: tuple) -> int:
if (res and type(res) is tuple and type(res[1]) is tuple and
type(res[1][0]) is tuple and type(res[1][0][1]) is list):
num = len(res[1][0][1])
elif (res and type(res) is tuple and type(res[0]) is tuple and
type([0][1]) is list):
num = len(res[0][1])
elif (res and type(res) is tuple and type([1]) is list):
num = len([1])
else:
num = 0
return num
#------------------------------------------------------------------------------
# Implementation of a cache of the IMAP folders and mailboxes (to
# limit the number of operations towards the IMAP server).
# ------------------------------------------------------------------------------
# The delimiter of folder paths used by us (e.g. in the rule file)
# This can be different from the delimiter used in the IMAP folders
_local_delim: str = "/"
# Save results to prevent unnecessary operations on the IMAP servers
_all_folders_cache: dict[str, dict[str, dict[str, str]]] = {}
# Make cache key
@typechecked
def _mk_cache_key(f: dict) -> str:
return f["name"].replace(f["delim"], _local_delim)
# Make folder path (from cache key)
@typechecked
def _mk_imap_folder_path(mailbox: MailBox, cache_key: str) -> str:
server = mailbox._host
if cache_key in _all_folders_cache[server]:
return _all_folders_cache[server][cache_key]["name"]
else:
raise KeyError(f"Folder \"{cache_key}\" does not exits.")
# Update folder cache
@typechecked
def _update_all_folders_cache(mailbox: MailBox):
# Do we have we a cache for this mailbox?
server = mailbox._host
if not server in _all_folders_cache:
# Create a cache for this mailbox
_all_folders_cache[server] = {}
# Fetch all folders and mailboxes
folder_list = mailbox.folder.list()
logmsg(f"update folder cache for {server}")
# And add them to the cache
for f in folder_list:
# Originally, `f` was a dict, but newer versions of imap-tools
# made this an object of the imap_tools.folder.FolderInfo class
if not type(f) is dict:
f = { "name": f.name, "delim": f.delim, "flags": f.flags }
# Include all folders in the path (speed up potential here)
cache_key = _mk_cache_key(f)
# Then add the folders to the different paths
if not cache_key in _all_folders_cache[server]:
_all_folders_cache[server][cache_key] = f
# Other things to cache
Cache = TypedDict(
"Cache", {"imap": Optional[MailBox], "smtp": Optional[SMTP]})
_cache: Cache = {
"imap": None, # Keep the imap connection
"smtp": None, # Keep the smtp connection
}
#------------------------------------------------------------------------------
# Argument parsing
#------------------------------------------------------------------------------
# Parse arguments (and optionally, fetch password from keychain)
@typechecked
def parse_args() -> Arguments:
# Need this for config and rule files
full_path = os.path.abspath(os.path.dirname(__file__))
# Create application config file parser
conf_parser = argparse.ArgumentParser(
description="Perform IMAP rules.",
formatter_class=argparse.RawDescriptionHelpFormatter,
add_help=False)
# Parse the argument specifying the config file (remaining arguments saved)
conf_parser.add_argument(
"-c", "--config-file", default=(full_path + "/IMAPrules.config"),
metavar="FILE", help="the config file")
args, remaining_argv = conf_parser.parse_known_args()
# Default values of arguments and config file values
defaults = {
"rule-file": (full_path + "/IMAPrules.rules"),
"log": 0,
"dump-path": "." }
# Parse config file (if given)
if args.config_file:
config = configparser.ConfigParser()
try:
config.read([args.config_file])
defaults.update(dict(config.items("Defaults")))
except Exception as err:
errmsg(f"\nUnable to read config file {args.config_file}:\n\n" +
f"{err}\n")
sys.exit(1)
# Create application argument parser
parser = argparse.ArgumentParser(
description="Perform IMAP rules.",
parents=[conf_parser])
# Fix default keys (replace "-" by "_")
defaults_fixed = {}
for (k, v) in defaults.items():
defaults_fixed[k.replace("-", "_")] = v
# Add defaults and values from config file to default args
parser.set_defaults(**defaults_fixed)
# Add all arguments (except config-file)
parser.add_argument(
"-r", "--rule-file", metavar="FILE", #required=True,
help="the file with the rules")
parser.add_argument(
"--server", #required=True,
help="the IMAP server")
parser.add_argument(
"--port", type=int,
help="the IMAP server port (default used if not given)")
parser.add_argument(
"--smtp",
help="the SMTP server (default is the IMAP server)")
parser.add_argument(
"--smtp-port", type=int,
help="the SMTP server port (default used if not given)")
parser.add_argument(
"--account", #required=True,
help="the IMAP account")
parser.add_argument(
"--smtp-account",
help="the SMTP account (default is the IMAP account)")
parser.add_argument(
"--pw", metavar="PASSWORD",
help="the IMAP account password (not from keychain)")
parser.add_argument(
"--smtp-pw", metavar="PASSWORD",
help="the SMTP account password (not from keychain)")
parser.add_argument(
"--dump-path",
help=("the path to where content of emails are dumped " +
"(default is \".\")"))
parser.add_argument(
"--log", type=int, choices=[0, 1, 2],
help="print log messages to stdout (0, 1 or 2)")
# Parse arguments
args = parser.parse_args(remaining_argv)
# Required arguments
if not (args.server and args.account):
errmsg('"server" and "account" have to be given a value in the con' + \
'fig file\n\tor by command line arguments (see -h for details).')
sys.exit(1)
# Log messages
if args.log:
global logMsgLevel
logMsgLevel = args.log
# Password not provided by user (no --pw argument)
if not args.pw:
# Password from keychain
try:
args.pw = keyring.get_password(args.server, args.account)
except Exception as err:
errmsg(f"\nUnable to access keychain:\n\n {err}\n")
sys.exit(1)
if not args.pw:
errmsg(f"\nNo password found in keychain for {args.account} " + \
f"({args.server}).\n")
errmsg(f"On a Mac, add password for the account {args.account} " + \
f"({args.server}) with this command:\n", pre="")
errmsg(f" security add-generic-password -a {args.account}" + \
f" -s {args.server} -w\n", pre="")
sys.exit(1)
if not args.smtp:
args.smtp = args.server
if not args.smtp_account:
args.smtp_account = args.account
if not args.smtp_pw:
if args.smtp == args.server and args.smtp_account == args.account:
args.smtp_pw = args.pw
else:
try:
args.smtp_pw = keyring.get_password(
args.smtp, args.smtp_account)
except Exception as err:
errmsg(f"\nUnable to access keychain:\n\n {err}\n)")
sys.exit(1)
if not args.smtp_pw:
errmsg("\nNo password found in keychain for " + \
f"{args.smtp_account} ({args.smtp}).\n")
errmsg("On a Mac, add password for the account " + \
f"{args.smtp_account} " + \
f"({args.smtp})\nwith this command:\n", pre="")
errmsg(" security add-generic-password -a " + \
f"{args.smtp_account} -s {args.smtp} -w\n",
pre="")
sys.exit(1)
# Return result
return args
#------------------------------------------------------------------------------
# Connect and interact with IMAP (and SMTP) server
#------------------------------------------------------------------------------
# Connect to IMAP account
@typechecked
def connect_imap(args: Arguments) -> MailBox:
if not _cache["imap"]:
try:
if args.port:
mailbox = MailBox(args.server, args.port)
else:
mailbox = MailBox(args.server)
mailbox.login(args.account, args.pw)
except Exception as err:
errmsg(f"\nUnable to connect to IMAP account {args.account} " + \
f"({args.server}):\n\n {err}\n")
sys.exit(1)
logmsg(f"connected to IMAP server {args.server}")
_cache["imap"] = mailbox
return _cache["imap"]
# Connect to SMTP account
@typechecked
def connect_smtp(args: Arguments) -> SMTP:
if not _cache["smtp"]:
try:
if args.smtp_port:
smtp = SMTPSessionClass(args.smtp, args.smtp_port)
else:
smtp = SMTPSessionClass(args.smtp)
smtp.login(args.smtp_account, args.smtp_pw)
except Exception as err:
errmsg(f"\nUnable to connect to SMTP account {args.smtp_account}"+ \
f" ({args.smtp}):\n\n {err}\n")
sys.exit(1)
logmsg(f"connected to SMTP server {args.smtp}")
_cache["smtp"] = smtp
return _cache["smtp"]
# Create a dictionaty of all folders and mailboxes
@typechecked
def list_folders(args: Arguments, folder: str = "") -> dict:
mailbox = connect_imap(args)
_update_all_folders_cache(mailbox)
server = mailbox._host
if folder:
logmsg(f"list folders and mailboxes in {folder}")
return dict(
filter(lambda elem: elem[0].startswith(folder),
_all_folders_cache[server].items()))
else:
logmsg("list all folders and mailboxes")
return _all_folders_cache[server]
# Get messages
@typechecked
def fetch(args: Arguments, rn: str, expr: object,
frm: str="INBOX") -> list[str]:
mailbox = connect_imap(args)
_update_all_folders_cache(mailbox)
src = _mk_imap_folder_path(mailbox, frm)
if src != mailbox.folder.get():
mailbox.folder.set(src)
logmsg(f"{rn} -> fetch from {frm}: {expr}", level=2)
res = list(map(lambda m: m.uid, mailbox.fetch(expr)))
logmsg(f"{rn} -> fetch result: {res}", level=2)
return res
# Move messages to folder
@typechecked
def move(args: Arguments, rn: str, messages: str | list[str], folder: str):
mailbox = connect_imap(args)
_update_all_folders_cache(mailbox)
dst = _mk_imap_folder_path(mailbox, folder)
num = _decode_num_msgs(mailbox.move(messages, dst))
m = "message" if num == 1 else "messages"
logmsg(f"{rn} -> moved {num} {m} to folder {folder}")
# Move messages to folder
@typechecked
def copy(args: Arguments, rn: str, messages: str | list[str], folder: str):
mailbox = connect_imap(args)
_update_all_folders_cache(mailbox)
dst = _mk_imap_folder_path(mailbox, folder)
mailbox.copy(messages, dst)
logmsg(f"{rn} -> messages copied to folder {folder}")
# Redirect messages til given address
@typechecked
def redirect(args: Arguments, rn: str, messages: str | list[str], adr: str):
# See https://stackoverflow.com/questions/2717196/forwarding-an-email-with-python-smtplib
# and https://docs.python.org/3/library/smtplib.html
mailbox = connect_imap(args)
smtp = connect_smtp(args)
if type(messages) is str:
messages = messages.split(",")
for msgid in messages:
msg = next(mailbox.fetch(AND(uid=msgid)))
frm = args.smtp_account
smtp.sendmail(frm, adr, msg.obj.as_string())
logmsg(f"{rn} -> redirect message {msgid} from {frm} to {adr}")
# Mark message as seen
@typechecked
def seen(args: Arguments, rn: str, messages: str | list[str], val: bool=True):
mailbox = connect_imap(args)
mailbox.flag(messages, MailMessageFlags.SEEN, val)
logmsg(f"{rn} -> messages marked seen")
# Dump content of message in file
@typechecked
def dump(args: Arguments, rn: str, messages: str | list[str], folder: str,
sub: tuple[str,str]=None):
mailbox = connect_imap(args)
if type(messages) is str:
messages = messages.split(",")
fullpath = args.dump_path + "/" + folder
if not os.access(fullpath, os.W_OK):
errmsg(f"{rn} -> unable to dump content to folder {fullpath}")
sys.exit(1)
for msgid in messages:
msg = next(mailbox.fetch(AND(uid=msgid)))
if sub:
fname = sanitize(re.sub(*sub, msg.subject))
else:
fname = sanitize(msg.subject)
if msg.html:
with open(dump_fname(fullpath + "/" + fname, ".html"), "w") as out:
out.write(msg.html)
if msg.text:
with open(dump_fname(fullpath + "/" + fname, ".txt"), "w") as out:
out.write(msg.text)
if attachments := msg.attachments:
dname = dump_fname(fullpath + "/" + fname)
os.mkdir(dname)
for a in attachments:
with open(dump_fname(dname + "/" + a.filename), "wb") as out:
out.write(a.payload)
logmsg(f'{rn} -> message "{fname}" dumped to "{fullpath}"')
# Do operations from rule file
@typechecked
def do_ops(args: Arguments, rn: str, ops: dict):
if ops["ENABLED"]:
if not type(ops["FETCH"]) is tuple: ops["FETCH"] = (ops["FETCH"],)
messages = fetch(args, rn, *ops["FETCH"])
if messages:
for op in ops["ACTIONS"]:
if not type(op) is tuple: op = (op,)
op[0](args, rn, messages, *op[1:])
# Name space for rule file evaluation
ns: dict[str, Callable[..., Any]] = {
"AND": AND,
"OR": OR,
"NOT": NOT,
"Header": Header,
"UidRange": UidRange,
"Date": datetime.date,
"move": move,
"copy": copy,
"redirect": redirect,
"seen": seen,
"dump": dump
}
# Read rule file and evaluate (in restricted namespace) its content
@typechecked
def eval_rules(rule_file: str) -> dict:
logmsg(f"read rule file {rule_file}")
try:
return eval(open(rule_file).read(), {"__builtins__": None}, ns)
except Exception as err:
errmsg(f"Unable to evaluate rule file {rule_file}:\n")
errmsg(f" {err}\n", pre="")
sys.exit(1)
# Clean up
@typechecked
def clean_up(args: Arguments):
if _cache["smtp"]:
_cache["smtp"].quit()
logmsg(f"disconnect SMTP server {args.smtp}")
if _cache["imap"]:
_cache["imap"].logout()
logmsg(f"disconnect IMAP server {args.server}")
#------------------------------------------------------------------------------
# The module is run as a script (a program)
#------------------------------------------------------------------------------
if __name__ == '__main__':
# Current version needs Python 3.10 (or newer):
if not (sys.version_info[0] > 2 and sys.version_info[1] > 9):
errmsg("Needs Python 3.10 or newer.")
sys.exit(1)
# Parse arguments
args = parse_args()
# Read and evaluate rule file
rule_dict = eval_rules(args.rule_file)
try:
# Do the operations in the rule file
for rn in rule_dict.keys():
do_ops(args, rn, rule_dict[rn])
# Clean up
clean_up(args)
except Exception as err:
# Handle errors
errmsg("Unable to perform IMAP action:\n")
errmsg(f" {err}\n", pre="")
sys.exit(1)