"""SPF (Sender-Permitted From) test script.
Copyright (c) 2003, Terence Way
This module is free software, and you may redistribute it and/or modify
it under the same terms as Python itself, so long as this copyright message
and disclaimer are retained in their original form.
IN NO EVENT SHALL THE AUTHOR BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT,
SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF
THIS CODE, EVEN IF THE AUTHOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
DAMAGE.
THE AUTHOR SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE. THE CODE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS,
AND THERE IS NO OBLIGATION WHATSOEVER TO PROVIDE MAINTENANCE,
SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
"""
__author__ = "Terence Way"
__email__ = "terry@wayforward.net"
__version__ = "1.0: December 9, 2003"
MODULE = 'spf'
import DNS
MASK = 0xFFFFFFFFL
def get_spf(domainname):
"""Get the SPF record recorded in DNS for a specific domain name.
Returns None if not found.
"""
req = DNS.DnsRequest(domainname, qtype='txt')
resp = req.req()
for s in resp.answers:
for d in s['data']:
if d.startswith('v=spf1'):
return d
else:
return None
def parse_mechanism(str, domainname):
"""
Examples:
>>> parse_mechanism('a', 'foo.com')
('a', 'foo.com', 32)
>>> parse_mechanism('a:bar.com', 'foo.com')
('a', 'bar.com', 32)
>>> parse_mechanism('a/24', 'foo.com')
('a', 'foo.com', 24)
>>> parse_mechanism('a:bar.com/16', 'foo.com')
('a', 'bar.com', 16)
"""
a = str.split('/')
if len(a) == 2:
a, port = a[0], int(a[1])
else:
a, port = str, 32
b = a.split(':')
if len(b) == 2:
return b[0], b[1], port
else:
return a, domainname, port
def check(ipaddr, domainname):
"""Test an incoming email claiming to be from a domainname,
coming in from a specific ipaddr.
"""
return check_spf(ipaddr, domainname, get_spf(domainname))
def check_spf(ipaddr, domainname, spf):
"""Test an incoming email claiming to be from a domainname,
passing in the SPF data."""
if not spf:
return (False, 200, 'no SPF record')
mechanisms = spf.split()[1:]
for m in spf.split()[1:]:
if m.startswith('include:'):
result = check(ipaddr, m[len('include:'):])
if result[0]:
return result
if m[0] == '-':
m = m[1:]
result = (False, 550, 'access denied')
elif m[0] == '?' or m[0] == '~':
m = m[1:]
result = (False, 250, 'SPF unknown')
elif m[0] == '+':
m = m[1:]
result = (True, 250, 'sender validated via SPF')
else: result = (True, 250, 'sender validated via SPF')
if m == 'all':
return result
if m.startswith('exists:'):
if len(get_a(m[len('exists:'):])) > 0:
return result
m = parse_mechanism(m, domainname)
if m[0] == 'a':
if cidrmatch(ipaddr, get_a(m[1]), m[2]):
return result
elif m[0] == 'mx':
if cidrmatch(ipaddr, get_mx(m[1]), m[2]):
return result
elif m[0] == 'ip4' and m[1] != domainname:
if cidrmatch(ipaddr, [m[1]], m[2]):
return result
elif m[0] == 'ptr':
if domainmatch(validated_ptrdnames(ipaddr), m[1]):
return result
result = (False, 250, 'SPF unknown')
return result;
def get_a(domainname):
"""Get a list of IP addresses for a domainname."""
req = DNS.DnsRequest(domainname)
resp = req.req()
return [a['data'] for a in resp.answers]
def get_ptr(ipaddr):
"""Get a list of domain names for an IP address."""
req = DNS.DnsRequest(myreverse(ipaddr) + ".in-addr.arpa", qtype='ptr')
resp = req.req()
return [a['data'] for a in resp.answers]
def myreverse(thingy):
""" why couldn't python's reverse() just return a value just like any other function? """
mythingy = thingy
mythingy.reverse()
return mythingy
def get_mx(domainname):
"""Get a list of IP addresses for all MX exchanges for a domainname."""
req = DNS.DnsRequest(domainname, qtype='mx')
resp = req.req()
result = []
for mx in resp.answers:
result += get_a(mx['data'][1])
return result
def domainmatch(ptrdnames, domainsuffix):
"""grep for a given domain suffix against a list of validated PTR domain names.
Examples:
>>> domainmatch(['foo.com'], 'foo.com')
True
>>> domainmatch(['moo.foo.com',] 'foo.com')
True
>>> domainmatch(['moo.bar.com'], 'foo.com')
False
"""
for ptrdname in ptrdnames:
if ptrdname.lower() == domainsuffix.lower(): return True
if ptrdname.lower.endswith("." + domainsuffix.lower()): return True
return False
def validated_ptrdnames(ipaddr):
""" Figure out the validated PTR domain names for a given IP address.
@ptrdnames = ptr_lookup(ipaddr);
@validated = grep { ipaddr in a_lookup($_) } @ptrdnames;
"""
ptrdnames = get_ptr(ipaddr)
validated = []
for ptrdname in ptrdnames:
ips = get_a(ptrdname)
if ipaddr in ips:
validated += ptrdname
return validated
def cidrmatch(ipaddr, ipaddrs, cidr_length = 32):
"""Match an IP address against a list of other IP addresses.
Examples:
>>> cidrmatch('192.168.0.45', ['192.168.0.44', '192.168.0.45'])
True
>>> cidrmatch('192.168.0.43', ['192.168.0.44', '192.168.0.45'])
False
>>> cidrmatch('192.168.0.43', ['192.168.0.44', '192.168.0.45'], 24)
True
"""
c = cidr(ipaddr, cidr_length)
for i in ipaddrs:
if cidr(i, cidr_length) == c:
return True
return False
def cidr(addr, n):
"""Convert an IP address string with a CIDR mask into a 32-bit
integer.
Examples:
>>> DNS.bin2addr(cidr('192.168.5.45', 32))
'192.168.5.45'
>>> DNS.bin2addr(cidr('192.168.5.45', 24))
'192.168.5.0'
>>> DNS.bin2addr(cidr('192.168.0.45', 8))
'192.0.0.0'
"""
return ~(MASK >> n) & MASK & DNS.addr2bin(addr)
def _test():
import doctest, spf
return doctest.testmod(spf)
if __name__ == '__main__':
import sys
if len(sys.argv) == 3:
DNS.DiscoverNameServers()
print check(sys.argv[1], sys.argv[2])
elif len(sys.argv) == 1:
_test()
else:
print "To check an incoming mail request:"
print " % python spf.py {ip-addr} {domain-name}"
print
print "To test this script:"
print " % python spf.py"