Remove duplicate and unused utility code

IPA has some unused code from abandoned features (Radius, ipa 1.x user
input, commant-line tab completion), as well as some duplicate utilities.
This patch cleans up the utility modules.

Duplicate code consolidated into ipapython.ipautil:
    {ipalib.util,ipaserver.ipautil,ipapython.ipautil}.realm_to_suffix
    {ipaserver,ipapython}.ipautil.CIDict
            (with style improvements from the ipaserver version)
    {ipapython.entity,ipaserver.ipautil}.utf8_encode_value
    {ipapython.entity,ipaserver.ipautil}.utf8_encode_values

ipalib.util.get_fqdn was removed in favor of the same function in
ipaserver.install.installutils

Removed unused code:
    ipalib.util:
        load_plugins_in_dir
        import_plugins_subpackage
        make_repr (was imported but unused; also removed from tests)

    ipapython.ipautil:
        format_list
        parse_key_value_pairs
        read_pairs_file
        read_items_file
        user_input_plain
        AttributeValueCompleter
        ItemCompleter

    ipaserver.ipautil:
        get_gsserror (a different version exists in ipapython.ipautil)

ipaserver.ipautil ended up empty and is removed entirely.

https://fedorahosted.org/freeipa/ticket/2650
This commit is contained in:
Petr Viktorin 2012-04-18 11:22:35 -04:00 committed by Martin Kosek
parent c02fcf5d34
commit f19218f7d8
23 changed files with 79 additions and 726 deletions

View File

@ -143,7 +143,7 @@ def main():
# We need to ldap_enable the CA now that DS is up and running
CA.ldap_enable('CA', config.host_name, config.dirman_password,
util.realm_to_suffix(config.realm_name))
ipautil.realm_to_suffix(config.realm_name))
cs.add_simple_service('dogtagldap/%s@%s' % (config.host_name, config.realm_name))
cs.add_cert_to_service()

View File

@ -165,7 +165,7 @@ def list_replicas(realm, host, replica, dirman_passwd, verbose):
conn = ipaldap.IPAdmin(host, 636, cacert=CACERT)
conn.do_simple_bind(bindpw=dirman_passwd)
dn = str(DN('cn=masters,cn=ipa,cn=etc,%s' % util.realm_to_suffix(realm)))
dn = str(DN('cn=masters,cn=ipa,cn=etc,%s' % ipautil.realm_to_suffix(realm)))
entries = conn.search_s(dn, ldap.SCOPE_ONELEVEL)
for ent in entries:
@ -316,7 +316,7 @@ def add_link(realm, replica1, replica2, dirman_passwd, options):
conn = ipaldap.IPAdmin(replica2, 636, cacert=CACERT)
conn.do_simple_bind(bindpw=dirman_passwd)
dn = str(DN('cn=CA,cn=%s,cn=masters,cn=ipa,cn=etc,%s' % (replica2, util.realm_to_suffix(realm))))
dn = str(DN('cn=CA,cn=%s,cn=masters,cn=ipa,cn=etc,%s' % (replica2, ipautil.realm_to_suffix(realm))))
conn.search_s(dn, ldap.SCOPE_ONELEVEL)
conn.unbind_s()
except ldap.NO_SUCH_OBJECT:

View File

@ -186,7 +186,7 @@ def install_http(config, auto_redirect):
config.dir + "/http_pin.txt")
memcache = memcacheinstance.MemcacheInstance()
memcache.create_instance('MEMCACHE', config.host_name, config.dirman_password, util.realm_to_suffix(config.realm_name))
memcache.create_instance('MEMCACHE', config.host_name, config.dirman_password, ipautil.realm_to_suffix(config.realm_name))
http = httpinstance.HTTPInstance()
http.create_instance(config.realm_name, config.host_name, config.domain_name, config.dirman_password, False, pkcs12_info, self_signed_ca=True, auto_redirect=auto_redirect)
@ -236,7 +236,7 @@ def install_bind(config, options):
def install_dns_records(config, options):
if not bindinstance.dns_container_exists(config.master_host_name,
util.realm_to_suffix(config.realm_name),
ipautil.realm_to_suffix(config.realm_name),
dm_password=config.dirman_password):
return
@ -355,7 +355,7 @@ def main():
fd = open("/etc/ipa/default.conf", "w")
fd.write("[global]\n")
fd.write("host=" + config.host_name + "\n")
fd.write("basedn=" + util.realm_to_suffix(config.realm_name) + "\n")
fd.write("basedn=" + ipautil.realm_to_suffix(config.realm_name) + "\n")
fd.write("realm=" + config.realm_name + "\n")
fd.write("domain=" + config.domain_name + "\n")
fd.write("xmlrpc_uri=https://%s/ipa/xml\n" % ipautil.format_netloc(config.host_name))
@ -434,7 +434,7 @@ def main():
# We need to ldap_enable the CA now that DS is up and running
if CA and config.setup_ca:
CA.ldap_enable('CA', config.host_name, config.dirman_password,
util.realm_to_suffix(config.realm_name))
ipautil.realm_to_suffix(config.realm_name))
cs.add_simple_service('dogtagldap/%s@%s' % (config.host_name, config.realm_name))
cs.add_cert_to_service()

