Refactor ipautil.run

The ipautil.run function now returns an object with returncode and
output are accessible as attributes.

The stdout and stderr of all commands are logged (unless skip_output is given).

The stdout/stderr contents must be explicitly requested with a keyword
argument, otherwise they are None.
This is because in Python 3, the output needs to be decoded, and that can
fail if it's not decodable (human-readable) text.

The raw (bytes) output is always available from the result object,
as is "leniently" decoded output suitable for logging.

All calls are changed to reflect this.

A use of Popen in cainstance is changed to ipautil.run.

Reviewed-By: Jan Cholasta <jcholast@redhat.com>
This commit is contained in:
Petr Viktorin 2015-11-25 17:17:18 +01:00 committed by Jan Cholasta
parent 4cc206b0f8
commit 099cf98307
28 changed files with 476 additions and 245 deletions

View File

@ -156,15 +156,23 @@ def request_cert():
args = [path] + sys.argv[1:]
if os.environ.get('CERTMONGER_CA_PROFILE') == 'caCACert':
args += ['-N', '-O', 'bypassCAnotafter=true']
stdout, stderr, rc = ipautil.run(args, raiseonerr=False, env=os.environ)
sys.stderr.write(stderr)
result = ipautil.run(args, raiseonerr=False, env=os.environ,
capture_output=True)
if six.PY2:
sys.stderr.write(result.raw_error_output)
else:
# Write bytes directly
sys.stderr.buffer.write(result.raw_error_output) #pylint: disable=no-member
sys.stderr.flush()
syslog.syslog(syslog.LOG_NOTICE, "dogtag-ipa-renew-agent returned %d" % rc)
syslog.syslog(syslog.LOG_NOTICE,
"dogtag-ipa-renew-agent returned %d" % result.returncode)
stdout = result.output
if stdout.endswith('\n'):
stdout = stdout[:-1]
rc = result.returncode
if rc == WAIT_WITH_DELAY:
delay, sep, cookie = stdout.partition('\n')
return (rc, delay, cookie)

View File

@ -30,6 +30,8 @@ import sys
import syslog
import traceback
import six
from ipapython import ipautil
from ipaserver.install import certs
@ -39,14 +41,18 @@ def main():
raise RuntimeError("Not enough arguments")
with certs.renewal_lock:
stdout, stderr, rc = ipautil.run(sys.argv[1:], raiseonerr=False,
env=os.environ)
sys.stdout.write(stdout)
result = ipautil.run(sys.argv[1:], raiseonerr=False, env=os.environ)
if six.PY2:
sys.stdout.write(result.raw_output)
sys.stderr.write(result.raw_error_output)
else:
# Write bytes directly
sys.stdout.buffer.write(result.raw_output) #pylint: disable=no-member
sys.stderr.buffer.write(result.raw_error_output) #pylint: disable=no-member
sys.stdout.flush()
sys.stderr.write(stderr)
sys.stderr.flush()
return rc
return result.returncode
try:

View File

@ -26,9 +26,8 @@ def retrieve_keytab(api, ccache_name, oneway_keytab_name, oneway_principal):
if os.path.isfile(oneway_keytab_name):
os.unlink(oneway_keytab_name)
(stdout, stderr, retcode) = ipautil.run(getkeytab_args,
env={'KRB5CCNAME': ccache_name, 'LANG': 'C'},
raiseonerr=False)
ipautil.run(getkeytab_args, env={'KRB5CCNAME': ccache_name, 'LANG': 'C'},
raiseonerr=False)
# Make sure SSSD is able to read the keytab
try:
sssd = pwd.getpwnam('sssd')

View File

