Switchboard.py   [plain text]


# Copyright (C) 2001-2003 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.

"""Reading and writing message objects and message metadata.
"""

# enqueue() and dequeue() are not symmetric.  enqueue() takes a Message
# object.  dequeue() returns a email.Message object tree.
#
# Message metadata is represented internally as a Python dictionary.  Keys and
# values must be strings.  When written to a queue directory, the metadata is
# written into an externally represented format, as defined here.  Because
# components of the Mailman system may be written in something other than
# Python, the external interchange format should be chosen based on what those
# other components can read and write.
#
# Most efficient, and recommended if everything is Python, is Python marshal
# format.  Also supported by default is Berkeley db format (using the default
# bsddb module compiled into your Python executable -- usually Berkeley db
# 2), and rfc822 style plain text.  You can write your own if you have other
# needs.

import os
import time
import sha
import marshal
import errno
import cPickle

import email

from Mailman import mm_cfg
from Mailman import Utils
from Mailman import Message
from Mailman.Logging.Syslog import syslog

# 20 bytes of all bits set, maximum sha.digest() value
shamax = 0xffffffffffffffffffffffffffffffffffffffffL

try:
    True, False
except NameError:
    True = 1
    False = 0

# This flag causes messages to be written as pickles (when True) or text files
# (when False).  Pickles are more efficient because the message doesn't need
# to be re-parsed every time it's unqueued, but pickles are not human readable.
SAVE_MSGS_AS_PICKLES = True



class _Switchboard:
    def __init__(self, whichq, slice=None, numslices=1):
        self.__whichq = whichq
        # Create the directory if it doesn't yet exist.
        # FIXME
        omask = os.umask(0)                       # rwxrws---
        try:
            try:
                os.mkdir(self.__whichq, 0770)
            except OSError, e:
                if e.errno <> errno.EEXIST: raise
        finally:
            os.umask(omask)
        # Fast track for no slices
        self.__lower = None
        self.__upper = None
        # BAW: test performance and end-cases of this algorithm
        if numslices <> 1:
            self.__lower = ((shamax+1) * slice) / numslices
            self.__upper = (((shamax+1) * (slice+1)) / numslices) - 1

    def whichq(self):
        return self.__whichq

    def enqueue(self, _msg, _metadata={}, **_kws):
        # Calculate the SHA hexdigest of the message to get a unique base
        # filename.  We're also going to use the digest as a hash into the set
        # of parallel qrunner processes.
        data = _metadata.copy()
        data.update(_kws)
        listname = data.get('listname', '--nolist--')
        # Get some data for the input to the sha hash
        now = time.time()
        if SAVE_MSGS_AS_PICKLES and not data.get('_plaintext'):
            msgsave = cPickle.dumps(_msg, 1)
            ext = '.pck'
        else:
            msgsave = str(_msg)
            ext = '.msg'
        hashfood = msgsave + listname + `now`
        # Encode the current time into the file name for FIFO sorting in
        # files().  The file name consists of two parts separated by a `+':
        # the received time for this message (i.e. when it first showed up on
        # this system) and the sha hex digest.
        #rcvtime = data.setdefault('received_time', now)
        rcvtime = data.setdefault('received_time', now)
        filebase = `rcvtime` + '+' + sha.new(hashfood).hexdigest()
        # Figure out which queue files the message is to be written to.
        msgfile = os.path.join(self.__whichq, filebase + ext)
        dbfile = os.path.join(self.__whichq, filebase + '.db')
        # Always add the metadata schema version number
        data['version'] = mm_cfg.QFILE_SCHEMA_VERSION
        # Filter out volatile entries
        for k in data.keys():
            if k.startswith('_'):
                del data[k]
        # Now write the message text to one file and the metadata to another
        # file.  The metadata is always written second to avoid race
        # conditions with the various queue runners (which key off of the .db
        # filename).
        omask = os.umask(007)                     # -rw-rw----
        try:
            msgfp = open(msgfile, 'w')
        finally:
            os.umask(omask)
        msgfp.write(msgsave)
        msgfp.flush()
        os.fsync(msgfp.fileno())
        msgfp.close()
        # Now write the metadata using the appropriate external metadata
        # format.  We play rename-switcheroo here to further plug the race
        # condition holes.
        tmpfile = dbfile + '.tmp'
        self._ext_write(tmpfile, data)
        os.rename(tmpfile, dbfile)
        return filebase

    def dequeue(self, filebase):
        # Calculate the .db and .msg filenames from the given filebase.
        msgfile = os.path.join(self.__whichq, filebase + '.msg')
        pckfile = os.path.join(self.__whichq, filebase + '.pck')
        dbfile = os.path.join(self.__whichq, filebase + '.db')
        # Now we are going to read the message and metadata for the given
        # filebase.  We want to read things in this order: first, the metadata
        # file to find out whether the message is stored as a pickle or as
        # plain text.  Second, the actual message file.  However, we want to
        # first unlink the message file and then the .db file, because the
        # qrunner only cues off of the .db file
        msg = None
        try:
            data = self._ext_read(dbfile)
            os.unlink(dbfile)
        except EnvironmentError, e:
            if e.errno <> errno.ENOENT: raise
            data = {}
        # Between 2.1b4 and 2.1b5, the `rejection-notice' key in the metadata
        # was renamed to `rejection_notice', since dashes in the keys are not
        # supported in METAFMT_ASCII.
        if data.has_key('rejection-notice'):
            data['rejection_notice'] = data['rejection-notice']
            del data['rejection-notice']
        msgfp = None
        try:
            try:
                msgfp = open(pckfile)
                msg = cPickle.load(msgfp)
                os.unlink(pckfile)
            except EnvironmentError, e:
                if e.errno <> errno.ENOENT: raise
                msgfp = None
                try:
                    msgfp = open(msgfile)
                    msg = email.message_from_file(msgfp, Message.Message)
                    os.unlink(msgfile)
                except EnvironmentError, e:
                    if e.errno <> errno.ENOENT: raise
                except email.Errors.MessageParseError, e:
                    # This message was unparsable, most likely because its
                    # MIME encapsulation was broken.  For now, there's not
                    # much we can do about it.
                    syslog('error', 'message is unparsable: %s', filebase)
                    msgfp.close()
                    msgfp = None
                    if mm_cfg.QRUNNER_SAVE_BAD_MESSAGES:
                        # Cheapo way to ensure the directory exists w/ the
                        # proper permissions.
                        sb = Switchboard(mm_cfg.BADQUEUE_DIR)
                        os.rename(msgfile, os.path.join(
                            mm_cfg.BADQUEUE_DIR, filebase + '.txt'))
                    else:
                        os.unlink(msgfile)
                    msg = data = None
        finally:
            if msgfp:
                msgfp.close()
        return msg, data

    def files(self):
        times = {}
        lower = self.__lower
        upper = self.__upper
        for f in os.listdir(self.__whichq):
            # We only care about the file's base name (i.e. no extension).
            # Thus we'll ignore anything that doesn't end in .db.
            if not f.endswith('.db'):
                continue
            filebase = os.path.splitext(f)[0]
            when, digest = filebase.split('+')
            # Throw out any files which don't match our bitrange.  BAW: test
            # performance and end-cases of this algorithm.
            if not lower or (lower <= long(digest, 16) < upper):
                times[float(when)] = filebase
        # FIFO sort
        keys = times.keys()
        keys.sort()
        return [times[k] for k in keys]

    def _ext_write(self, tmpfile, data):
        raise NotImplementedError

    def _ext_read(self, dbfile):
        raise NotImplementedError



