gate_news   [plain text]


#! @PYTHON@
#
# Copyright (C) 1998-2008 by the Free Software Foundation, Inc.
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.

"""Poll the NNTP servers for messages to be gatewayed to mailing lists.

Usage: gate_news [options]

Where options are

    --help
    -h
        Print this text and exit.

"""

import sys
import os
import time
import getopt
import socket
import nntplib

import paths
# Import this /after/ paths so that the sys.path is properly hacked
import email.Errors
from email.Parser import Parser

from Mailman import mm_cfg
from Mailman import MailList
from Mailman import Utils
from Mailman import Message
from Mailman import LockFile
from Mailman.i18n import _
from Mailman.Queue.sbcache import get_switchboard
from Mailman.Logging.Utils import LogStdErr
from Mailman.Logging.Syslog import syslog

# Work around known problems with some RedHat cron daemons
import signal
signal.signal(signal.SIGCHLD, signal.SIG_DFL)

GATENEWS_LOCK_FILE = os.path.join(mm_cfg.LOCK_DIR, 'gate_news.lock')

LogStdErr('error', 'gate_news', manual_reprime=0)

LOCK_LIFETIME = mm_cfg.hours(2)
NL = '\n'

# Continues inside try: block are not allowed in Python versions before 2.1.
# This exception is used to work around that.
class _ContinueLoop(Exception):
    pass


try:
    True, False
except NameError:
    True = 1
    False = 0



def usage(status, msg=''):
    if code:
        fd = sys.stderr
    else:
        fd = sys.stdout
    print >> fd, _(__doc__)
    if msg:
        print >> fd, msg
    sys.exit(code)



_hostcache = {}

def open_newsgroup(mlist):
    # Split host:port if given
    nntp_host, nntp_port = Utils.nntpsplit(mlist.nntp_host)
    # Open up a "mode reader" connection to nntp server.  This will be shared
    # for all the gated lists having the same nntp_host.
    conn = _hostcache.get(mlist.nntp_host)
    if conn is None:
        try:
            conn = nntplib.NNTP(nntp_host, nntp_port,
                                readermode=True,
                                user=mm_cfg.NNTP_USERNAME,
                                password=mm_cfg.NNTP_PASSWORD)
        except (socket.error, nntplib.NNTPError, IOError), e:
            syslog('fromusenet',
                   'error opening connection to nntp_host: %s\n%s',
                   mlist.nntp_host, e)
            raise
        _hostcache[mlist.nntp_host] = conn
    # Get the GROUP information for the list, but we're only really interested
    # in the first article number and the last article number
    r,c,f,l,n = conn.group(mlist.linked_newsgroup)
    return conn, int(f), int(l)


def clearcache():
    reverse = {}
    for conn in _hostcache.values():
        reverse[conn] = 1
    for conn in reverse.keys():
        conn.quit()
    _hostcache.clear()



# This function requires the list to be locked.
def poll_newsgroup(mlist, conn, first, last, glock):
    listname = mlist.internal_name()
    # NEWNEWS is not portable and has synchronization issues.
    for num in range(first, last):
        glock.refresh()
        try:
            headers = conn.head(`num`)[3]
            # I don't know how this happens, but skip an empty message.
            if not headers:
                raise _ContinueLoop
            found_to = 0
            beenthere = 0
            for header in headers:
                i = header.find(':')
                value = header[:i].lower()
                if i > 0 and value == 'to':
                    found_to = 1
                if value <> 'x-beenthere':
                    continue
                if header[i:] == ': %s' % mlist.GetListEmail():
                    beenthere = 1
                    break
            if not beenthere:
                body = conn.body(`num`)[3]
                # Usenet originated messages will not have a Unix envelope
                # (i.e. "From " header).  This breaks Pipermail archiving, so
                # we will synthesize one.  Be sure to use the format searched
                # for by mailbox.UnixMailbox._isrealfromline().  BAW: We use
                # the -bounces address here in case any downstream clients use
                # the envelope sender for bounces; I'm not sure about this,
                # but it's the closest to the old semantics.
                lines = ['From %s  %s' % (mlist.GetBouncesEmail(),
                                          time.ctime(time.time()))]
                lines.extend(headers)
                lines.append('')
                lines.extend(body)
                lines.append('')
                p = Parser(Message.Message)
                try:
                    msg = p.parsestr(NL.join(lines))
                except email.Errors.MessageError, e:
                    syslog('fromusenet',
                           'email package exception for %s:%d\n%s',
                           mlist.linked_newsgroup, num, e)
                    raise _ContinueLoop
                if found_to:
                    del msg['X-Originally-To']
                    msg['X-Originally-To'] = msg['To']
                    del msg['To']
                msg['To'] = mlist.GetListEmail()
                # Post the message to the locked list
                inq = get_switchboard(mm_cfg.INQUEUE_DIR)
                inq.enqueue(msg,
                            listname = mlist.internal_name(),
                            fromusenet = 1)
                syslog('fromusenet',
                       'posted to list %s: %7d' % (listname, num))
        except nntplib.NNTPError, e:
            syslog('fromusenet',
                   'NNTP error for list %s: %7d' % (listname, num))
            syslog('fromusenet', str(e))
        except _ContinueLoop:
            continue
    # Even if we don't post the message because it was seen on the
    # list already, or if we skipped it as unparseable or empty,
    # update the watermark. Note this used to be in the 'for' block
    # but if the last message(s) raised _ContinueLoop, they wouldn't
    # update the watermark.
    mlist.usenet_watermark = num