View File

@ -126,13 +126,13 @@ def list_replicas(realm, host, replica, dirman_passwd, verbose):
else:
conn.do_sasl_gssapi_bind()
dn = 'cn=masters,cn=ipa,cn=etc,%s' % util.realm_to_suffix(realm)
dn = 'cn=masters,cn=ipa,cn=etc,%s' % ipautil.realm_to_suffix(realm)
entries = conn.search_s(dn, ldap.SCOPE_ONELEVEL)
for ent in entries:
peers[ent.cn] = ['master', '']
dn = 'cn=replicas,cn=ipa,cn=etc,%s' % util.realm_to_suffix(realm)
dn = 'cn=replicas,cn=ipa,cn=etc,%s' % ipautil.realm_to_suffix(realm)
entries = conn.search_s(dn, ldap.SCOPE_ONELEVEL)
for ent in entries:
@ -255,7 +255,7 @@ def del_link(realm, replica1, replica2, dirman_passwd, force=False):
if type1 == replication.WINSYNC:
try:
dn = 'cn=%s,cn=replicas,cn=ipa,cn=etc,%s' % (replica2,
util.realm_to_suffix(realm))
ipautil.realm_to_suffix(realm))
entries = repl1.conn.search_s(dn, ldap.SCOPE_SUBTREE)
if len(entries) != 0:
dnset = repl1.conn.get_dns_sorted_by_length(entries,

View File

@ -307,7 +307,7 @@ def main():
enable_replication_version_checking(api.env.host, api.env.realm,
dirman_password)
subject_base = get_subject_base(api.env.host, dirman_password, util.realm_to_suffix(api.env.realm))
subject_base = get_subject_base(api.env.host, dirman_password, ipautil.realm_to_suffix(api.env.realm))
top_dir = tempfile.mkdtemp("ipa")
dir = top_dir + "/realm_info"

View File

@ -57,6 +57,7 @@ from ipaserver.plugins.ldap2 import ldap2
from ipapython import sysrestore
from ipapython.ipautil import *
from ipapython import ipautil
from ipalib import api, errors, util
from ipapython.config import IPAOptionParser
from ipalib.dn import DN
@ -812,7 +813,7 @@ def main():
fd = open(target_fname, "w")
fd.write("[global]\n")
fd.write("host=" + host_name + "\n")
fd.write("basedn=" + util.realm_to_suffix(realm_name) + "\n")
fd.write("basedn=" + ipautil.realm_to_suffix(realm_name) + "\n")
fd.write("realm=" + realm_name + "\n")
fd.write("domain=" + domain_name + "\n")
fd.write("xmlrpc_uri=https://%s/ipa/xml\n" % format_netloc(host_name))
@ -938,7 +939,7 @@ def main():
# We need to ldap_enable the CA now that DS is up and running
if not options.selfsign:
ca.ldap_enable('CA', host_name, dm_password,
util.realm_to_suffix(realm_name))
ipautil.realm_to_suffix(realm_name))
# Turn on SSL in the dogtag LDAP instance. This will get restarted
# later, we don't need SSL now.
@ -984,7 +985,7 @@ def main():
os.close(pw_fd)
memcache = memcacheinstance.MemcacheInstance()
memcache.create_instance('MEMCACHE', host_name, dm_password, util.realm_to_suffix(realm_name))
memcache.create_instance('MEMCACHE', host_name, dm_password, ipautil.realm_to_suffix(realm_name))
http = httpinstance.HTTPInstance(fstore)
if options.http_pkcs12:
@ -995,7 +996,7 @@ def main():
http.create_instance(realm_name, host_name, domain_name, dm_password, autoconfig=True, self_signed_ca=options.selfsign, subject_base=options.subject, auto_redirect=options.ui_redirect)
ipaservices.restore_context("/var/cache/ipa/sessions")
set_subject_in_config(realm_name, dm_password, util.realm_to_suffix(realm_name), options.subject)
set_subject_in_config(realm_name, dm_password, ipautil.realm_to_suffix(realm_name), options.subject)
# Apply any LDAP updates. Needs to be done after the configuration file
# is created

View File

@ -26,7 +26,6 @@ import inspect
from base import lock, check_name, NameSpace
from plugable import Plugin, is_production_mode
from parameters import create_param, parse_param_spec, Param, Str, Flag, Password
from util import make_repr
from output import Output, Entry, ListOfEntries
from text import _, ngettext

View File

@ -106,7 +106,6 @@ import csv
from xmlrpclib import MAXINT, MININT
from types import NoneType
from util import make_repr
from text import _ as ugettext
from plugable import ReadOnly, lock, check_name
from errors import ConversionError, RequirementError, ValidationError

View File