class MarshalSwitchboard(_Switchboard):
    """Python marshal format."""
    FLOAT_ATTRIBUTES = ['received_time']

    def _ext_write(self, filename, dict):
        omask = os.umask(007)                     # -rw-rw----
        try:
            fp = open(filename, 'w')
        finally:
            os.umask(omask)
        # Python's marshal, up to and including in Python 2.1, has a bug where
        # the full precision of floats was not stored.  We work around this
        # bug by hardcoding a list of float values we know about, repr()-izing
        # them ourselves, and doing the reverse conversion on _ext_read().
        for attr in self.FLOAT_ATTRIBUTES:
            # We use try/except because we expect a hitrate of nearly 100%
            try:
                fval = dict[attr]
            except KeyError:
                pass
            else:
                dict[attr] = repr(fval)
        marshal.dump(dict, fp)
        # Make damn sure that the data we just wrote gets flushed to disk
        fp.flush()
        if mm_cfg.SYNC_AFTER_WRITE:
            os.fsync(fp.fileno())
        fp.close()

    def _ext_read(self, filename):
        fp = open(filename)
        dict = marshal.load(fp)
        # Update from version 2 files
        if dict.get('version', 0) == 2:
            del dict['filebase']
        # Do the reverse conversion (repr -> float)
        for attr in self.FLOAT_ATTRIBUTES:
            try:
                sval = dict[attr]
            except KeyError:
                pass
            else:
                # Do a safe eval by setting up a restricted execution
                # environment.  This may not be strictly necessary since we
                # know they are floats, but it can't hurt.
                dict[attr] = eval(sval, {'__builtins__': {}})
        fp.close()
        return dict



class BSDDBSwitchboard(_Switchboard):
    """Native (i.e. compiled-in) Berkeley db format."""
    def _ext_write(self, filename, dict):
        import bsddb
        omask = os.umask(0)
        try:
            hashfile = bsddb.hashopen(filename, 'n', 0660)
        finally:
            os.umask(omask)
        # values must be strings
        for k, v in dict.items():
            hashfile[k] = marshal.dumps(v)
        hashfile.sync()
        hashfile.close()

    def _ext_read(self, filename):
        import bsddb
        dict = {}
        hashfile = bsddb.hashopen(filename, 'r')
        for k in hashfile.keys():
            dict[k] = marshal.loads(hashfile[k])
        hashfile.close()
        return dict



class ASCIISwitchboard(_Switchboard):
    """Human readable .db file format.

    key/value pairs are written as

        key = value

    as real Python code which can be execfile'd.
    """

    def _ext_write(self, filename, dict):
        omask = os.umask(007)                     # -rw-rw----
        try:
            fp = open(filename, 'w')
        finally:
            os.umask(omask)
        for k, v in dict.items():
            print >> fp, '%s = %s' % (k, repr(v))
        # Make damn sure that the data we just wrote gets flushed to disk
        fp.flush()
        if mm_cfg.SYNC_AFTER_WRITE:
            os.fsync(fp.fileno())
        fp.close()

    def _ext_read(self, filename):
        dict = {'__builtins__': {}}
        execfile(filename, dict)
        del dict['__builtins__']
        return dict



# Here are the various types of external file formats available.  The format
# chosen is given defined in the mm_cfg.py configuration file.
if mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_MARSHAL:
    Switchboard = MarshalSwitchboard
elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_BSDDB_NATIVE:
    Switchboard = BSDDBSwitchboard
elif mm_cfg.METADATA_FORMAT == mm_cfg.METAFMT_ASCII:
    Switchboard = ASCIISwitchboard
else:
    syslog('error', 'Undefined metadata format: %d (using marshals)',
           mm_cfg.METADATA_FORMAT)
    Switchboard = MarshalSwitchboard



# For bin/dumpdb
class DumperSwitchboard(Switchboard):
    def __init__(self):
        pass

    def read(self, filename):
        return self._ext_read(filename)