Rename ipa-python directory to ipapython so it is a real python library

We used to install it as ipa, now installing it as ipapython. The rpm
is still ipa-python.
This commit is contained in:
Rob Crittenden
2009-02-05 15:03:08 -05:00
parent 58ae191a5a
commit 262ff2d731
42 changed files with 143 additions and 226 deletions

2
ipapython/MANIFEST.in Normal file
View File

@@ -0,0 +1,2 @@
include *.conf

28
ipapython/Makefile Normal file
View File

@@ -0,0 +1,28 @@
PYTHONLIBDIR ?= $(shell python -c "from distutils.sysconfig import *; print get_python_lib()")
PACKAGEDIR ?= $(DESTDIR)/$(PYTHONLIBDIR)/ipa
CONFIGDIR ?= $(DESTDIR)/etc/ipa
TESTS = $(wildcard test/*.py)
all: ;
install:
if [ "$(DESTDIR)" = "" ]; then \
python setup.py install; \
else \
python setup.py install --root $(DESTDIR); \
fi
clean:
rm -f *~ *.pyc
distclean: clean
rm -f setup.py ipa-python.spec version.py
maintainer-clean: distclean
rm -rf build
.PHONY: test
test: $(subst .py,.tst,$(TESTS))
%.tst: %.py
python $<

18
ipapython/README Normal file
View File

@@ -0,0 +1,18 @@
This is a set of libraries common to IPA clients and servers though mostly
geared currently towards command-line tools.
A brief overview:
config.py - identify the IPA server domain and realm. It uses dnsclient to
try to detect this information first and will fall back to
/etc/ipa/ipa.conf if that fails.
dnsclient.py - find IPA information via DNS
ipautil.py - helper functions
radius_util.py - helper functions for Radius
entity.py - entity is the main data type. User and Group extend this class
(but don't add anything currently).
ipavalidate.py - basic data validation routines

0
ipapython/__init__.py Normal file
View File

183
ipapython/config.py Normal file
View File

@@ -0,0 +1,183 @@
# Authors: Karl MacMillan <kmacmill@redhat.com>
#
# Copyright (C) 2007 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
import ConfigParser
from optparse import OptionParser, IndentedHelpFormatter
import krbV
import socket
import ipapython.dnsclient
import re
class IPAConfigError(Exception):
def __init__(self, msg=''):
self.msg = msg
Exception.__init__(self, msg)
def __repr__(self):
return self.msg
__str__ = __repr__
class IPAFormatter(IndentedHelpFormatter):
"""Our own optparse formatter that indents multiple lined usage string."""
def format_usage(self, usage):
usage_string = "Usage:"
spacing = " " * len(usage_string)
lines = usage.split("\n")
ret = "%s %s\n" % (usage_string, lines[0])
for line in lines[1:]:
ret += "%s %s\n" % (spacing, line)
return ret
def verify_args(parser, args, needed_args = None):
"""Verify that we have all positional arguments we need, if not, exit."""
if needed_args:
needed_list = needed_args.split(" ")
else:
needed_list = []
len_need = len(needed_list)
len_have = len(args)
if len_have > len_need:
parser.error("too many arguments")
elif len_have < len_need:
parser.error("no %s specified" % needed_list[len_have])
class IPAConfig:
def __init__(self):
self.default_realm = None
self.default_server = []
self.default_domain = None
def get_realm(self):
if self.default_realm:
return self.default_realm
else:
raise IPAConfigError("no default realm")
def get_server(self):
if len(self.default_server):
return self.default_server
else:
raise IPAConfigError("no default server")
def get_domain(self):
if self.default_domain:
return self.default_domain
else:
raise IPAConfigError("no default domain")
# Global library config
config = IPAConfig()
def __parse_config(discover_server = True):
p = ConfigParser.SafeConfigParser()
p.read("/etc/ipa/ipa.conf")
try:
if not config.default_realm:
config.default_realm = p.get("defaults", "realm")
except:
pass
if discover_server:
try:
s = p.get("defaults", "server")
config.default_server.extend(re.sub("\s+", "", s).split(','))
except:
pass
try:
if not config.default_domain:
config.default_domain = p.get("defaults", "domain")
except:
pass
def __discover_config(discover_server = True):
rl = 0
try:
if not config.default_realm:
krbctx = krbV.default_context()
config.default_realm = krbctx.default_realm
if not config.default_realm:
return False
if not config.default_domain:
#try once with REALM -> domain
dom_name = config.default_realm.lower()
name = "_ldap._tcp."+dom_name+"."
rs = ipapython.dnsclient.query(name, ipapython.dnsclient.DNS_C_IN, ipapython.dnsclient.DNS_T_SRV)
rl = len(rs)
if rl == 0:
#try cycling on domain components of FQDN
dom_name = socket.getfqdn()
while rl == 0:
tok = dom_name.find(".")
if tok == -1:
return False
dom_name = dom_name[tok+1:]
name = "_ldap._tcp." + dom_name + "."
rs = ipapython.dnsclient.query(name, ipapython.dnsclient.DNS_C_IN, ipapython.dnsclient.DNS_T_SRV)
rl = len(rs)
config.default_domain = dom_name
if discover_server:
if rl == 0:
name = "_ldap._tcp."+config.default_domain+"."
rs = ipapython.dnsclient.query(name, ipapython.dnsclient.DNS_C_IN, ipapython.dnsclient.DNS_T_SRV)
for r in rs:
if r.dns_type == ipapython.dnsclient.DNS_T_SRV:
rsrv = r.rdata.server.rstrip(".")
config.default_server.append(rsrv)
except:
pass
def add_standard_options(parser):
parser.add_option("--realm", dest="realm", help="Override default IPA realm")
parser.add_option("--server", dest="server", help="Override default IPA server")
parser.add_option("--domain", dest="domain", help="Override default IPA DNS domain")
def init_config(options=None):
if options:
config.default_realm = options.realm
config.default_domain = options.domain
if options.server:
config.default_server.extend(options.server.split(","))
if len(config.default_server):
discover_server = False
else:
discover_server = True
__parse_config(discover_server)
__discover_config(discover_server)
# make sure the server list only contains unique items
new_server = []
for server in config.default_server:
if server not in new_server:
new_server.append(server)
config.default_server = new_server
if not config.default_realm:
raise IPAConfigError("IPA realm not found in DNS, in the config file (/etc/ipa/ipa.conf) or on the command line.")
if not config.default_server:
raise IPAConfigError("IPA server not found in DNS, in the config file (/etc/ipa/ipa.conf) or on the command line.")
if not config.default_domain:
raise IPAConfigError("IPA domain not found in the config file (/etc/ipa/ipa.conf) or on the command line.")

443
ipapython/dnsclient.py Normal file
View File

@@ -0,0 +1,443 @@
#
# Copyright 2001, 2005 Red Hat, Inc.
#
# This is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 only
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
#
import struct
import socket
import sys
import acutil
DNS_C_IN = 1
DNS_C_CS = 2
DNS_C_CHAOS = 3
DNS_C_HS = 4
DNS_C_ANY = 255
DNS_T_A = 1
DNS_T_NS = 2
DNS_T_CNAME = 5
DNS_T_SOA = 6
DNS_T_NULL = 10
DNS_T_WKS = 11
DNS_T_PTR = 12
DNS_T_HINFO = 13
DNS_T_MX = 15
DNS_T_TXT = 16
DNS_T_SRV = 33
DNS_T_ANY = 255
DEBUG_DNSCLIENT = False
class DNSQueryHeader:
FORMAT = "!HBBHHHH"
def __init__(self):
self.dns_id = 0
self.dns_rd = 0
self.dns_tc = 0
self.dns_aa = 0
self.dns_opcode = 0
self.dns_qr = 0
self.dns_rcode = 0
self.dns_z = 0
self.dns_ra = 0
self.dns_qdcount = 0
self.dns_ancount = 0
self.dns_nscount = 0
self.dns_arcount = 0
def pack(self):
return struct.pack(DNSQueryHeader.FORMAT,
self.dns_id,
(self.dns_rd & 1) |
(self.dns_tc & 1) << 1 |
(self.dns_aa & 1) << 2 |
(self.dns_opcode & 15) << 3 |
(self.dns_qr & 1) << 7,
(self.dns_rcode & 15) |
(self.dns_z & 7) << 4 |
(self.dns_ra & 1) << 7,
self.dns_qdcount,
self.dns_ancount,
self.dns_nscount,
self.dns_arcount)
def unpack(self, data):
(self.dns_id, byte1, byte2, self.dns_qdcount, self.dns_ancount,
self.dns_nscount, self.dns_arcount) = struct.unpack(DNSQueryHeader.FORMAT, data[0:self.size()])
self.dns_rd = byte1 & 1
self.dns_tc = (byte1 >> 1) & 1
self.dns_aa = (byte1 >> 2) & 1
self.dns_opcode = (byte1 >> 3) & 15
self.dns_qr = (byte1 >> 7) & 1
self.dns_rcode = byte2 & 15
self.dns_z = (byte2 >> 4) & 7
self.dns_ra = (byte1 >> 7) & 1
def size(self):
return struct.calcsize(DNSQueryHeader.FORMAT)
def unpackQueryHeader(data):
header = DNSQueryHeader()
header.unpack(data)
return header
class DNSResult:
FORMAT = "!HHIH"
QFORMAT = "!HH"
def __init__(self):
self.dns_name = ""
self.dns_type = 0
self.dns_class = 0
self.dns_ttl = 0
self.dns_rlength = 0
self.rdata = None
def unpack(self, data):
(self.dns_type, self.dns_class, self.dns_ttl,
self.dns_rlength) = struct.unpack(DNSResult.FORMAT, data[0:self.size()])
def qunpack(self, data):
(self.dns_type, self.dns_class) = struct.unpack(DNSResult.QFORMAT, data[0:self.qsize()])
def size(self):
return struct.calcsize(DNSResult.FORMAT)
def qsize(self):
return struct.calcsize(DNSResult.QFORMAT)
class DNSRData:
def __init__(self):
pass
#typedef struct dns_rr_a {
# u_int32_t address;
#} dns_rr_a_t;
#
#typedef struct dns_rr_cname {
# const char *cname;
#} dns_rr_cname_t;
#
#typedef struct dns_rr_hinfo {
# const char *cpu, *os;
#} dns_rr_hinfo_t;
#
#typedef struct dns_rr_mx {
# u_int16_t preference;
# const char *exchange;
#} dns_rr_mx_t;
#
#typedef struct dns_rr_null {
# unsigned const char *data;
#} dns_rr_null_t;
#
#typedef struct dns_rr_ns {
# const char *nsdname;
#} dns_rr_ns_t;
#
#typedef struct dns_rr_ptr {
# const char *ptrdname;
#} dns_rr_ptr_t;
#
#typedef struct dns_rr_soa {
# const char *mname;
# const char *rname;
# u_int32_t serial;
# int32_t refresh;
# int32_t retry;
# int32_t expire;
# int32_t minimum;
#} dns_rr_soa_t;
#
#typedef struct dns_rr_txt {
# const char *data;
#} dns_rr_txt_t;
#
#typedef struct dns_rr_srv {
# const char *server;
# u_int16_t priority;
# u_int16_t weight;
# u_int16_t port;
#} dns_rr_srv_t;
def dnsNameToLabel(name):
out = ""
name = name.split(".")
for part in name:
out += chr(len(part)) + part
return out
def dnsFormatQuery(query, qclass, qtype):
header = DNSQueryHeader()
header.dns_id = 0 # FIXME: id = 0
header.dns_rd = 1 # don't know why the original code didn't request recursion for non SOA requests
header.dns_qr = 0 # query
header.dns_opcode = 0 # standard query
header.dns_qdcount = 1 # single query
qlabel = dnsNameToLabel(query)
if not qlabel:
return ""
out = header.pack() + qlabel
out += chr(qtype >> 8)
out += chr(qtype & 0xff)
out += chr(qclass >> 8)
out += chr(qclass & 0xff)
return out
def dnsParseLabel(label, base):
# returns (output, rest)
if not label:
return ("", None)
update = 1
rest = label
output = ""
skip = 0
try:
while ord(rest[0]):
if ord(rest[0]) & 0xc0:
rest = base[((ord(rest[0]) & 0x3f) << 8) + ord(rest[1]):]
if update:
skip += 2
update = 0
continue
output += rest[1:ord(rest[0]) + 1] + "."
if update:
skip += ord(rest[0]) + 1
rest = rest[ord(rest[0]) + 1:]
except IndexError:
return ("", None)
return (label[skip+update:], output)
def dnsParseA(data, base):
rdata = DNSRData()
if len(data) < 4:
rdata.address = 0
return None
rdata.address = (ord(data[0])<<24) | (ord(data[1])<<16) | (ord(data[2])<<8) | (ord(data[3])<<0)
if DEBUG_DNSCLIENT:
print "A = %d.%d.%d.%d." % (ord(data[0]), ord(data[1]), ord(data[2]), ord(data[3]))
return rdata
def dnsParseText(data):
if len(data) < 1:
return ("", None)
tlen = ord(data[0])
if len(data) < tlen + 1:
return ("", None)
return (data[tlen+1:], data[1:tlen+1])
def dnsParseNS(data, base):
rdata = DNSRData()
(rest, rdata.nsdname) = dnsParseLabel(data, base)
if DEBUG_DNSCLIENT:
print "NS DNAME = \"%s\"." % (rdata.nsdname)
return rdata
def dnsParseCNAME(data, base):
rdata = DNSRData()
(rest, rdata.cname) = dnsParseLabel(data, base)
if DEBUG_DNSCLIENT:
print "CNAME = \"%s\"." % (rdata.cname)
return rdata
def dnsParseSOA(data, base):
rdata = DNSRData()
format = "!IIIII"
(rest, rdata.mname) = dnsParseLabel(data, base)
if rdata.mname is None:
return None
(rest, rdata.rname) = dnsParseLabel(rest, base)
if rdata.rname is None:
return None
if len(rest) < struct.calcsize(format):
return None
(rdata.serial, rdata.refresh, rdata.retry, rdata.expire,
rdata.minimum) = struct.unpack(format, rest[:struct.calcsize(format)])
if DEBUG_DNSCLIENT:
print "SOA(mname) = \"%s\"." % rdata.mname
print "SOA(rname) = \"%s\"." % rdata.rname
print "SOA(serial) = %d." % rdata.serial
print "SOA(refresh) = %d." % rdata.refresh
print "SOA(retry) = %d." % rdata.retry
print "SOA(expire) = %d." % rdata.expire
print "SOA(minimum) = %d." % rdata.minimum
return rdata
def dnsParseNULL(data, base):
# um, yeah
return None
def dnsParseWKS(data, base):
return None
def dnsParseHINFO(data, base):
rdata = DNSRData()
(rest, rdata.cpu) = dnsParseText(data)
if rest:
(rest, rdata.os) = dnsParseText(rest)
if DEBUG_DNSCLIENT:
print "HINFO(cpu) = \"%s\"." % rdata.cpu
print "HINFO(os) = \"%s\"." % rdata.os
return rdata
def dnsParseMX(data, base):
rdata = DNSRData()
if len(data) < 2:
return None
rdata.preference = (ord(data[0]) << 8) | ord(data[1])
(rest, rdata.exchange) = dnsParseLabel(data[2:], base)
if DEBUG_DNSCLIENT:
print "MX(exchanger) = \"%s\"." % rdata.exchange
print "MX(preference) = %d." % rdata.preference
return rdata
def dnsParseTXT(data, base):
rdata = DNSRData()
(rest, rdata.data) = dnsParseText(data)
if DEBUG_DNSCLIENT:
print "TXT = \"%s\"." % rdata.data
return rdata
def dnsParsePTR(data, base):
rdata = DNSRData()
(rest, rdata.ptrdname) = dnsParseLabel(data, base)
if DEBUG_DNSCLIENT:
print "PTR = \"%s\"." % rdata.ptrdname
return rdata
def dnsParseSRV(data, base):
rdata = DNSRData()
format = "!HHH"
flen = struct.calcsize(format)
if len(data) < flen:
return None
(rdata.priority, rdata.weight, rdata.port) = struct.unpack(format, data[:flen])
(rest, rdata.server) = dnsParseLabel(data[flen:], base)
if DEBUG_DNSCLIENT:
print "SRV(server) = \"%s\"." % rdata.server
print "SRV(weight) = %d." % rdata.weight
print "SRV(priority) = %d." % rdata.priority
print "SRV(port) = %d." % rdata.port
return rdata
def dnsParseResults(results):
try:
header = unpackQueryHeader(results)
except struct.error:
return []
if header.dns_qr != 1: # should be a response
return []
if header.dns_rcode != 0: # should be no error
return []
rest = results[header.size():]
rrlist = []
for i in xrange(header.dns_qdcount):
if not rest:
return []
qq = DNSResult()
(rest, label) = dnsParseLabel(rest, results)
if label is None:
return []
if len(rest) < qq.qsize():
return []
qq.qunpack(rest)
rest = rest[qq.qsize():]
if DEBUG_DNSCLIENT:
print "Queried for '%s', class = %d, type = %d." % (label,
qq.dns_class, qq.dns_type)
for i in xrange(header.dns_ancount + header.dns_nscount + header.dns_arcount):
(rest, label) = dnsParseLabel(rest, results)
if label is None:
return []
rr = DNSResult()
rr.dns_name = label
if len(rest) < rr.size():
return []
rr.unpack(rest)
rest = rest[rr.size():]
if DEBUG_DNSCLIENT:
print "Answer %d for '%s', class = %d, type = %d, ttl = %d." % (i,
rr.dns_name, rr.dns_class, rr.dns_type,
rr.dns_ttl)
if len(rest) < rr.dns_rlength:
if DEBUG_DNSCLIENT:
print "Answer too short."
return []
fmap = { DNS_T_A: dnsParseA, DNS_T_NS: dnsParseNS,
DNS_T_CNAME: dnsParseCNAME, DNS_T_SOA: dnsParseSOA,
DNS_T_NULL: dnsParseNULL, DNS_T_WKS: dnsParseWKS,
DNS_T_PTR: dnsParsePTR, DNS_T_HINFO: dnsParseHINFO,
DNS_T_MX: dnsParseMX, DNS_T_TXT: dnsParseTXT,
DNS_T_SRV: dnsParseSRV}
if not rr.dns_type in fmap:
if DEBUG_DNSCLIENT:
print "Don't know how to parse RR type %d!" % rr.dns_type
else:
rr.rdata = fmap[rr.dns_type](rest[:rr.dns_rlength], results)
rest = rest[rr.dns_rlength:]
rrlist += [rr]
return rrlist
def query(query, qclass, qtype):
qdata = dnsFormatQuery(query, qclass, qtype)
if not qdata:
return []
answer = acutil.res_send(qdata)
if not answer:
return []
return dnsParseResults(answer)
if __name__ == '__main__':
DEBUG_DNSCLIENT = True
print "Sending query."
rr = query(len(sys.argv) > 1 and sys.argv[1] or "devserv.devel.redhat.com.",
DNS_C_IN, DNS_T_ANY)
sys.exit(0)

202
ipapython/entity.py Normal file
View File

@@ -0,0 +1,202 @@
# Copyright (C) 2007 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
import ldap
import ldif
import re
import cStringIO
import copy
import ipapython.ipautil
def utf8_encode_value(value):
if isinstance(value,unicode):
return value.encode('utf-8')
return value
def utf8_encode_values(values):
if isinstance(values,list) or isinstance(values,tuple):
return map(utf8_encode_value, values)
else:
return utf8_encode_value(values)
def copy_CIDict(x):
"""Do a deep copy of a CIDict"""
y = {}
for key, value in x.iteritems():
y[copy.deepcopy(key)] = copy.deepcopy(value)
return y
class Entity:
"""This class represents an IPA user. An LDAP entry consists of a DN
and a list of attributes. Each attribute consists of a name and a list of
values. For the time being I will maintain this.
In python-ldap, entries are returned as a list of 2-tuples.
Instance variables:
dn - string - the string DN of the entry
data - CIDict - case insensitive dict of the attributes and values
orig_data - CIDict - case insentiive dict of the original attributes and values"""
def __init__(self,entrydata=None):
"""data is the raw data returned from the python-ldap result method,
which is a search result entry or a reference or None.
If creating a new empty entry, data is the string DN."""
if entrydata:
if isinstance(entrydata,tuple):
self.dn = entrydata[0]
self.data = ipapython.ipautil.CIDict(entrydata[1])
elif isinstance(entrydata,str) or isinstance(entrydata,unicode):
self.dn = entrydata
self.data = ipapython.ipautil.CIDict()
elif isinstance(entrydata,dict):
self.dn = entrydata['dn']
del entrydata['dn']
self.data = ipapython.ipautil.CIDict(entrydata)
else:
self.dn = ''
self.data = ipapython.ipautil.CIDict()
self.orig_data = ipapython.ipautil.CIDict(copy_CIDict(self.data))
def __nonzero__(self):
"""This allows us to do tests like if entry: returns false if there is no data,
true otherwise"""
return self.data != None and len(self.data) > 0
def hasAttr(self,name):
"""Return True if this entry has an attribute named name, False otherwise"""
return self.data and self.data.has_key(name)
def __setattr__(self,name,value):
"""One should use setValue() or setValues() to set values except for
dn and data which are special."""
if name != 'dn' and name != 'data' and name != 'orig_data':
raise KeyError, 'use setValue() or setValues()'
else:
self.__dict__[name] = value
def __getattr__(self,name):
"""If name is the name of an LDAP attribute, return the first value for that
attribute - equivalent to getValue - this allows the use of
entry.cn
instead of
entry.getValue('cn')
This also allows us to return None if an attribute is not found rather than
throwing an exception"""
return self.getValue(name)
def getValues(self,name):
"""Get the list (array) of values for the attribute named name"""
return self.data.get(name)
def getValue(self,name,default=None):
"""Get the first value for the attribute named name"""
value = self.data.get(name,default)
if isinstance(value,list) or isinstance(value,tuple):
return value[0]
else:
return value
def setValue(self,name,*value):
"""Value passed in may be a single value, several values, or a single sequence.
For example:
ent.setValue('name', 'value')
ent.setValue('name', 'value1', 'value2', ..., 'valueN')
ent.setValue('name', ['value1', 'value2', ..., 'valueN'])
ent.setValue('name', ('value1', 'value2', ..., 'valueN'))
Since *value is a tuple, we may have to extract a list or tuple from that
tuple as in the last two examples above"""
if (len(value) < 1):
return
if (len(value) == 1):
self.data[name] = utf8_encode_values(value[0])
else:
self.data[name] = utf8_encode_values(value)
setValues = setValue
def setValueNotEmpty(self,name,*value):
"""Similar to setValue() but will not set an empty field. This
is an attempt to avoid adding empty attributes."""
if (len(value) >= 1) and value[0] and len(value[0]) > 0:
if isinstance(value[0], list):
if len(value[0][0]) > 0:
self.setValue(name, *value)
return
else:
self.setValue(name, *value)
return
# At this point we have an empty incoming value. See if they are
# trying to erase the current value. If so we'll delete it so
# it gets marked as removed in the modlist.
v = self.getValues(name)
if v:
self.delValue(name)
return
def delValue(self,name):
"""Remove the attribute named name."""
if self.data.get(name,None):
del self.data[name]
def toTupleList(self):
"""Convert the attrs and values to a list of 2-tuples. The first element
of the tuple is the attribute name. The second element is either a
single value or a list of values."""
return self.data.items()
def toDict(self):
"""Convert the attrs and values to a dict. The dict is keyed on the
attribute name. The value is either single value or a list of values."""
result = ipapython.ipautil.CIDict(self.data)
result['dn'] = self.dn
return result
def attrList(self):
"""Return a list of all attributes in the entry"""
return self.data.keys()
def origDataDict(self):
"""Returns a dict of the original values of the user. Used for updates."""
result = ipapython.ipautil.CIDict(self.orig_data)
result['dn'] = self.dn
return result
# def __str__(self):
# """Convert the Entry to its LDIF representation"""
# return self.__repr__()
#
# # the ldif class base64 encodes some attrs which I would rather see in raw form - to
# # encode specific attrs as base64, add them to the list below
# ldif.safe_string_re = re.compile('^$')
# base64_attrs = ['nsstate', 'krbprincipalkey', 'krbExtraData']
#
# def __repr__(self):
# """Convert the Entry to its LDIF representation"""
# sio = cStringIO.StringIO()
# # what's all this then? the unparse method will currently only accept
# # a list or a dict, not a class derived from them. self.data is a
# # cidict, so unparse barfs on it. I've filed a bug against python-ldap,
# # but in the meantime, we have to convert to a plain old dict for printing
# # I also don't want to see wrapping, so set the line width really high (1000)
# newdata = {}
# newdata.update(self.data)
# ldif.LDIFWriter(sio,User.base64_attrs,1000).unparse(self.dn,newdata)
# return sio.getvalue()

3
ipapython/ipa.conf Normal file
View File

@@ -0,0 +1,3 @@
[defaults]
# realm = EXAMPLE.COM
# server = ipa.example.com

969
ipapython/ipautil.py Normal file
View File

@@ -0,0 +1,969 @@
# Authors: Simo Sorce <ssorce@redhat.com>
#
# Copyright (C) 2007 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
SHARE_DIR = "/usr/share/ipa/"
PLUGINS_SHARE_DIR = "/usr/share/ipa/plugins"
import string
import tempfile
import logging
import subprocess
import random
import os, sys, traceback, readline
import stat
import shutil
from ipapython import ipavalidate
from types import *
import re
import xmlrpclib
import datetime
from ipapython import config
try:
from subprocess import CalledProcessError
class CalledProcessError(subprocess.CalledProcessError):
def __init__(self, returncode, cmd):
super(CalledProcessError, self).__init__(returncode, cmd)
except ImportError:
# Python 2.4 doesn't implement CalledProcessError
class CalledProcessError(Exception):
"""This exception is raised when a process run by check_call() returns
a non-zero exit status. The exit status will be stored in the
returncode attribute."""
def __init__(self, returncode, cmd):
self.returncode = returncode
self.cmd = cmd
def __str__(self):
return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode)
def get_domain_name():
try:
config.init_config()
domain_name = config.config.get_domain()
except Exception, e:
return None
return domain_name
def realm_to_suffix(realm_name):
s = realm_name.split(".")
terms = ["dc=" + x.lower() for x in s]
return ",".join(terms)
def template_str(txt, vars):
return string.Template(txt).substitute(vars)
def template_file(infilename, vars):
txt = open(infilename).read()
return template_str(txt, vars)
def write_tmp_file(txt):
fd = tempfile.NamedTemporaryFile()
fd.write(txt)
fd.flush()
return fd
def run(args, stdin=None):
if stdin:
p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
stdout,stderr = p.communicate(stdin)
else:
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
stdout,stderr = p.communicate()
logging.info(stdout)
logging.info(stderr)
if p.returncode != 0:
raise CalledProcessError(p.returncode, ' '.join(args))
return (stdout, stderr)
def file_exists(filename):
try:
mode = os.stat(filename)[stat.ST_MODE]
if stat.S_ISREG(mode):
return True
else:
return False
except:
return False
def dir_exists(filename):
try:
mode = os.stat(filename)[stat.ST_MODE]
if stat.S_ISDIR(mode):
return True
else:
return False
except:
return False
def install_file(fname, dest):
if file_exists(dest):
os.rename(dest, dest + ".orig")
shutil.move(fname, dest)
def backup_file(fname):
if file_exists(fname):
os.rename(fname, fname + ".orig")
# uses gpg to compress and encrypt a file
def encrypt_file(source, dest, password, workdir = None):
if type(source) is not StringType or not len(source):
raise ValueError('Missing Source File')
#stat it so that we get back an exception if it does no t exist
os.stat(source)
if type(dest) is not StringType or not len(dest):
raise ValueError('Missing Destination File')
if type(password) is not StringType or not len(password):
raise ValueError('Missing Password')
#create a tempdir so that we can clean up with easily
tempdir = tempfile.mkdtemp('', 'ipa-', workdir)
gpgdir = tempdir+"/.gnupg"
try:
try:
#give gpg a fake dir so that we can leater remove all
#the cruft when we clean up the tempdir
os.mkdir(gpgdir)
args = ['/usr/bin/gpg', '--homedir', gpgdir, '--passphrase-fd', '0', '--yes', '--no-tty', '-o', dest, '-c', source]
run(args, password)
except:
raise
finally:
#job done, clean up
shutil.rmtree(tempdir, ignore_errors=True)
def decrypt_file(source, dest, password, workdir = None):
if type(source) is not StringType or not len(source):
raise ValueError('Missing Source File')
#stat it so that we get back an exception if it does no t exist
os.stat(source)
if type(dest) is not StringType or not len(dest):
raise ValueError('Missing Destination File')
if type(password) is not StringType or not len(password):
raise ValueError('Missing Password')
#create a tempdir so that we can clean up with easily
tempdir = tempfile.mkdtemp('', 'ipa-', workdir)
gpgdir = tempdir+"/.gnupg"
try:
try:
#give gpg a fake dir so that we can leater remove all
#the cruft when we clean up the tempdir
os.mkdir(gpgdir)
args = ['/usr/bin/gpg', '--homedir', gpgdir, '--passphrase-fd', '0', '--yes', '--no-tty', '-o', dest, '-d', source]
run(args, password)
except:
raise
finally:
#job done, clean up
shutil.rmtree(tempdir, ignore_errors=True)
class CIDict(dict):
"""
Case-insensitive but case-respecting dictionary.
This code is derived from python-ldap's cidict.py module,
written by stroeder: http://python-ldap.sourceforge.net/
This version extends 'dict' so it works properly with TurboGears.
If you extend UserDict, isinstance(foo, dict) returns false.
"""
def __init__(self,default=None):
super(CIDict, self).__init__()
self._keys = {}
self.update(default or {})
def __getitem__(self,key):
return super(CIDict,self).__getitem__(string.lower(key))
def __setitem__(self,key,value):
lower_key = string.lower(key)
self._keys[lower_key] = key
return super(CIDict,self).__setitem__(string.lower(key),value)
def __delitem__(self,key):
lower_key = string.lower(key)
del self._keys[lower_key]
return super(CIDict,self).__delitem__(string.lower(key))
def update(self,dict):
for key in dict.keys():
self[key] = dict[key]
def has_key(self,key):
return super(CIDict, self).has_key(string.lower(key))
def get(self,key,failobj=None):
try:
return self[key]
except KeyError:
return failobj
def keys(self):
return self._keys.values()
def items(self):
result = []
for k in self._keys.values():
result.append((k,self[k]))
return result
def copy(self):
copy = {}
for k in self._keys.values():
copy[k] = self[k]
return copy
def iteritems(self):
return self.copy().iteritems()
def iterkeys(self):
return self.copy().iterkeys()
def setdefault(self,key,value=None):
try:
return self[key]
except KeyError:
self[key] = value
return value
def pop(self, key, *args):
try:
value = self[key]
del self[key]
return value
except KeyError:
if len(args) == 1:
return args[0]
raise
def popitem(self):
(lower_key,value) = super(CIDict,self).popitem()
key = self._keys[lower_key]
del self._keys[lower_key]
return (key,value)
#
# The safe_string_re regexp and needs_base64 function are extracted from the
# python-ldap ldif module, which was
# written by Michael Stroeder <michael@stroeder.com>
# http://python-ldap.sourceforge.net
#
# It was extracted because ipaldap.py is naughtily reaching into the ldif
# module and squashing this regexp.
#
SAFE_STRING_PATTERN = '(^(\000|\n|\r| |:|<)|[\000\n\r\200-\377]+|[ ]+$)'
safe_string_re = re.compile(SAFE_STRING_PATTERN)
def needs_base64(s):
"""
returns 1 if s has to be base-64 encoded because of special chars
"""
return not safe_string_re.search(s) is None
def wrap_binary_data(data):
"""Converts all binary data strings into Binary objects for transport
back over xmlrpc."""
if isinstance(data, str):
if needs_base64(data):
return xmlrpclib.Binary(data)
else:
return data
elif isinstance(data, list) or isinstance(data,tuple):
retval = []
for value in data:
retval.append(wrap_binary_data(value))
return retval
elif isinstance(data, dict):
retval = {}
for (k,v) in data.iteritems():
retval[k] = wrap_binary_data(v)
return retval
else:
return data
def unwrap_binary_data(data):
"""Converts all Binary objects back into strings."""
if isinstance(data, xmlrpclib.Binary):
# The data is decoded by the xmlproxy, but is stored
# in a binary object for us.
return str(data)
elif isinstance(data, str):
return data
elif isinstance(data, list) or isinstance(data,tuple):
retval = []
for value in data:
retval.append(unwrap_binary_data(value))
return retval
elif isinstance(data, dict):
retval = {}
for (k,v) in data.iteritems():
retval[k] = unwrap_binary_data(v)
return retval
else:
return data
class GeneralizedTimeZone(datetime.tzinfo):
"""This class is a basic timezone wrapper for the offset specified
in a Generalized Time. It is dst-ignorant."""
def __init__(self,offsetstr="Z"):
super(GeneralizedTimeZone, self).__init__()
self.name = offsetstr
self.houroffset = 0
self.minoffset = 0
if offsetstr == "Z":
self.houroffset = 0
self.minoffset = 0
else:
if (len(offsetstr) >= 3) and re.match(r'[-+]\d\d', offsetstr):
self.houroffset = int(offsetstr[0:3])
offsetstr = offsetstr[3:]
if (len(offsetstr) >= 2) and re.match(r'\d\d', offsetstr):
self.minoffset = int(offsetstr[0:2])
offsetstr = offsetstr[2:]
if len(offsetstr) > 0:
raise ValueError()
if self.houroffset < 0:
self.minoffset *= -1
def utcoffset(self, dt):
return datetime.timedelta(hours=self.houroffset, minutes=self.minoffset)
def dst(self, dt):
return datetime.timedelta(0)
def tzname(self, dt):
return self.name
def parse_generalized_time(timestr):
"""Parses are Generalized Time string (as specified in X.680),
returning a datetime object. Generalized Times are stored inside
the krbPasswordExpiration attribute in LDAP.
This method doesn't attempt to be perfect wrt timezones. If python
can't be bothered to implement them, how can we..."""
if len(timestr) < 8:
return None
try:
date = timestr[:8]
time = timestr[8:]
year = int(date[:4])
month = int(date[4:6])
day = int(date[6:8])
hour = min = sec = msec = 0
tzone = None
if (len(time) >= 2) and re.match(r'\d', time[0]):
hour = int(time[:2])
time = time[2:]
if len(time) >= 2 and (time[0] == "," or time[0] == "."):
hour_fraction = "."
time = time[1:]
while (len(time) > 0) and re.match(r'\d', time[0]):
hour_fraction += time[0]
time = time[1:]
total_secs = int(float(hour_fraction) * 3600)
min, sec = divmod(total_secs, 60)
if (len(time) >= 2) and re.match(r'\d', time[0]):
min = int(time[:2])
time = time[2:]
if len(time) >= 2 and (time[0] == "," or time[0] == "."):
min_fraction = "."
time = time[1:]
while (len(time) > 0) and re.match(r'\d', time[0]):
min_fraction += time[0]
time = time[1:]
sec = int(float(min_fraction) * 60)
if (len(time) >= 2) and re.match(r'\d', time[0]):
sec = int(time[:2])
time = time[2:]
if len(time) >= 2 and (time[0] == "," or time[0] == "."):
sec_fraction = "."
time = time[1:]
while (len(time) > 0) and re.match(r'\d', time[0]):
sec_fraction += time[0]
time = time[1:]
msec = int(float(sec_fraction) * 1000000)
if (len(time) > 0):
tzone = GeneralizedTimeZone(time)
return datetime.datetime(year, month, day, hour, min, sec, msec, tzone)
except ValueError:
return None
def ipa_generate_password():
rndpwd = ''
r = random.SystemRandom()
for x in range(12):
rndpwd += chr(r.randint(32,126))
return rndpwd
def format_list(items, quote=None, page_width=80):
'''Format a list of items formatting them so they wrap to fit the
available width. The items will be sorted.
The items may optionally be quoted. The quote parameter may either be
a string, in which case it is added before and after the item. Or the
quote parameter may be a pair (either a tuple or list). In this case
quote[0] is left hand quote and quote[1] is the right hand quote.
'''
left_quote = right_quote = ''
num_items = len(items)
if not num_items: return ""
if quote is not None:
if type(quote) in StringTypes:
left_quote = right_quote = quote
elif type(quote) is TupleType or type(quote) is ListType:
left_quote = quote[0]
right_quote = quote[1]
max_len = max(map(len, items))
max_len += len(left_quote) + len(right_quote)
num_columns = (page_width + max_len) / (max_len+1)
num_rows = (num_items + num_columns - 1) / num_columns
items.sort()
rows = [''] * num_rows
i = row = col = 0
while i < num_items:
row = 0
if col == 0:
separator = ''
else:
separator = ' '
while i < num_items and row < num_rows:
rows[row] += "%s%*s" % (separator, -max_len, "%s%s%s" % (left_quote, items[i], right_quote))
i += 1
row += 1
col += 1
return '\n'.join(rows)
key_value_re = re.compile("(\w+)\s*=\s*(([^\s'\\\"]+)|(?P<quote>['\\\"])((?P=quote)|(.*?[^\\\])(?P=quote)))")
def parse_key_value_pairs(input):
''' Given a string composed of key=value pairs parse it and return
a dict of the key/value pairs. Keys must be a word, a key must be followed
by an equal sign (=) and a value. The value may be a single word or may be
quoted. Quotes may be either single or double quotes, but must be balanced.
Inside the quoted text the same quote used to start the quoted value may be
used if it is escaped by preceding it with a backslash (\).
White space between the key, the equal sign, and the value is ignored.
Values are always strings. Empty values must be specified with an empty
quoted string, it's value after parsing will be an empty string.
Example: The string
arg0 = '' arg1 = 1 arg2='two' arg3 = "three's a crowd" arg4 = "this is a \" quote"
will produce
arg0= arg1=1
arg2=two
arg3=three's a crowd
arg4=this is a " quote
'''
kv_dict = {}
for match in key_value_re.finditer(input):
key = match.group(1)
quote = match.group('quote')
if match.group(5):
value = match.group(6)
if value is None: value = ''
value = re.sub('\\\%s' % quote, quote, value)
else:
value = match.group(2)
kv_dict[key] = value
return kv_dict
def parse_items(text):
'''Given text with items separated by whitespace or comma, return a list of those items'''
split_re = re.compile('[ ,\t\n]+')
items = split_re.split(text)
for item in items[:]:
if not item: items.remove(item)
return items
def read_pairs_file(filename):
comment_re = re.compile('#.*$', re.MULTILINE)
if filename == '-':
fd = sys.stdin
else:
fd = open(filename)
text = fd.read()
text = comment_re.sub('', text) # kill comments
pairs = parse_key_value_pairs(text)
if fd != sys.stdin: fd.close()
return pairs
def read_items_file(filename):
comment_re = re.compile('#.*$', re.MULTILINE)
if filename == '-':
fd = sys.stdin
else:
fd = open(filename)
text = fd.read()
text = comment_re.sub('', text) # kill comments
items = parse_items(text)
if fd != sys.stdin: fd.close()
return items
def user_input(prompt, default = None, allow_empty = True):
if default == None:
while True:
ret = raw_input("%s: " % prompt)
if allow_empty or ret.strip():
return ret
if isinstance(default, basestring):
while True:
ret = raw_input("%s [%s]: " % (prompt, default))
if not ret and (allow_empty or default):
return default
elif ret.strip():
return ret
if isinstance(default, bool):
if default:
choice = "yes"
else:
choice = "no"
while True:
ret = raw_input("%s [%s]: " % (prompt, choice))
if not ret:
return default
elif ret.lower()[0] == "y":
return True
elif ret.lower()[0] == "n":
return False
if isinstance(default, int):
while True:
try:
ret = raw_input("%s [%s]: " % (prompt, default))
if not ret:
return default
ret = int(ret)
except ValueError:
pass
else:
return ret
def user_input_plain(prompt, default = None, allow_empty = True, allow_spaces = True):
while True:
ret = user_input(prompt, default, allow_empty)
if ipavalidate.Plain(ret, not allow_empty, allow_spaces):
return ret
def user_input_path(prompt, default = None, allow_empty = True):
if default != None and allow_empty:
prompt += " (enter \"none\" for empty)"
while True:
ret = user_input(prompt, default, allow_empty)
if allow_empty and ret.lower() == "none":
return ""
if ipavalidate.Path(ret, not allow_empty):
return ret
class AttributeValueCompleter:
'''
Gets input from the user in the form "lhs operator rhs"
TAB completes partial input.
lhs completes to a name in @lhs_names
The lhs is fully parsed if a lhs_delim delimiter is seen, then TAB will
complete to the operator and a default value.
Default values for a lhs value can specified as:
- a string, all lhs values will use this default
- a dict, the lhs value is looked up in the dict to return the default or None
- a function with a single arg, the lhs value, it returns the default or None
After creating the completer you must open it to set the terminal
up, Then get a line of input from the user by calling read_input()
which returns two values, the lhs and rhs, which might be None if
lhs or rhs was not parsed. After you are done getting input you
should close the completer to restore the terminal.
Example: (note this is essentially what the convenience function get_pairs() does)
This will allow the user to autocomplete foo & foobar, both have
defaults defined in a dict. In addition the foobar attribute must
be specified before the prompting loop will exit. Also, this
example show how to require that each attrbute entered by the user
is valid.
attrs = ['foo', 'foobar']
defaults = {'foo' : 'foo_default', 'foobar' : 'foobar_default'}
mandatory_attrs = ['foobar']
c = AttributeValueCompleter(attrs, defaults)
c.open()
mandatory_attrs_remaining = mandatory_attrs[:]
while True:
if mandatory_attrs_remaining:
attribute, value = c.read_input("Enter: ", mandatory_attrs_remaining[0])
try:
mandatory_attrs_remaining.remove(attribute)
except ValueError:
pass
else:
attribute, value = c.read_input("Enter: ")
if attribute is None:
# Are we done?
if mandatory_attrs_remaining:
print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining))
continue
else:
break
if attribute not in attrs:
print "ERROR: %s is not a valid attribute" % (attribute)
else:
print "got '%s' = '%s'" % (attribute, value)
c.close()
print "exiting..."
'''
def __init__(self, lhs_names, default_value=None, lhs_regexp=r'^\s*(?P<lhs>[^ =]+)', lhs_delims=' =',
operator='=', strip_rhs=True):
self.lhs_names = lhs_names
self.default_value = default_value
# lhs_regexp must have named group 'lhs' which returns the contents of the lhs
self.lhs_regexp = lhs_regexp
self.lhs_re = re.compile(self.lhs_regexp)
self.lhs_delims = lhs_delims
self.operator = operator
self.strip_rhs = strip_rhs
self.pairs = None
self._reset()
def _reset(self):
self.lhs = None
self.lhs_complete = False
self.operator_complete = False
self.rhs = None
def open(self):
# Save state
self.prev_completer = readline.get_completer()
self.prev_completer_delims = readline.get_completer_delims()
# Set up for ourself
readline.parse_and_bind("tab: complete")
readline.set_completer(self.complete)
readline.set_completer_delims(self.lhs_delims)
def close(self):
# Restore previous state
readline.set_completer_delims(self.prev_completer_delims)
readline.set_completer(self.prev_completer)
def parse_input(self):
'''We are looking for 3 tokens: <lhs,op,rhs>
Extract as much of each token as possible.
Set flags indicating if token is fully parsed.
'''
try:
self._reset()
buf_len = len(self.line_buffer)
pos = 0
lhs_match = self.lhs_re.search(self.line_buffer, pos)
if not lhs_match: return # no lhs content
self.lhs = lhs_match.group('lhs') # get lhs contents
pos = lhs_match.end('lhs') # new scanning position
if pos == buf_len: return # nothing after lhs, lhs incomplete
self.lhs_complete = True # something trails the lhs, lhs is complete
operator_beg = self.line_buffer.find(self.operator, pos) # locate operator
if operator_beg == -1: return # did not find the operator
self.operator_complete = True # operator fully parsed
operator_end = operator_beg + len(self.operator)
pos = operator_end # step over the operator
self.rhs = self.line_buffer[pos:]
except Exception, e:
traceback.print_exc()
print "Exception in %s.parse_input(): %s" % (self.__class__.__name__, e)
def get_default_value(self):
'''default_value can be a string, a dict, or a function.
If it's a string it's a global default for all attributes.
If it's a dict the default is looked up in the dict index by attribute.
If it's a function, the function is called with 1 parameter, the attribute
and it should return the default value for the attriubte or None'''
if not self.lhs_complete: raise ValueError("attribute not parsed")
# If the user previously provided a value let that override the supplied default
if self.pairs is not None:
prev_value = self.pairs.get(self.lhs)
if prev_value is not None: return prev_value
# No previous user provided value, query for a default
default_value_type = type(self.default_value)
if default_value_type is DictType:
return self.default_value.get(self.lhs, None)
elif default_value_type is FunctionType:
return self.default_value(self.lhs)
elif default_value_type is StringsType:
return self.default_value
else:
return None
def get_lhs_completions(self, text):
if text:
self.completions = [lhs for lhs in self.lhs_names if lhs.startswith(text)]
else:
self.completions = self.lhs_names
def complete(self, text, state):
self.line_buffer= readline.get_line_buffer()
self.parse_input()
if not self.lhs_complete:
# lhs is not complete, set up to complete the lhs
if state == 0:
beg = readline.get_begidx()
end = readline.get_endidx()
self.get_lhs_completions(self.line_buffer[beg:end])
if state >= len(self.completions): return None
return self.completions[state]
elif not self.operator_complete:
# lhs is complete, but the operator is not so we complete
# by inserting the operator manually.
# Also try to complete the default value at this time.
readline.insert_text('%s ' % self.operator)
default_value = self.get_default_value()
if default_value is not None:
readline.insert_text(default_value)
readline.redisplay()
return None
else:
# lhs and operator are complete, if the the rhs is blank
# (either empty or only only whitespace) then attempt
# to complete by inserting the default value, otherwise
# there is nothing we can complete to so we're done.
if self.rhs.strip():
return None
default_value = self.get_default_value()
if default_value is not None:
readline.insert_text(default_value)
readline.redisplay()
return None
def pre_input_hook(self):
readline.insert_text('%s %s ' % (self.initial_lhs, self.operator))
readline.redisplay()
def read_input(self, prompt, initial_lhs=None):
self.initial_lhs = initial_lhs
try:
self._reset()
if initial_lhs is None:
readline.set_pre_input_hook(None)
else:
readline.set_pre_input_hook(self.pre_input_hook)
self.line_buffer = raw_input(prompt).strip()
self.parse_input()
if self.strip_rhs and self.rhs is not None:
return self.lhs, self.rhs.strip()
else:
return self.lhs, self.rhs
except EOFError:
return None, None
def get_pairs(self, prompt, mandatory_attrs=None, validate_callback=None, must_match=True, value_required=True):
self.pairs = {}
if mandatory_attrs:
mandatory_attrs_remaining = mandatory_attrs[:]
else:
mandatory_attrs_remaining = []
print "Enter name = value"
print "Press <ENTER> to accept, a blank line terminates input"
print "Pressing <TAB> will auto completes name, assignment, and value"
print
while True:
if mandatory_attrs_remaining:
attribute, value = self.read_input(prompt, mandatory_attrs_remaining[0])
else:
attribute, value = self.read_input(prompt)
if attribute is None:
# Are we done?
if mandatory_attrs_remaining:
print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining))
continue
else:
break
if value is None:
if value_required:
print "ERROR: you must specify a value for %s" % attribute
continue
else:
if must_match and attribute not in self.lhs_names:
print "ERROR: %s is not a valid name" % (attribute)
continue
if validate_callback is not None:
if not validate_callback(attribute, value):
print "ERROR: %s is not valid for %s" % (value, attribute)
continue
try:
mandatory_attrs_remaining.remove(attribute)
except ValueError:
pass
self.pairs[attribute] = value
return self.pairs
class ItemCompleter:
'''
Prompts the user for items in a list of items with auto completion.
TAB completes partial input.
More than one item can be specifed during input, whitespace and/or comma's seperate.
Example:
possible_items = ['foo', 'bar']
c = ItemCompleter(possible_items)
c.open()
# Use read_input() to limit input to a single carriage return (e.g. <ENTER>)
#items = c.read_input("Enter: ")
# Use get_items to iterate until a blank line is entered.
items = c.get_items("Enter: ")
c.close()
print "items=%s" % (items)
'''
def __init__(self, items):
self.items = items
self.initial_input = None
self.item_delims = ' \t,'
self.split_re = re.compile('[%s]+' % self.item_delims)
def open(self):
# Save state
self.prev_completer = readline.get_completer()
self.prev_completer_delims = readline.get_completer_delims()
# Set up for ourself
readline.parse_and_bind("tab: complete")
readline.set_completer(self.complete)
readline.set_completer_delims(self.item_delims)
def close(self):
# Restore previous state
readline.set_completer_delims(self.prev_completer_delims)
readline.set_completer(self.prev_completer)
def get_item_completions(self, text):
if text:
self.completions = [lhs for lhs in self.items if lhs.startswith(text)]
else:
self.completions = self.items
def complete(self, text, state):
self.line_buffer= readline.get_line_buffer()
if state == 0:
beg = readline.get_begidx()
end = readline.get_endidx()
self.get_item_completions(self.line_buffer[beg:end])
if state >= len(self.completions): return None
return self.completions[state]
def pre_input_hook(self):
readline.insert_text('%s %s ' % (self.initial_input, self.operator))
readline.redisplay()
def read_input(self, prompt, initial_input=None):
items = []
self.initial_input = initial_input
try:
if initial_input is None:
readline.set_pre_input_hook(None)
else:
readline.set_pre_input_hook(self.pre_input_hook)
self.line_buffer = raw_input(prompt).strip()
items = self.split_re.split(self.line_buffer)
for item in items[:]:
if not item: items.remove(item)
return items
except EOFError:
return items
def get_items(self, prompt, must_match=True):
items = []
print "Enter name [name ...]"
print "Press <ENTER> to accept, blank line or control-D terminates input"
print "Pressing <TAB> auto completes name"
print
while True:
new_items = self.read_input(prompt)
if not new_items: break
for item in new_items:
if must_match:
if item not in self.items:
print "ERROR: %s is not valid" % (item)
continue
if item in items: continue
items.append(item)
return items
def get_gsserror(e):
"""A GSSError exception looks differently in python 2.4 than it does
in python 2.5, deal with it."""
try:
primary = e[0]
secondary = e[1]
except:
primary = e[0][0]
secondary = e[0][1]
return (primary[0], secondary[0])

137
ipapython/ipavalidate.py Normal file
View File

@@ -0,0 +1,137 @@
# Authors: Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2007 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
import re
def Email(mail, notEmpty=True):
"""Do some basic validation of an e-mail address.
Return True if ok
Return False if not
If notEmpty is True the this will return an error if the field
is "" or None.
"""
usernameRE = re.compile(r"^[^ \t\n\r@<>()]+$", re.I)
domainRE = re.compile(r"^[a-z0-9][a-z0-9\.\-_]*\.[a-z]+$", re.I)
if not mail or mail is None:
if notEmpty is True:
return False
else:
return True
mail = mail.strip()
s = mail.split('@', 1)
try:
username, domain=s
except ValueError:
return False
if not usernameRE.search(username):
return False
if not domainRE.search(domain):
return False
return True
def Plain(text, notEmpty=False, allowSpaces=True):
"""Do some basic validation of a plain text field
Return True if ok
Return False if not
If notEmpty is True the this will return an error if the field
is "" or None.
"""
if (text is None) or (not text.strip()):
if notEmpty is True:
return False
else:
return True
if allowSpaces:
textRE = re.compile(r"^[a-zA-Z_\-0-9\'\ ]*$")
else:
textRE = re.compile(r"^[a-zA-Z_\-0-9\']*$")
if not textRE.search(text):
return False
return True
def String(text, notEmpty=False):
"""A string type. This is much looser in what it allows than plain"""
if text is None or not text.strip():
if notEmpty is True:
return False
else:
return True
return True
def Path(text, notEmpty=False):
"""Do some basic validation of a path
Return True if ok
Return False if not
If notEmpty is True the this will return an error if the field
is "" or None.
"""
textRE = re.compile(r"^[a-zA-Z_\-0-9\\ \.\/\\:]*$")
if not text and notEmpty is True:
return False
if text is None:
if notEmpty is True:
return False
else:
return True
if not textRE.search(text):
return False
return True
def GoodName(text, notEmpty=False):
"""From shadow-utils:
User/group names must match gnu e-regex:
[a-zA-Z0-9_.][a-zA-Z0-9_.-]{0,30}[a-zA-Z0-9_.$-]?
as a non-POSIX, extension, allow "$" as the last char for
sake of Samba 3.x "add machine script"
Return True if ok
Return False if not
"""
textRE = re.compile(r"^[a-zA-Z0-9_.][a-zA-Z0-9_.-]{0,30}[a-zA-Z0-9_.$-]?$")
if not text and notEmpty is True:
return False
if text is None:
if notEmpty is True:
return False
else:
return True
m = textRE.match(text)
if not m or text != m.group(0):
return False
return True

366
ipapython/radius_util.py Normal file
View File

@@ -0,0 +1,366 @@
# Authors: John Dennis <jdennis@redhat.com>
#
# Copyright (C) 2007 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
import sys
import os
import re
import ldap
import getpass
import ldap.filter
from ipapython import ipautil
from ipapython.entity import Entity
import ipapython.ipavalidate as ipavalidate
__all__ = [
'RADIUS_PKG_NAME',
'RADIUS_PKG_CONFIG_DIR',
'RADIUS_SERVICE_NAME',
'RADIUS_USER',
'RADIUS_IPA_KEYTAB_FILEPATH',
'RADIUS_LDAP_ATTR_MAP_FILEPATH',
'RADIUSD_CONF_FILEPATH',
'RADIUSD_CONF_TEMPLATE_FILEPATH',
'RADIUSD',
'RadiusClient',
'RadiusProfile',
'clients_container',
'radius_clients_basedn',
'radius_client_filter',
'radius_client_dn',
'profiles_container',
'radius_profiles_basedn',
'radius_profile_filter',
'radius_profile_dn',
'radius_client_ldap_attr_to_radius_attr',
'radius_client_attr_to_ldap_attr',
'radius_profile_ldap_attr_to_radius_attr',
'radius_profile_attr_to_ldap_attr',
'get_secret',
'validate_ip_addr',
'validate_secret',
'validate_name',
'validate_nastype',
'validate_desc',
'validate',
]
#------------------------------------------------------------------------------
RADIUS_PKG_NAME = 'freeradius'
RADIUS_PKG_CONFIG_DIR = '/etc/raddb'
RADIUS_SERVICE_NAME = 'radius'
RADIUS_USER = 'radiusd'
RADIUS_IPA_KEYTAB_FILEPATH = os.path.join(RADIUS_PKG_CONFIG_DIR, 'ipa.keytab')
RADIUS_LDAP_ATTR_MAP_FILEPATH = os.path.join(RADIUS_PKG_CONFIG_DIR, 'ldap.attrmap')
RADIUSD_CONF_FILEPATH = os.path.join(RADIUS_PKG_CONFIG_DIR, 'radiusd.conf')
RADIUSD_CONF_TEMPLATE_FILEPATH = os.path.join(ipautil.PLUGINS_SHARE_DIR, 'radius.radiusd.conf.template')
RADIUSD = '/usr/sbin/radiusd'
#------------------------------------------------------------------------------
dotted_octet_re = re.compile(r"^(\d+)\.(\d+)\.(\d+)\.(\d+)(/(\d+))?$")
dns_re = re.compile(r"^[a-zA-Z][a-zA-Z0-9.-]+$")
# secret, name, nastype all have 31 char max in freeRADIUS, max ip address len is 255
valid_secret_len = (1,31)
valid_name_len = (1,31)
valid_nastype_len = (1,31)
valid_ip_addr_len = (1,255)
valid_ip_addr_msg = '''\
IP address must be either a DNS name (letters,digits,dot,hyphen, beginning with
a letter),or a dotted octet followed by an optional mask (e.g 192.168.1.0/24)'''
valid_desc_msg = "Description must text string"
#------------------------------------------------------------------------------
class RadiusClient(Entity):
def __init2__(self):
pass
class RadiusProfile(Entity):
def __init2__(self):
pass
#------------------------------------------------------------------------------
def reverse_map_dict(src_dict):
reverse_dict = {}
for k,v in src_dict.items():
if reverse_dict.has_key(v):
raise ValueError("reverse_map_dict: collision on (%s) with values (%s),(%s)" % \
v, reverse_dict[v], src_dict[k])
reverse_dict[v] = k
return reverse_dict
#------------------------------------------------------------------------------
radius_client_ldap_attr_to_radius_attr = ipautil.CIDict({
'radiusClientIPAddress' : 'Client-IP-Address',
'radiusClientSecret' : 'Secret',
'radiusClientNASType' : 'NAS-Type',
'radiusClientShortName' : 'Name',
'description' : 'Description',
})
radius_client_attr_to_ldap_attr = reverse_map_dict(radius_client_ldap_attr_to_radius_attr)
#------------------------------------------------------------------------------
radius_profile_ldap_attr_to_radius_attr = ipautil.CIDict({
'uid' : 'UID',
'radiusArapFeatures' : 'Arap-Features',
'radiusArapSecurity' : 'Arap-Security',
'radiusArapZoneAccess' : 'Arap-Zone-Access',
'radiusAuthType' : 'Auth-Type',
'radiusCallbackId' : 'Callback-Id',
'radiusCallbackNumber' : 'Callback-Number',
'radiusCalledStationId' : 'Called-Station-Id',
'radiusCallingStationId' : 'Calling-Station-Id',
'radiusClass' : 'Class',
'radiusClientIPAddress' : 'Client-IP-Address',
'radiusExpiration' : 'Expiration',
'radiusFilterId' : 'Filter-Id',
'radiusFramedAppleTalkLink' : 'Framed-AppleTalk-Link',
'radiusFramedAppleTalkNetwork' : 'Framed-AppleTalk-Network',
'radiusFramedAppleTalkZone' : 'Framed-AppleTalk-Zone',
'radiusFramedCompression' : 'Framed-Compression',
'radiusFramedIPAddress' : 'Framed-IP-Address',
'radiusFramedIPNetmask' : 'Framed-IP-Netmask',
'radiusFramedIPXNetwork' : 'Framed-IPX-Network',
'radiusFramedMTU' : 'Framed-MTU',
'radiusFramedProtocol' : 'Framed-Protocol',
'radiusFramedRoute' : 'Framed-Route',
'radiusFramedRouting' : 'Framed-Routing',
'radiusGroupName' : 'Group-Name',
'radiusHint' : 'Hint',
'radiusHuntgroupName' : 'Huntgroup-Name',
'radiusIdleTimeout' : 'Idle-Timeout',
'radiusLoginIPHost' : 'Login-IP-Host',
'radiusLoginLATGroup' : 'Login-LAT-Group',
'radiusLoginLATNode' : 'Login-LAT-Node',
'radiusLoginLATPort' : 'Login-LAT-Port',
'radiusLoginLATService' : 'Login-LAT-Service',
'radiusLoginService' : 'Login-Service',
'radiusLoginTCPPort' : 'Login-TCP-Port',
'radiusLoginTime' : 'Login-Time',
'radiusNASIpAddress' : 'NAS-IP-Address',
'radiusPasswordRetry' : 'Password-Retry',
'radiusPortLimit' : 'Port-Limit',
'radiusProfileDn' : 'Profile-Dn',
'radiusPrompt' : 'Prompt',
'radiusProxyToRealm' : 'Proxy-To-Realm',
'radiusRealm' : 'Realm',
'radiusReplicateToRealm' : 'Replicate-To-Realm',
'radiusReplyMessage' : 'Reply-Message',
'radiusServiceType' : 'Service-Type',
'radiusSessionTimeout' : 'Session-Timeout',
'radiusSimultaneousUse' : 'Simultaneous-Use',
'radiusStripUserName' : 'Strip-User-Name',
'radiusTerminationAction' : 'Termination-Action',
'radiusTunnelAssignmentId' : 'Tunnel-Assignment-Id',
'radiusTunnelClientEndpoint' : 'Tunnel-Client-Endpoint',
'radiusTunnelMediumType' : 'Tunnel-Medium-Type',
'radiusTunnelPassword' : 'Tunnel-Password',
'radiusTunnelPreference' : 'Tunnel-Preference',
'radiusTunnelPrivateGroupId' : 'Tunnel-Private-Group-Id',
'radiusTunnelServerEndpoint' : 'Tunnel-Server-Endpoint',
'radiusTunnelType' : 'Tunnel-Type',
'radiusUserCategory' : 'User-Category',
'radiusVSA' : 'VSA',
})
radius_profile_attr_to_ldap_attr = reverse_map_dict(radius_profile_ldap_attr_to_radius_attr)
#------------------------------------------------------------------------------
clients_container = 'cn=clients,cn=radius'
def radius_clients_basedn(container, suffix):
if container is None: container = clients_container
return '%s,%s' % (container, suffix)
def radius_client_filter(ip_addr):
return "(&(radiusClientIPAddress=%s)(objectclass=radiusClientProfile))" % \
ldap.filter.escape_filter_chars(ip_addr)
def radius_client_dn(client, container, suffix):
if container is None: container = clients_container
return 'radiusClientIPAddress=%s,%s,%s' % (ldap.dn.escape_dn_chars(client), container, suffix)
# --
profiles_container = 'cn=profiles,cn=radius'
def radius_profiles_basedn(container, suffix):
if container is None: container = profiles_container
return '%s,%s' % (container, suffix)
def radius_profile_filter(uid):
return "(&(uid=%s)(objectclass=radiusprofile))" % \
ldap.filter.escape_filter_chars(uid)
def radius_profile_dn(uid, container, suffix):
if container is None: container = profiles_container
return 'uid=%s,%s,%s' % (ldap.dn.escape_dn_chars(uid), container, suffix)
#------------------------------------------------------------------------------
def get_ldap_attr_translations():
comment_re = re.compile('#.*$')
radius_attr_to_ldap_attr = {}
ldap_attr_to_radius_attr = {}
try:
f = open(LDAP_ATTR_MAP_FILEPATH)
for line in f.readlines():
line = comment_re.sub('', line).strip()
if not line: continue
attr_type, radius_attr, ldap_attr = line.split()
print 'type="%s" radius="%s" ldap="%s"' % (attr_type, radius_attr, ldap_attr)
radius_attr_to_ldap_attr[radius_attr] = {'ldap_attr':ldap_attr, 'attr_type':attr_type}
ldap_attr_to_radius_attr[ldap_attr] = {'radius_attr':radius_attr, 'attr_type':attr_type}
f.close()
except Exception, e:
logging.error('cold not read radius ldap attribute map file (%s): %s', LDAP_ATTR_MAP_FILEPATH, e)
pass # FIXME
#for k,v in radius_attr_to_ldap_attr.items():
# print '%s --> %s' % (k,v)
#for k,v in ldap_attr_to_radius_attr.items():
# print '%s --> %s' % (k,v)
def get_secret():
valid = False
while (not valid):
secret = getpass.getpass("Enter Secret: ")
confirm = getpass.getpass("Confirm Secret: ")
if (secret != confirm):
print "Secrets do not match"
continue
valid = True
return secret
#------------------------------------------------------------------------------
def valid_ip_addr(text):
# is it a dotted octet? If so there should be 4 integers seperated
# by a dot and each integer should be between 0 and 255
# there may be an optional mask preceded by a slash (e.g. 1.2.3.4/24)
match = dotted_octet_re.search(text)
if match:
# dotted octet notation
i = 1
while i <= 4:
octet = int(match.group(i))
if octet > 255: return False
i += 1
if match.group(5):
mask = int(match.group(6))
if mask <= 32:
return True
else:
return False
return True
else:
# DNS name, can contain letters, numbers, dot and hypen, must start with a letter
if dns_re.search(text): return True
return False
def validate_length(value, limits):
length = len(value)
if length < limits[0] or length > limits[1]:
return False
return True
def valid_length_msg(name, limits):
return "%s length must be at least %d and not more than %d" % (name, limits[0], limits[1])
def err_msg(variable, variable_name=None):
if variable_name is None: variable_name = 'value'
print "ERROR: %s = %s" % (variable_name, variable)
#------------------------------------------------------------------------------
def validate_ip_addr(ip_addr, variable_name=None):
if not validate_length(ip_addr, valid_ip_addr_len):
err_msg(ip_addr, variable_name)
print valid_length_msg('ip address', valid_ip_addr_len)
return False
if not valid_ip_addr(ip_addr):
err_msg(ip_addr, variable_name)
print valid_ip_addr_msg
return False
return True
def validate_secret(secret, variable_name=None):
if not validate_length(secret, valid_secret_len):
err_msg(secret, variable_name)
print valid_length_msg('secret', valid_secret_len)
return False
return True
def validate_name(name, variable_name=None):
if not validate_length(name, valid_name_len):
err_msg(name, variable_name)
print valid_length_msg('name', valid_name_len)
return False
return True
def validate_nastype(nastype, variable_name=None):
if not validate_length(nastype, valid_nastype_len):
err_msg(nastype, variable_name)
print valid_length_msg('NAS Type', valid_nastype_len)
return False
return True
def validate_desc(desc, variable_name=None):
if not ipavalidate.Plain(desc):
print valid_desc_msg
return False
return True
def validate(attribute, value):
if attribute == 'Client-IP-Address':
return validate_ip_addr(value, attribute)
if attribute == 'Secret':
return validate_secret(value, attribute)
if attribute == 'NAS-Type':
return validate_nastype(value, attribute)
if attribute == 'Name':
return validate_name(value, attribute)
if attribute == 'Description':
return validate_desc(value, attribute)
return True

77
ipapython/setup.py.in Normal file
View File

@@ -0,0 +1,77 @@
#!/usr/bin/env python
# Copyright (C) 2007 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
"""FreeIPA python support library
FreeIPA is a server for identity, policy, and audit.
"""
DOCLINES = __doc__.split("\n")
import os
import sys
import distutils.sysconfig
CLASSIFIERS = """\
Development Status :: 4 - Beta
Intended Audience :: System Environment/Base
License :: GPL
Programming Language :: Python
Operating System :: POSIX
Operating System :: Unix
"""
# BEFORE importing distutils, remove MANIFEST. distutils doesn't properly
# update it when the contents of directories change.
if os.path.exists('MANIFEST'): os.remove('MANIFEST')
def setup_package():
from distutils.core import setup
old_path = os.getcwd()
local_path = os.path.dirname(os.path.abspath(sys.argv[0]))
os.chdir(local_path)
sys.path.insert(0,local_path)
try:
setup(
name = "ipapython",
version = "__VERSION__",
license = "GPL",
author = "Karl MacMillan, et.al.",
author_email = "kmacmill@redhat.com",
maintainer = "freeIPA Developers",
maintainer_email = "freeipa-devel@redhat.com",
url = "http://www.freeipa.org/",
description = DOCLINES[0],
long_description = "\n".join(DOCLINES[2:]),
download_url = "http://www.freeipa.org/page/Downloads",
classifiers=filter(None, CLASSIFIERS.split('\n')),
platforms = ["Linux", "Solaris", "Unix"],
package_dir = {'ipapython': ''},
packages = [ "ipapython" ],
data_files = [('/etc/ipa', ['ipa.conf'])]
)
finally:
del sys.path[0]
os.chdir(old_path)
return
if __name__ == '__main__':
setup_package()

317
ipapython/sysrestore.py Normal file
View File

@@ -0,0 +1,317 @@
# Authors: Mark McLoughlin <markmc@redhat.com>
#
# Copyright (C) 2007 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
#
# This module provides a very simple API which allows
# ipa-xxx-install --uninstall to restore certain
# parts of the system configuration to the way it was
# before ipa-server-install was first run
import os
import os.path
import errno
import shutil
import logging
import ConfigParser
import random
import string
from ipapython import ipautil
SYSRESTORE_PATH = "/tmp"
SYSRESTORE_INDEXFILE = "sysrestore.index"
SYSRESTORE_STATEFILE = "sysrestore.state"
class FileStore:
"""Class for handling backup and restore of files"""
def __init__(self, path = SYSRESTORE_PATH):
"""Create a _StoreFiles object, that uses @path as the
base directory.
The file @path/sysrestore.index is used to store information
about the original location of the saved files.
"""
self._path = path
self._index = self._path + "/" + SYSRESTORE_INDEXFILE
self.random = random.Random()
self.files = {}
self._load()
def _load(self):
"""Load the file list from the index file. @files will
be an empty dictionary if the file doesn't exist.
"""
logging.debug("Loading Index file from '%s'", self._index)
self.files = {}
p = ConfigParser.SafeConfigParser()
p.read(self._index)
for section in p.sections():
if section == "files":
for (key, value) in p.items(section):
self.files[key] = value
def save(self):
"""Save the file list to @_index. If @files is an empty
dict, then @_index should be removed.
"""
logging.debug("Saving Index File to '%s'", self._index)
if len(self.files) == 0:
logging.debug(" -> no files, removing file")
if os.path.exists(self._index):
os.remove(self._index)
return
p = ConfigParser.SafeConfigParser()
p.add_section('files')
for (key, value) in self.files.items():
p.set('files', key, str(value))
f = file(self._index, "w")
p.write(f)
f.close()
def backup_file(self, path):
"""Create a copy of the file at @path - so long as a copy
does not already exist - which will be restored to its
original location by restore_files().
"""
logging.debug("Backing up system configuration file '%s'", path)
if not os.path.isabs(path):
raise ValueError("Absolute path required")
if not os.path.isfile(path):
logging.debug(" -> Not backing up - '%s' doesn't exist", path)
return
(reldir, file) = os.path.split(path)
filename = ""
for i in range(8):
h = "%02x" % self.random.randint(0,255)
filename += h
filename += "-"+file
backup_path = os.path.join(self._path, filename)
if os.path.exists(backup_path):
logging.debug(" -> Not backing up - already have a copy of '%s'", path)
return
shutil.copy2(path, backup_path)
stat = os.stat(path)
self.files[filename] = string.join([str(stat.st_mode),str(stat.st_uid),str(stat.st_gid),path], ',')
self.save()
def restore_file(self, path):
"""Restore the copy of a file at @path to its original
location and delete the copy.
Returns #True if the file was restored, #False if there
was no backup file to restore
"""
logging.debug("Restoring system configuration file '%s'", path)
if not os.path.isabs(path):
raise ValueError("Absolute path required")
mode = None
uid = None
gid = None
filename = None
for (key, value) in self.files.items():
(mode,uid,gid,filepath) = string.split(value, ',', 3)
if (filepath == path):
filename = key
break
if not filename:
raise ValueError("No such file name in the index")
backup_path = os.path.join(self._path, filename)
if not os.path.exists(backup_path):
logging.debug(" -> Not restoring - '%s' doesn't exist", backup_path)
return False
shutil.move(backup_path, path)
os.chown(path, int(uid), int(gid))
os.chmod(path, int(mode))
ipautil.run(["/sbin/restorecon", path])
del self.files[filename]
self.save()
return True
def restore_all_files(self):
"""Restore the files in the inbdex to their original
location and delete the copy.
Returns #True if the file was restored, #False if there
was no backup file to restore
"""
if len(self.files) == 0:
return False
for (filename, value) in self.files.items():
(mode,uid,gid,path) = string.split(value, ',', 3)
backup_path = os.path.join(self._path, filename)
if not os.path.exists(backup_path):
logging.debug(" -> Not restoring - '%s' doesn't exist", backup_path)
shutil.move(backup_path, path)
os.chown(path, int(uid), int(gid))
os.chmod(path, int(mode))
ipautil.run(["/sbin/restorecon", path])
#force file to be deleted
self.files = {}
self.save()
return True
class StateFile:
"""A metadata file for recording system state which can
be backed up and later restored. The format is something
like:
[httpd]
running=True
enabled=False
"""
def __init__(self, path = SYSRESTORE_PATH):
"""Create a StateFile object, loading from @path.
The dictionary @modules, a member of the returned object,
is where the state can be modified. @modules is indexed
using a module name to return another dictionary containing
key/value pairs with the saved state of that module.
The keys in these latter dictionaries are arbitrary strings
and the values may either be strings or booleans.
"""
self._path = path+"/"+SYSRESTORE_STATEFILE
self.modules = {}
self._load()
def _load(self):
"""Load the modules from the file @_path. @modules will
be an empty dictionary if the file doesn't exist.
"""
logging.debug("Loading StateFile from '%s'", self._path)
self.modules = {}
p = ConfigParser.SafeConfigParser()
p.read(self._path)
for module in p.sections():
self.modules[module] = {}
for (key, value) in p.items(module):
if value == str(True):
value = True
elif value == str(False):
value = False
self.modules[module][key] = value
def save(self):
"""Save the modules to @_path. If @modules is an empty
dict, then @_path should be removed.
"""
logging.debug("Saving StateFile to '%s'", self._path)
for module in self.modules.keys():
if len(self.modules[module]) == 0:
del self.modules[module]
if len(self.modules) == 0:
logging.debug(" -> no modules, removing file")
if os.path.exists(self._path):
os.remove(self._path)
return
p = ConfigParser.SafeConfigParser()
for module in self.modules.keys():
p.add_section(module)
for (key, value) in self.modules[module].items():
p.set(module, key, str(value))
f = file(self._path, "w")
p.write(f)
f.close()
def backup_state(self, module, key, value):
"""Backup an item of system state from @module, identified
by the string @key and with the value @value. @value may be
a string or boolean.
"""
if not (isinstance(value, str) or isinstance(value, bool)):
raise ValueError("Only strings or booleans supported")
if not self.modules.has_key(module):
self.modules[module] = {}
if not self.modules.has_key(key):
self.modules[module][key] = value
self.save()
def restore_state(self, module, key):
"""Return the value of an item of system state from @module,
identified by the string @key, and remove it from the backed
up system state.
If the item doesn't exist, #None will be returned, otherwise
the original string or boolean value is returned.
"""
if not self.modules.has_key(module):
return None
if not self.modules[module].has_key(key):
return None
value = self.modules[module][key]
del self.modules[module][key]
self.save()
return value

127
ipapython/test/test_aci.py Normal file
View File

@@ -0,0 +1,127 @@
#! /usr/bin/python -E
#
# Copyright (C) 2007 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
import sys
sys.path.insert(0, ".")
import unittest
import aci
import urllib
class TestACI(unittest.TestCase):
acitemplate = ('(targetattr="%s")' +
'(targetfilter="(memberOf=%s)")' +
'(version 3.0;' +
'acl "%s";' +
'allow (write) ' +
'groupdn="ldap:///%s";)')
def setUp(self):
self.aci = aci.ACI()
def tearDown(self):
pass
def testExport(self):
self.aci.source_group = 'cn=foo, dc=freeipa, dc=org'
self.aci.dest_group = 'cn=bar, dc=freeipa, dc=org'
self.aci.name = 'this is a "name'
self.aci.attrs = ['field1', 'field2', 'field3']
exportaci = self.aci.export_to_string()
aci = TestACI.acitemplate % ('field1 || field2 || field3',
self.aci.dest_group,
'this is a "name',
self.aci.source_group)
self.assertEqual(aci, exportaci)
def testURLEncodedExport(self):
self.aci.source_group = 'cn=foo " bar, dc=freeipa, dc=org'
self.aci.dest_group = 'cn=bar, dc=freeipa, dc=org'
self.aci.name = 'this is a "name'
self.aci.attrs = ['field1', 'field2', 'field3']
exportaci = self.aci.export_to_string()
aci = TestACI.acitemplate % ('field1 || field2 || field3',
self.aci.dest_group,
'this is a "name',
urllib.quote(self.aci.source_group, "/=, "))
self.assertEqual(aci, exportaci)
def testSimpleParse(self):
attr_str = 'field3 || field4 || field5'
dest_dn = 'cn=dest\\"group, dc=freeipa, dc=org'
name = 'my name'
src_dn = 'cn=srcgroup, dc=freeipa, dc=org'
acistr = TestACI.acitemplate % (attr_str, dest_dn, name, src_dn)
self.aci.parse_acistr(acistr)
self.assertEqual(['field3', 'field4', 'field5'], self.aci.attrs)
self.assertEqual(dest_dn, self.aci.dest_group)
self.assertEqual(name, self.aci.name)
self.assertEqual(src_dn, self.aci.source_group)
def testUrlEncodedParse(self):
attr_str = 'field3 || field4 || field5'
dest_dn = 'cn=dest\\"group, dc=freeipa, dc=org'
name = 'my name'
src_dn = 'cn=src " group, dc=freeipa, dc=org'
acistr = TestACI.acitemplate % (attr_str, dest_dn, name,
urllib.quote(src_dn, "/=, "))
self.aci.parse_acistr(acistr)
self.assertEqual(['field3', 'field4', 'field5'], self.aci.attrs)
self.assertEqual(dest_dn, self.aci.dest_group)
self.assertEqual(name, self.aci.name)
self.assertEqual(src_dn, self.aci.source_group)
def testInvalidParse(self):
try:
self.aci.parse_acistr('foo bar')
self.fail('Should have failed to parse')
except SyntaxError:
pass
try:
self.aci.parse_acistr('')
self.fail('Should have failed to parse')
except SyntaxError:
pass
attr_str = 'field3 || field4 || field5'
dest_dn = 'cn=dest\\"group, dc=freeipa, dc=org'
name = 'my name'
src_dn = 'cn=srcgroup, dc=freeipa, dc=org'
acistr = TestACI.acitemplate % (attr_str, dest_dn, name, src_dn)
acistr += 'trailing garbage'
try:
self.aci.parse_acistr('')
self.fail('Should have failed to parse')
except SyntaxError:
pass
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,309 @@
#! /usr/bin/python -E
#
# Copyright (C) 2007 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
import sys
sys.path.insert(0, ".")
import unittest
import datetime
import ipautil
class TestCIDict(unittest.TestCase):
def setUp(self):
self.cidict = ipautil.CIDict()
self.cidict["Key1"] = "val1"
self.cidict["key2"] = "val2"
self.cidict["KEY3"] = "VAL3"
def tearDown(self):
pass
def testLen(self):
self.assertEqual(3, len(self.cidict))
def test__GetItem(self):
self.assertEqual("val1", self.cidict["Key1"])
self.assertEqual("val1", self.cidict["key1"])
self.assertEqual("val2", self.cidict["KEY2"])
self.assertEqual("VAL3", self.cidict["key3"])
self.assertEqual("VAL3", self.cidict["KEY3"])
try:
self.cidict["key4"]
fail("should have raised KeyError")
except KeyError:
pass
def testGet(self):
self.assertEqual("val1", self.cidict.get("Key1"))
self.assertEqual("val1", self.cidict.get("key1"))
self.assertEqual("val2", self.cidict.get("KEY2"))
self.assertEqual("VAL3", self.cidict.get("key3"))
self.assertEqual("VAL3", self.cidict.get("KEY3"))
self.assertEqual("default", self.cidict.get("key4", "default"))
def test__SetItem(self):
self.cidict["key4"] = "val4"
self.assertEqual("val4", self.cidict["key4"])
self.cidict["KEY4"] = "newval4"
self.assertEqual("newval4", self.cidict["key4"])
def testDel(self):
self.assert_(self.cidict.has_key("Key1"))
del(self.cidict["Key1"])
self.failIf(self.cidict.has_key("Key1"))
self.assert_(self.cidict.has_key("key2"))
del(self.cidict["KEY2"])
self.failIf(self.cidict.has_key("key2"))
def testClear(self):
self.assertEqual(3, len(self.cidict))
self.cidict.clear()
self.assertEqual(0, len(self.cidict))
def testCopy(self):
"""A copy is no longer a CIDict, but should preserve the case of
the keys as they were inserted."""
copy = self.cidict.copy()
self.assertEqual(3, len(copy))
self.assert_(copy.has_key("Key1"))
self.assertEqual("val1", copy["Key1"])
self.failIf(copy.has_key("key1"))
def testHasKey(self):
self.assert_(self.cidict.has_key("KEY1"))
self.assert_(self.cidict.has_key("key2"))
self.assert_(self.cidict.has_key("key3"))
def testItems(self):
items = self.cidict.items()
self.assertEqual(3, len(items))
items_set = set(items)
self.assert_(("Key1", "val1") in items_set)
self.assert_(("key2", "val2") in items_set)
self.assert_(("KEY3", "VAL3") in items_set)
def testIterItems(self):
items = []
for (k,v) in self.cidict.iteritems():
items.append((k,v))
self.assertEqual(3, len(items))
items_set = set(items)
self.assert_(("Key1", "val1") in items_set)
self.assert_(("key2", "val2") in items_set)
self.assert_(("KEY3", "VAL3") in items_set)
def testIterKeys(self):
keys = []
for k in self.cidict.iterkeys():
keys.append(k)
self.assertEqual(3, len(keys))
keys_set = set(keys)
self.assert_("Key1" in keys_set)
self.assert_("key2" in keys_set)
self.assert_("KEY3" in keys_set)
def testIterValues(self):
values = []
for k in self.cidict.itervalues():
values.append(k)
self.assertEqual(3, len(values))
values_set = set(values)
self.assert_("val1" in values_set)
self.assert_("val2" in values_set)
self.assert_("VAL3" in values_set)
def testKeys(self):
keys = self.cidict.keys()
self.assertEqual(3, len(keys))
keys_set = set(keys)
self.assert_("Key1" in keys_set)
self.assert_("key2" in keys_set)
self.assert_("KEY3" in keys_set)
def testValues(self):
values = self.cidict.values()
self.assertEqual(3, len(values))
values_set = set(values)
self.assert_("val1" in values_set)
self.assert_("val2" in values_set)
self.assert_("VAL3" in values_set)
def testUpdate(self):
newdict = { "KEY2": "newval2",
"key4": "val4" }
self.cidict.update(newdict)
self.assertEqual(4, len(self.cidict))
items = self.cidict.items()
self.assertEqual(4, len(items))
items_set = set(items)
self.assert_(("Key1", "val1") in items_set)
# note the update "overwrites" the case of the key2
self.assert_(("KEY2", "newval2") in items_set)
self.assert_(("KEY3", "VAL3") in items_set)
self.assert_(("key4", "val4") in items_set)
def testSetDefault(self):
self.assertEqual("val1", self.cidict.setdefault("KEY1", "default"))
self.failIf(self.cidict.has_key("KEY4"))
self.assertEqual("default", self.cidict.setdefault("KEY4", "default"))
self.assert_(self.cidict.has_key("KEY4"))
self.assertEqual("default", self.cidict["key4"])
self.failIf(self.cidict.has_key("KEY5"))
self.assertEqual(None, self.cidict.setdefault("KEY5"))
self.assert_(self.cidict.has_key("KEY5"))
self.assertEqual(None, self.cidict["key5"])
def testPop(self):
self.assertEqual("val1", self.cidict.pop("KEY1", "default"))
self.failIf(self.cidict.has_key("key1"))
self.assertEqual("val2", self.cidict.pop("KEY2"))
self.failIf(self.cidict.has_key("key2"))
self.assertEqual("default", self.cidict.pop("key4", "default"))
try:
self.cidict.pop("key4")
fail("should have raised KeyError")
except KeyError:
pass
def testPopItem(self):
items = set(self.cidict.items())
self.assertEqual(3, len(self.cidict))
item = self.cidict.popitem()
self.assertEqual(2, len(self.cidict))
self.assert_(item in items)
items.discard(item)
item = self.cidict.popitem()
self.assertEqual(1, len(self.cidict))
self.assert_(item in items)
items.discard(item)
item = self.cidict.popitem()
self.assertEqual(0, len(self.cidict))
self.assert_(item in items)
items.discard(item)
class TestTimeParser(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def testSimple(self):
timestr = "20070803"
time = ipautil.parse_generalized_time(timestr)
self.assertEqual(2007, time.year)
self.assertEqual(8, time.month)
self.assertEqual(3, time.day)
self.assertEqual(0, time.hour)
self.assertEqual(0, time.minute)
self.assertEqual(0, time.second)
def testHourMinSec(self):
timestr = "20051213141205"
time = ipautil.parse_generalized_time(timestr)
self.assertEqual(2005, time.year)
self.assertEqual(12, time.month)
self.assertEqual(13, time.day)
self.assertEqual(14, time.hour)
self.assertEqual(12, time.minute)
self.assertEqual(5, time.second)
def testFractions(self):
timestr = "2003092208.5"
time = ipautil.parse_generalized_time(timestr)
self.assertEqual(2003, time.year)
self.assertEqual(9, time.month)
self.assertEqual(22, time.day)
self.assertEqual(8, time.hour)
self.assertEqual(30, time.minute)
self.assertEqual(0, time.second)
timestr = "199203301544,25"
time = ipautil.parse_generalized_time(timestr)
self.assertEqual(1992, time.year)
self.assertEqual(3, time.month)
self.assertEqual(30, time.day)
self.assertEqual(15, time.hour)
self.assertEqual(44, time.minute)
self.assertEqual(15, time.second)
timestr = "20060401185912,8"
time = ipautil.parse_generalized_time(timestr)
self.assertEqual(2006, time.year)
self.assertEqual(4, time.month)
self.assertEqual(1, time.day)
self.assertEqual(18, time.hour)
self.assertEqual(59, time.minute)
self.assertEqual(12, time.second)
self.assertEqual(800000, time.microsecond)
def testTimeZones(self):
timestr = "20051213141205Z"
time = ipautil.parse_generalized_time(timestr)
self.assertEqual(0, time.tzinfo.houroffset)
self.assertEqual(0, time.tzinfo.minoffset)
offset = time.tzinfo.utcoffset(None)
self.assertEqual(0, offset.seconds)
timestr = "20051213141205+0500"
time = ipautil.parse_generalized_time(timestr)
self.assertEqual(5, time.tzinfo.houroffset)
self.assertEqual(0, time.tzinfo.minoffset)
offset = time.tzinfo.utcoffset(None)
self.assertEqual(5 * 60 * 60, offset.seconds)
timestr = "20051213141205-0500"
time = ipautil.parse_generalized_time(timestr)
self.assertEqual(-5, time.tzinfo.houroffset)
self.assertEqual(0, time.tzinfo.minoffset)
# NOTE - the offset is always positive - it's minutes
# _east_ of UTC
offset = time.tzinfo.utcoffset(None)
self.assertEqual((24 - 5) * 60 * 60, offset.seconds)
timestr = "20051213141205-0930"
time = ipautil.parse_generalized_time(timestr)
self.assertEqual(-9, time.tzinfo.houroffset)
self.assertEqual(-30, time.tzinfo.minoffset)
offset = time.tzinfo.utcoffset(None)
self.assertEqual(((24 - 9) * 60 * 60) - (30 * 60), offset.seconds)
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,97 @@
#! /usr/bin/python -E
#
# Copyright (C) 2007 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
import sys
sys.path.insert(0, ".")
import unittest
import ipavalidate
class TestValidate(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_validEmail(self):
self.assertEqual(True, ipavalidate.Email("test@freeipa.org"))
self.assertEqual(True, ipavalidate.Email("", notEmpty=False))
def test_invalidEmail(self):
self.assertEqual(False, ipavalidate.Email("test"))
self.assertEqual(False, ipavalidate.Email("test@freeipa"))
self.assertEqual(False, ipavalidate.Email("test@.com"))
self.assertEqual(False, ipavalidate.Email(""))
self.assertEqual(False, ipavalidate.Email(None))
def test_validPlain(self):
self.assertEqual(True, ipavalidate.Plain("Joe User"))
self.assertEqual(True, ipavalidate.Plain("Joe O'Malley"))
self.assertEqual(True, ipavalidate.Plain("", notEmpty=False))
self.assertEqual(True, ipavalidate.Plain(None, notEmpty=False))
self.assertEqual(True, ipavalidate.Plain("JoeUser", allowSpaces=False))
self.assertEqual(True, ipavalidate.Plain("JoeUser", allowSpaces=True))
def test_invalidPlain(self):
self.assertEqual(False, ipavalidate.Plain("Joe (User)"))
self.assertEqual(False, ipavalidate.Plain("Joe C. User"))
self.assertEqual(False, ipavalidate.Plain("", notEmpty=True))
self.assertEqual(False, ipavalidate.Plain(None, notEmpty=True))
self.assertEqual(False, ipavalidate.Plain("Joe User", allowSpaces=False))
self.assertEqual(False, ipavalidate.Plain("Joe C. User"))
def test_validString(self):
self.assertEqual(True, ipavalidate.String("Joe User"))
self.assertEqual(True, ipavalidate.String("Joe O'Malley"))
self.assertEqual(True, ipavalidate.String("", notEmpty=False))
self.assertEqual(True, ipavalidate.String(None, notEmpty=False))
self.assertEqual(True, ipavalidate.String("Joe C. User"))
def test_invalidString(self):
self.assertEqual(False, ipavalidate.String("", notEmpty=True))
self.assertEqual(False, ipavalidate.String(None, notEmpty=True))
def test_validPath(self):
self.assertEqual(True, ipavalidate.Path("/"))
self.assertEqual(True, ipavalidate.Path("/home/user"))
self.assertEqual(True, ipavalidate.Path("../home/user"))
self.assertEqual(True, ipavalidate.Path("", notEmpty=False))
self.assertEqual(True, ipavalidate.Path(None, notEmpty=False))
def test_invalidPath(self):
self.assertEqual(False, ipavalidate.Path("(foo)"))
self.assertEqual(False, ipavalidate.Path("", notEmpty=True))
self.assertEqual(False, ipavalidate.Path(None, notEmpty=True))
def test_validName(self):
self.assertEqual(True, ipavalidate.GoodName("foo"))
self.assertEqual(True, ipavalidate.GoodName("1foo"))
self.assertEqual(True, ipavalidate.GoodName("foo.bar"))
self.assertEqual(True, ipavalidate.GoodName("foo.bar$"))
def test_invalidName(self):
self.assertEqual(False, ipavalidate.GoodName("foo bar"))
self.assertEqual(False, ipavalidate.GoodName("foo%bar"))
self.assertEqual(False, ipavalidate.GoodName("*foo"))
self.assertEqual(False, ipavalidate.GoodName("$foo.bar$"))
if __name__ == '__main__':
unittest.main()

25
ipapython/version.py.in Normal file
View File

@@ -0,0 +1,25 @@
# Authors: Rob Crittenden <rcritten@redhat.com>
#
# Copyright (C) 2007 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; version 2 only
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# The full version including strings
VERSION="__VERSION__"
# Just the numeric portion of the version so one can do direct numeric
# comparisons to see if the API is compatible.
NUM_VERSION=__NUM_VERSION__