@ -61,16 +61,6 @@ def get_current_principal():
#TODO: do a kinit?
raise errors.CCacheError()
def get_fqdn():
fqdn = ""
try:
fqdn = socket.getfqdn()
except:
try:
fqdn = socket.gethostname()
except:
fqdn = ""
return fqdn
# FIXME: This function has no unit test
def find_modules_in_dir(src_dir):
@ -94,43 +84,6 @@ def find_modules_in_dir(src_dir):
yield (module, pyfile)
# FIXME: This function has no unit test
def load_plugins_in_dir(src_dir):
"""
Import each Python module found in ``src_dir``.
"""
for module in find_modules_in_dir(src_dir):
imp.load_module(module, *imp.find_module(module, [src_dir]))
# FIXME: This function has no unit test
def import_plugins_subpackage(name):
"""
Import everythig in ``plugins`` sub-package of package named ``name``.
"""
try:
plugins = __import__(name + '.plugins').plugins
except ImportError:
return
src_dir = os.path.dirname(os.path.abspath(plugins.__file__))
for name in find_modules_in_dir(src_dir):
full_name = '%s.%s' % (plugins.__name__, name)
__import__(full_name)
def make_repr(name, *args, **kw):
"""
Construct a standard representation of a class instance.
"""
args = [repr(a) for a in args]
kw = ['%s=%r' % (k, kw[k]) for k in sorted(kw)]
return '%s(%s)' % (name, ', '.join(args + kw))
def realm_to_suffix(realm_name):
s = realm_name.split(".")
terms = ["dc=" + x.lower() for x in s]
return ",".join(terms)
def validate_host_dns(log, fqdn):
"""
See if the hostname has a DNS A record.

View File

@ -17,18 +17,7 @@
import copy
import ipapython.ipautil
def utf8_encode_value(value):
if isinstance(value,unicode):
return value.encode('utf-8')
return value
def utf8_encode_values(values):
if isinstance(values,list) or isinstance(values,tuple):
return map(utf8_encode_value, values)
else:
return utf8_encode_value(values)
from ipapython import ipautil
def copy_CIDict(x):
"""Do a deep copy of a CIDict"""
@ -55,19 +44,19 @@ class Entity:
if entrydata:
if isinstance(entrydata,tuple):
self.dn = entrydata[0]
self.data = ipapython.ipautil.CIDict(entrydata[1])
self.data = ipautil.CIDict(entrydata[1])
elif isinstance(entrydata,str) or isinstance(entrydata,unicode):
self.dn = entrydata
self.data = ipapython.ipautil.CIDict()
self.data = ipautil.CIDict()
elif isinstance(entrydata,dict):
self.dn = entrydata['dn']
del entrydata['dn']
self.data = ipapython.ipautil.CIDict(entrydata)
self.data = ipautil.CIDict(entrydata)
else:
self.dn = ''
self.data = ipapython.ipautil.CIDict()
self.data = ipautil.CIDict()
self.orig_data = ipapython.ipautil.CIDict(copy_CIDict(self.data))
self.orig_data = ipautil.CIDict(copy_CIDict(self.data))
def __nonzero__(self):
"""This allows us to do tests like if entry: returns false if there is no data,
@ -120,9 +109,9 @@ class Entity:
if (len(value) < 1):
return
if (len(value) == 1):
self.data[name] = utf8_encode_values(value[0])
self.data[name] = ipautil.utf8_encode_values(value[0])
else:
self.data[name] = utf8_encode_values(value)
self.data[name] = ipautil.utf8_encode_values(value)
setValues = setValue
@ -161,7 +150,7 @@ class Entity:
def toDict(self):
"""Convert the attrs and values to a dict. The dict is keyed on the
attribute name. The value is either single value or a list of values."""
result = ipapython.ipautil.CIDict(self.data)
result = ipautil.CIDict(self.data)
result['dn'] = self.dn
return result
@ -171,7 +160,7 @@ class Entity:
def origDataDict(self):
"""Returns a dict of the original values of the user. Used for updates."""
result = ipapython.ipautil.CIDict(self.orig_data)
result = ipautil.CIDict(self.orig_data)
result['dn'] = self.dn
return result

View File

