"""Outgoing queue runner."""
import os
import sys
import copy
import time
import socket
import email
from Mailman import mm_cfg
from Mailman import Message
from Mailman import Errors
from Mailman import LockFile
from Mailman.Queue.Runner import Runner
from Mailman.Queue.Switchboard import Switchboard
from Mailman.Logging.Syslog import syslog
DEAL_WITH_PERMFAILURES_EVERY = 10
try:
True, False
except NameError:
True = 1
False = 0
class OutgoingRunner(Runner):
QDIR = mm_cfg.OUTQUEUE_DIR
def __init__(self, slice=None, numslices=1):
Runner.__init__(self, slice, numslices)
self._permfailures = {}
self._permfail_counter = 0
modname = 'Mailman.Handlers.' + mm_cfg.DELIVERY_MODULE
mod = __import__(modname)
self._func = getattr(sys.modules[modname], 'process')
self.__logged = False
self.__retryq = Switchboard(mm_cfg.RETRYQUEUE_DIR)
def _dispose(self, mlist, msg, msgdata):
deliver_after = msgdata.get('deliver_after', 0)
if time.time() < deliver_after:
return True
mlist.Load()
try:
pid = os.getpid()
self._func(mlist, msg, msgdata)
if pid <> os.getpid():
syslog('error', 'child process leaked thru: %s', modname)
os._exit(1)
self.__logged = False
except socket.error:
port = mm_cfg.SMTPPORT
if port == 0:
port = 'smtp'
if not self.__logged:
syslog('error', 'Cannot connect to SMTP server %s on port %s',
mm_cfg.SMTPHOST, port)
self.__logged = True
return True
except Errors.SomeRecipientsFailed, e:
if e.permfailures:
pcnt = len(e.permfailures)
msgcopy = copy.deepcopy(msg)
self._permfailures.setdefault(mlist, []).extend(
zip(e.permfailures, [msgcopy] * pcnt))
if e.tempfailures:
now = time.time()
recips = e.tempfailures
last_recip_count = msgdata.get('last_recip_count', 0)
deliver_until = msgdata.get('deliver_until', now)
if len(recips) == last_recip_count:
if now > deliver_until:
return False
else:
deliver_until = now + mm_cfg.DELIVERY_RETRY_PERIOD
msgdata['last_recip_count'] = len(recips)
msgdata['deliver_until'] = deliver_until
msgdata['recips'] = recips
self.__retryq.enqueue(msg, msgdata)
return False
def _doperiodic(self):
self._permfail_counter += 1
if self._permfail_counter < DEAL_WITH_PERMFAILURES_EVERY:
return
self._handle_permfailures()
def _handle_permfailures(self):
self._permfail_counter = 0
for mlist in self._permfailures.keys():
try:
mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT)
except LockFile.TimeOutError:
return
try:
for recip, msg in self._permfailures[mlist]:
mlist.registerBounce(recip, msg)
del self._permfailures[mlist]
mlist.Save()
finally:
mlist.Unlock()
def _cleanup(self):
self._handle_permfailures()
Runner._cleanup(self)