@ -82,7 +82,8 @@ class SshExec(object):
elif 'KRB5CCNAME' in os.environ:
env['KRB5CCNAME'] = os.environ['KRB5CCNAME']
return ipautil.run(cmd, env=env, raiseonerr=False)
return ipautil.run(cmd, env=env, raiseonerr=False,
capture_output=True, capture_error=True)
class CheckedPort(object):
@ -432,21 +433,21 @@ def main():
sys.exit("Principal password required")
stderr=''
(stdout, stderr, returncode) = ipautil.run([paths.KINIT, principal],
result = ipautil.run([paths.KINIT, principal],
env={'KRB5_CONFIG':KRB5_CONFIG, 'KRB5CCNAME':CCACHE_FILE},
stdin=password, raiseonerr=False)
if returncode != 0:
raise RuntimeError("Cannot acquire Kerberos ticket: %s" % stderr)
stdin=password, raiseonerr=False, capture_error=True)
if result.returncode != 0:
raise RuntimeError("Cannot acquire Kerberos ticket: %s" %
result.error_output)
# Verify kinit was actually successful
stderr=''
(stdout, stderr, returncode) = ipautil.run([paths.BIN_KVNO,
result = ipautil.run([paths.BIN_KVNO,
'host/%s' % options.master],
env={'KRB5_CONFIG':KRB5_CONFIG, 'KRB5CCNAME':CCACHE_FILE},
raiseonerr=False)
if returncode != 0:
raise RuntimeError("Could not get ticket for master server: %s" % stderr)
raiseonerr=False, capture_error=True)
if result.returncode != 0:
raise RuntimeError("Could not get ticket for master server: %s" %
result.error_output)
try:
print_info("Check RPC connection to remote master")
@ -519,18 +520,20 @@ def main():
ssh = SshExec(user, options.master)
print_info("Check SSH connection to remote master")
stdout, stderr, returncode = ssh('echo OK', verbose=True)
if returncode != 0:
result = ssh('echo OK', verbose=True)
if result.returncode != 0:
print('Could not SSH into remote host. Error output:')
for line in stderr.splitlines():
for line in result.error_output.splitlines():
print(' %s' % line)
raise RuntimeError('Could not SSH to remote host.')
print_info("Execute check on remote master")
stdout, stderr, returncode = ssh(
result = ssh(
"/usr/sbin/ipa-replica-conncheck " +
" ".join(remote_check_opts))
print_info(stdout)
returncode = result.returncode
stderr = result.error_output
print_info(result.output)
if returncode != 0:
raise RuntimeError("Remote master check failed with following error message(s):\n%s" % stderr)
else:

View File

@ -603,9 +603,9 @@ def uninstall(options, env):
if options.debug:
join_args.append("-d")
env['XMLRPC_TRACE_CURL'] = 'yes'
(stdout, stderr, returncode) = run(join_args, raiseonerr=False, env=env)
if returncode != 0:
root_logger.error("Unenrolling host failed: %s", stderr)
result = run(join_args, raiseonerr=False, env=env)
if result.returncode != 0:
root_logger.error("Unenrolling host failed: %s", result.error_log)
if os.path.exists(paths.IPA_DEFAULT_CONF):
root_logger.info(
@ -1438,8 +1438,8 @@ def configure_sshd_config(fstore, options):
args.append('-o')
args.append('%s=%s' % item)
(stdout, stderr, retcode) = ipautil.run(args, raiseonerr=False)
if retcode == 0:
result = ipautil.run(args, raiseonerr=False)
if result.returncode == 0:
authorized_keys_changes = candidate
break
@ -1475,11 +1475,11 @@ def configure_automount(options):
args.append('--no-sssd')
try:
stdout, _, _ = run(args)
result = run(args)
except Exception as e:
root_logger.error('Automount configuration failed: %s', str(e))
else:
root_logger.info(stdout)
root_logger.info(result.output_log)
def configure_nisdomain(options, domain):
@ -1491,9 +1491,12 @@ def configure_nisdomain(options, domain):
# First backup the old NIS domain name
if os.path.exists(paths.BIN_NISDOMAINNAME):
try:
nis_domain_name, _, _ = ipautil.run([paths.BIN_NISDOMAINNAME])
result = ipautil.run([paths.BIN_NISDOMAINNAME],
capture_output=True)
except CalledProcessError as e:
pass
else:
nis_domain_name = result.output
statestore.backup_state('network', 'nisdomain', nis_domain_name)
@ -1530,8 +1533,9 @@ def unconfigure_nisdomain():
def get_iface_from_ip(ip_addr):
ipresult = ipautil.run([paths.IP, '-oneline', 'address', 'show'])
for line in ipresult[0].split('\n'):
result = ipautil.run([paths.IP, '-oneline', 'address', 'show'],
capture_output=True)
for line in result.output.split('\n'):
fields = line.split()
if len(fields) < 6:
continue
@ -1548,8 +1552,8 @@ def get_local_ipaddresses(iface=None):
args = [paths.IP, '-oneline', 'address', 'show']
if iface:
args += ['dev', iface]
ipresult = ipautil.run(args)
lines = ipresult[0].split('\n')
result = ipautil.run(args, capture_output=True)
lines = result.output.split('\n')
ips = []
for line in lines:
fields = line.split()
@ -1922,9 +1926,10 @@ def get_ca_certs_from_http(url, warn=True):
root_logger.debug("trying to retrieve CA cert via HTTP from %s", url)
try:
stdout, stderr, rc = run([paths.BIN_CURL, "-o", "-", url])
result = run([paths.BIN_CURL, "-o", "-", url], capture_output=True)
except CalledProcessError as e:
raise errors.NoCertificateError(entry=url)
stdout = result.output
try:
certs = x509.load_certificate_list(stdout)
@ -2671,12 +2676,15 @@ def install(options, env, fstore, statestore):
return CLIENT_INSTALL_ERROR
# Now join the domain
(stdout, stderr, returncode) = run(join_args, raiseonerr=False, env=env, nolog=nolog)
result = run(
join_args, raiseonerr=False, env=env, nolog=nolog,
capture_error=True)
stderr = result.error_output
if returncode != 0:
if result.returncode != 0:
root_logger.error("Joining realm failed: %s", stderr)
if not options.force:
if returncode == 13:
if result.returncode == 13:
root_logger.info("Use --force-join option to override "
"the host entry on the server "
"and force client enrollment.")
@ -2694,8 +2702,7 @@ def install(options, env, fstore, statestore):
subject_base = DN(subject_base)
if options.principal is not None:
stderr, stdout, returncode = run(
["kdestroy"], raiseonerr=False, env=env)
run(["kdestroy"], raiseonerr=False, env=env)
# Obtain the TGT. We do it with the temporary krb5.conf, so that
# only the KDC we're installing under is contacted.

View File

@ -279,9 +279,9 @@ class pwpolicy(LDAPObject):
has_lockout = False
lockout_params = ()
(stdout, stderr, rc) = run(['klist', '-V'], raiseonerr=False)
if rc == 0:
verstr = stdout.split()[-1]
result = run(['klist', '-V'], raiseonerr=False, capture_output=True)
if result.returncode == 0:
verstr = result.output.split()[-1]
ver = version.LooseVersion(verstr)
min = version.LooseVersion(MIN_KRB5KDC_WITH_LOCKOUT)
if ver >= min:

View File

@ -281,7 +281,7 @@ class SystemdService(PlatformService):
if instance == "ipa-otpd.socket":
args.append("--ignore-dependencies")
ipautil.run(args, capture_output=capture_output)
ipautil.run(args, skip_output=not capture_output)
if getattr(self.api.env, 'context', None) in ['ipactl', 'installer']:
update_service_list = True
@ -294,7 +294,7 @@ class SystemdService(PlatformService):
def start(self, instance_name="", capture_output=True, wait=True):
ipautil.run([paths.SYSTEMCTL, "start",
self.service_instance(instance_name)],
capture_output=capture_output)
skip_output=not capture_output)
if getattr(self.api.env, 'context', None) in ['ipactl', 'installer']:
update_service_list = True
@ -310,7 +310,7 @@ class SystemdService(PlatformService):
def restart(self, instance_name="", capture_output=True, wait=True):
ipautil.run([paths.SYSTEMCTL, "restart",
self.service_instance(instance_name)],
capture_output=capture_output)
skip_output=not capture_output)
if wait and self.is_running(instance_name):
self.wait_for_open_ports(self.service_instance(instance_name))
@ -320,7 +320,7 @@ class SystemdService(PlatformService):
while True:
try:
(sout, serr, rcode) = ipautil.run(
result = ipautil.run(
[paths.SYSTEMCTL, "is-active", instance],
capture_output=True
)
@ -331,24 +331,24 @@ class SystemdService(PlatformService):
return False
else:
# activating
if rcode == 3 and 'activating' in str(sout):
if result.returncode == 3 and 'activating' in result.output:
time.sleep(SERVICE_POLL_INTERVAL)
continue
# active
if rcode == 0:
if result.returncode == 0:
return True
# not active
return False
def is_installed(self):
try:
(sout, serr, rcode) = ipautil.run([paths.SYSTEMCTL,
"list-unit-files",
"--full"])
if rcode != 0:
result = ipautil.run(
[paths.SYSTEMCTL, "list-unit-files", "--full"],
capture_output=True)
if result.returncode != 0:
return False
else:
svar = self.parse_variables(sout)
svar = self.parse_variables(result.output)
if not self.service_instance("") in svar:
# systemd doesn't show the service
return False
@ -360,12 +360,11 @@ class SystemdService(PlatformService):
def is_enabled(self, instance_name=""):
enabled = True
try:
(sout, serr, rcode) = ipautil.run(
[paths.SYSTEMCTL,
"is-enabled",
self.service_instance(instance_name)])
result = ipautil.run(
[paths.SYSTEMCTL, "is-enabled",
self.service_instance(instance_name)])
if rcode != 0:
if result.returncode != 0:
enabled = False
except ipautil.CalledProcessError:
@ -375,12 +374,12 @@ class SystemdService(PlatformService):
def is_masked(self, instance_name=""):
masked = False
try:
(sout, serr, rcode) = ipautil.run(
[paths.SYSTEMCTL,
"is-enabled",
self.service_instance(instance_name)])
result = ipautil.run(
[paths.SYSTEMCTL, "is-enabled",
self.service_instance(instance_name)],
capture_output=True)
if rcode == 1 and sout == 'masked':
if result.returncode == 1 and result.output == 'masked':
masked = True
except ipautil.CalledProcessError:

View File

@ -220,9 +220,9 @@ class RedHatCAService(RedHatService):
url
]
stdout, stderr, returncode = ipautil.run(args)
result = ipautil.run(args, capture_output=True)
status = dogtag._parse_ca_status(stdout)
status = dogtag._parse_ca_status(result.output)
# end of workaround
except Exception as e:
status = 'check interrupted due to error: %s' % e

View File

@ -375,8 +375,11 @@ class RedHatTaskNamespace(BaseTaskNamespace):
if state is None:
continue
try:
(stdout, stderr, rc) = ipautil.run([paths.GETSEBOOL, setting])
original_state = stdout.split()[2]
result = ipautil.run(
[paths.GETSEBOOL, setting],
capture_output=True
)
original_state = result.output.split()[2]
if backup_func is not None:
backup_func(setting, original_state)

View File

@ -107,10 +107,10 @@ class NSSDatabase(object):
def __exit__(self, type, value, tb):
self.close()
def run_certutil(self, args, stdin=None):
def run_certutil(self, args, stdin=None, **kwargs):
new_args = [paths.CERTUTIL, "-d", self.secdir]
new_args = new_args + args
return ipautil.run(new_args, stdin)
return ipautil.run(new_args, stdin, **kwargs)
def create_db(self, password_filename):
"""Create cert DB
@ -124,8 +124,8 @@ class NSSDatabase(object):
:return: List of (name, trust_flags) tuples
"""
certs, stderr, returncode = self.run_certutil(["-L"])
certs = certs.splitlines()
result = self.run_certutil(["-L"], capture_output=True)
certs = result.output.splitlines()
# FIXME, this relies on NSS never changing the formatting of certutil
certlist = []
@ -157,9 +157,8 @@ class NSSDatabase(object):
:return: List of certificate names
"""
root_nicknames = []
chain, stderr, returncode = self.run_certutil([
"-O", "-n", nickname])
chain = chain.splitlines()
result = self.run_certutil(["-O", "-n", nickname], capture_output=True)
chain = result.output.splitlines()
for c in chain:
m = re.match('\s*"(.*)" \[.*', c)
@ -247,7 +246,8 @@ class NSSDatabase(object):
'-print_certs',
]
try:
stdout, stderr, rc = ipautil.run(args, stdin=body)
result = ipautil.run(
args, stdin=body, capture_output=True)
except ipautil.CalledProcessError as e:
if label == 'CERTIFICATE':
root_logger.warning(
@ -259,7 +259,7 @@ class NSSDatabase(object):
filename, line, e)
continue
else:
extracted_certs += stdout + '\n'
extracted_certs += result.output + '\n'
loaded = True
continue
@ -286,14 +286,15 @@ class NSSDatabase(object):
'-passin', 'file:' + key_pwdfile.name,
]
try:
stdout, stderr, rc = ipautil.run(args, stdin=body)
result = ipautil.run(
args, stdin=body, capture_output=True)
except ipautil.CalledProcessError as e:
root_logger.warning(
"Skipping private key in %s at line %s: %s",
filename, line, e)
continue
else:
extracted_key = stdout
extracted_key = result.output
key_file = filename
loaded = True
continue
@ -401,10 +402,13 @@ class NSSDatabase(object):
else:
args.append('-r')
try:
cert, err, returncode = self.run_certutil(args)
result = self.run_certutil(args, capture_output=pem)
except ipautil.CalledProcessError:
raise RuntimeError("Failed to get %s" % nickname)
return cert
if pem:
return result.output
else:
return result.raw_output
def has_nickname(self, nickname):
try:

View File

@ -40,8 +40,8 @@ class BINDMgr(object):
def notify_zone(self, zone):
cmd = ['rndc', 'sign', zone.to_text()]
output = ipautil.run(cmd)[0]
self.log.info(output)
result = ipautil.run(cmd, capture_output=True)
self.log.info('%s', result.output_log)
def dn2zone_name(self, dn):
"""cn=KSK-20140813162153Z-cede9e182fc4af76c4bddbc19123a565,cn=keys,idnsname=test,cn=dns,dc=ipa,dc=example"""
@ -110,7 +110,8 @@ class BINDMgr(object):
cmd.append(zone.to_text())
# keys has to be readable by ODS & named
basename = ipautil.run(cmd)[0].strip()
result = ipautil.run(cmd, capture_output=True)
basename = result.output.strip()
private_fn = "%s/%s.private" % (workdir, basename)
os.chmod(private_fn, FILE_PERM)
# this is useful mainly for debugging

View File

@ -128,7 +128,8 @@ class ODSMgr(object):
Raises CalledProcessError if returncode != 0.
"""
cmd = ['ods-ksmutil'] + params
return ipautil.run(cmd)[0]
result = ipautil.run(cmd, capture_output=True)
return result.output
def get_ods_zonelist(self):
stdout = self.ksmutil(['zonelist', 'export'])

View File

@ -37,6 +37,8 @@ import gssapi
import pwd
import grp
from contextlib import contextmanager
import locale
import collections
from dns import resolver, rdatatype
from dns.exception import DNSException
@ -155,8 +157,10 @@ class CheckedIPAddress(netaddr.IPAddress):
elif addr.version == 6:
family = 'inet6'
ipresult = run([paths.IP, '-family', family, '-oneline', 'address', 'show'])
lines = ipresult[0].split('\n')
result = run(
[paths.IP, '-family', family, '-oneline', 'address', 'show'],
capture_output=True)
lines = result.output.split('\n')
for line in lines:
fields = line.split()
if len(fields) < 4:
@ -256,13 +260,35 @@ def write_tmp_file(txt):
return fd
def shell_quote(string):
return "'" + string.replace("'", "'\\''") + "'"
if isinstance(string, str):
return "'" + string.replace("'", "'\\''") + "'"
else:
return b"'" + string.replace(b"'", b"'\\''") + b"'"
def run(args, stdin=None, raiseonerr=True,
nolog=(), env=None, capture_output=True, skip_output=False, cwd=None,
runas=None, timeout=None, suplementary_groups=[]):
if six.PY3:
def _log_arg(s):
"""Convert string or bytes to a string suitable for logging"""
if isinstance(s, bytes):
return s.decode(locale.getpreferredencoding(),
errors='replace')
else:
return s
else:
_log_arg = str
class _RunResult(collections.namedtuple('_RunResult',
'output error_output returncode')):
"""Result of ipautil.run"""
def run(args, stdin=None, raiseonerr=True, nolog=(), env=None,
capture_output=False, skip_output=False, cwd=None,
runas=None, timeout=None, suplementary_groups=[],
capture_error=False, encoding=None):
"""
Execute a command and return stdin, stdout and the process return code.
Execute an external command.
:param args: List of arguments for the command
:param stdin: Optional input to the command
@ -283,8 +309,8 @@ def run(args, stdin=None, raiseonerr=True,
If a value isn't found in the list it is silently ignored.
:param env: Dictionary of environment variables passed to the command.
When None, current environment is copied
:param capture_output: Capture stderr and stdout
:param skip_output: Redirect the output to /dev/null and do not capture it
:param capture_output: Capture stdout
:param skip_output: Redirect the output to /dev/null and do not log it
:param cwd: Current working directory
:param runas: Name of a user that the command should be run as. The spawned
process will have both real and effective UID and GID set.
@ -293,6 +319,31 @@ def run(args, stdin=None, raiseonerr=True,
:param suplementary_groups: List of group names that will be used as
suplementary groups for subporcess.
The option runas must be specified together with this option.
:param capture_error: Capture stderr
:param encoding: For Python 3, the encoding to use for output,
error_output, and (if it's not bytes) stdin.
If None, the current encoding according to locale is used.
:return: An object with these attributes:
`returncode`: The process' exit status
`output` and `error_output`: captured output, as strings. Under
Python 3, these are encoded with the given `encoding`.
None unless `capture_output` or `capture_error`, respectively, are
given
`raw_output`, `raw_error_output`: captured output, as bytes.
`output_log` and `error_log`: The captured output, as strings, with any
unencodable characters discarded. These should only be used
for logging or error messages.
If skip_output is given, all output-related attributes on the result
(that is, all except `returncode`) are None.
For backwards compatibility, the return value can also be used as a
(output, error_output, returncode) triple.
"""
assert isinstance(suplementary_groups, list)
p_in = None
@ -301,12 +352,16 @@ def run(args, stdin=None, raiseonerr=True,
if isinstance(nolog, six.string_types):
# We expect a tuple (or list, or other iterable) of nolog strings.
# Passing just a single string is bad: strings are also, so this
# Passing just a single string is bad: strings are iterable, so this
# would result in every individual character of that string being
# replaced by XXXXXXXX.
# This is a sanity check to prevent that.
raise ValueError('nolog must be a tuple of strings.')
if skip_output and (capture_output or capture_error):
raise ValueError('skip_output is incompatible with '
'capture_output or capture_error')
if env is None:
# copy default env
env = copy.deepcopy(os.environ)
@ -315,16 +370,22 @@ def run(args, stdin=None, raiseonerr=True,
p_in = subprocess.PIPE
if skip_output:
p_out = p_err = open(paths.DEV_NULL, 'w')
elif capture_output:
else:
p_out = subprocess.PIPE
p_err = subprocess.PIPE
if encoding is None:
encoding = locale.getpreferredencoding()
if six.PY3 and isinstance(stdin, str):
stdin = stdin.encode(encoding)
if timeout:
# If a timeout was provided, use the timeout command
# to execute the requested command.
args[0:0] = [paths.BIN_TIMEOUT, str(timeout)]
arg_string = nolog_replace(' '.join(shell_quote(a) for a in args), nolog)
arg_string = nolog_replace(' '.join(_log_arg(a) for a in args), nolog)
root_logger.debug('Starting external process')
root_logger.debug('args=%s' % arg_string)
@ -352,8 +413,7 @@ def run(args, stdin=None, raiseonerr=True,
p = subprocess.Popen(args, stdin=p_in, stdout=p_out, stderr=p_err,
close_fds=True, env=env, cwd=cwd,
preexec_fn=preexec_fn)
stdout,stderr = p.communicate(stdin)
stdout,stderr = str(stdout), str(stderr) # Make pylint happy
stdout, stderr = p.communicate(stdin)
except KeyboardInterrupt:
root_logger.debug('Process interrupted')
p.wait()
@ -372,16 +432,50 @@ def run(args, stdin=None, raiseonerr=True,
# The command and its output may include passwords that we don't want
# to log. Replace those.
if capture_output and not skip_output:
stdout = nolog_replace(stdout, nolog)
stderr = nolog_replace(stderr, nolog)
root_logger.debug('stdout=%s' % stdout)
root_logger.debug('stderr=%s' % stderr)
if skip_output:
output_log = None
error_log = None
else:
if six.PY3:
output_log = stdout.decode(locale.getpreferredencoding(),
errors='replace')
else:
output_log = stdout
if six.PY3:
error_log = stderr.decode(locale.getpreferredencoding(),
errors='replace')
else:
error_log = stderr
output_log = nolog_replace(output_log, nolog)
root_logger.debug('stdout=%s' % output_log)
error_log = nolog_replace(error_log, nolog)
root_logger.debug('stderr=%s' % error_log)
if capture_output:
if six.PY2:
output = stdout
else:
output = stdout.encode(encoding)
else:
output = None
if capture_error:
if six.PY2:
error_output = stderr
else:
error_output = stderr.encode(encoding)
else:
error_output = None
if p.returncode != 0 and raiseonerr:
raise CalledProcessError(p.returncode, arg_string, stdout)
raise CalledProcessError(p.returncode, arg_string, str(output))
return (stdout, stderr, p.returncode)
result = _RunResult(output, error_output, p.returncode)
result.raw_output = stdout
result.raw_error_output = stderr
result.output_log = output_log
result.error_log = error_log
return result
def nolog_replace(string, nolog):
@ -1269,10 +1363,10 @@ def kinit_password(principal, password, ccache_name, config=None,
# this workaround enables us to capture stderr and put it
# into the raised exception in case of unsuccessful authentication
(stdout, stderr, retcode) = run(args, stdin=password, env=env,
raiseonerr=False)
if retcode:
raise RuntimeError(stderr)
result = run(args, stdin=password, env=env, raiseonerr=False,
capture_error=True)
if result.returncode:
raise RuntimeError(result.error_output)
def dn_attribute_property(private_name):

View File

@ -36,24 +36,29 @@ def dump_keys():
"""
Dump all keys
"""
(stdout, stderr, rc) = run(['keyctl', 'list', KEYRING], raiseonerr=False)
return stdout
result = run(['keyctl', 'list', KEYRING], raiseonerr=False,
capture_output=True)
return result.output
def get_real_key(key):
"""
One cannot request a key based on the description it was created with
so find the one we're looking for.
"""
(stdout, stderr, rc) = run(['keyctl', 'search', KEYRING, KEYTYPE, key], raiseonerr=False)
if rc:
assert isinstance(key, str)
result = run(['keyctl', 'search', KEYRING, KEYTYPE, key],
raiseonerr=False, capture_output=True)
if result.returncode:
raise ValueError('key %s not found' % key)
return stdout.rstrip()
return result.output.rstrip()
def get_persistent_key(key):
(stdout, stderr, rc) = run(['keyctl', 'get_persistent', KEYRING, key], raiseonerr=False)
if rc:
assert isinstance(key, str)
result = run(['keyctl', 'get_persistent', KEYRING, key],
raiseonerr=False, capture_output=True)
if result.returncode:
raise ValueError('persistent key %s not found' % key)
return stdout.rstrip()
return result.output.rstrip()
def is_persistent_keyring_supported():
uid = os.geteuid()
@ -68,6 +73,7 @@ def has_key(key):
"""
Returns True/False whether the key exists in the keyring.
"""
assert isinstance(key, str)
try:
get_real_key(key)
return True
@ -80,22 +86,27 @@ def read_key(key):
Use pipe instead of print here to ensure we always get the raw data.
"""
assert isinstance(key, str)
real_key = get_real_key(key)
(stdout, stderr, rc) = run(['keyctl', 'pipe', real_key], raiseonerr=False)
if rc:
raise ValueError('keyctl pipe failed: %s' % stderr)
result = run(['keyctl', 'pipe', real_key], raiseonerr=False,
capture_output=True)
if result.returncode:
raise ValueError('keyctl pipe failed: %s' % result.error_log)
return stdout
return result.output
def update_key(key, value):
"""
Update the keyring data. If they key doesn't exist it is created.
"""
assert isinstance(key, str)
assert isinstance(value, bytes)
if has_key(key):
real_key = get_real_key(key)
(stdout, stderr, rc) = run(['keyctl', 'pupdate', real_key], stdin=value, raiseonerr=False)
if rc:
raise ValueError('keyctl pupdate failed: %s' % stderr)
result = run(['keyctl', 'pupdate', real_key], stdin=value,
raiseonerr=False)
if result.returncode:
raise ValueError('keyctl pupdate failed: %s' % result.error_log)
else:
add_key(key, value)
@ -103,17 +114,22 @@ def add_key(key, value):
"""
Add a key to the kernel keyring.
"""
assert isinstance(key, str)
assert isinstance(value, bytes)
if has_key(key):
raise ValueError('key %s already exists' % key)
(stdout, stderr, rc) = run(['keyctl', 'padd', KEYTYPE, key, KEYRING], stdin=value, raiseonerr=False)
if rc:
raise ValueError('keyctl padd failed: %s' % stderr)
result = run(['keyctl', 'padd', KEYTYPE, key, KEYRING],
stdin=value, raiseonerr=False)
if result.returncode:
raise ValueError('keyctl padd failed: %s' % result.error_log)
def del_key(key):
"""
Remove a key from the keyring
"""
assert isinstance(key, str)
real_key = get_real_key(key)
(stdout, stderr, rc) = run(['keyctl', 'unlink', real_key, KEYRING], raiseonerr=False)
if rc:
raise ValueError('keyctl unlink failed: %s' % stderr)
result = run(['keyctl', 'unlink', real_key, KEYRING],
raiseonerr=False)
if result.returncode:
raise ValueError('keyctl unlink failed: %s' % result.error_log)

View File

@ -588,7 +588,7 @@ class DomainValidator(object):
# Destroy the contents of the ccache
root_logger.debug('Destroying the contents of the separate ccache')
(stdout, stderr, returncode) = ipautil.run(
ipautil.run(
[paths.KDESTROY, '-A', '-c', ccache_path],
env={'KRB5CCNAME': ccache_path},
raiseonerr=False)
@ -597,16 +597,14 @@ class DomainValidator(object):
root_logger.debug('Running kinit from ipa.keytab to obtain HTTP '
'service principal with MS-PAC attached.')
(stdout, stderr, returncode) = ipautil.run(
result = ipautil.run(
[paths.KINIT, '-kt', keytab, principal],
env={'KRB5CCNAME': ccache_path},
raiseonerr=False)
if returncode == 0:
if result.returncode == 0:
return (ccache_path, principal)
else:
root_logger.debug('Kinit failed, stout: %s, stderr: %s'
% (stdout, stderr))
return (None, None)
def kinit_as_administrator(self, domain):
@ -634,7 +632,7 @@ class DomainValidator(object):
# Destroy the contents of the ccache
root_logger.debug('Destroying the contents of the separate ccache')
(stdout, stderr, returncode) = ipautil.run(
ipautil.run(
[paths.KDESTROY, '-A', '-c', ccache_path],
env={'KRB5CCNAME': ccache_path},
raiseonerr=False)
@ -642,17 +640,15 @@ class DomainValidator(object):
# Destroy the contents of the ccache
root_logger.debug('Running kinit with credentials of AD administrator')
(stdout, stderr, returncode) = ipautil.run(
result = ipautil.run(
[paths.KINIT, principal],
env={'KRB5CCNAME': ccache_path},
stdin=password,
raiseonerr=False)
if returncode == 0:
if result.returncode == 0:
return (ccache_path, principal)
else:
root_logger.debug('Kinit failed, stout: %s, stderr: %s'
% (stdout, stderr))
return (None, None)
def search_in_dc(self, domain, filter, attrs, scope, basedn=None,

View File

@ -549,10 +549,12 @@ class CAInstance(DogtagInstance):
x509.write_certificate(cert.der_data, cert_file.name)
cert_file.flush()
cert_chain, stderr, rc = ipautil.run(
result = ipautil.run(
[paths.OPENSSL, 'crl2pkcs7',
'-certfile', self.cert_chain_file,
'-nocrl'])
'-nocrl'],
capture_output=True)
cert_chain = result.output
# Dogtag chokes on the header and footer, remove them
# https://bugzilla.redhat.com/show_bug.cgi?id=1127838
cert_chain = re.search(
@ -621,11 +623,12 @@ class CAInstance(DogtagInstance):
# Look through the cert chain to get all the certs we need to add
# trust for
p = subprocess.Popen([paths.CERTUTIL, "-d", self.agent_db,
"-O", "-n", "ipa-ca-agent"], stdout=subprocess.PIPE)
chain = p.stdout.read()
chain = chain.split("\n")
args = [paths.CERTUTIL,
"-d", self.agent_db,
"-O",
"-n", "ipa-ca-agent"]
result = ipautil.run(args, capture_output=True)
chain = result.output.split("\n")
root_nickname=[]
for part in chain:
@ -660,10 +663,11 @@ class CAInstance(DogtagInstance):
'-r', '/ca/agent/ca/profileReview?requestId=%s' % self.requestId,
'%s' % ipautil.format_netloc(self.fqdn, 8443),
]
(stdout, _stderr, _returncode) = ipautil.run(
args, nolog=(self.admin_password,))
result = ipautil.run(
args, nolog=(self.admin_password,),
capture_output=True)
data = stdout.split('\n')
data = result.output.split('\n')
params = get_defList(data)
params['requestId'] = find_substring(data, "requestId")
params['op'] = 'approve'
@ -682,10 +686,11 @@ class CAInstance(DogtagInstance):
'-r', '/ca/agent/ca/profileProcess',
'%s' % ipautil.format_netloc(self.fqdn, 8443),
]
(stdout, _stderr, _returncode) = ipautil.run(
args, nolog=(self.admin_password,))
result = ipautil.run(
args, nolog=(self.admin_password,),
capture_output=True)
data = stdout.split('\n')
data = result.output.split('\n')
outputList = get_outputList(data)
self.ra_cert = outputList['b64_cert']
@ -776,14 +781,15 @@ class CAInstance(DogtagInstance):
conn.disconnect()
def __run_certutil(self, args, database=None, pwd_file=None, stdin=None):
def __run_certutil(self, args, database=None, pwd_file=None, stdin=None,
**kwargs):
if not database:
database = self.ra_agent_db
if not pwd_file:
pwd_file = self.ra_agent_pwd
new_args = [paths.CERTUTIL, "-d", database, "-f", pwd_file]
new_args = new_args + args
return ipautil.run(new_args, stdin, nolog=(pwd_file,))
return ipautil.run(new_args, stdin, nolog=(pwd_file,), **kwargs)
def __create_ra_agent_db(self):
if ipautil.file_exists(self.ra_agent_db + "/cert8.db"):
@ -822,13 +828,14 @@ class CAInstance(DogtagInstance):
# makes openssl throw up.
data = base64.b64decode(chain)
(certlist, _stderr, _returncode) = ipautil.run(
result = ipautil.run(
[paths.OPENSSL,
"pkcs7",
"-inform",
"DER",
"-print_certs",
], stdin=data)
], stdin=data, capture_output=True)
certlist = result.output
# Ok, now we have all the certificates in certs, walk through it
# and pull out each certificate and add it to our database
@ -869,14 +876,15 @@ class CAInstance(DogtagInstance):
# Generate our CSR. The result gets put into stdout
try:
(stdout, _stderr, _returncode) = self.__run_certutil(
result = self.__run_certutil(
["-R", "-k", "rsa", "-g", "2048", "-s",
str(DN(('CN', 'IPA RA'), self.subject_base)),
"-z", noise_name, "-a"])
"-z", noise_name, "-a"],
capture_output=True)
finally:
os.remove(noise_name)
csr = pkcs10.strip_header(stdout)
csr = pkcs10.strip_header(result.output)
# Send the request to the CA
conn = httplib.HTTPConnection(self.fqdn, 8080)
@ -1051,7 +1059,9 @@ class CAInstance(DogtagInstance):
def publish_ca_cert(self, location):
args = ["-L", "-n", self.canickname, "-a"]
(cert, _err, _returncode) = self.__run_certutil(args)
result = self.__run_certutil(
args, capture_output=True)
cert = result.output
fd = open(location, "w+")
fd.write(cert)
fd.close()

View File

@ -164,8 +164,8 @@ class CertDB(object):
def gen_password(self):
return sha1(ipautil.ipa_generate_password()).hexdigest()
def run_certutil(self, args, stdin=None):
return self.nssdb.run_certutil(args, stdin)
def run_certutil(self, args, stdin=None, **kwargs):
return self.nssdb.run_certutil(args, stdin, **kwargs)
def run_signtool(self, args, stdin=None):
with open(self.passwd_fname, "r") as f:
@ -231,8 +231,9 @@ class CertDB(object):
root_nicknames = self.find_root_cert(nickname)[:-1]
fd = open(self.cacert_fname, "w")
for root in root_nicknames:
(cert, stderr, returncode) = self.run_certutil(["-L", "-n", root, "-a"])
fd.write(cert)
result = self.run_certutil(["-L", "-n", root, "-a"],
capture_output=True)
fd.write(result.output)
fd.close()
os.chmod(self.cacert_fname, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
if create_pkcs12:
@ -276,7 +277,8 @@ class CertDB(object):
"""
try:
args = ["-L", "-n", nickname, "-a"]
(cert, err, returncode) = self.run_certutil(args)
result = self.run_certutil(args, capture_output=True)
cert = result.output
if pem:
return cert
else:
@ -370,9 +372,10 @@ class CertDB(object):
"-z", self.noise_fname,
"-f", self.passwd_fname,
"-a"]
(stdout, stderr, returncode) = self.run_certutil(args)
result = self.run_certutil(args,
capture_output=True, capture_error=True)
os.remove(self.noise_fname)
return (stdout, stderr)
return (result.output, result.error_output)
def issue_server_cert(self, certreq_fname, cert_fname):
self.setup_cert_request()

View File

@ -28,6 +28,9 @@ import re
import dbus
import shlex
import pipes
import locale
import six
from ipaserver.install import service
from ipaserver.install import certs
@ -63,13 +66,17 @@ def httpd_443_configured():
False otherwise.
"""
try:
(stdout, stderr, rc) = ipautil.run([paths.HTTPD, '-t', '-D', 'DUMP_VHOSTS'])
result = ipautil.run([paths.HTTPD, '-t', '-D', 'DUMP_VHOSTS'],
capture_output=True)
except ipautil.CalledProcessError as e:
service.print_msg("WARNING: cannot check if port 443 is already configured")
service.print_msg("httpd returned error when checking: %s" % e)
return False
port_line_re = re.compile(r'(?P<address>\S+):(?P<port>\d+)')
stdout = result.raw_output
if six.PY3:
stdout = stdout.decode(locale.getpreferredencoding(), errors='replace')
for line in stdout.splitlines():
m = port_line_re.match(line)
if m and int(m.group('port')) == 443:

View File

@ -66,7 +66,6 @@ EOF
"""
def encrypt_file(filename, keyring, remove_original=True):
source = filename
dest = filename + '.gpg'
@ -86,9 +85,9 @@ def encrypt_file(filename, keyring, remove_original=True):
args.append('-e')
args.append(source)
(stdout, stderr, rc) = run(args, raiseonerr=False)
if rc != 0:
raise admintool.ScriptError('gpg failed: %s' % stderr)
result = run(args, raiseonerr=False)
if result.returncode != 0:
raise admintool.ScriptError('gpg failed: %s' % result.error_log)
if remove_original:
os.unlink(source)
@ -420,9 +419,9 @@ class Backup(admintool.AdminTool):
'-r',
'-n', backend,
'-a', ldiffile]
(stdout, stderr, rc) = run(args, raiseonerr=False)
if rc != 0:
self.log.critical("db2ldif failed: %s", stderr)
result = run(args, raiseonerr=False)
if result.returncode != 0:
self.log.critical('db2ldif failed: %s', result.error_log)
# Move the LDIF backup to our location
shutil.move(ldiffile, os.path.join(self.dir, ldifname))
@ -464,9 +463,9 @@ class Backup(admintool.AdminTool):
wait_for_task(conn, dn)
else:
args = [paths.DB2BAK, bakdir, '-Z', instance]
(stdout, stderr, rc) = run(args, raiseonerr=False)
if rc != 0:
self.log.critical("db2bak failed: %s" % stderr)
result = run(args, raiseonerr=False)
if result.returncode != 0:
self.log.critical('db2bak failed: %s', result.error_log)
shutil.move(bakdir, self.dir)
@ -493,10 +492,10 @@ class Backup(admintool.AdminTool):
if options.logs:
args.extend(verify_directories(self.logs))
(stdout, stderr, rc) = run(args, raiseonerr=False)
if rc != 0:
raise admintool.ScriptError('tar returned non-zero code '
'%d: %s' % (rc, stderr))
result = run(args, raiseonerr=False)
if result.returncode != 0:
raise admintool.ScriptError('tar returned non-zero code %d: %s' %
(result.returncode, result.error_log))
# Backup the necessary directory structure. This is a separate
# call since we are using the '--no-recursion' flag to store
@ -514,17 +513,21 @@ class Backup(admintool.AdminTool):
]
args.extend(missing_directories)
(stdout, stderr, rc) = run(args, raiseonerr=False)
if rc != 0:
raise admintool.ScriptError('tar returned non-zero %d when adding '
'directory structure: %s' % (rc, stderr))
result = run(args, raiseonerr=False)
if result.returncode != 0:
raise admintool.ScriptError(
'tar returned non-zero code %d '
'when adding directory structure: %s' %
(result.returncode, result.error_log))
# Compress the archive. This is done separately, since 'tar' cannot
# append to a compressed archive.
(stdout, stderr, rc) = run(['gzip', tarfile], raiseonerr=False)
if rc != 0:
raise admintool.ScriptError('gzip returned non-zero %d when '
'compressing the backup: %s' % (rc, stderr))
result = run(['gzip', tarfile], raiseonerr=False)
if result.returncode != 0:
raise admintool.ScriptError(
'gzip returned non-zero code %d '
'when compressing the backup: %s' %
(result.returncode, result.error_log))
# Rename the archive back to files.tar to preserve compatibility
os.rename(os.path.join(self.dir, 'files.tar.gz'), tarfile)
@ -596,9 +599,11 @@ class Backup(admintool.AdminTool):
filename,
'.'
]
(stdout, stderr, rc) = run(args, raiseonerr=False)
if rc != 0:
raise admintool.ScriptError('tar returned non-zero %d: %s' % (rc, stdout))
result = run(args, raiseonerr=False)
if result.returncode != 0:
raise admintool.ScriptError(
'tar returned non-zero code %s: %s' %
(result.returncode, result.error_log))
if encrypt:
self.log.info('Encrypting %s' % filename)

View File

@ -25,8 +25,10 @@ import time
import pwd
import ldif
import itertools
import locale
from six.moves.configparser import SafeConfigParser
import six
from ipalib import api, errors, constants
from ipapython import version, ipautil, certdb
@ -88,9 +90,9 @@ def decrypt_file(tmpdir, filename, keyring):
args.append('-d')
args.append(source)
(stdout, stderr, rc) = run(args, raiseonerr=False)
if rc != 0:
raise admintool.ScriptError('gpg failed: %s' % stderr)
result = run(args, raiseonerr=False)
if result.returncode != 0:
raise admintool.ScriptError('gpg failed: %s' % result.error_log)
return dest
@ -365,9 +367,9 @@ class Restore(admintool.AdminTool):
dirsrv.start(capture_output=False)
else:
self.log.info('Stopping IPA services')
(stdout, stderr, rc) = run(['ipactl', 'stop'], raiseonerr=False)
if rc not in [0, 6]:
self.log.warn('Stopping IPA failed: %s' % stderr)
result = run(['ipactl', 'stop'], raiseonerr=False)
if result.returncode not in [0, 6]:
self.log.warn('Stopping IPA failed: %s' % result.error_log)
self.restore_selinux_booleans()
@ -573,9 +575,9 @@ class Restore(admintool.AdminTool):
'-Z', instance,
'-i', ldiffile,
'-n', backend]
(stdout, stderr, rc) = run(args, raiseonerr=False)
if rc != 0:
self.log.critical("ldif2db failed: %s" % stderr)
result = run(args, raiseonerr=False)
if result.returncode != 0:
self.log.critical("ldif2db failed: %s" % result.error_log)
def bak2db(self, instance, backend, online=True):
@ -628,9 +630,9 @@ class Restore(admintool.AdminTool):
if backend is not None:
args.append('-n')
args.append(backend)
(stdout, stderr, rc) = run(args, raiseonerr=False)
if rc != 0:
self.log.critical("bak2db failed: %s" % stderr)
result = run(args, raiseonerr=False)
if result.returncode != 0:
self.log.critical("bak2db failed: %s" % result.error_log)
def restore_default_conf(self):
@ -650,10 +652,10 @@ class Restore(admintool.AdminTool):
paths.IPA_DEFAULT_CONF[1:],
]
(stdout, stderr, rc) = run(args, raiseonerr=False)
if rc != 0:
result = run(args, raiseonerr=False)
if result.returncode != 0:
self.log.critical('Restoring %s failed: %s' %
(paths.IPA_DEFAULT_CONF, stderr))
(paths.IPA_DEFAULT_CONF, result.error_log))
os.chdir(cwd)
def remove_old_files(self):
@ -696,9 +698,9 @@ class Restore(admintool.AdminTool):
args.append('--exclude')
args.append('var/log')
(stdout, stderr, rc) = run(args, raiseonerr=False)
if rc != 0:
self.log.critical('Restoring files failed: %s', stderr)
result = run(args, raiseonerr=False)
if result.returncode != 0:
self.log.critical('Restoring files failed: %s', result.error_log)
os.chdir(cwd)

