"""Miscellaneous essential routines.
This includes actual message transmission routines, address checking and
message and address munging, a handy-dandy routine to map a function on all
the mailing lists, and whatever else doesn't belong elsewhere.
"""
from __future__ import nested_scopes
import os
import re
import random
import urlparse
import sha
import errno
import time
import cgi
import htmlentitydefs
import email.Iterators
from types import UnicodeType
from string import whitespace, digits
try:
from string import ascii_letters
except ImportError:
_lower = 'abcdefghijklmnopqrstuvwxyz'
ascii_letters = _lower + _lower.upper()
from Mailman import mm_cfg
from Mailman import Errors
from Mailman import Site
from Mailman.SafeDict import SafeDict
try:
True, False
except NameError:
True = 1
False = 0
EMPTYSTRING = ''
NL = '\n'
DOT = '.'
IDENTCHARS = ascii_letters + digits + '_'
cre = re.compile(r'%\(([_a-z]\w*?)\)s?', re.IGNORECASE)
dre = re.compile(r'(\${2})|\$([_a-z]\w*)|\${([_a-z]\w*)}', re.IGNORECASE)
def list_exists(listname):
"""Return true iff list `listname' exists."""
basepath = Site.get_listpath(listname)
for ext in ('.pck', '.pck.last', '.db', '.db.last'):
dbfile = os.path.join(basepath, 'config' + ext)
if os.path.exists(dbfile):
return True
return False
def list_names():
"""Return the names of all lists in default list directory."""
return Site.get_listnames()
def wrap(text, column=70, honor_leading_ws=True):
"""Wrap and fill the text to the specified column.
Wrapping is always in effect, although if it is not possible to wrap a
line (because some word is longer than `column' characters) the line is
broken at the next available whitespace boundary. Paragraphs are also
always filled, unless honor_leading_ws is true and the line begins with
whitespace. This is the algorithm that the Python FAQ wizard uses, and
seems like a good compromise.
"""
wrapped = ''
paras = re.split('\n\n', text)
for para in paras:
lines = []
fillprev = False
for line in para.split(NL):
if not line:
lines.append(line)
continue
if honor_leading_ws and line[0] in whitespace:
fillthis = False
else:
fillthis = True
if fillprev and fillthis:
lines[-1] = lines[-1].rstrip() + ' ' + line
else:
lines.append(line)
fillprev = fillthis
for text in lines:
while text:
if len(text) <= column:
line = text
text = ''
else:
bol = column
while bol > 0 and text[bol] not in whitespace:
bol -= 1
eol = bol
while eol > 0 and text[eol] in whitespace:
eol -= 1
if eol == 0:
eol = column
while eol < len(text) and \
text[eol] not in whitespace:
eol += 1
bol = eol
while bol < len(text) and \
text[bol] in whitespace:
bol += 1
bol -= 1
line = text[:eol+1] + '\n'
bol += 1
while bol < len(text) and text[bol] in whitespace:
bol += 1
text = text[bol:]
wrapped += line
wrapped += '\n'
wrapped += '\n'
return wrapped[:-2]
def QuotePeriods(text):
JOINER = '\n .\n'
SEP = '\n.\n'
return JOINER.join(text.split(SEP))
def ParseEmail(email):
user = None
domain = None
email = email.lower()
at_sign = email.find('@')
if at_sign < 1:
return email, None
user = email[:at_sign]
rest = email[at_sign+1:]
domain = rest.split('.')
return user, domain
def LCDomain(addr):
"returns the address with the domain part lowercased"
atind = addr.find('@')
if atind == -1: return addr
return addr[:atind] + '@' + addr[atind+1:].lower()
_badchars = re.compile(r'[][()<>|;^,/\200-\377]')
def ValidateEmail(s):
"""Verify that the an email address isn't grossly evil."""
if not s or s.count(' ') > 0:
raise Errors.MMBadEmailError
if _badchars.search(s) or s[0] == '-':
raise Errors.MMHostileAddress, s
user, domain_parts = ParseEmail(s)
if not domain_parts:
raise Errors.MMBadEmailError, s
if len(domain_parts) < 2:
raise Errors.MMBadEmailError, s
def GetPathPieces(envar='PATH_INFO'):
path = os.environ.get(envar)
if path:
return [p for p in path.split('/') if p]
return None
def ScriptURL(target, web_page_url=None, absolute=False):
"""target - scriptname only, nothing extra
web_page_url - the list's configvar of the same name
absolute - a flag which if set, generates an absolute url
"""
if web_page_url is None:
web_page_url = mm_cfg.DEFAULT_URL_PATTERN % get_domain()
if web_page_url[-1] <> '/':
web_page_url = web_page_url + '/'
fullpath = os.environ.get('REQUEST_URI')
if fullpath is None:
fullpath = os.environ.get('SCRIPT_NAME', '') + \
os.environ.get('PATH_INFO', '')
baseurl = urlparse.urlparse(web_page_url)[2]
if not absolute and fullpath.endswith(baseurl):
fullpath = fullpath[len(baseurl):]
i = fullpath.find('?')
if i > 0:
count = fullpath.count('/', 0, i)
else:
count = fullpath.count('/')
path = ('../' * count) + target
else:
path = web_page_url + target
return path + mm_cfg.CGIEXT
def GetPossibleMatchingAddrs(name):
"""returns a sorted list of addresses that could possibly match
a given name.
For Example, given scott@pobox.com, return ['scott@pobox.com'],
given scott@blackbox.pobox.com return ['scott@blackbox.pobox.com',
'scott@pobox.com']"""
name = name.lower()
user, domain = ParseEmail(name)
res = [name]
if domain:
domain = domain[1:]
while len(domain) >= 2:
res.append("%s@%s" % (user, DOT.join(domain)))
domain = domain[1:]
return res
def List2Dict(L, foldcase=False):
"""Return a dict keyed by the entries in the list passed to it."""
d = {}
if foldcase:
for i in L:
d[i.lower()] = True
else:
for i in L:
d[i] = True
return d
_vowels = ('a', 'e', 'i', 'o', 'u')
_consonants = ('b', 'c', 'd', 'f', 'g', 'h', 'k', 'm', 'n',
'p', 'r', 's', 't', 'v', 'w', 'x', 'z')
_syllables = []
for v in _vowels:
for c in _consonants:
_syllables.append(c+v)
_syllables.append(v+c)
del c, v
def MakeRandomPassword(length=6):
syls = []
while len(syls) * 2 < length:
syls.append(random.choice(_syllables))
return EMPTYSTRING.join(syls)[:length]
def GetRandomSeed():
chr1 = int(random.random() * 52)
chr2 = int(random.random() * 52)
def mkletter(c):
if 0 <= c < 26:
c += 65
if 26 <= c < 52:
c += 71
return c
return "%c%c" % tuple(map(mkletter, (chr1, chr2)))
def set_global_password(pw, siteadmin=True):
if siteadmin:
filename = mm_cfg.SITE_PW_FILE
else:
filename = mm_cfg.LISTCREATOR_PW_FILE
omask = os.umask(026)
try:
fp = open(filename, 'w')
fp.write(sha.new(pw).hexdigest() + '\n')
fp.close()
finally:
os.umask(omask)
def get_global_password(siteadmin=True):
if siteadmin:
filename = mm_cfg.SITE_PW_FILE
else:
filename = mm_cfg.LISTCREATOR_PW_FILE
try:
fp = open(filename)
challenge = fp.read()[:-1] fp.close()
except IOError, e:
if e.errno <> errno.ENOENT: raise
return None
return challenge
def check_global_password(response, siteadmin=True):
challenge = get_global_password(siteadmin)
if challenge is None:
return None
return challenge == sha.new(response).hexdigest()
def websafe(s):
return cgi.escape(s, quote=1)
def ObscureEmail(addr, for_text=False):
"""Make email address unrecognizable to web spiders, but invertable.
When for_text option is set (not default), make a sentence fragment
instead of a token."""
if for_text:
return addr.replace('@', ' at ')
else:
return addr.replace('@', '--at--')
def UnobscureEmail(addr):
"""Invert ObscureEmail() conversion."""
return addr.replace('--at--', '@')
def maketext(templatefile, dict=None, raw=False, lang=None, mlist=None):
languages = []
if lang is not None:
languages.append(lang)
if mlist is not None:
languages.append(mlist.preferred_language)
languages.append(mm_cfg.DEFAULT_SERVER_LANGUAGE)
searchdirs = []
if mlist is not None:
searchdirs.append(mlist.fullpath())
searchdirs.append(os.path.join(mm_cfg.TEMPLATE_DIR, mlist.host_name))
searchdirs.append(os.path.join(mm_cfg.TEMPLATE_DIR, 'site'))
searchdirs.append(mm_cfg.TEMPLATE_DIR)
quickexit = 'quickexit'
fp = None
try:
for lang in languages:
for dir in searchdirs:
filename = os.path.join(dir, lang, templatefile)
try:
fp = open(filename)
raise quickexit
except IOError, e:
if e.errno <> errno.ENOENT: raise
fp = None
except quickexit:
pass
if fp is None:
try:
fp = open(os.path.join(mm_cfg.TEMPLATE_DIR, 'en', templatefile))
except IOError, e:
if e.errno <> errno.ENOENT: raise
raise IOError(errno.ENOENT, 'No template file found', templatefile)
template = fp.read()
fp.close()
text = template
if dict is not None:
try:
sdict = SafeDict(dict)
try:
text = sdict.interpolate(template)
except UnicodeError:
utemplate = unicode(template, GetCharSet(lang), 'replace')
text = sdict.interpolate(utemplate)
except (TypeError, ValueError), e:
from Mailman.Logging.Syslog import syslog
syslog('error', 'broken template: %s\n%s', filename, e)
pass
if raw:
return text
return wrap(text)
ADMINDATA = {
'confirm': (1, 1),
'help': (0, 0),
'info': (0, 0),
'lists': (0, 0),
'options': (0, 0),
'password': (2, 2),
'remove': (0, 0),
'set': (3, 3),
'subscribe': (0, 3),
'unsubscribe': (0, 1),
'who': (0, 0),
}
def is_administrivia(msg):
linecnt = 0
lines = []
for line in email.Iterators.body_line_iterator(msg):
if line == '-- ':
break
if line.strip():
linecnt += 1
if linecnt > mm_cfg.DEFAULT_MAIL_COMMANDS_MAX_LINES:
return False
lines.append(line)
bodytext = NL.join(lines)
if ADMINDATA.has_key(bodytext.strip().lower()):
return True
bodylines = lines[:5]
subject = str(msg.get('subject', ''))
bodylines.append(subject)
for line in bodylines:
if not line.strip():
continue
words = [word.lower() for word in line.split()]
minargs, maxargs = ADMINDATA.get(words[0], (None, None))
if minargs is None and maxargs is None:
continue
if minargs <= len(words[1:]) <= maxargs:
if words[0] == 'set' and words[2] not in ('on', 'off'):
continue
return True
return False
def GetRequestURI(fallback=None, escape=True):
"""Return the full virtual path this CGI script was invoked with.
Newer web servers seems to supply this info in the REQUEST_URI
environment variable -- which isn't part of the CGI/1.1 spec.
Thus, if REQUEST_URI isn't available, we concatenate SCRIPT_NAME
and PATH_INFO, both of which are part of CGI/1.1.
Optional argument `fallback' (default `None') is returned if both of
the above methods fail.
The url will be cgi escaped to prevent cross-site scripting attacks,
unless `escape' is set to 0.
"""
url = fallback
if os.environ.has_key('REQUEST_URI'):
url = os.environ['REQUEST_URI']
elif os.environ.has_key('SCRIPT_NAME') and os.environ.has_key('PATH_INFO'):
url = os.environ['SCRIPT_NAME'] + os.environ['PATH_INFO']
if escape:
return websafe(url)
return url
def reap(kids, func=None, once=False):
while kids:
if func:
func()
try:
pid, status = os.waitpid(-1, os.WNOHANG)
except OSError, e:
if e.errno <> errno.ECHILD:
raise
kids.clear()
break
if pid <> 0:
try:
del kids[pid]
except KeyError:
pass
if once:
break
def GetLanguageDescr(lang):
return mm_cfg.LC_DESCRIPTIONS[lang][0]
def GetCharSet(lang):
return mm_cfg.LC_DESCRIPTIONS[lang][1]
def IsLanguage(lang):
return mm_cfg.LC_DESCRIPTIONS.has_key(lang)
def get_domain():
host = os.environ.get('HTTP_HOST', os.environ.get('SERVER_NAME'))
port = os.environ.get('SERVER_PORT')
if port and host.endswith(':' + port):
host = host[:-len(port)-1]
if mm_cfg.VIRTUAL_HOST_OVERVIEW and host:
return host.lower()
else:
hostname = mm_cfg.DEFAULT_HOST_NAME or mm_cfg.DEFAULT_EMAIL_HOST
return hostname.lower()
def get_site_email(hostname=None, extra=None):
if hostname is None:
hostname = mm_cfg.VIRTUAL_HOSTS.get(get_domain(), get_domain())
if extra is None:
return '%s@%s' % (mm_cfg.MAILMAN_SITE_LIST, hostname)
return '%s-%s@%s' % (mm_cfg.MAILMAN_SITE_LIST, extra, hostname)
_serial = 0
def unique_message_id(mlist):
global _serial
msgid = '<mailman.%d.%d.%d.%s@%s>' % (
_serial, time.time(), os.getpid(),
mlist.internal_name(), mlist.host_name)
_serial += 1
return msgid
def midnight(date=None):
if date is None:
date = time.localtime()[:3]
return time.mktime(date + (0,)*5 + (-1,))
def to_dollar(s):
"""Convert from %-strings to $-strings."""
s = s.replace('$', '$$').replace('%%', '%')
parts = cre.split(s)
for i in range(1, len(parts), 2):
if parts[i+1] and parts[i+1][0] in IDENTCHARS:
parts[i] = '${' + parts[i] + '}'
else:
parts[i] = '$' + parts[i]
return EMPTYSTRING.join(parts)
def to_percent(s):
"""Convert from $-strings to %-strings."""
s = s.replace('%', '%%').replace('$$', '$')
parts = dre.split(s)
for i in range(1, len(parts), 4):
if parts[i] is not None:
parts[i] = '$'
elif parts[i+1] is not None:
parts[i+1] = '%(' + parts[i+1] + ')s'
else:
parts[i+2] = '%(' + parts[i+2] + ')s'
return EMPTYSTRING.join(filter(None, parts))
def dollar_identifiers(s):
"""Return the set (dictionary) of identifiers found in a $-string."""
d = {}
for name in filter(None, [b or c or None for a, b, c in dre.findall(s)]):
d[name] = True
return d
def percent_identifiers(s):
"""Return the set (dictionary) of identifiers found in a %-string."""
d = {}
for name in cre.findall(s):
d[name] = True
return d
def canonstr(s, lang=None):
newparts = []
parts = re.split(r'&(?P<ref>[^;]+);', s)
def appchr(i):
if i < 256:
newparts.append(chr(i))
else:
newparts.append(unichr(i))
while True:
newparts.append(parts.pop(0))
if not parts:
break
ref = parts.pop(0)
if ref.startswith('#'):
try:
appchr(int(ref[1:]))
except ValueError:
newparts.append('&'+ref+';')
else:
c = htmlentitydefs.entitydefs.get(ref, '?')
if c.startswith('#') and c.endswith(';'):
appchr(int(ref[1:-1]))
else:
newparts.append(c)
newstr = EMPTYSTRING.join(newparts)
if isinstance(newstr, UnicodeType):
return newstr
if lang is None:
charset = 'iso-8859-1'
else:
charset = GetCharSet(lang)
if charset == 'us-ascii':
charset = 'iso-8859-1'
return unicode(newstr, charset, 'replace')
def uncanonstr(s, lang=None):
if s is None:
s = u''
if lang is None:
charset = 'us-ascii'
else:
charset = GetCharSet(lang)
try:
if isinstance(s, UnicodeType):
return s.encode(charset)
else:
u = unicode(s, charset)
return s
except UnicodeError:
return uquote(s)
def uquote(s):
a = []
for c in s:
o = ord(c)
if o > 127:
a.append('&#%3d;' % o)
else:
a.append(c)
return str(EMPTYSTRING.join(a))