"""Reading and writing message objects and message metadata.
"""
import os
import time
import sha
import marshal
import errno
import cPickle
import email
from Mailman import mm_cfg
from Mailman import Utils
from Mailman import Message
from Mailman.Logging.Syslog import syslog
shamax = 0xffffffffffffffffffffffffffffffffffffffffL
try:
True, False
except NameError:
True = 1
False = 0
SAVE_MSGS_AS_PICKLES = True
class _Switchboard:
def __init__(self, whichq, slice=None, numslices=1):
self.__whichq = whichq
omask = os.umask(0) try:
try:
os.mkdir(self.__whichq, 0770)
except OSError, e:
if e.errno <> errno.EEXIST: raise
finally:
os.umask(omask)
self.__lower = None
self.__upper = None
if numslices <> 1:
self.__lower = ((shamax+1) * slice) / numslices
self.__upper = (((shamax+1) * (slice+1)) / numslices) - 1
def whichq(self):
return self.__whichq
def enqueue(self, _msg, _metadata={}, **_kws):
data = _metadata.copy()
data.update(_kws)
listname = data.get('listname', '--nolist--')
now = time.time()
if SAVE_MSGS_AS_PICKLES and not data.get('_plaintext'):
msgsave = cPickle.dumps(_msg, 1)
ext = '.pck'
else:
msgsave = str(_msg)
ext = '.msg'
hashfood = msgsave + listname + `now`
rcvtime = data.setdefault('received_time', now)
filebase = `rcvtime` + '+' + sha.new(hashfood).hexdigest()
msgfile = os.path.join(self.__whichq, filebase + ext)
dbfile = os.path.join(self.__whichq, filebase + '.db')
data['version'] = mm_cfg.QFILE_SCHEMA_VERSION
for k in data.keys():
if k.startswith('_'):
del data[k]
omask = os.umask(007) try:
msgfp = open(msgfile, 'w')
finally:
os.umask(omask)
msgfp.write(msgsave)
msgfp.flush()
os.fsync(msgfp.fileno())
msgfp.close()
tmpfile = dbfile + '.tmp'
self._ext_write(tmpfile, data)
os.rename(tmpfile, dbfile)
return filebase
def dequeue(self, filebase):
msgfile = os.path.join(self.__whichq, filebase + '.msg')
pckfile = os.path.join(self.__whichq, filebase + '.pck')
dbfile = os.path.join(self.__whichq, filebase + '.db')
msg = None
try:
data = self._ext_read(dbfile)
os.unlink(dbfile)
except EnvironmentError, e:
if e.errno <> errno.ENOENT: raise
data = {}
if data.has_key('rejection-notice'):
data['rejection_notice'] = data['rejection-notice']
del data['rejection-notice']
msgfp = None
try:
try:
msgfp = open(pckfile)
msg = cPickle.load(msgfp)
os.unlink(pckfile)
except EnvironmentError, e:
if e.errno <> errno.ENOENT: raise
msgfp = None
try:
msgfp = open(msgfile)
msg = email.message_from_file(msgfp, Message.Message)
os.unlink(msgfile)
except EnvironmentError, e:
if e.errno <> errno.ENOENT: raise
except email.Errors.MessageParseError, e:
syslog('error', 'message is unparsable: %s', filebase)
msgfp.close()
msgfp = None
if mm_cfg.QRUNNER_SAVE_BAD_MESSAGES:
sb = Switchboard(mm_cfg.BADQUEUE_DIR)
os.rename(msgfile, os.path.join(
mm_cfg.BADQUEUE_DIR, filebase + '.txt'))
else:
os.unlink(msgfile)
msg = data = None
finally:
if msgfp:
msgfp.close()
return msg, data
def files(self):
times = {}
lower = self.__lower
upper = self.__upper
for f in os.listdir(self.__whichq):
if not f.endswith('.db'):
continue
filebase = os.path.splitext(f)[0]
when, digest = filebase.split('+')
if not lower or (lower <= long(digest, 16) < upper):
times[float(when)] = filebase
keys = times.keys()
keys.sort()
return [times[k] for k in keys]
def _ext_write(self, tmpfile, data):
raise NotImplementedError
def _ext_read(self, dbfile):
raise NotImplementedError
class MarshalSwitchboard(_Switchboard):
"""Python marshal format."""
FLOAT_ATTRIBUTES = ['received_time']
def _ext_write(self, filename, dict):
omask = os.umask(007) try:
fp = open(filename, 'w')
finally:
os.umask(omask)
for attr in self.FLOAT_ATTRIBUTES:
try:
fval = dict[attr]
except KeyError:
pass
else:
dict[attr] = repr(fval)
marshal.dump(dict, fp)
fp.flush()
if mm_cfg.SYNC_AFTER_WRITE:
os.fsync(fp.fileno())
fp.close()
def _ext_read(self, filename):
fp = open(filename)
dict = marshal.load(fp)
if dict.get('version', 0) == 2:
del dict['filebase']
for attr in self.FLOAT_ATTRIBUTES:
try:
sval = dict[attr]
except KeyError:
pass
else:
dict[attr] = eval(sval, {'__builtins__': {}})
fp.close()
return dict
class BSDDBSwitchboard(_Switchboard):
"""Native (i.e. compiled-in) Berkeley db format."""
def _ext_write(self, filename, dict):
import bsddb
omask = os.umask(0)
try:
hashfile = bsddb.hashopen(filename, 'n', 0660)
finally:
os.umask(omask)
for k, v in dict.items():
hashfile[k] = marshal.dumps(v)
hashfile.sync()
hashfile.close()
def _ext_read(self, filename):
import bsddb
dict = {}
hashfile = bsddb.hashopen(filename, 'r')
for k in hashfile.keys():
dict[k] = marshal.loads(hashfile[k])
hashfile.close()
return dict
class ASCIISwitchboard(_Switchboard):
"""Human readable .db file format.
key/value pairs are written as
key = value
as real Python code which can be execfile'd.
"""
def _ext_write(self, filename, dict):
omask = os.umask(007) try:
fp = open(filename, 'w')
finally:
os.umask(omask)
for k, v in dict.items():
print >> fp, '%s = %s' % (k, repr(v))
fp.flush()
if mm_cfg.SYNC_AFTER_WRITE:
os.fsync(fp.fileno())
fp.close()
def _ext_read(self, filename):
dict = {'__builtins__': {}}
execfile(filename, dict)
del dict['__builtins__']
return dict
if mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_MARSHAL:
Switchboard = MarshalSwitchboard
elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_BSDDB_NATIVE:
Switchboard = BSDDBSwitchboard
elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_ASCII:
Switchboard = ASCIISwitchboard
else:
syslog('error', 'Undefined metadata format: %d (using marshals)',
mm_cfg.METADATA_FORMAT)
Switchboard = MarshalSwitchboard
class DumperSwitchboard(Switchboard):
def __init__(self):
pass
def read(self, filename):
return self._ext_read(filename)