View File

@ -300,9 +300,10 @@ class KrbInstance(service.Service):
MIN_KRB5KDC_WITH_WORKERS = "1.9"
cpus = os.sysconf('SC_NPROCESSORS_ONLN')
workers = False
(stdout, stderr, rc) = ipautil.run(['klist', '-V'], raiseonerr=False)
if rc == 0:
verstr = stdout.split()[-1]
result = ipautil.run(['klist', '-V'],
raiseonerr=False, capture_output=True)
if result.returncode == 0:
verstr = result.output.split()[-1]
ver = version.LooseVersion(verstr)
min = version.LooseVersion(MIN_KRB5KDC_WITH_WORKERS)
if ver >= min:

View File

@ -288,10 +288,11 @@ class OpenDNSSECInstance(service.Service):
# regenerate zonelist.xml
ods_enforcerd = services.knownservices.ods_enforcerd
cmd = [paths.ODS_KSMUTIL, 'zonelist', 'export']
stdout, stderr, retcode = ipautil.run(cmd,
runas=ods_enforcerd.get_user_name())
result = ipautil.run(cmd,
runas=ods_enforcerd.get_user_name(),
capture_output=True)
with open(paths.OPENDNSSEC_ZONELIST_FILE, 'w') as zonelistf:
zonelistf.write(stdout)
zonelistf.write(result.output)
os.chown(paths.OPENDNSSEC_ZONELIST_FILE,
self.ods_uid, self.ods_gid)
os.chmod(paths.OPENDNSSEC_ZONELIST_FILE, 0o660)

