Work around pkisilent bugs.

Check directory manager password and certificate subject base for
invalid characters.
(https://bugzilla.redhat.com/show_bug.cgi?id=658641)

Shell-escape pkisilent command-line arguments.
(https://bugzilla.redhat.com/show_bug.cgi?id=741180)

ticket 1636
This commit is contained in:
Jan Cholasta 2011-09-26 08:27:01 +02:00 committed by Rob Crittenden
parent 5bc8323964
commit 428d8c4a2d
4 changed files with 58 additions and 25 deletions

View File

@ -40,7 +40,7 @@ from ConfigParser import RawConfigParser
import random import random
import tempfile import tempfile
import nss.error import nss.error
from optparse import OptionGroup from optparse import OptionGroup, OptionValueError
from ipaserver.install import dsinstance from ipaserver.install import dsinstance
from ipaserver.install import krbinstance from ipaserver.install import krbinstance
@ -92,15 +92,31 @@ def subject_callback(option, opt_str, value, parser):
""" """
name = opt_str.replace('--','') name = opt_str.replace('--','')
v = unicode(value, 'utf-8') v = unicode(value, 'utf-8')
if any(ord(c) < 0x20 for c in v):
raise OptionValueError("Subject base must not contain control characters")
if '&' in v:
raise OptionValueError("Subject base must not contain an ampersand (\"&\")")
try: try:
dn = DN(v) dn = DN(v)
for rdn in dn: for rdn in dn:
if rdn.attr.lower() not in VALID_SUBJECT_ATTRS: if rdn.attr.lower() not in VALID_SUBJECT_ATTRS:
raise ValueError('invalid attribute: %s' % rdn.attr) raise OptionValueError('invalid attribute: %s' % rdn.attr)
except ValueError, e: except ValueError, e:
raise ValueError('Invalid subject base format: %s' % str(e)) raise OptionValueError('Invalid subject base format: %s' % str(e))
parser.values.subject = str(dn) # may as well normalize it parser.values.subject = str(dn) # may as well normalize it
def validate_dm_password(password):
if len(password) < 8:
raise ValueError("Password must be at least 8 characters long")
if any(ord(c) < 0x20 for c in password):
raise ValueError("Password must not contain control characters")
if ' ' in password:
raise ValueError("Password must not contain a space (\" \")")
if '&' in password:
raise ValueError("Password must not contain an ampersand (\"&\")")
if '\\' in password:
raise ValueError("Password must not contain a backslash (\"\\\")")
def parse_options(): def parse_options():
# Guaranteed to give a random 200k range below the 2G mark (uint32_t limit) # Guaranteed to give a random 200k range below the 2G mark (uint32_t limit)
namespace = random.randint(1, 10000) * 200000 namespace = random.randint(1, 10000) * 200000
@ -204,8 +220,11 @@ def parse_options():
options, args = parser.parse_args() options, args = parser.parse_args()
safe_options = parser.get_safe_opts(options) safe_options = parser.get_safe_opts(options)
if options.dm_password is not None and len(options.dm_password) < 8: if options.dm_password is not None:
parser.error("DS admin password must be at least 8 characters long") try:
validate_dm_password(options.dm_password)
except ValueError, e:
parser.error("DS admin password: " + str(e))
if options.admin_password is not None and len(options.admin_password) < 8: if options.admin_password is not None and len(options.admin_password) < 8:
parser.error("Admin user password must be at least 8 characters long") parser.error("Admin user password must be at least 8 characters long")
@ -417,7 +436,7 @@ def read_dm_password():
print "The password must be at least 8 characters long." print "The password must be at least 8 characters long."
print "" print ""
#TODO: provide the option of generating a random password #TODO: provide the option of generating a random password
dm_password = read_password("Directory Manager") dm_password = read_password("Directory Manager", validator=validate_dm_password)
return dm_password return dm_password
def read_admin_password(): def read_admin_password():

View File

@ -196,6 +196,9 @@ def write_tmp_file(txt):
return fd return fd
def shell_quote(string):
return "'" + string.replace("'", "'\\''") + "'"
def run(args, stdin=None, raiseonerr=True, def run(args, stdin=None, raiseonerr=True,
nolog=(), env=None, capture_output=True): nolog=(), env=None, capture_output=True):
""" """
@ -248,7 +251,8 @@ def run(args, stdin=None, raiseonerr=True,
continue continue
quoted = urllib2.quote(value) quoted = urllib2.quote(value)
for nolog_value in (value, quoted): shquoted = shell_quote(value)
for nolog_value in (shquoted, value, quoted):
if capture_output: if capture_output:
stdout = stdout.replace(nolog_value, 'XXXXXXXX') stdout = stdout.replace(nolog_value, 'XXXXXXXX')
stderr = stderr.replace(nolog_value, 'XXXXXXXX') stderr = stderr.replace(nolog_value, 'XXXXXXXX')

View File

@ -593,34 +593,34 @@ class CAInstance(service.Service):
"-cs_hostname", self.fqdn, "-cs_hostname", self.fqdn,
"-cs_port", str(ADMIN_SECURE_PORT), "-cs_port", str(ADMIN_SECURE_PORT),
"-client_certdb_dir", self.ca_agent_db, "-client_certdb_dir", self.ca_agent_db,
"-client_certdb_pwd", "'%s'" % self.admin_password, "-client_certdb_pwd", self.admin_password,
"-preop_pin" , preop_pin, "-preop_pin" , preop_pin,
"-domain_name", self.domain_name, "-domain_name", self.domain_name,
"-admin_user", "admin", "-admin_user", "admin",
"-admin_email", "root@localhost", "-admin_email", "root@localhost",
"-admin_password", "'%s'" % self.admin_password, "-admin_password", self.admin_password,
"-agent_name", "ipa-ca-agent", "-agent_name", "ipa-ca-agent",
"-agent_key_size", "2048", "-agent_key_size", "2048",
"-agent_key_type", "rsa", "-agent_key_type", "rsa",
"-agent_cert_subject", "\"CN=ipa-ca-agent,%s\"" % self.subject_base, "-agent_cert_subject", "CN=ipa-ca-agent,%s" % self.subject_base,
"-ldap_host", self.fqdn, "-ldap_host", self.fqdn,
"-ldap_port", str(self.ds_port), "-ldap_port", str(self.ds_port),
"-bind_dn", "\"cn=Directory Manager\"", "-bind_dn", "cn=Directory Manager",
"-bind_password", "'%s'" % self.dm_password, "-bind_password", self.dm_password,
"-base_dn", self.basedn, "-base_dn", self.basedn,
"-db_name", "ipaca", "-db_name", "ipaca",
"-key_size", "2048", "-key_size", "2048",
"-key_type", "rsa", "-key_type", "rsa",
"-key_algorithm", "SHA256withRSA", "-key_algorithm", "SHA256withRSA",
"-save_p12", "true", "-save_p12", "true",
"-backup_pwd", "'%s'" % self.admin_password, "-backup_pwd", self.admin_password,
"-subsystem_name", self.service_name, "-subsystem_name", self.service_name,
"-token_name", "internal", "-token_name", "internal",
"-ca_subsystem_cert_subject_name", "\"CN=CA Subsystem,%s\"" % self.subject_base, "-ca_subsystem_cert_subject_name", "CN=CA Subsystem,%s" % self.subject_base,
"-ca_ocsp_cert_subject_name", "\"CN=OCSP Subsystem,%s\"" % self.subject_base, "-ca_ocsp_cert_subject_name", "CN=OCSP Subsystem,%s" % self.subject_base,
"-ca_server_cert_subject_name", "\"CN=%s,%s\"" % (self.fqdn, self.subject_base), "-ca_server_cert_subject_name", "CN=%s,%s" % (self.fqdn, self.subject_base),
"-ca_audit_signing_cert_subject_name", "\"CN=CA Audit,%s\"" % self.subject_base, "-ca_audit_signing_cert_subject_name", "CN=CA Audit,%s" % self.subject_base,
"-ca_sign_cert_subject_name", "\"CN=Certificate Authority,%s\"" % self.subject_base ] "-ca_sign_cert_subject_name", "CN=Certificate Authority,%s" % self.subject_base ]
if self.external == 1: if self.external == 1:
args.append("-external") args.append("-external")
args.append("true") args.append("true")
@ -651,7 +651,7 @@ class CAInstance(service.Service):
args.append("-clone_p12_file") args.append("-clone_p12_file")
args.append("ca.p12") args.append("ca.p12")
args.append("-clone_p12_password") args.append("-clone_p12_password")
args.append("'%s'" % self.dm_password) args.append(self.dm_password)
args.append("-sd_hostname") args.append("-sd_hostname")
args.append(self.master_host) args.append(self.master_host)
args.append("-sd_admin_port") args.append("-sd_admin_port")
@ -659,7 +659,7 @@ class CAInstance(service.Service):
args.append("-sd_admin_name") args.append("-sd_admin_name")
args.append("admin") args.append("admin")
args.append("-sd_admin_password") args.append("-sd_admin_password")
args.append("'%s'" % self.admin_password) args.append(self.admin_password)
args.append("-clone_start_tls") args.append("-clone_start_tls")
args.append("true") args.append("true")
args.append("-clone_uri") args.append("-clone_uri")
@ -668,6 +668,9 @@ class CAInstance(service.Service):
args.append("-clone") args.append("-clone")
args.append("false") args.append("false")
# pkisilent does not escape the arguments before passing them to shell
args[2:] = [ipautil.shell_quote(i) for i in args[2:]]
# Define the things we don't want logged # Define the things we don't want logged
nolog = (self.admin_password, self.dm_password,) nolog = (self.admin_password, self.dm_password,)

View File

@ -313,7 +313,11 @@ def get_password(prompt):
else: else:
return sys.stdin.readline().rstrip() return sys.stdin.readline().rstrip()
def read_password(user, confirm=True, validate=True, retry=True): def _read_password_default_validator(password):
if len(password) < 8:
raise ValueError("Password must be at least 8 characters long")
def read_password(user, confirm=True, validate=True, retry=True, validator=_read_password_default_validator):
correct = False correct = False
pwd = "" pwd = ""
while not correct: while not correct:
@ -322,10 +326,13 @@ def read_password(user, confirm=True, validate=True, retry=True):
pwd = get_password(user + " password: ") pwd = get_password(user + " password: ")
if not pwd: if not pwd:
continue continue
if validate and len(pwd) < 8: if validate:
print "Password must be at least 8 characters long" try:
pwd = "" validator(pwd)
continue except ValueError, e:
print str(e)
pwd = ""
continue
if not confirm: if not confirm:
correct = True correct = True
continue continue