@ -195,6 +195,7 @@ def format_netloc(host, port=None):
return '%s:%s' % (host, str(port))
def realm_to_suffix(realm_name):
"""Convert a kerberos realm into the IPA suffix."""
s = realm_name.split(".")
terms = ["dc=" + x.lower() for x in s]
return ",".join(terms)
@ -412,24 +413,24 @@ class CIDict(dict):
self.update(default or {})
def __getitem__(self, key):
return super(CIDict,self).__getitem__(string.lower(key))
return super(CIDict, self).__getitem__(key.lower())
def __setitem__(self, key, value):
lower_key = string.lower(key)
lower_key = key.lower()
self._keys[lower_key] = key
return super(CIDict,self).__setitem__(string.lower(key),value)
return super(CIDict, self).__setitem__(lower_key, value)
def __delitem__(self, key):
lower_key = string.lower(key)
lower_key = key.lower()
del self._keys[lower_key]
return super(CIDict,self).__delitem__(string.lower(key))
return super(CIDict, self).__delitem__(key.lower())
def update(self, dict):
for key in dict.keys():
self[key] = dict[key]
def has_key(self, key):
return super(CIDict, self).has_key(string.lower(key))
return super(CIDict, self).has_key(key.lower())
def get(self, key, failobj=None):
try:
@ -610,118 +611,17 @@ def ipa_generate_password(characters=None,pwd_len=None):
rndpwd += rndchar
return rndpwd
def format_list(items, quote=None, page_width=80):
'''Format a list of items formatting them so they wrap to fit the
available width. The items will be sorted.
The items may optionally be quoted. The quote parameter may either be
a string, in which case it is added before and after the item. Or the
quote parameter may be a pair (either a tuple or list). In this case
quote[0] is left hand quote and quote[1] is the right hand quote.
'''
left_quote = right_quote = ''
num_items = len(items)
if not num_items: return ""
if quote is not None:
if type(quote) in StringTypes:
left_quote = right_quote = quote
elif type(quote) is TupleType or type(quote) is ListType:
left_quote = quote[0]
right_quote = quote[1]
max_len = max(map(len, items))
max_len += len(left_quote) + len(right_quote)
num_columns = (page_width + max_len) / (max_len+1)
num_rows = (num_items + num_columns - 1) / num_columns
items.sort()
rows = [''] * num_rows
i = row = col = 0
while i < num_items:
row = 0
if col == 0:
separator = ''
else:
separator = ' '
while i < num_items and row < num_rows:
rows[row] += "%s%*s" % (separator, -max_len, "%s%s%s" % (left_quote, items[i], right_quote))
i += 1
row += 1
col += 1
return '\n'.join(rows)
key_value_re = re.compile("(\w+)\s*=\s*(([^\s'\\\"]+)|(?P<quote>['\\\"])((?P=quote)|(.*?[^\\\])(?P=quote)))")
def parse_key_value_pairs(input):
''' Given a string composed of key=value pairs parse it and return
a dict of the key/value pairs. Keys must be a word, a key must be followed
by an equal sign (=) and a value. The value may be a single word or may be
quoted. Quotes may be either single or double quotes, but must be balanced.
Inside the quoted text the same quote used to start the quoted value may be
used if it is escaped by preceding it with a backslash (\).
White space between the key, the equal sign, and the value is ignored.
Values are always strings. Empty values must be specified with an empty
quoted string, it's value after parsing will be an empty string.
Example: The string
arg0 = '' arg1 = 1 arg2='two' arg3 = "three's a crowd" arg4 = "this is a \" quote"
will produce
arg0= arg1=1
arg2=two
arg3=three's a crowd
arg4=this is a " quote
'''
kv_dict = {}
for match in key_value_re.finditer(input):
key = match.group(1)
quote = match.group('quote')
if match.group(5):
value = match.group(6)
if value is None: value = ''
value = re.sub('\\\%s' % quote, quote, value)
else:
value = match.group(2)
kv_dict[key] = value
return kv_dict
def parse_items(text):
'''Given text with items separated by whitespace or comma, return a list of those items'''
'''Given text with items separated by whitespace or comma, return a list of those items
The returned list only contains non-empty items.
'''
split_re = re.compile('[ ,\t\n]+')
items = split_re.split(text)
for item in items[:]:
if not item: items.remove(item)
return items
def read_pairs_file(filename):
comment_re = re.compile('#.*$', re.MULTILINE)
if filename == '-':
fd = sys.stdin
else:
fd = open(filename)
text = fd.read()
text = comment_re.sub('', text) # kill comments
pairs = parse_key_value_pairs(text)
if fd != sys.stdin: fd.close()
return pairs
def read_items_file(filename):
comment_re = re.compile('#.*$', re.MULTILINE)
if filename == '-':
fd = sys.stdin
else:
fd = open(filename)
text = fd.read()
text = comment_re.sub('', text) # kill comments
items = parse_items(text)
if fd != sys.stdin: fd.close()
return items
def user_input(prompt, default = None, allow_empty = True):
if default == None:
while True:
@ -761,357 +661,6 @@ def user_input(prompt, default = None, allow_empty = True):
else:
return ret
def user_input_plain(prompt, default = None, allow_empty = True, allow_spaces = True):
while True:
ret = user_input(prompt, default, allow_empty)
if ipavalidate.Plain(ret, not allow_empty, allow_spaces):
return ret
class AttributeValueCompleter:
'''
Gets input from the user in the form "lhs operator rhs"
TAB completes partial input.
lhs completes to a name in @lhs_names
The lhs is fully parsed if a lhs_delim delimiter is seen, then TAB will
complete to the operator and a default value.
Default values for a lhs value can specified as:
- a string, all lhs values will use this default
- a dict, the lhs value is looked up in the dict to return the default or None
- a function with a single arg, the lhs value, it returns the default or None
After creating the completer you must open it to set the terminal
up, Then get a line of input from the user by calling read_input()
which returns two values, the lhs and rhs, which might be None if
lhs or rhs was not parsed. After you are done getting input you
should close the completer to restore the terminal.
Example: (note this is essentially what the convenience function get_pairs() does)
This will allow the user to autocomplete foo & foobar, both have
defaults defined in a dict. In addition the foobar attribute must
be specified before the prompting loop will exit. Also, this
example show how to require that each attrbute entered by the user
is valid.
attrs = ['foo', 'foobar']
defaults = {'foo' : 'foo_default', 'foobar' : 'foobar_default'}
mandatory_attrs = ['foobar']
c = AttributeValueCompleter(attrs, defaults)
c.open()
mandatory_attrs_remaining = mandatory_attrs[:]
while True:
if mandatory_attrs_remaining:
attribute, value = c.read_input("Enter: ", mandatory_attrs_remaining[0])
try:
mandatory_attrs_remaining.remove(attribute)
except ValueError:
pass
else:
attribute, value = c.read_input("Enter: ")
if attribute is None:
# Are we done?
if mandatory_attrs_remaining:
print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining))
continue
else:
break
if attribute not in attrs:
print "ERROR: %s is not a valid attribute" % (attribute)
else:
print "got '%s' = '%s'" % (attribute, value)
c.close()
print "exiting..."
'''
def __init__(self, lhs_names, default_value=None, lhs_regexp=r'^\s*(?P<lhs>[^ =]+)', lhs_delims=' =',
operator='=', strip_rhs=True):
self.lhs_names = lhs_names
self.default_value = default_value
# lhs_regexp must have named group 'lhs' which returns the contents of the lhs
self.lhs_regexp = lhs_regexp
self.lhs_re = re.compile(self.lhs_regexp)
self.lhs_delims = lhs_delims
self.operator = operator
self.strip_rhs = strip_rhs
self.pairs = None
self._reset()
def _reset(self):
self.lhs = None
self.lhs_complete = False
self.operator_complete = False
self.rhs = None
def open(self):
# Save state
self.prev_completer = readline.get_completer()
self.prev_completer_delims = readline.get_completer_delims()
# Set up for ourself
readline.parse_and_bind("tab: complete")
readline.set_completer(self.complete)
readline.set_completer_delims(self.lhs_delims)
def close(self):
# Restore previous state
readline.set_completer_delims(self.prev_completer_delims)
readline.set_completer(self.prev_completer)
def parse_input(self):
'''We are looking for 3 tokens: <lhs,op,rhs>
Extract as much of each token as possible.
Set flags indicating if token is fully parsed.
'''
try:
self._reset()
buf_len = len(self.line_buffer)
pos = 0
lhs_match = self.lhs_re.search(self.line_buffer, pos)
if not lhs_match: return # no lhs content
self.lhs = lhs_match.group('lhs') # get lhs contents
pos = lhs_match.end('lhs') # new scanning position
if pos == buf_len: return # nothing after lhs, lhs incomplete
self.lhs_complete = True # something trails the lhs, lhs is complete
operator_beg = self.line_buffer.find(self.operator, pos) # locate operator
if operator_beg == -1: return # did not find the operator
self.operator_complete = True # operator fully parsed
operator_end = operator_beg + len(self.operator)
pos = operator_end # step over the operator
self.rhs = self.line_buffer[pos:]
except Exception, e:
traceback.print_exc()
print "Exception in %s.parse_input(): %s" % (self.__class__.__name__, e)
def get_default_value(self):
'''default_value can be a string, a dict, or a function.
If it's a string it's a global default for all attributes.
If it's a dict the default is looked up in the dict index by attribute.
If it's a function, the function is called with 1 parameter, the attribute
and it should return the default value for the attriubte or None'''
if not self.lhs_complete: raise ValueError("attribute not parsed")
# If the user previously provided a value let that override the supplied default
if self.pairs is not None:
prev_value = self.pairs.get(self.lhs)
if prev_value is not None: return prev_value
# No previous user provided value, query for a default
default_value_type = type(self.default_value)
if default_value_type is DictType:
return self.default_value.get(self.lhs, None)
elif default_value_type is FunctionType:
return self.default_value(self.lhs)
elif default_value_type is StringType:
return self.default_value
else:
return None
def get_lhs_completions(self, text):
if text:
self.completions = [lhs for lhs in self.lhs_names if lhs.startswith(text)]
else:
self.completions = self.lhs_names
def complete(self, state):
self.line_buffer= readline.get_line_buffer()
self.parse_input()
if not self.lhs_complete:
# lhs is not complete, set up to complete the lhs
if state == 0:
beg = readline.get_begidx()
end = readline.get_endidx()
self.get_lhs_completions(self.line_buffer[beg:end])
if state >= len(self.completions): return None
return self.completions[state]
elif not self.operator_complete:
# lhs is complete, but the operator is not so we complete
# by inserting the operator manually.
# Also try to complete the default value at this time.
readline.insert_text('%s ' % self.operator)
default_value = self.get_default_value()
if default_value is not None:
readline.insert_text(default_value)
readline.redisplay()
return None
else:
# lhs and operator are complete, if the rhs is blank
# (either empty or only only whitespace) then attempt
# to complete by inserting the default value, otherwise
# there is nothing we can complete to so we're done.
if self.rhs.strip():
return None
default_value = self.get_default_value()
if default_value is not None:
readline.insert_text(default_value)
readline.redisplay()
return None
def pre_input_hook(self):
readline.insert_text('%s %s ' % (self.initial_lhs, self.operator))
readline.redisplay()
def read_input(self, prompt, initial_lhs=None):
self.initial_lhs = initial_lhs
try:
self._reset()
if initial_lhs is None:
readline.set_pre_input_hook(None)
else:
readline.set_pre_input_hook(self.pre_input_hook)
self.line_buffer = raw_input(prompt).strip()
self.parse_input()
if self.strip_rhs and self.rhs is not None:
return self.lhs, self.rhs.strip()
else:
return self.lhs, self.rhs
except EOFError:
return None, None
def get_pairs(self, prompt, mandatory_attrs=None, validate_callback=None, must_match=True, value_required=True):
self.pairs = {}
if mandatory_attrs:
mandatory_attrs_remaining = mandatory_attrs[:]
else:
mandatory_attrs_remaining = []
print "Enter name = value"
print "Press <ENTER> to accept, a blank line terminates input"
print "Pressing <TAB> will auto completes name, assignment, and value"
print
while True:
if mandatory_attrs_remaining:
attribute, value = self.read_input(prompt, mandatory_attrs_remaining[0])
else:
attribute, value = self.read_input(prompt)
if attribute is None:
# Are we done?
if mandatory_attrs_remaining:
print "ERROR, you must specify: %s" % (','.join(mandatory_attrs_remaining))
continue
else:
break
if value is None:
if value_required:
print "ERROR: you must specify a value for %s" % attribute
continue
else:
if must_match and attribute not in self.lhs_names:
print "ERROR: %s is not a valid name" % (attribute)
continue
if validate_callback is not None:
if not validate_callback(attribute, value):
print "ERROR: %s is not valid for %s" % (value, attribute)
continue
try:
mandatory_attrs_remaining.remove(attribute)
except ValueError:
pass
self.pairs[attribute] = value
return self.pairs
class ItemCompleter:
'''
Prompts the user for items in a list of items with auto completion.
TAB completes partial input.
More than one item can be specifed during input, whitespace and/or comma's seperate.
Example:
possible_items = ['foo', 'bar']
c = ItemCompleter(possible_items)
c.open()
# Use read_input() to limit input to a single carriage return (e.g. <ENTER>)
#items = c.read_input("Enter: ")
# Use get_items to iterate until a blank line is entered.
items = c.get_items("Enter: ")
c.close()
print "items=%s" % (items)
'''
def __init__(self, items):
self.items = items
self.initial_input = None
self.item_delims = ' \t,'
self.operator = '='
self.split_re = re.compile('[%s]+' % self.item_delims)
def open(self):
# Save state
self.prev_completer = readline.get_completer()
self.prev_completer_delims = readline.get_completer_delims()
# Set up for ourself
readline.parse_and_bind("tab: complete")
readline.set_completer(self.complete)
readline.set_completer_delims(self.item_delims)
def close(self):
# Restore previous state
readline.set_completer_delims(self.prev_completer_delims)
readline.set_completer(self.prev_completer)
def get_item_completions(self, text):
if text:
self.completions = [lhs for lhs in self.items if lhs.startswith(text)]
else:
self.completions = self.items
def complete(self, state):
self.line_buffer= readline.get_line_buffer()
if state == 0:
beg = readline.get_begidx()
end = readline.get_endidx()
self.get_item_completions(self.line_buffer[beg:end])
if state >= len(self.completions): return None
return self.completions[state]
def pre_input_hook(self):
readline.insert_text('%s %s ' % (self.initial_input, self.operator))
readline.redisplay()
def read_input(self, prompt, initial_input=None):
items = []
self.initial_input = initial_input
try:
if initial_input is None:
readline.set_pre_input_hook(None)
else:
readline.set_pre_input_hook(self.pre_input_hook)
self.line_buffer = raw_input(prompt).strip()
items = self.split_re.split(self.line_buffer)
for item in items[:]:
if not item: items.remove(item)
return items
except EOFError:
return items
def get_items(self, prompt, must_match=True):
items = []
print "Enter name [name ...]"
print "Press <ENTER> to accept, blank line or control-D terminates input"
print "Pressing <TAB> auto completes name"
print
while True:
new_items = self.read_input(prompt)
if not new_items: break
for item in new_items:
if must_match:
if item not in self.items:
print "ERROR: %s is not valid" % (item)
continue
if item in items: continue
items.append(item)
return items
def get_gsserror(e):
"""
@ -1446,3 +995,16 @@ def make_sshfp(key):
else:
return
return '%d 1 %s' % (algo, fp)
def utf8_encode_value(value):
if isinstance(value, unicode):
return value.encode('utf-8')
return value
def utf8_encode_values(values):
if isinstance(values, list) or isinstance(values, tuple):
return map(utf8_encode_value, values)
else:
return utf8_encode_value(values)

View File

@ -395,7 +395,7 @@ class BindInstance(service.Service):
self.domain = domain_name
self.forwarders = forwarders
self.host = fqdn.split(".")[0]
self.suffix = util.realm_to_suffix(self.realm)
self.suffix = ipautil.realm_to_suffix(self.realm)
self.ntp = ntp
self.reverse_zone = reverse_zone
self.zone_refresh = zone_refresh
@ -614,7 +614,7 @@ class BindInstance(service.Service):
self.realm = realm_name
self.domain = domain_name
self.host = fqdn.split(".")[0]
self.suffix = util.realm_to_suffix(self.realm)
self.suffix = ipautil.realm_to_suffix(self.realm)
self.ntp = ntp
self.reverse_zone = reverse_zone
@ -622,7 +622,7 @@ class BindInstance(service.Service):
def remove_master_dns_records(self, fqdn, realm_name, domain_name):
host = fqdn.split(".")[0]
suffix = util.realm_to_suffix(realm_name)
suffix = ipautil.realm_to_suffix(realm_name)
zone = domain_name
resource_records = (

View File

@ -240,7 +240,7 @@ class CADSInstance(service.Service):
if host_name and realm_name:
self.principal = "dogtagldap/%s@%s" % (self.fqdn, self.realm_name)
if realm_name:
self.suffix = util.realm_to_suffix(self.realm_name)
self.suffix = ipautil.realm_to_suffix(self.realm_name)
self.__setup_sub_dict()
else:
self.suffix = None
@ -250,7 +250,7 @@ class CADSInstance(service.Service):
subject_base=None):
self.ds_port = ds_port
self.realm_name = realm_name.upper()
self.suffix = util.realm_to_suffix(self.realm_name)
self.suffix = ipautil.realm_to_suffix(self.realm_name)
self.fqdn = host_name
self.dm_password = dm_password
self.domain = domain_name

View File

@ -167,7 +167,7 @@ class DsInstance(service.Service):
self.open_ports = []
self.run_init_memberof = True
if realm_name:
self.suffix = util.realm_to_suffix(self.realm_name)
self.suffix = ipautil.realm_to_suffix(self.realm_name)
self.__setup_sub_dict()
else:
self.suffix = None
@ -218,7 +218,7 @@ class DsInstance(service.Service):
hbac_allow=True):
self.realm_name = realm_name.upper()
self.serverid = realm_to_serverid(self.realm_name)
self.suffix = util.realm_to_suffix(self.realm_name)
self.suffix = ipautil.realm_to_suffix(self.realm_name)
self.fqdn = fqdn
self.dm_password = dm_password
self.domain = domain_name
@ -251,7 +251,7 @@ class DsInstance(service.Service):
domain_name, dm_password, pkcs12_info=None):
self.realm_name = realm_name.upper()
self.serverid = realm_to_serverid(self.realm_name)
self.suffix = util.realm_to_suffix(self.realm_name)
self.suffix = ipautil.realm_to_suffix(self.realm_name)
self.master_fqdn = master_fqdn
self.fqdn = fqdn
self.dm_password = dm_password

View File

@ -62,7 +62,7 @@ class HTTPInstance(service.Service):
self.realm = realm
self.domain = domain_name
self.dm_password = dm_password
self.suffix = util.realm_to_suffix(self.realm)
self.suffix = ipautil.realm_to_suffix(self.realm)
self.pkcs12_info = pkcs12_info
self.self_signed_ca = self_signed_ca
self.principal = "HTTP/%s@%s" % (self.fqdn, self.realm)

View File

@ -132,7 +132,7 @@ class KrbInstance(service.Service):
self.host = host_name.split(".")[0]
self.ip = socket.getaddrinfo(host_name, None, socket.AF_UNSPEC, socket.SOCK_STREAM)[0][4][0]
self.domain = domain_name
self.suffix = util.realm_to_suffix(self.realm)
self.suffix = ipautil.realm_to_suffix(self.realm)
self.kdc_password = ipautil.ipa_generate_password()
self.admin_password = admin_password
self.dm_password = admin_password

View File

@ -79,7 +79,7 @@ class LDAPUpdate:
krbctx = krbV.default_context()
try:
self.realm = krbctx.default_realm
suffix = util.realm_to_suffix(self.realm)
suffix = ipautil.realm_to_suffix(self.realm)
except krbV.Krb5Error:
self.realm = None
suffix = None

View File

@ -105,7 +105,7 @@ class ReplicationManager(object):
self.dirman_passwd = dirman_passwd
self.realm = realm
self.starttls = starttls
tmp = util.realm_to_suffix(realm)
tmp = ipautil.realm_to_suffix(realm)
self.suffix = str(DN(tmp)).lower()
self.need_memberof_fixup = False

View File

@ -34,7 +34,7 @@ import ldap.sasl
import ldapurl
from ldap.controls import LDAPControl
from ldap.ldapobject import SimpleLDAPObject
from ipaserver import ipautil
from ipapython import ipautil
from ipalib import errors
from ipapython.ipautil import format_netloc
from ipapython.entity import Entity

View File

@ -1,141 +0,0 @@
# Authors: Simo Sorce <ssorce@redhat.com>
#
# Copyright (C) 2007 Red Hat
# see file 'COPYING' for use and warranty information
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import xmlrpclib
import re
def realm_to_suffix(realm_name):
"""
Convert a kerberos realm into the IPA suffix.
"""
s = realm_name.split(".")
terms = ["dc=" + x.lower() for x in s]
return ",".join(terms)
class CIDict(dict):
"""
Case-insensitive but case-respecting dictionary.
This code is derived from python-ldap's cidict.py module,
written by stroeder: http://python-ldap.sourceforge.net/
This version extends 'dict' so it works properly with TurboGears.
If you extend UserDict, isinstance(foo, dict) returns false.
"""
def __init__(self, default=None):
super(CIDict, self).__init__()
self._keys = {}
self.update(default or {})
def __getitem__(self, key):
return super(CIDict, self).__getitem__(key.lower())
def __setitem__(self, key, value):
lower_key = key.lower()
self._keys[lower_key] = key
return super(CIDict, self).__setitem__(lower_key, value)
def __delitem__(self, key):
lower_key = key.lower()
del self._keys[lower_key]
return super(CIDict, self).__delitem__(key.lower())
def update(self, dict):
for key in dict.keys():
self[key] = dict[key]
def has_key(self, key):
return super(CIDict, self).has_key(key.lower())
def get(self, key, failobj=None):
try:
return self[key]
except KeyError:
return failobj
def keys(self):
return self._keys.values()
def items(self):
result = []
for k in self._keys.values():
result.append((k, self[k]))
return result
def copy(self):
copy = {}
for k in self._keys.values():
copy[k] = self[k]
return copy
def iteritems(self):
return self.copy().iteritems()
def iterkeys(self):
return self.copy().iterkeys()
def setdefault(self, key, value=None):
try:
return self[key]
except KeyError:
self[key] = value
return value
def pop(self, key, *args):
try:
value = self[key]
del self[key]
return value
except KeyError:
if len(args) == 1:
return args[0]
raise
def popitem(self):
(lower_key, value) = super(CIDict, self).popitem()
key = self._keys[lower_key]
del self._keys[lower_key]
return (key, value)
def get_gsserror(e):
"""A GSSError exception looks differently in python 2.4 than it does
in python 2.5, deal with it."""
try:
primary = e[0]
secondary = e[1]
except Exception:
primary = e[0][0]
secondary = e[0][1]
return (primary[0], secondary[0])
def utf8_encode_value(value):
if isinstance(value, unicode):
return value.encode('utf-8')
return value
def utf8_encode_values(values):
if isinstance(values, list) or isinstance(values, tuple):
return map(utf8_encode_value, values)
else:
return utf8_encode_value(values)

View File

@ -21,11 +21,13 @@
Joining an IPA domain
"""
import krbV
from ipalib import api, util
from ipalib import Command, Str
from ipalib import errors
import krbV
from ipalib import _
from ipaserver.install import installutils
def get_realm():
"""
@ -52,7 +54,7 @@ class join(Command):
validate_host,
cli_name='hostname',
doc=_("The hostname to register as"),
default_from=lambda: unicode(util.get_fqdn()),
default_from=lambda: unicode(installutils.get_fqdn()),
autofill=True,
#normalizer=lamda value: value.lower(),
),

View File

@ -31,7 +31,7 @@ from ipalib.backend import Executioner
from ipalib.errors import PublicError, InternalError, CommandError, JSONError, ConversionError, CCacheError, RefererError, InvalidSessionPassword
from ipalib.request import context, Connection, destroy_context
from ipalib.rpc import xml_dumps, xml_loads
from ipalib.util import make_repr, parse_time_duration
from ipalib.util import parse_time_duration
from ipalib.dn import DN
from ipaserver.plugins.ldap2 import ldap2
from ipapython.compat import json

View File

@ -21,17 +21,6 @@
Test the `ipalib.util` module.
"""
from tests.util import raises
from ipalib import util
def test_make_repr():
"""
Test the `ipalib.util.make_repr` function.
"""
f = util.make_repr
assert f('my') == 'my()'
assert f('my', True, u'hello') == "my(True, u'hello')"
assert f('my', one=1, two='two') == "my(one=1, two='two')"
assert f('my', None, 3, dog='animal', apple='fruit') == \
"my(None, 3, apple='fruit', dog='animal')"