View File

@ -93,10 +93,10 @@ def replica_conn_check(master_host, host_name, realm, check_ca,
if ca_cert_file:
args.extend(["--ca-cert-file", ca_cert_file])
(stdin, stderr, returncode) = ipautil.run(
result = ipautil.run(
args, raiseonerr=False, capture_output=False, nolog=nolog)
if returncode != 0:
if result.returncode != 0:
sys.exit("Connection check failed!" +
"\nPlease fix your network settings according to error messages above." +
"\nIf the check results are not valid it can be skipped with --skip-conncheck parameter.")

View File

@ -1164,7 +1164,7 @@ def uninstall(installer):
print("Shutting down all IPA services")
try:
(stdout, stderr, rc) = run([paths.IPACTL, "stop"], raiseonerr=False)
run([paths.IPACTL, "stop"], raiseonerr=False)
except Exception as e:
pass
@ -1249,12 +1249,11 @@ def uninstall(installer):
print("Removing IPA client configuration")
try:
(stdout, stderr, rc) = run([paths.IPA_CLIENT_INSTALL, "--on-master",
"--unattended", "--uninstall"],
raiseonerr=False)
if rc not in [0, 2]:
root_logger.debug("ipa-client-install returned %d" % rc)
raise RuntimeError(stdout)
result = run([paths.IPA_CLIENT_INSTALL, "--on-master",
"--unattended", "--uninstall"],
raiseonerr=False, capture_output=True)
if result.returncode not in [0, 2]:
raise RuntimeError(result.output)
except Exception as e:
rv = 1
print("Uninstall of client side components failed!")

View File

@ -85,8 +85,11 @@ class test_ipagetkeytab(cmdline_test):
"-p", "test/notfound.example.com",
"-k", self.keytabname,
]
(out, err, rc) = ipautil.run(new_args, stdin=None, raiseonerr=False)
result = ipautil.run(new_args, stdin=None, raiseonerr=False,
capture_error=True)
err = result.error_output
assert 'Failed to parse result: PrincipalName not found.\n' in err, err
rc = result.returncode
assert rc > 0, rc
def test_2_run(self):
@ -107,10 +110,11 @@ class test_ipagetkeytab(cmdline_test):
"-k", self.keytabname,
]
try:
(out, err, rc) = ipautil.run(new_args, None)
result = ipautil.run(new_args, None, capture_error=True)
expected = 'Keytab successfully retrieved and stored in: %s\n' % (
self.keytabname)
assert expected in err, 'Success message not in output:\n%s' % err
assert (expected in result.error_output,
'Success message not in output:\n%s' % result.error_output)
except ipautil.CalledProcessError as e:
assert (False)

