mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
Move config directives handling code
Move config directives handling code: ipaserver.install.installutils -> ipapython.directivesetter Reviewed-By: Stanislav Laznicka <slaznick@redhat.com> Reviewed-By: Christian Heimes <cheimes@redhat.com>
This commit is contained in:
parent
8c0d7bb92f
commit
f47d86c719
@ -28,11 +28,12 @@ import shutil
|
||||
import traceback
|
||||
|
||||
from ipalib.install import certstore
|
||||
from ipapython import directivesetter
|
||||
from ipapython import ipautil
|
||||
from ipalib import api, errors
|
||||
from ipalib import x509
|
||||
from ipalib.install.kinit import kinit_keytab
|
||||
from ipaserver.install import certs, cainstance, installutils
|
||||
from ipaserver.install import certs, cainstance
|
||||
from ipaserver.plugins.ldap2 import ldap2
|
||||
from ipaplatform import services
|
||||
from ipaplatform.paths import paths
|
||||
@ -104,22 +105,22 @@ def _main():
|
||||
elif nickname == 'caSigningCert cert-pki-ca':
|
||||
# Update CS.cfg
|
||||
cfg_path = paths.CA_CS_CFG_PATH
|
||||
config = installutils.get_directive(
|
||||
config = directivesetter.get_directive(
|
||||
cfg_path, 'subsystem.select', '=')
|
||||
if config == 'New':
|
||||
syslog.syslog(syslog.LOG_NOTICE, "Updating CS.cfg")
|
||||
if cert.is_self_signed():
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
cfg_path, 'hierarchy.select', 'Root',
|
||||
quotes=False, separator='=')
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
cfg_path, 'subsystem.count', '1',
|
||||
quotes=False, separator='=')
|
||||
else:
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
cfg_path, 'hierarchy.select', 'Subordinate',
|
||||
quotes=False, separator='=')
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
cfg_path, 'subsystem.count', '0',
|
||||
quotes=False, separator='=')
|
||||
else:
|
||||
|
@ -40,6 +40,7 @@ from subprocess import CalledProcessError
|
||||
from pyasn1.error import PyAsn1Error
|
||||
from six.moves import urllib
|
||||
|
||||
from ipapython import directivesetter
|
||||
from ipapython import ipautil
|
||||
import ipapython.errors
|
||||
|
||||
@ -47,7 +48,6 @@ from ipaplatform.constants import constants
|
||||
from ipaplatform.paths import paths
|
||||
from ipaplatform.redhat.authconfig import get_auth_tool
|
||||
from ipaplatform.base.tasks import BaseTaskNamespace
|
||||
from ipaserver.install import installutils
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -567,12 +567,12 @@ class RedHatTaskNamespace(BaseTaskNamespace):
|
||||
return False
|
||||
|
||||
def setup_httpd_logging(self):
|
||||
installutils.set_directive(paths.HTTPD_SSL_CONF,
|
||||
'ErrorLog',
|
||||
'logs/error_log', False)
|
||||
installutils.set_directive(paths.HTTPD_SSL_CONF,
|
||||
'TransferLog',
|
||||
'logs/access_log', False)
|
||||
directivesetter.set_directive(paths.HTTPD_SSL_CONF,
|
||||
'ErrorLog',
|
||||
'logs/error_log', False)
|
||||
directivesetter.set_directive(paths.HTTPD_SSL_CONF,
|
||||
'TransferLog',
|
||||
'logs/access_log', False)
|
||||
|
||||
|
||||
tasks = RedHatTaskNamespace()
|
||||
|
234
ipapython/directivesetter.py
Normal file
234
ipapython/directivesetter.py
Normal file
@ -0,0 +1,234 @@
|
||||
#
|
||||
# Copyright (C) 2018 FreeIPA Contributors see COPYING for license
|
||||
#
|
||||
|
||||
import six
|
||||
|
||||
import io
|
||||
import os
|
||||
import re
|
||||
import stat
|
||||
import tempfile
|
||||
|
||||
from ipapython.ipautil import unescape_seq, escape_seq
|
||||
|
||||
_SENTINEL = object()
|
||||
|
||||
|
||||
class DirectiveSetter(object):
|
||||
"""Safe directive setter
|
||||
|
||||
with DirectiveSetter('/path/to/conf') as ds:
|
||||
ds.set(key, value)
|
||||
"""
|
||||
def __init__(self, filename, quotes=True, separator=' ', comment='#'):
|
||||
self.filename = os.path.abspath(filename)
|
||||
self.quotes = quotes
|
||||
self.separator = separator
|
||||
self.comment = comment
|
||||
self.lines = None
|
||||
self.stat = None
|
||||
|
||||
def __enter__(self):
|
||||
with io.open(self.filename) as f:
|
||||
self.stat = os.fstat(f.fileno())
|
||||
self.lines = list(f)
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if exc_type is not None:
|
||||
# something went wrong, reset
|
||||
self.lines = None
|
||||
self.stat = None
|
||||
return
|
||||
|
||||
directory, prefix = os.path.split(self.filename)
|
||||
# use tempfile in same directory to have atomic rename
|
||||
fd, name = tempfile.mkstemp(prefix=prefix, dir=directory, text=True)
|
||||
with io.open(fd, mode='w', closefd=True) as f:
|
||||
for line in self.lines:
|
||||
if not isinstance(line, six.text_type):
|
||||
line = line.decode('utf-8')
|
||||
f.write(line)
|
||||
self.lines = None
|
||||
os.fchmod(f.fileno(), stat.S_IMODE(self.stat.st_mode))
|
||||
os.fchown(f.fileno(), self.stat.st_uid, self.stat.st_gid)
|
||||
self.stat = None
|
||||
# flush and sync tempfile inode
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
|
||||
# rename file and sync directory inode
|
||||
os.rename(name, self.filename)
|
||||
dirfd = os.open(directory, os.O_RDONLY | os.O_DIRECTORY)
|
||||
try:
|
||||
os.fsync(dirfd)
|
||||
finally:
|
||||
os.close(dirfd)
|
||||
|
||||
def set(self, directive, value, quotes=_SENTINEL, separator=_SENTINEL,
|
||||
comment=_SENTINEL):
|
||||
"""Set a single directive
|
||||
"""
|
||||
if quotes is _SENTINEL:
|
||||
quotes = self.quotes
|
||||
if separator is _SENTINEL:
|
||||
separator = self.separator
|
||||
if comment is _SENTINEL:
|
||||
comment = self.comment
|
||||
# materialize lines
|
||||
# set_directive_lines() modify item, shrink or enlage line count
|
||||
self.lines = list(set_directive_lines(
|
||||
quotes, separator, directive, value, self.lines, comment
|
||||
))
|
||||
|
||||
def setitems(self, items):
|
||||
"""Set multiple directives from a dict or list with key/value pairs
|
||||
"""
|
||||
if isinstance(items, dict):
|
||||
# dict-like, use sorted for stable order
|
||||
items = sorted(items.items())
|
||||
for k, v in items:
|
||||
self.set(k, v)
|
||||
|
||||
|
||||
def set_directive(filename, directive, value, quotes=True, separator=' ',
|
||||
comment='#'):
|
||||
"""Set a name/value pair directive in a configuration file.
|
||||
|
||||
A value of None means to drop the directive.
|
||||
|
||||
Does not tolerate (or put) spaces around the separator.
|
||||
|
||||
:param filename: input filename
|
||||
:param directive: directive name
|
||||
:param value: value of the directive
|
||||
:param quotes: whether to quote `value` in double quotes. If true, then
|
||||
any existing double quotes are first escaped to avoid
|
||||
unparseable directives.
|
||||
:param separator: character serving as separator between directive and
|
||||
value. Correct value required even when dropping a directive.
|
||||
:param comment: comment character for the file to keep new values near
|
||||
their commented-out counterpart
|
||||
"""
|
||||
st = os.stat(filename)
|
||||
with open(filename, 'r') as f:
|
||||
lines = list(f) # read the whole file
|
||||
# materialize new list
|
||||
new_lines = list(set_directive_lines(
|
||||
quotes, separator, directive, value, lines, comment
|
||||
))
|
||||
with open(filename, 'w') as f:
|
||||
# don't construct the whole string; write line-wise
|
||||
for line in new_lines:
|
||||
f.write(line)
|
||||
os.chown(filename, st.st_uid, st.st_gid) # reset perms
|
||||
|
||||
|
||||
def set_directive_lines(quotes, separator, k, v, lines, comment):
|
||||
"""Set a name/value pair in a configuration (iterable of lines).
|
||||
|
||||
Replaces the value of the key if found, otherwise adds it at
|
||||
end. If value is ``None``, remove the key if found.
|
||||
|
||||
Takes an iterable of lines (with trailing newline).
|
||||
Yields lines (with trailing newline).
|
||||
|
||||
"""
|
||||
new_line = ""
|
||||
if v is not None:
|
||||
v_quoted = quote_directive_value(v, '"') if quotes else v
|
||||
new_line = ''.join([k, separator, v_quoted, '\n'])
|
||||
|
||||
# Special case: consider space as "white space" so tabs are allowed
|
||||
if separator == ' ':
|
||||
separator = '[ \t]+'
|
||||
|
||||
found = False
|
||||
addnext = False # add on next line, found a comment
|
||||
matcher = re.compile(r'\s*{}\s*{}'.format(re.escape(k), separator))
|
||||
cmatcher = re.compile(r'\s*{}\s*{}\s*{}'.format(comment,
|
||||
re.escape(k), separator))
|
||||
for line in lines:
|
||||
if matcher.match(line):
|
||||
found = True
|
||||
addnext = False
|
||||
if v is not None:
|
||||
yield new_line
|
||||
elif addnext:
|
||||
found = True
|
||||
addnext = False
|
||||
yield new_line
|
||||
yield line
|
||||
elif cmatcher.match(line):
|
||||
addnext = True
|
||||
yield line
|
||||
else:
|
||||
yield line
|
||||
|
||||
if not found and v is not None:
|
||||
yield new_line
|
||||
|
||||
|
||||
def get_directive(filename, directive, separator=' '):
|
||||
"""
|
||||
A rather inefficient way to get a configuration directive.
|
||||
|
||||
:param filename: input filename
|
||||
:param directive: directive name
|
||||
:param separator: separator between directive and value
|
||||
|
||||
:returns: The (unquoted) value if the directive was found, None otherwise
|
||||
"""
|
||||
# Special case: consider space as "white space" so tabs are allowed
|
||||
if separator == ' ':
|
||||
separator = '[ \t]+'
|
||||
|
||||
result = None
|
||||
with open(filename, "r") as fd:
|
||||
for line in fd:
|
||||
if line.lstrip().startswith(directive):
|
||||
line = line.strip()
|
||||
|
||||
match = re.match(
|
||||
r'{}\s*{}\s*(.*)'.format(directive, separator), line)
|
||||
if match:
|
||||
value = match.group(1)
|
||||
else:
|
||||
raise ValueError("Malformed directive: {}".format(line))
|
||||
|
||||
result = unquote_directive_value(value.strip(), '"')
|
||||
result = result.strip(' ')
|
||||
break
|
||||
return result
|
||||
|
||||
|
||||
def quote_directive_value(value, quote_char):
|
||||
"""Quote a directive value
|
||||
:param value: string to quote
|
||||
:param quote_char: character which is used for quoting. All prior
|
||||
occurences will be escaped before quoting to avoid unparseable value.
|
||||
:returns: processed value
|
||||
"""
|
||||
if value.startswith(quote_char) and value.endswith(quote_char):
|
||||
return value
|
||||
|
||||
return "{quote}{value}{quote}".format(
|
||||
quote=quote_char,
|
||||
value="".join(escape_seq(quote_char, value))
|
||||
)
|
||||
|
||||
|
||||
def unquote_directive_value(value, quote_char):
|
||||
"""Unquote a directive value
|
||||
:param value: string to unquote
|
||||
:param quote_char: character to strip. All escaped occurences of
|
||||
`quote_char` will be uncescaped during processing
|
||||
:returns: processed value
|
||||
"""
|
||||
unescaped_value = "".join(unescape_seq(quote_char, value))
|
||||
if (unescaped_value.startswith(quote_char) and
|
||||
unescaped_value.endswith(quote_char)):
|
||||
return unescaped_value[1:-1]
|
||||
|
||||
return unescaped_value
|
@ -55,6 +55,7 @@ from ipaplatform import services
|
||||
from ipaplatform.paths import paths
|
||||
from ipaplatform.tasks import tasks
|
||||
|
||||
from ipapython import directivesetter
|
||||
from ipapython import dogtag
|
||||
from ipapython import ipautil
|
||||
from ipapython import ipaldap
|
||||
@ -261,7 +262,7 @@ def is_step_one_done():
|
||||
path = paths.CA_CS_CFG_PATH
|
||||
if not os.path.exists(path):
|
||||
return False
|
||||
test = installutils.get_directive(path, 'preop.ca.type', '=')
|
||||
test = directivesetter.get_directive(path, 'preop.ca.type', '=')
|
||||
if test == "otherca":
|
||||
return True
|
||||
return False
|
||||
@ -723,7 +724,7 @@ class CAInstance(DogtagInstance):
|
||||
os.chown(self.config, pent.pw_uid, pent.pw_gid)
|
||||
|
||||
def enable_pkix(self):
|
||||
installutils.set_directive(paths.SYSCONFIG_PKI_TOMCAT,
|
||||
directivesetter.set_directive(paths.SYSCONFIG_PKI_TOMCAT,
|
||||
'NSS_ENABLE_PKIX_VERIFY', '1',
|
||||
quotes=False, separator='=')
|
||||
|
||||
@ -964,9 +965,8 @@ class CAInstance(DogtagInstance):
|
||||
|
||||
https://access.redhat.com/knowledge/docs/en-US/Red_Hat_Certificate_System/8.0/html/Admin_Guide/Setting_up_Publishing.html
|
||||
"""
|
||||
with installutils.DirectiveSetter(self.config,
|
||||
quotes=False, separator='=') as ds:
|
||||
|
||||
with directivesetter.DirectiveSetter(
|
||||
self.config, quotes=False, separator='=') as ds:
|
||||
# Enable file publishing, disable LDAP
|
||||
ds.set('ca.publish.enable', 'true')
|
||||
ds.set('ca.publish.ldappublish.enable', 'false')
|
||||
@ -1124,7 +1124,7 @@ class CAInstance(DogtagInstance):
|
||||
"""
|
||||
# Check the default validity period of the audit signing cert
|
||||
# and set it to 2 years if it is 6 months.
|
||||
cert_range = installutils.get_directive(
|
||||
cert_range = directivesetter.get_directive(
|
||||
paths.CASIGNEDLOGCERT_CFG,
|
||||
'policyset.caLogSigningSet.2.default.params.range',
|
||||
separator='='
|
||||
@ -1132,14 +1132,14 @@ class CAInstance(DogtagInstance):
|
||||
logger.debug(
|
||||
'caSignedLogCert.cfg profile validity range is %s', cert_range)
|
||||
if cert_range == "180":
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
paths.CASIGNEDLOGCERT_CFG,
|
||||
'policyset.caLogSigningSet.2.default.params.range',
|
||||
'720',
|
||||
quotes=False,
|
||||
separator='='
|
||||
)
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
paths.CASIGNEDLOGCERT_CFG,
|
||||
'policyset.caLogSigningSet.2.constraint.params.range',
|
||||
'720',
|
||||
@ -1284,7 +1284,7 @@ class CAInstance(DogtagInstance):
|
||||
'/usr/libexec/ipa/ipa-pki-retrieve-key'),
|
||||
]
|
||||
for k, v in directives:
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
self.config, k, v, quotes=False, separator='=')
|
||||
|
||||
sysupgrade.set_upgrade_state('dogtag', 'setup_lwca_key_retieval', True)
|
||||
|
@ -19,6 +19,7 @@ from ipapython.dnsutil import DNSName
|
||||
from ipaserver.install import service
|
||||
from ipaserver.install import installutils
|
||||
from ipapython.dn import DN
|
||||
from ipapython import directivesetter
|
||||
from ipapython import ipautil
|
||||
from ipaplatform.constants import constants
|
||||
from ipaplatform.paths import paths
|
||||
@ -199,9 +200,9 @@ class DNSKeySyncInstance(service.Service):
|
||||
# setting up named and ipa-dnskeysyncd to use our softhsm2 config
|
||||
for sysconfig in [paths.SYSCONFIG_NAMED,
|
||||
paths.SYSCONFIG_IPA_DNSKEYSYNCD]:
|
||||
installutils.set_directive(sysconfig, 'SOFTHSM2_CONF',
|
||||
paths.DNSSEC_SOFTHSM2_CONF,
|
||||
quotes=False, separator='=')
|
||||
directivesetter.set_directive(sysconfig, 'SOFTHSM2_CONF',
|
||||
paths.DNSSEC_SOFTHSM2_CONF,
|
||||
quotes=False, separator='=')
|
||||
|
||||
if (token_dir_exists and os.path.exists(paths.DNSSEC_SOFTHSM_PIN) and
|
||||
os.path.exists(paths.DNSSEC_SOFTHSM_PIN_SO)):
|
||||
|
@ -37,11 +37,11 @@ from ipalib.constants import CA_DBUS_TIMEOUT
|
||||
from ipaplatform import services
|
||||
from ipaplatform.constants import constants
|
||||
from ipaplatform.paths import paths
|
||||
from ipapython import directivesetter
|
||||
from ipapython import ipaldap
|
||||
from ipapython import ipautil
|
||||
from ipapython.dn import DN
|
||||
from ipaserver.install import service
|
||||
from ipaserver.install import installutils
|
||||
from ipaserver.install import replication
|
||||
from ipaserver.install.installutils import stopped_service
|
||||
|
||||
@ -182,41 +182,41 @@ class DogtagInstance(service.Service):
|
||||
"""
|
||||
|
||||
with stopped_service('pki-tomcatd', 'pki-tomcat'):
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
self.config,
|
||||
'authz.instance.DirAclAuthz.ldap.ldapauth.authtype',
|
||||
'SslClientAuth', quotes=False, separator='=')
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
self.config,
|
||||
'authz.instance.DirAclAuthz.ldap.ldapauth.clientCertNickname',
|
||||
'subsystemCert cert-pki-ca', quotes=False, separator='=')
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
self.config,
|
||||
'authz.instance.DirAclAuthz.ldap.ldapconn.port', '636',
|
||||
quotes=False, separator='=')
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
self.config,
|
||||
'authz.instance.DirAclAuthz.ldap.ldapconn.secureConn',
|
||||
'true', quotes=False, separator='=')
|
||||
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
self.config,
|
||||
'internaldb.ldapauth.authtype',
|
||||
'SslClientAuth', quotes=False, separator='=')
|
||||
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
self.config,
|
||||
'internaldb.ldapauth.clientCertNickname',
|
||||
'subsystemCert cert-pki-ca', quotes=False, separator='=')
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
self.config,
|
||||
'internaldb.ldapconn.port', '636', quotes=False, separator='=')
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
self.config,
|
||||
'internaldb.ldapconn.secureConn', 'true', quotes=False,
|
||||
separator='=')
|
||||
# Remove internaldb password as is not needed anymore
|
||||
installutils.set_directive(paths.PKI_TOMCAT_PASSWORD_CONF,
|
||||
directivesetter.set_directive(paths.PKI_TOMCAT_PASSWORD_CONF,
|
||||
'internaldb', None, separator='=')
|
||||
|
||||
def uninstall(self):
|
||||
@ -353,7 +353,7 @@ class DogtagInstance(service.Service):
|
||||
"""
|
||||
|
||||
with stopped_service('pki-tomcatd', 'pki-tomcat'):
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
self.config,
|
||||
directive,
|
||||
# the cert must be only the base64 string without headers
|
||||
|
@ -37,6 +37,7 @@ from ipaserver.install import replication
|
||||
from ipaserver.install import service
|
||||
from ipaserver.install import certs
|
||||
from ipaserver.install import installutils
|
||||
from ipapython import directivesetter
|
||||
from ipapython import dogtag
|
||||
from ipapython import ipautil
|
||||
from ipapython.dn import DN
|
||||
@ -208,8 +209,10 @@ class HTTPInstance(service.Service):
|
||||
services.knownservices.gssproxy.restart()
|
||||
|
||||
def get_mod_nss_nickname(self):
|
||||
cert = installutils.get_directive(paths.HTTPD_NSS_CONF, 'NSSNickname')
|
||||
nickname = installutils.unquote_directive_value(cert, quote_char="'")
|
||||
cert = directivesetter.get_directive(paths.HTTPD_NSS_CONF,
|
||||
'NSSNickname')
|
||||
nickname = directivesetter.unquote_directive_value(cert,
|
||||
quote_char="'")
|
||||
return nickname
|
||||
|
||||
def backup_ssl_conf(self):
|
||||
@ -231,7 +234,7 @@ class HTTPInstance(service.Service):
|
||||
installutils.remove_file(paths.HTTPD_NSS_CONF)
|
||||
|
||||
def set_mod_ssl_protocol(self):
|
||||
installutils.set_directive(paths.HTTPD_SSL_CONF,
|
||||
directivesetter.set_directive(paths.HTTPD_SSL_CONF,
|
||||
'SSLProtocol',
|
||||
'+TLSv1 +TLSv1.1 +TLSv1.2', False)
|
||||
|
||||
@ -400,22 +403,22 @@ class HTTPInstance(service.Service):
|
||||
|
||||
def configure_mod_ssl_certs(self):
|
||||
"""Configure the mod_ssl certificate directives"""
|
||||
installutils.set_directive(paths.HTTPD_SSL_SITE_CONF,
|
||||
directivesetter.set_directive(paths.HTTPD_SSL_SITE_CONF,
|
||||
'SSLCertificateFile',
|
||||
paths.HTTPD_CERT_FILE, False)
|
||||
installutils.set_directive(paths.HTTPD_SSL_SITE_CONF,
|
||||
directivesetter.set_directive(paths.HTTPD_SSL_SITE_CONF,
|
||||
'SSLCertificateKeyFile',
|
||||
paths.HTTPD_KEY_FILE, False)
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
paths.HTTPD_SSL_CONF,
|
||||
'SSLPassPhraseDialog',
|
||||
'exec:{passread}'.format(passread=paths.IPA_HTTPD_PASSWD_READER),
|
||||
False)
|
||||
installutils.set_directive(paths.HTTPD_SSL_SITE_CONF,
|
||||
directivesetter.set_directive(paths.HTTPD_SSL_SITE_CONF,
|
||||
'SSLCACertificateFile',
|
||||
paths.IPA_CA_CRT, False)
|
||||
# set SSLVerifyDepth for external CA installations
|
||||
installutils.set_directive(paths.HTTPD_SSL_CONF,
|
||||
directivesetter.set_directive(paths.HTTPD_SSL_CONF,
|
||||
'SSLVerifyDepth',
|
||||
MOD_SSL_VERIFY_DEPTH,
|
||||
quotes=False)
|
||||
|
@ -25,7 +25,6 @@ import logging
|
||||
import socket
|
||||
import getpass
|
||||
import gssapi
|
||||
import io
|
||||
import ldif
|
||||
import os
|
||||
import re
|
||||
@ -33,7 +32,6 @@ import fileinput
|
||||
import sys
|
||||
import tempfile
|
||||
import shutil
|
||||
import stat
|
||||
import traceback
|
||||
import textwrap
|
||||
from contextlib import contextmanager
|
||||
@ -406,229 +404,6 @@ def update_file(filename, orig, subst):
|
||||
return 1
|
||||
|
||||
|
||||
def quote_directive_value(value, quote_char):
|
||||
"""Quote a directive value
|
||||
:param value: string to quote
|
||||
:param quote_char: character which is used for quoting. All prior
|
||||
occurences will be escaped before quoting to avoid unparseable value.
|
||||
:returns: processed value
|
||||
"""
|
||||
if value.startswith(quote_char) and value.endswith(quote_char):
|
||||
return value
|
||||
|
||||
return "{quote}{value}{quote}".format(
|
||||
quote=quote_char,
|
||||
value="".join(ipautil.escape_seq(quote_char, value))
|
||||
)
|
||||
|
||||
|
||||
def unquote_directive_value(value, quote_char):
|
||||
"""Unquote a directive value
|
||||
:param value: string to unquote
|
||||
:param quote_char: character to strip. All escaped occurences of
|
||||
`quote_char` will be uncescaped during processing
|
||||
:returns: processed value
|
||||
"""
|
||||
unescaped_value = "".join(ipautil.unescape_seq(quote_char, value))
|
||||
if (unescaped_value.startswith(quote_char) and
|
||||
unescaped_value.endswith(quote_char)):
|
||||
return unescaped_value[1:-1]
|
||||
|
||||
return unescaped_value
|
||||
|
||||
|
||||
_SENTINEL = object()
|
||||
|
||||
|
||||
class DirectiveSetter(object):
|
||||
"""Safe directive setter
|
||||
|
||||
with DirectiveSetter('/path/to/conf') as ds:
|
||||
ds.set(key, value)
|
||||
"""
|
||||
def __init__(self, filename, quotes=True, separator=' ', comment='#'):
|
||||
self.filename = os.path.abspath(filename)
|
||||
self.quotes = quotes
|
||||
self.separator = separator
|
||||
self.comment = comment
|
||||
self.lines = None
|
||||
self.stat = None
|
||||
|
||||
def __enter__(self):
|
||||
with io.open(self.filename) as f:
|
||||
self.stat = os.fstat(f.fileno())
|
||||
self.lines = list(f)
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if exc_type is not None:
|
||||
# something went wrong, reset
|
||||
self.lines = None
|
||||
self.stat = None
|
||||
return
|
||||
|
||||
directory, prefix = os.path.split(self.filename)
|
||||
# use tempfile in same directory to have atomic rename
|
||||
fd, name = tempfile.mkstemp(prefix=prefix, dir=directory, text=True)
|
||||
with io.open(fd, mode='w', closefd=True) as f:
|
||||
for line in self.lines:
|
||||
if not isinstance(line, six.text_type):
|
||||
line = line.decode('utf-8')
|
||||
f.write(line)
|
||||
self.lines = None
|
||||
os.fchmod(f.fileno(), stat.S_IMODE(self.stat.st_mode))
|
||||
os.fchown(f.fileno(), self.stat.st_uid, self.stat.st_gid)
|
||||
self.stat = None
|
||||
# flush and sync tempfile inode
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
|
||||
# rename file and sync directory inode
|
||||
os.rename(name, self.filename)
|
||||
dirfd = os.open(directory, os.O_RDONLY | os.O_DIRECTORY)
|
||||
try:
|
||||
os.fsync(dirfd)
|
||||
finally:
|
||||
os.close(dirfd)
|
||||
|
||||
def set(self, directive, value, quotes=_SENTINEL, separator=_SENTINEL,
|
||||
comment=_SENTINEL):
|
||||
"""Set a single directive
|
||||
"""
|
||||
if quotes is _SENTINEL:
|
||||
quotes = self.quotes
|
||||
if separator is _SENTINEL:
|
||||
separator = self.separator
|
||||
if comment is _SENTINEL:
|
||||
comment = self.comment
|
||||
# materialize lines
|
||||
# set_directive_lines() modify item, shrink or enlage line count
|
||||
self.lines = list(set_directive_lines(
|
||||
quotes, separator, directive, value, self.lines, comment
|
||||
))
|
||||
|
||||
def setitems(self, items):
|
||||
"""Set multiple directives from a dict or list with key/value pairs
|
||||
"""
|
||||
if isinstance(items, dict):
|
||||
# dict-like, use sorted for stable order
|
||||
items = sorted(items.items())
|
||||
for k, v in items:
|
||||
self.set(k, v)
|
||||
|
||||
|
||||
def set_directive(filename, directive, value, quotes=True, separator=' ',
|
||||
comment='#'):
|
||||
"""Set a name/value pair directive in a configuration file.
|
||||
|
||||
A value of None means to drop the directive.
|
||||
|
||||
Does not tolerate (or put) spaces around the separator.
|
||||
|
||||
:param filename: input filename
|
||||
:param directive: directive name
|
||||
:param value: value of the directive
|
||||
:param quotes: whether to quote `value` in double quotes. If true, then
|
||||
any existing double quotes are first escaped to avoid
|
||||
unparseable directives.
|
||||
:param separator: character serving as separator between directive and
|
||||
value. Correct value required even when dropping a directive.
|
||||
:param comment: comment character for the file to keep new values near
|
||||
their commented-out counterpart
|
||||
"""
|
||||
st = os.stat(filename)
|
||||
with open(filename, 'r') as f:
|
||||
lines = list(f) # read the whole file
|
||||
# materialize new list
|
||||
new_lines = list(set_directive_lines(
|
||||
quotes, separator, directive, value, lines, comment
|
||||
))
|
||||
with open(filename, 'w') as f:
|
||||
# don't construct the whole string; write line-wise
|
||||
for line in new_lines:
|
||||
f.write(line)
|
||||
os.chown(filename, st.st_uid, st.st_gid) # reset perms
|
||||
|
||||
|
||||
def set_directive_lines(quotes, separator, k, v, lines, comment):
|
||||
"""Set a name/value pair in a configuration (iterable of lines).
|
||||
|
||||
Replaces the value of the key if found, otherwise adds it at
|
||||
end. If value is ``None``, remove the key if found.
|
||||
|
||||
Takes an iterable of lines (with trailing newline).
|
||||
Yields lines (with trailing newline).
|
||||
|
||||
"""
|
||||
new_line = ""
|
||||
if v is not None:
|
||||
v_quoted = quote_directive_value(v, '"') if quotes else v
|
||||
new_line = ''.join([k, separator, v_quoted, '\n'])
|
||||
|
||||
# Special case: consider space as "white space" so tabs are allowed
|
||||
if separator == ' ':
|
||||
separator = '[ \t]+'
|
||||
|
||||
found = False
|
||||
addnext = False # add on next line, found a comment
|
||||
matcher = re.compile(r'\s*{}\s*{}'.format(re.escape(k), separator))
|
||||
cmatcher = re.compile(r'\s*{}\s*{}\s*{}'.format(comment,
|
||||
re.escape(k), separator))
|
||||
for line in lines:
|
||||
if matcher.match(line):
|
||||
found = True
|
||||
addnext = False
|
||||
if v is not None:
|
||||
yield new_line
|
||||
elif addnext:
|
||||
found = True
|
||||
addnext = False
|
||||
yield new_line
|
||||
yield line
|
||||
elif cmatcher.match(line):
|
||||
addnext = True
|
||||
yield line
|
||||
else:
|
||||
yield line
|
||||
|
||||
if not found and v is not None:
|
||||
yield new_line
|
||||
|
||||
|
||||
def get_directive(filename, directive, separator=' '):
|
||||
"""
|
||||
A rather inefficient way to get a configuration directive.
|
||||
|
||||
:param filename: input filename
|
||||
:param directive: directive name
|
||||
:param separator: separator between directive and value
|
||||
|
||||
:returns: The (unquoted) value if the directive was found, None otherwise
|
||||
"""
|
||||
# Special case: consider space as "white space" so tabs are allowed
|
||||
if separator == ' ':
|
||||
separator = '[ \t]+'
|
||||
|
||||
fd = open(filename, "r")
|
||||
for line in fd:
|
||||
if line.lstrip().startswith(directive):
|
||||
line = line.strip()
|
||||
|
||||
match = re.match(r'{}\s*{}\s*(.*)'.format(directive, separator),
|
||||
line)
|
||||
if match:
|
||||
value = match.group(1)
|
||||
else:
|
||||
raise ValueError("Malformed directive: {}".format(line))
|
||||
|
||||
result = unquote_directive_value(value.strip(), '"')
|
||||
result = result.strip(' ')
|
||||
fd.close()
|
||||
return result
|
||||
fd.close()
|
||||
return None
|
||||
|
||||
|
||||
def kadmin(command):
|
||||
return ipautil.run(
|
||||
[
|
||||
|
@ -34,6 +34,7 @@ from six.moves.configparser import RawConfigParser
|
||||
from ipalib import api
|
||||
from ipalib import x509
|
||||
from ipaplatform.paths import paths
|
||||
from ipapython import directivesetter
|
||||
from ipapython import ipautil
|
||||
from ipapython.dn import DN
|
||||
from ipaserver.install import cainstance
|
||||
@ -362,7 +363,7 @@ class KRAInstance(DogtagInstance):
|
||||
write operations.
|
||||
"""
|
||||
with installutils.stopped_service('pki-tomcatd', 'pki-tomcat'):
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
self.config,
|
||||
'kra.ephemeralRequests',
|
||||
'true', quotes=False, separator='=')
|
||||
|
@ -14,6 +14,7 @@ import ldap
|
||||
from ipaserver.install import service
|
||||
from ipaserver.install import installutils
|
||||
from ipapython.dn import DN
|
||||
from ipapython import directivesetter
|
||||
from ipapython import ipautil
|
||||
from ipaplatform.constants import constants
|
||||
from ipaplatform.paths import paths
|
||||
@ -79,7 +80,7 @@ class ODSExporterInstance(service.Service):
|
||||
logger.error("DNSKeyExporter service already exists")
|
||||
|
||||
def __setup_key_exporter(self):
|
||||
installutils.set_directive(paths.SYSCONFIG_IPA_ODS_EXPORTER,
|
||||
directivesetter.set_directive(paths.SYSCONFIG_IPA_ODS_EXPORTER,
|
||||
'SOFTHSM2_CONF',
|
||||
paths.DNSSEC_SOFTHSM2_CONF,
|
||||
quotes=False, separator='=')
|
||||
|
@ -14,8 +14,8 @@ from subprocess import CalledProcessError
|
||||
|
||||
from ipalib.install import sysrestore
|
||||
from ipaserver.install import service
|
||||
from ipaserver.install import installutils
|
||||
from ipapython.dn import DN
|
||||
from ipapython import directivesetter
|
||||
from ipapython import ipautil
|
||||
from ipaplatform import services
|
||||
from ipaplatform.constants import constants
|
||||
@ -199,10 +199,10 @@ class OpenDNSSECInstance(service.Service):
|
||||
if not self.fstore.has_file(paths.SYSCONFIG_ODS):
|
||||
self.fstore.backup_file(paths.SYSCONFIG_ODS)
|
||||
|
||||
installutils.set_directive(paths.SYSCONFIG_ODS,
|
||||
'SOFTHSM2_CONF',
|
||||
paths.DNSSEC_SOFTHSM2_CONF,
|
||||
quotes=False, separator='=')
|
||||
directivesetter.set_directive(paths.SYSCONFIG_ODS,
|
||||
'SOFTHSM2_CONF',
|
||||
paths.DNSSEC_SOFTHSM2_CONF,
|
||||
quotes=False, separator='=')
|
||||
|
||||
def __setup_ownership_file_modes(self):
|
||||
assert self.ods_uid is not None
|
||||
@ -302,10 +302,10 @@ class OpenDNSSECInstance(service.Service):
|
||||
|
||||
def __setup_dnskeysyncd(self):
|
||||
# set up dnskeysyncd this is DNSSEC master
|
||||
installutils.set_directive(paths.SYSCONFIG_IPA_DNSKEYSYNCD,
|
||||
'ISMASTER',
|
||||
'1',
|
||||
quotes=False, separator='=')
|
||||
directivesetter.set_directive(paths.SYSCONFIG_IPA_DNSKEYSYNCD,
|
||||
'ISMASTER',
|
||||
'1',
|
||||
quotes=False, separator='=')
|
||||
|
||||
def __start(self):
|
||||
self.restart() # needed to reload conf files
|
||||
@ -333,9 +333,9 @@ class OpenDNSSECInstance(service.Service):
|
||||
|
||||
# remove directive from ipa-dnskeysyncd, this server is not DNSSEC
|
||||
# master anymore
|
||||
installutils.set_directive(paths.SYSCONFIG_IPA_DNSKEYSYNCD,
|
||||
'ISMASTER', None,
|
||||
quotes=False, separator='=')
|
||||
directivesetter.set_directive(paths.SYSCONFIG_IPA_DNSKEYSYNCD,
|
||||
'ISMASTER', None,
|
||||
quotes=False, separator='=')
|
||||
|
||||
restore_list = [paths.OPENDNSSEC_CONF_FILE, paths.OPENDNSSEC_KASP_FILE,
|
||||
paths.SYSCONFIG_ODS, paths.OPENDNSSEC_ZONELIST_FILE]
|
||||
|
@ -21,13 +21,14 @@ from __future__ import absolute_import
|
||||
|
||||
import logging
|
||||
|
||||
from ipaserver.install import installutils, cainstance
|
||||
from ipaserver.install import cainstance
|
||||
from ipalib import errors
|
||||
from ipalib import Updater
|
||||
from ipalib.install import certmonger
|
||||
from ipalib.plugable import Registry
|
||||
from ipaplatform.paths import paths
|
||||
from ipapython.dn import DN
|
||||
from ipapython import directivesetter
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -108,7 +109,7 @@ class update_ca_renewal_master(Updater):
|
||||
else:
|
||||
logger.debug("certmonger request for RA cert not found")
|
||||
|
||||
config = installutils.get_directive(
|
||||
config = directivesetter.get_directive(
|
||||
paths.CA_CS_CFG_PATH, 'subsystem.select', '=')
|
||||
|
||||
if config == 'New':
|
||||
|
@ -25,7 +25,7 @@ from ipaclient.install.client import sssd_enable_service
|
||||
from ipaplatform import services
|
||||
from ipaplatform.tasks import tasks
|
||||
from ipapython import ipautil, version
|
||||
from ipapython import dnsutil
|
||||
from ipapython import dnsutil, directivesetter
|
||||
from ipapython.dn import DN
|
||||
from ipaplatform.constants import constants
|
||||
from ipaplatform.paths import paths
|
||||
@ -352,7 +352,7 @@ def ca_enable_ldap_profile_subsystem(ca):
|
||||
try:
|
||||
for i in range(15):
|
||||
directive = "subsystem.{}.class".format(i)
|
||||
value = installutils.get_directive(
|
||||
value = directivesetter.get_directive(
|
||||
paths.CA_CS_CFG_PATH,
|
||||
directive,
|
||||
separator='=')
|
||||
@ -365,7 +365,7 @@ def ca_enable_ldap_profile_subsystem(ca):
|
||||
return False
|
||||
|
||||
if needs_update:
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
paths.CA_CS_CFG_PATH,
|
||||
directive,
|
||||
'com.netscape.cmscore.profile.LDAPProfileSubsystem',
|
||||
@ -407,14 +407,14 @@ def ca_add_default_ocsp_uri(ca):
|
||||
logger.info('CA is not configured')
|
||||
return False
|
||||
|
||||
value = installutils.get_directive(
|
||||
value = directivesetter.get_directive(
|
||||
paths.CA_CS_CFG_PATH,
|
||||
'ca.defaultOcspUri',
|
||||
separator='=')
|
||||
if value:
|
||||
return False # already set; restart not needed
|
||||
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
paths.CA_CS_CFG_PATH,
|
||||
'ca.defaultOcspUri',
|
||||
'http://ipa-ca.%s/ca/ocsp' % ipautil.format_netloc(api.env.domain),
|
||||
@ -1107,7 +1107,7 @@ def migrate_crl_publish_dir(ca):
|
||||
return False
|
||||
|
||||
try:
|
||||
old_publish_dir = installutils.get_directive(
|
||||
old_publish_dir = directivesetter.get_directive(
|
||||
paths.CA_CS_CFG_PATH,
|
||||
'ca.publish.publisher.instance.FileBaseCRLPublisher.directory',
|
||||
separator='=')
|
||||
@ -1144,7 +1144,7 @@ def migrate_crl_publish_dir(ca):
|
||||
logger.error('Cannot move CRL file to new directory: %s', e)
|
||||
|
||||
try:
|
||||
installutils.set_directive(
|
||||
directivesetter.set_directive(
|
||||
paths.CA_CS_CFG_PATH,
|
||||
'ca.publish.publisher.instance.FileBaseCRLPublisher.directory',
|
||||
publishdir, quotes=False, separator='=')
|
||||
@ -1765,7 +1765,7 @@ def upgrade_configuration():
|
||||
ca_restart = migrate_crl_publish_dir(ca)
|
||||
|
||||
if ca.is_configured():
|
||||
crl = installutils.get_directive(
|
||||
crl = directivesetter.get_directive(
|
||||
paths.CA_CS_CFG_PATH, 'ca.crl.MasterCRL.enableCRLUpdates', '=')
|
||||
sub_dict['CLONE']='#' if crl.lower() == 'true' else ''
|
||||
|
||||
@ -1797,7 +1797,7 @@ def upgrade_configuration():
|
||||
if kra.is_installed():
|
||||
logger.info('[Ensuring ephemeralRequest is enabled in KRA]')
|
||||
kra.backup_config()
|
||||
value = installutils.get_directive(
|
||||
value = directivesetter.get_directive(
|
||||
paths.KRA_CS_CFG_PATH,
|
||||
'kra.ephemeralRequests',
|
||||
separator='=')
|
||||
|
177
ipatests/test_ipapython/test_directivesetter.py
Normal file
177
ipatests/test_ipapython/test_directivesetter.py
Normal file
@ -0,0 +1,177 @@
|
||||
#
|
||||
# Copyright (C) 2017 FreeIPA Contributors. See COPYING for license
|
||||
#
|
||||
from __future__ import absolute_import
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
from ipapython import directivesetter
|
||||
|
||||
EXAMPLE_CONFIG = [
|
||||
'foo=1\n',
|
||||
'foobar=2\n',
|
||||
]
|
||||
|
||||
WHITESPACE_CONFIG = [
|
||||
'foo 1\n',
|
||||
'foobar\t2\n',
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tempdir(request):
|
||||
tempdir = tempfile.mkdtemp()
|
||||
|
||||
def fin():
|
||||
shutil.rmtree(tempdir)
|
||||
|
||||
request.addfinalizer(fin)
|
||||
return tempdir
|
||||
|
||||
|
||||
class test_set_directive_lines(object):
|
||||
def test_remove_directive(self):
|
||||
lines = directivesetter.set_directive_lines(
|
||||
False, '=', 'foo', None, EXAMPLE_CONFIG, comment="#")
|
||||
assert list(lines) == ['foobar=2\n']
|
||||
|
||||
def test_add_directive(self):
|
||||
lines = directivesetter.set_directive_lines(
|
||||
False, '=', 'baz', '4', EXAMPLE_CONFIG, comment="#")
|
||||
assert list(lines) == ['foo=1\n', 'foobar=2\n', 'baz=4\n']
|
||||
|
||||
def test_set_directive_does_not_clobber_suffix_key(self):
|
||||
lines = directivesetter.set_directive_lines(
|
||||
False, '=', 'foo', '3', EXAMPLE_CONFIG, comment="#")
|
||||
assert list(lines) == ['foo=3\n', 'foobar=2\n']
|
||||
|
||||
|
||||
class test_set_directive_lines_whitespace(object):
|
||||
def test_remove_directive(self):
|
||||
lines = directivesetter.set_directive_lines(
|
||||
False, ' ', 'foo', None, WHITESPACE_CONFIG, comment="#")
|
||||
assert list(lines) == ['foobar\t2\n']
|
||||
|
||||
def test_add_directive(self):
|
||||
lines = directivesetter.set_directive_lines(
|
||||
False, ' ', 'baz', '4', WHITESPACE_CONFIG, comment="#")
|
||||
assert list(lines) == ['foo 1\n', 'foobar\t2\n', 'baz 4\n']
|
||||
|
||||
def test_set_directive_does_not_clobber_suffix_key(self):
|
||||
lines = directivesetter.set_directive_lines(
|
||||
False, ' ', 'foo', '3', WHITESPACE_CONFIG, comment="#")
|
||||
assert list(lines) == ['foo 3\n', 'foobar\t2\n']
|
||||
|
||||
def test_set_directive_with_tab(self):
|
||||
lines = directivesetter.set_directive_lines(
|
||||
False, ' ', 'foobar', '6', WHITESPACE_CONFIG, comment="#")
|
||||
assert list(lines) == ['foo 1\n', 'foobar 6\n']
|
||||
|
||||
|
||||
class test_set_directive(object):
|
||||
def test_set_directive(self):
|
||||
"""Check that set_directive writes the new data and preserves mode."""
|
||||
fd, filename = tempfile.mkstemp()
|
||||
try:
|
||||
os.close(fd)
|
||||
stat_pre = os.stat(filename)
|
||||
|
||||
with open(filename, 'w') as f:
|
||||
for line in EXAMPLE_CONFIG:
|
||||
f.write(line)
|
||||
|
||||
directivesetter.set_directive(
|
||||
filename, 'foo', '3', False, '=', "#")
|
||||
|
||||
stat_post = os.stat(filename)
|
||||
with open(filename, 'r') as f:
|
||||
lines = list(f)
|
||||
|
||||
assert lines == ['foo=3\n', 'foobar=2\n']
|
||||
assert stat_pre.st_mode == stat_post.st_mode
|
||||
assert stat_pre.st_uid == stat_post.st_uid
|
||||
assert stat_pre.st_gid == stat_post.st_gid
|
||||
|
||||
finally:
|
||||
os.remove(filename)
|
||||
|
||||
|
||||
class test_get_directive(object):
|
||||
def test_get_directive(self, tmpdir):
|
||||
configfile = tmpdir.join('config')
|
||||
configfile.write(''.join(EXAMPLE_CONFIG))
|
||||
|
||||
assert '1' == directivesetter.get_directive(str(configfile),
|
||||
'foo',
|
||||
separator='=')
|
||||
assert '2' == directivesetter.get_directive(str(configfile),
|
||||
'foobar',
|
||||
separator='=')
|
||||
|
||||
|
||||
class test_get_directive_whitespace(object):
|
||||
def test_get_directive(self, tmpdir):
|
||||
configfile = tmpdir.join('config')
|
||||
configfile.write(''.join(WHITESPACE_CONFIG))
|
||||
|
||||
assert '1' == directivesetter.get_directive(str(configfile),
|
||||
'foo')
|
||||
assert '2' == directivesetter.get_directive(str(configfile),
|
||||
'foobar')
|
||||
|
||||
|
||||
def test_directivesetter(tempdir):
|
||||
filename = os.path.join(tempdir, 'example.conf')
|
||||
with open(filename, 'w') as f:
|
||||
for line in EXAMPLE_CONFIG:
|
||||
f.write(line)
|
||||
|
||||
ds = directivesetter.DirectiveSetter(filename)
|
||||
assert ds.lines is None
|
||||
with ds:
|
||||
assert ds.lines == EXAMPLE_CONFIG
|
||||
ds.set('foo', '3') # quoted, space separated, doesn't change 'foo='
|
||||
ds.set('foobar', None, separator='=') # remove
|
||||
ds.set('baz', '4', False, '=') # add
|
||||
ds.setitems([
|
||||
('list1', 'value1'),
|
||||
('list2', 'value2'),
|
||||
])
|
||||
ds.setitems({
|
||||
'dict1': 'value1',
|
||||
'dict2': 'value2',
|
||||
})
|
||||
|
||||
with open(filename, 'r') as f:
|
||||
lines = list(f)
|
||||
|
||||
assert lines == [
|
||||
'foo=1\n',
|
||||
'foo "3"\n',
|
||||
'baz=4\n',
|
||||
'list1 "value1"\n',
|
||||
'list2 "value2"\n',
|
||||
'dict1 "value1"\n',
|
||||
'dict2 "value2"\n',
|
||||
]
|
||||
|
||||
with directivesetter.DirectiveSetter(filename, True, '=') as ds:
|
||||
ds.set('foo', '4') # doesn't change 'foo '
|
||||
|
||||
with open(filename, 'r') as f:
|
||||
lines = list(f)
|
||||
|
||||
assert lines == [
|
||||
'foo="4"\n',
|
||||
'foo "3"\n',
|
||||
'baz=4\n',
|
||||
'list1 "value1"\n',
|
||||
'list2 "value2"\n',
|
||||
'dict1 "value1"\n',
|
||||
'dict2 "value2"\n',
|
||||
|
||||
]
|
@ -6,8 +6,8 @@ from __future__ import absolute_import
|
||||
import binascii
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import shutil
|
||||
import subprocess
|
||||
import tempfile
|
||||
import textwrap
|
||||
|
||||
@ -19,16 +19,6 @@ from ipaserver.install import installutils
|
||||
from ipaserver.install import ipa_backup
|
||||
from ipaserver.install import ipa_restore
|
||||
|
||||
EXAMPLE_CONFIG = [
|
||||
'foo=1\n',
|
||||
'foobar=2\n',
|
||||
]
|
||||
|
||||
WHITESPACE_CONFIG = [
|
||||
'foo 1\n',
|
||||
'foobar\t2\n',
|
||||
]
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tempdir(request):
|
||||
@ -122,149 +112,6 @@ def gpgkey(request, tempdir):
|
||||
os.environ['GNUPGHOME'] = gnupghome
|
||||
|
||||
|
||||
class test_set_directive_lines(object):
|
||||
def test_remove_directive(self):
|
||||
lines = installutils.set_directive_lines(
|
||||
False, '=', 'foo', None, EXAMPLE_CONFIG, comment="#")
|
||||
assert list(lines) == ['foobar=2\n']
|
||||
|
||||
def test_add_directive(self):
|
||||
lines = installutils.set_directive_lines(
|
||||
False, '=', 'baz', '4', EXAMPLE_CONFIG, comment="#")
|
||||
assert list(lines) == ['foo=1\n', 'foobar=2\n', 'baz=4\n']
|
||||
|
||||
def test_set_directive_does_not_clobber_suffix_key(self):
|
||||
lines = installutils.set_directive_lines(
|
||||
False, '=', 'foo', '3', EXAMPLE_CONFIG, comment="#")
|
||||
assert list(lines) == ['foo=3\n', 'foobar=2\n']
|
||||
|
||||
|
||||
class test_set_directive_lines_whitespace(object):
|
||||
def test_remove_directive(self):
|
||||
lines = installutils.set_directive_lines(
|
||||
False, ' ', 'foo', None, WHITESPACE_CONFIG, comment="#")
|
||||
assert list(lines) == ['foobar\t2\n']
|
||||
|
||||
def test_add_directive(self):
|
||||
lines = installutils.set_directive_lines(
|
||||
False, ' ', 'baz', '4', WHITESPACE_CONFIG, comment="#")
|
||||
assert list(lines) == ['foo 1\n', 'foobar\t2\n', 'baz 4\n']
|
||||
|
||||
def test_set_directive_does_not_clobber_suffix_key(self):
|
||||
lines = installutils.set_directive_lines(
|
||||
False, ' ', 'foo', '3', WHITESPACE_CONFIG, comment="#")
|
||||
assert list(lines) == ['foo 3\n', 'foobar\t2\n']
|
||||
|
||||
def test_set_directive_with_tab(self):
|
||||
lines = installutils.set_directive_lines(
|
||||
False, ' ', 'foobar', '6', WHITESPACE_CONFIG, comment="#")
|
||||
assert list(lines) == ['foo 1\n', 'foobar 6\n']
|
||||
|
||||
|
||||
class test_set_directive(object):
|
||||
def test_set_directive(self):
|
||||
"""Check that set_directive writes the new data and preserves mode."""
|
||||
fd, filename = tempfile.mkstemp()
|
||||
try:
|
||||
os.close(fd)
|
||||
stat_pre = os.stat(filename)
|
||||
|
||||
with open(filename, 'w') as f:
|
||||
for line in EXAMPLE_CONFIG:
|
||||
f.write(line)
|
||||
|
||||
installutils.set_directive(filename, 'foo', '3', False, '=', "#")
|
||||
|
||||
stat_post = os.stat(filename)
|
||||
with open(filename, 'r') as f:
|
||||
lines = list(f)
|
||||
|
||||
assert lines == ['foo=3\n', 'foobar=2\n']
|
||||
assert stat_pre.st_mode == stat_post.st_mode
|
||||
assert stat_pre.st_uid == stat_post.st_uid
|
||||
assert stat_pre.st_gid == stat_post.st_gid
|
||||
|
||||
finally:
|
||||
os.remove(filename)
|
||||
|
||||
|
||||
class test_get_directive(object):
|
||||
def test_get_directive(self, tmpdir):
|
||||
configfile = tmpdir.join('config')
|
||||
configfile.write(''.join(EXAMPLE_CONFIG))
|
||||
|
||||
assert '1' == installutils.get_directive(str(configfile),
|
||||
'foo',
|
||||
separator='=')
|
||||
assert '2' == installutils.get_directive(str(configfile),
|
||||
'foobar',
|
||||
separator='=')
|
||||
|
||||
|
||||
class test_get_directive_whitespace(object):
|
||||
def test_get_directive(self, tmpdir):
|
||||
configfile = tmpdir.join('config')
|
||||
configfile.write(''.join(WHITESPACE_CONFIG))
|
||||
|
||||
assert '1' == installutils.get_directive(str(configfile),
|
||||
'foo')
|
||||
assert '2' == installutils.get_directive(str(configfile),
|
||||
'foobar')
|
||||
|
||||
|
||||
def test_directivesetter(tempdir):
|
||||
filename = os.path.join(tempdir, 'example.conf')
|
||||
with open(filename, 'w') as f:
|
||||
for line in EXAMPLE_CONFIG:
|
||||
f.write(line)
|
||||
|
||||
ds = installutils.DirectiveSetter(filename)
|
||||
assert ds.lines is None
|
||||
with ds:
|
||||
assert ds.lines == EXAMPLE_CONFIG
|
||||
ds.set('foo', '3') # quoted, space separated, doesn't change 'foo='
|
||||
ds.set('foobar', None, separator='=') # remove
|
||||
ds.set('baz', '4', False, '=') # add
|
||||
ds.setitems([
|
||||
('list1', 'value1'),
|
||||
('list2', 'value2'),
|
||||
])
|
||||
ds.setitems({
|
||||
'dict1': 'value1',
|
||||
'dict2': 'value2',
|
||||
})
|
||||
|
||||
with open(filename, 'r') as f:
|
||||
lines = list(f)
|
||||
|
||||
assert lines == [
|
||||
'foo=1\n',
|
||||
'foo "3"\n',
|
||||
'baz=4\n',
|
||||
'list1 "value1"\n',
|
||||
'list2 "value2"\n',
|
||||
'dict1 "value1"\n',
|
||||
'dict2 "value2"\n',
|
||||
]
|
||||
|
||||
with installutils.DirectiveSetter(filename, True, '=') as ds:
|
||||
ds.set('foo', '4') # doesn't change 'foo '
|
||||
|
||||
with open(filename, 'r') as f:
|
||||
lines = list(f)
|
||||
|
||||
assert lines == [
|
||||
'foo="4"\n',
|
||||
'foo "3"\n',
|
||||
'baz=4\n',
|
||||
'list1 "value1"\n',
|
||||
'list2 "value2"\n',
|
||||
'dict1 "value1"\n',
|
||||
'dict2 "value2"\n',
|
||||
|
||||
]
|
||||
|
||||
|
||||
def test_gpg_encrypt(tempdir):
|
||||
src = os.path.join(tempdir, "data.txt")
|
||||
encrypted = os.path.join(tempdir, "data.gpg")
|
||||
|
Loading…
Reference in New Issue
Block a user