"""Maildir pre-queue runner.
Most MTAs can be configured to deliver messages to a `Maildir'[1]. This
runner will read messages from a maildir's new/ directory and inject them into
Mailman's qfiles/in directory for processing in the normal pipeline. This
delivery mechanism contrasts with mail program delivery, where incoming
messages end up in qfiles/in via the MTA executing the scripts/post script
(and likewise for the other -aliases for each mailing list).
The advantage to Maildir delivery is that it is more efficient; there's no
need to fork an intervening program just to take the message from the MTA's
standard output, to the qfiles/in directory.
[1] http://cr.yp.to/proto/maildir.html
We're going to use the :info flag == 1, experimental status flag for our own
purposes. The :1 can be followed by one of these letters:
- P means that MaildirRunner's in the process of parsing and enqueuing the
message. If successful, it will delete the file.
- X means something failed during the parse/enqueue phase. An error message
will be logged to log/error and the file will be renamed <filename>:1,X.
MaildirRunner will never automatically return to this file, but once the
problem is fixed, you can manually move the file back to the new/ directory
and MaildirRunner will attempt to re-process it. At some point we may do
this automatically.
See the variable USE_MAILDIR in Defaults.py.in for enabling this delivery
mechanism.
"""
import os
import re
import errno
from email.Parser import Parser
from email.Utils import parseaddr
from Mailman import mm_cfg
from Mailman import Utils
from Mailman.Message import Message
from Mailman.Queue.Runner import Runner
from Mailman.Queue.sbcache import get_switchboard
from Mailman.Logging.Syslog import syslog
lre = re.compile(r"""
^ # start of string
(?P<listname>[^-@]+) # listname@ or listname-subq@
(?: # non-grouping
- # dash separator
(?P<subq>[^-+@]+) # everything up to + or - or @
)? # if it exists
""", re.VERBOSE | re.IGNORECASE)
class MaildirRunner(Runner):
def __init__(self, slice=None, numslices=1):
self._stop = 0
self._dir = os.path.join(mm_cfg.MAILDIR_DIR, 'new')
self._cur = os.path.join(mm_cfg.MAILDIR_DIR, 'cur')
self._parser = Parser(Message)
def _oneloop(self):
listnames = Utils.list_names()
try:
files = os.listdir(self._dir)
except OSError, e:
if e.errno <> errno.ENOENT: raise
return 0
for file in files:
srcname = os.path.join(self._dir, file)
dstname = os.path.join(self._cur, file + ':1,P')
xdstname = os.path.join(self._cur, file + ':1,X')
try:
os.rename(srcname, dstname)
except OSError, e:
if e.errno == errno.ENOENT:
continue
syslog('error', 'Could not rename maildir file: %s', srcname)
raise
try:
fp = open(dstname)
try:
msg = self._parser.parse(fp)
finally:
fp.close()
vals = []
for header in ('delivered-to', 'envelope-to', 'apparently-to'):
vals.extend(msg.get_all(header, []))
for field in vals:
to = parseaddr(field)[1]
if not to:
continue
mo = lre.match(to)
if not mo:
continue
listname, subq = mo.group('listname', 'subq')
if listname in listnames:
break
else:
syslog('error', 'Message apparently not for any list: %s',
xdstname)
os.rename(dstname, xdstname)
continue
msgdata = {'listname': listname}
if subq in ('bounces', 'admin'):
queue = get_switchboard(mm_cfg.BOUNCEQUEUE_DIR)
elif subq == 'confirm':
msgdata['toconfirm'] = 1
queue = get_switchboard(mm_cfg.CMDQUEUE_DIR)
elif subq in ('join', 'subscribe'):
msgdata['tojoin'] = 1
queue = get_switchboard(mm_cfg.CMDQUEUE_DIR)
elif subq in ('leave', 'unsubscribe'):
msgdata['toleave'] = 1
queue = get_switchboard(mm_cfg.CMDQUEUE_DIR)
elif subq == 'owner':
msgdata.update({
'toowner': 1,
'envsender': Utils.get_site_email(extra='bounces'),
'pipeline': mm_cfg.OWNER_PIPELINE,
})
queue = get_switchboard(mm_cfg.INQUEUE_DIR)
elif subq is None:
msgdata['tolist'] = 1
queue = get_switchboard(mm_cfg.INQUEUE_DIR)
elif subq == 'request':
msgdata['torequest'] = 1
queue = get_switchboard(mm_cfg.CMDQUEUE_DIR)
else:
syslog('error', 'Unknown sub-queue: %s', subq)
os.rename(dstname, xdstname)
continue
queue.enqueue(msg, msgdata)
os.unlink(dstname)
except Exception, e:
os.rename(dstname, xdstname)
syslog('error', str(e))
def _cleanup(self):
pass