View File

@ -1,3 +1,5 @@
# encoding: utf-8
# Authors:
# Jan Cholasta <jcholast@redhat.com>
#
@ -417,3 +419,62 @@ class TestTimeParser(object):
nose.tools.assert_equal(-30, time.tzinfo.minoffset)
offset = time.tzinfo.utcoffset(time.tzinfo.dst())
nose.tools.assert_equal(((24 - 9) * 60 * 60) - (30 * 60), offset.seconds)
def test_run():
result = ipautil.run(['echo', 'foo\x02bar'],
capture_output=True,
capture_error=True)
assert result.returncode == 0
assert result.output == 'foo\x02bar\n'
assert result.raw_output == b'foo\x02bar\n'
assert result.error_output == ''
assert result.raw_error_output == b''
def test_run_no_capture_output():
result = ipautil.run(['echo', 'foo\x02bar'])
assert result.returncode == 0
assert result.output is None
assert result.raw_output == b'foo\x02bar\n'
assert result.error_output is None
assert result.raw_error_output == b''
def test_run_bytes():
result = ipautil.run(['echo', b'\x01\x02'], capture_output=True)
assert result.returncode == 0
assert result.output == b'\x01\x02\n'
def test_run_decode():
result = ipautil.run(['echo', u'á'.encode('utf-8')],
encoding='utf-8', capture_output=True)
assert result.returncode == 0
if six.PY3:
assert result.output == 'á\n'
else:
assert result.output == 'á\n'.encode('utf-8')
def test_run_decode_bad():
if six.PY3:
with pytest.raises(UnicodeDecodeError):
ipautil.run(['echo', b'\xa0\xa1'],
capture_output=True,
encoding='utf-8')
else:
result = ipautil.run(['echo', '\xa0\xa1'],
capture_output=True,
encoding='utf-8')
assert result.returncode == 0
assert result.output == '\xa0\xa1\n'
def test_backcompat():
result = out, err, rc = ipautil.run(['echo', 'foo\x02bar'],
capture_output=True,
capture_error=True)
assert rc is result.returncode
assert out is result.output
assert err is result.error_output