def process_lists(glock):
    for listname in Utils.list_names():
        glock.refresh()
        # Open the list unlocked just to check to see if it is gating news to
        # mail.  If not, we're done with the list.  Otherwise, lock the list
        # and gate the group.
        mlist = MailList.MailList(listname, lock=0)
        if not mlist.gateway_to_mail:
            continue
        # Get the list's watermark, i.e. the last article number that we gated
        # from news to mail.  `None' means that this list has never polled its
        # newsgroup and that we should do a catch up.
        watermark = getattr(mlist, 'usenet_watermark', None)
        # Open the newsgroup, but let most exceptions percolate up.
        try:
            conn, first, last = open_newsgroup(mlist)
        except (socket.error, nntplib.NNTPError), e:
            syslog('fromusenet',
                   "%s: couldn't open newsgroup %s: skipping\n%s",
                   listname, mlist.linked_newsgroup, e)
            continue
        syslog('fromusenet', '%s: [%d..%d]' % (listname, first, last))
        try:
            try:
                if watermark is None:
                    mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT)
                    # This is the first time we've tried to gate this
                    # newsgroup.  We essentially do a mass catch-up, otherwise
                    # we'd flood the mailing list.
                    mlist.usenet_watermark = last
                    syslog('fromusenet', '%s caught up to article %d' %
                           (listname, last))
                else:
                    # The list has been polled previously, so now we simply
                    # grab all the messages on the newsgroup that have not
                    # been seen by the mailing list.  The first such article
                    # is the maximum of the lowest article available in the
                    # newsgroup and the watermark.  It's possible that some
                    # articles have been expired since the last time gate_news
                    # has run.  Not much we can do about that.
                    start = max(watermark+1, first)
                    if start > last:
                        syslog('fromusenet', 'nothing new for list %s' %
                               listname)
                    else:
                        mlist.Lock(timeout=mm_cfg.LIST_LOCK_TIMEOUT)
                        syslog('fromusenet', 'gating %s articles [%d..%d]' %
                               (listname, start, last))
                        # Use last+1 because poll_newsgroup() employes a for
                        # loop over range, and this will not include the last
                        # element in the list.
                        poll_newsgroup(mlist, conn, start, last+1, glock)
            except LockFile.TimeOutError:
                syslog('fromusenet', 'Could not acquire list lock: %s' %
                       listname)
        finally:
            if mlist.Locked():
                mlist.Save()
                mlist.Unlock()
        syslog('fromusenet', '%s watermark: %d' %
               (listname, mlist.usenet_watermark))



def main():
    lock = LockFile.LockFile(GATENEWS_LOCK_FILE,
                             # it's okay to hijack this
                             lifetime=LOCK_LIFETIME)
    try:
        lock.lock(timeout=0.5)
    except LockFile.TimeOutError:
        syslog('fromusenet', 'Could not acquire gate_news lock')
        return
    try:
        process_lists(lock)
    finally:
        clearcache()
        lock.unlock(unconditionally=1)



if __name__ == '__main__':
    try:
        opts, args = getopt.getopt(sys.argv[1:], 'h', ['help'])
    except getopt.error, msg:
        usage(1, msg)

    if args:
        usage(1, 'No args are expected')

    for opt, arg in opts:
        if opt in ('-h', '--help'):
            usage(0)

    main()