View File

@ -28,8 +28,8 @@ import pytest
pytestmark = pytest.mark.tier0
TEST_KEY = 'ipa_test'
TEST_VALUE = 'abc123'
UPDATE_VALUE = '123abc'
TEST_VALUE = b'abc123'
UPDATE_VALUE = b'123abc'
SIZE_256 = 'abcdefgh' * 32
SIZE_512 = 'abcdefgh' * 64
@ -94,9 +94,10 @@ class test_keyring(object):
# Now update it 10 times
for i in range(10):
kernel_keyring.update_key(TEST_KEY, 'test %d' % i)
value = ('test %d' % i).encode('ascii')
kernel_keyring.update_key(TEST_KEY, value)
result = kernel_keyring.read_key(TEST_KEY)
assert(result == 'test %d' % i)
assert(result == value)
kernel_keyring.del_key(TEST_KEY)
@ -134,9 +135,9 @@ class test_keyring(object):
"""
Test 512-bytes of data
"""
kernel_keyring.add_key(TEST_KEY, SIZE_512)
kernel_keyring.add_key(TEST_KEY, SIZE_512.encode('ascii'))
result = kernel_keyring.read_key(TEST_KEY)
assert(result == SIZE_512)
assert(result == SIZE_512.encode('ascii'))
kernel_keyring.del_key(TEST_KEY)
@ -144,8 +145,8 @@ class test_keyring(object):
"""
Test 1k bytes of data
"""
kernel_keyring.add_key(TEST_KEY, SIZE_1024)
kernel_keyring.add_key(TEST_KEY, SIZE_1024.encode('ascii'))
result = kernel_keyring.read_key(TEST_KEY)
assert(result == SIZE_1024)
assert(result == SIZE_1024.encode('ascii'))
kernel_keyring.del_key(TEST_KEY)

View File

@ -552,7 +552,7 @@ class TestHostFalsePwdChange(XMLRPC_test):
]
try:
out, err, rc = ipautil.run(new_args)
ipautil.run(new_args)
except ipautil.CalledProcessError as e:
# join operation may fail on 'adding key into keytab', but
# the keytab is not necessary for further tests