From f47d86c719aa67f62f3d0c54f5270fc0fc8d1393 Mon Sep 17 00:00:00 2001 From: Stanislav Laznicka Date: Wed, 23 May 2018 10:37:58 +0200 Subject: [PATCH] Move config directives handling code Move config directives handling code: ipaserver.install.installutils -> ipapython.directivesetter Reviewed-By: Stanislav Laznicka Reviewed-By: Christian Heimes --- install/restart_scripts/renew_ca_cert | 13 +- ipaplatform/redhat/tasks.py | 14 +- ipapython/directivesetter.py | 234 ++++++++++++++++++ ipaserver/install/cainstance.py | 18 +- ipaserver/install/dnskeysyncinstance.py | 7 +- ipaserver/install/dogtaginstance.py | 22 +- ipaserver/install/httpinstance.py | 19 +- ipaserver/install/installutils.py | 225 ----------------- ipaserver/install/krainstance.py | 3 +- ipaserver/install/odsexporterinstance.py | 3 +- ipaserver/install/opendnssecinstance.py | 24 +- .../install/plugins/ca_renewal_master.py | 5 +- ipaserver/install/server/upgrade.py | 18 +- .../test_ipapython/test_directivesetter.py | 177 +++++++++++++ .../test_install/test_installutils.py | 155 +----------- 15 files changed, 489 insertions(+), 448 deletions(-) create mode 100644 ipapython/directivesetter.py create mode 100644 ipatests/test_ipapython/test_directivesetter.py diff --git a/install/restart_scripts/renew_ca_cert b/install/restart_scripts/renew_ca_cert index 39b9c4c66..83e79a247 100644 --- a/install/restart_scripts/renew_ca_cert +++ b/install/restart_scripts/renew_ca_cert @@ -28,11 +28,12 @@ import shutil import traceback from ipalib.install import certstore +from ipapython import directivesetter from ipapython import ipautil from ipalib import api, errors from ipalib import x509 from ipalib.install.kinit import kinit_keytab -from ipaserver.install import certs, cainstance, installutils +from ipaserver.install import certs, cainstance from ipaserver.plugins.ldap2 import ldap2 from ipaplatform import services from ipaplatform.paths import paths @@ -104,22 +105,22 @@ def _main(): elif nickname == 'caSigningCert cert-pki-ca': # Update CS.cfg cfg_path = paths.CA_CS_CFG_PATH - config = installutils.get_directive( + config = directivesetter.get_directive( cfg_path, 'subsystem.select', '=') if config == 'New': syslog.syslog(syslog.LOG_NOTICE, "Updating CS.cfg") if cert.is_self_signed(): - installutils.set_directive( + directivesetter.set_directive( cfg_path, 'hierarchy.select', 'Root', quotes=False, separator='=') - installutils.set_directive( + directivesetter.set_directive( cfg_path, 'subsystem.count', '1', quotes=False, separator='=') else: - installutils.set_directive( + directivesetter.set_directive( cfg_path, 'hierarchy.select', 'Subordinate', quotes=False, separator='=') - installutils.set_directive( + directivesetter.set_directive( cfg_path, 'subsystem.count', '0', quotes=False, separator='=') else: diff --git a/ipaplatform/redhat/tasks.py b/ipaplatform/redhat/tasks.py index 8d5bee38a..0ae768013 100644 --- a/ipaplatform/redhat/tasks.py +++ b/ipaplatform/redhat/tasks.py @@ -40,6 +40,7 @@ from subprocess import CalledProcessError from pyasn1.error import PyAsn1Error from six.moves import urllib +from ipapython import directivesetter from ipapython import ipautil import ipapython.errors @@ -47,7 +48,6 @@ from ipaplatform.constants import constants from ipaplatform.paths import paths from ipaplatform.redhat.authconfig import get_auth_tool from ipaplatform.base.tasks import BaseTaskNamespace -from ipaserver.install import installutils logger = logging.getLogger(__name__) @@ -567,12 +567,12 @@ class RedHatTaskNamespace(BaseTaskNamespace): return False def setup_httpd_logging(self): - installutils.set_directive(paths.HTTPD_SSL_CONF, - 'ErrorLog', - 'logs/error_log', False) - installutils.set_directive(paths.HTTPD_SSL_CONF, - 'TransferLog', - 'logs/access_log', False) + directivesetter.set_directive(paths.HTTPD_SSL_CONF, + 'ErrorLog', + 'logs/error_log', False) + directivesetter.set_directive(paths.HTTPD_SSL_CONF, + 'TransferLog', + 'logs/access_log', False) tasks = RedHatTaskNamespace() diff --git a/ipapython/directivesetter.py b/ipapython/directivesetter.py new file mode 100644 index 000000000..3da4c88fa --- /dev/null +++ b/ipapython/directivesetter.py @@ -0,0 +1,234 @@ +# +# Copyright (C) 2018 FreeIPA Contributors see COPYING for license +# + +import six + +import io +import os +import re +import stat +import tempfile + +from ipapython.ipautil import unescape_seq, escape_seq + +_SENTINEL = object() + + +class DirectiveSetter(object): + """Safe directive setter + + with DirectiveSetter('/path/to/conf') as ds: + ds.set(key, value) + """ + def __init__(self, filename, quotes=True, separator=' ', comment='#'): + self.filename = os.path.abspath(filename) + self.quotes = quotes + self.separator = separator + self.comment = comment + self.lines = None + self.stat = None + + def __enter__(self): + with io.open(self.filename) as f: + self.stat = os.fstat(f.fileno()) + self.lines = list(f) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_type is not None: + # something went wrong, reset + self.lines = None + self.stat = None + return + + directory, prefix = os.path.split(self.filename) + # use tempfile in same directory to have atomic rename + fd, name = tempfile.mkstemp(prefix=prefix, dir=directory, text=True) + with io.open(fd, mode='w', closefd=True) as f: + for line in self.lines: + if not isinstance(line, six.text_type): + line = line.decode('utf-8') + f.write(line) + self.lines = None + os.fchmod(f.fileno(), stat.S_IMODE(self.stat.st_mode)) + os.fchown(f.fileno(), self.stat.st_uid, self.stat.st_gid) + self.stat = None + # flush and sync tempfile inode + f.flush() + os.fsync(f.fileno()) + + # rename file and sync directory inode + os.rename(name, self.filename) + dirfd = os.open(directory, os.O_RDONLY | os.O_DIRECTORY) + try: + os.fsync(dirfd) + finally: + os.close(dirfd) + + def set(self, directive, value, quotes=_SENTINEL, separator=_SENTINEL, + comment=_SENTINEL): + """Set a single directive + """ + if quotes is _SENTINEL: + quotes = self.quotes + if separator is _SENTINEL: + separator = self.separator + if comment is _SENTINEL: + comment = self.comment + # materialize lines + # set_directive_lines() modify item, shrink or enlage line count + self.lines = list(set_directive_lines( + quotes, separator, directive, value, self.lines, comment + )) + + def setitems(self, items): + """Set multiple directives from a dict or list with key/value pairs + """ + if isinstance(items, dict): + # dict-like, use sorted for stable order + items = sorted(items.items()) + for k, v in items: + self.set(k, v) + + +def set_directive(filename, directive, value, quotes=True, separator=' ', + comment='#'): + """Set a name/value pair directive in a configuration file. + + A value of None means to drop the directive. + + Does not tolerate (or put) spaces around the separator. + + :param filename: input filename + :param directive: directive name + :param value: value of the directive + :param quotes: whether to quote `value` in double quotes. If true, then + any existing double quotes are first escaped to avoid + unparseable directives. + :param separator: character serving as separator between directive and + value. Correct value required even when dropping a directive. + :param comment: comment character for the file to keep new values near + their commented-out counterpart + """ + st = os.stat(filename) + with open(filename, 'r') as f: + lines = list(f) # read the whole file + # materialize new list + new_lines = list(set_directive_lines( + quotes, separator, directive, value, lines, comment + )) + with open(filename, 'w') as f: + # don't construct the whole string; write line-wise + for line in new_lines: + f.write(line) + os.chown(filename, st.st_uid, st.st_gid) # reset perms + + +def set_directive_lines(quotes, separator, k, v, lines, comment): + """Set a name/value pair in a configuration (iterable of lines). + + Replaces the value of the key if found, otherwise adds it at + end. If value is ``None``, remove the key if found. + + Takes an iterable of lines (with trailing newline). + Yields lines (with trailing newline). + + """ + new_line = "" + if v is not None: + v_quoted = quote_directive_value(v, '"') if quotes else v + new_line = ''.join([k, separator, v_quoted, '\n']) + + # Special case: consider space as "white space" so tabs are allowed + if separator == ' ': + separator = '[ \t]+' + + found = False + addnext = False # add on next line, found a comment + matcher = re.compile(r'\s*{}\s*{}'.format(re.escape(k), separator)) + cmatcher = re.compile(r'\s*{}\s*{}\s*{}'.format(comment, + re.escape(k), separator)) + for line in lines: + if matcher.match(line): + found = True + addnext = False + if v is not None: + yield new_line + elif addnext: + found = True + addnext = False + yield new_line + yield line + elif cmatcher.match(line): + addnext = True + yield line + else: + yield line + + if not found and v is not None: + yield new_line + + +def get_directive(filename, directive, separator=' '): + """ + A rather inefficient way to get a configuration directive. + + :param filename: input filename + :param directive: directive name + :param separator: separator between directive and value + + :returns: The (unquoted) value if the directive was found, None otherwise + """ + # Special case: consider space as "white space" so tabs are allowed + if separator == ' ': + separator = '[ \t]+' + + result = None + with open(filename, "r") as fd: + for line in fd: + if line.lstrip().startswith(directive): + line = line.strip() + + match = re.match( + r'{}\s*{}\s*(.*)'.format(directive, separator), line) + if match: + value = match.group(1) + else: + raise ValueError("Malformed directive: {}".format(line)) + + result = unquote_directive_value(value.strip(), '"') + result = result.strip(' ') + break + return result + + +def quote_directive_value(value, quote_char): + """Quote a directive value + :param value: string to quote + :param quote_char: character which is used for quoting. All prior + occurences will be escaped before quoting to avoid unparseable value. + :returns: processed value + """ + if value.startswith(quote_char) and value.endswith(quote_char): + return value + + return "{quote}{value}{quote}".format( + quote=quote_char, + value="".join(escape_seq(quote_char, value)) + ) + + +def unquote_directive_value(value, quote_char): + """Unquote a directive value + :param value: string to unquote + :param quote_char: character to strip. All escaped occurences of + `quote_char` will be uncescaped during processing + :returns: processed value + """ + unescaped_value = "".join(unescape_seq(quote_char, value)) + if (unescaped_value.startswith(quote_char) and + unescaped_value.endswith(quote_char)): + return unescaped_value[1:-1] + + return unescaped_value diff --git a/ipaserver/install/cainstance.py b/ipaserver/install/cainstance.py index 5b469e13f..a616ebce0 100644 --- a/ipaserver/install/cainstance.py +++ b/ipaserver/install/cainstance.py @@ -55,6 +55,7 @@ from ipaplatform import services from ipaplatform.paths import paths from ipaplatform.tasks import tasks +from ipapython import directivesetter from ipapython import dogtag from ipapython import ipautil from ipapython import ipaldap @@ -261,7 +262,7 @@ def is_step_one_done(): path = paths.CA_CS_CFG_PATH if not os.path.exists(path): return False - test = installutils.get_directive(path, 'preop.ca.type', '=') + test = directivesetter.get_directive(path, 'preop.ca.type', '=') if test == "otherca": return True return False @@ -723,7 +724,7 @@ class CAInstance(DogtagInstance): os.chown(self.config, pent.pw_uid, pent.pw_gid) def enable_pkix(self): - installutils.set_directive(paths.SYSCONFIG_PKI_TOMCAT, + directivesetter.set_directive(paths.SYSCONFIG_PKI_TOMCAT, 'NSS_ENABLE_PKIX_VERIFY', '1', quotes=False, separator='=') @@ -964,9 +965,8 @@ class CAInstance(DogtagInstance): https://access.redhat.com/knowledge/docs/en-US/Red_Hat_Certificate_System/8.0/html/Admin_Guide/Setting_up_Publishing.html """ - with installutils.DirectiveSetter(self.config, - quotes=False, separator='=') as ds: - + with directivesetter.DirectiveSetter( + self.config, quotes=False, separator='=') as ds: # Enable file publishing, disable LDAP ds.set('ca.publish.enable', 'true') ds.set('ca.publish.ldappublish.enable', 'false') @@ -1124,7 +1124,7 @@ class CAInstance(DogtagInstance): """ # Check the default validity period of the audit signing cert # and set it to 2 years if it is 6 months. - cert_range = installutils.get_directive( + cert_range = directivesetter.get_directive( paths.CASIGNEDLOGCERT_CFG, 'policyset.caLogSigningSet.2.default.params.range', separator='=' @@ -1132,14 +1132,14 @@ class CAInstance(DogtagInstance): logger.debug( 'caSignedLogCert.cfg profile validity range is %s', cert_range) if cert_range == "180": - installutils.set_directive( + directivesetter.set_directive( paths.CASIGNEDLOGCERT_CFG, 'policyset.caLogSigningSet.2.default.params.range', '720', quotes=False, separator='=' ) - installutils.set_directive( + directivesetter.set_directive( paths.CASIGNEDLOGCERT_CFG, 'policyset.caLogSigningSet.2.constraint.params.range', '720', @@ -1284,7 +1284,7 @@ class CAInstance(DogtagInstance): '/usr/libexec/ipa/ipa-pki-retrieve-key'), ] for k, v in directives: - installutils.set_directive( + directivesetter.set_directive( self.config, k, v, quotes=False, separator='=') sysupgrade.set_upgrade_state('dogtag', 'setup_lwca_key_retieval', True) diff --git a/ipaserver/install/dnskeysyncinstance.py b/ipaserver/install/dnskeysyncinstance.py index b865ee8aa..10c3a712a 100644 --- a/ipaserver/install/dnskeysyncinstance.py +++ b/ipaserver/install/dnskeysyncinstance.py @@ -19,6 +19,7 @@ from ipapython.dnsutil import DNSName from ipaserver.install import service from ipaserver.install import installutils from ipapython.dn import DN +from ipapython import directivesetter from ipapython import ipautil from ipaplatform.constants import constants from ipaplatform.paths import paths @@ -199,9 +200,9 @@ class DNSKeySyncInstance(service.Service): # setting up named and ipa-dnskeysyncd to use our softhsm2 config for sysconfig in [paths.SYSCONFIG_NAMED, paths.SYSCONFIG_IPA_DNSKEYSYNCD]: - installutils.set_directive(sysconfig, 'SOFTHSM2_CONF', - paths.DNSSEC_SOFTHSM2_CONF, - quotes=False, separator='=') + directivesetter.set_directive(sysconfig, 'SOFTHSM2_CONF', + paths.DNSSEC_SOFTHSM2_CONF, + quotes=False, separator='=') if (token_dir_exists and os.path.exists(paths.DNSSEC_SOFTHSM_PIN) and os.path.exists(paths.DNSSEC_SOFTHSM_PIN_SO)): diff --git a/ipaserver/install/dogtaginstance.py b/ipaserver/install/dogtaginstance.py index d0c6690fa..6e5b024af 100644 --- a/ipaserver/install/dogtaginstance.py +++ b/ipaserver/install/dogtaginstance.py @@ -37,11 +37,11 @@ from ipalib.constants import CA_DBUS_TIMEOUT from ipaplatform import services from ipaplatform.constants import constants from ipaplatform.paths import paths +from ipapython import directivesetter from ipapython import ipaldap from ipapython import ipautil from ipapython.dn import DN from ipaserver.install import service -from ipaserver.install import installutils from ipaserver.install import replication from ipaserver.install.installutils import stopped_service @@ -182,41 +182,41 @@ class DogtagInstance(service.Service): """ with stopped_service('pki-tomcatd', 'pki-tomcat'): - installutils.set_directive( + directivesetter.set_directive( self.config, 'authz.instance.DirAclAuthz.ldap.ldapauth.authtype', 'SslClientAuth', quotes=False, separator='=') - installutils.set_directive( + directivesetter.set_directive( self.config, 'authz.instance.DirAclAuthz.ldap.ldapauth.clientCertNickname', 'subsystemCert cert-pki-ca', quotes=False, separator='=') - installutils.set_directive( + directivesetter.set_directive( self.config, 'authz.instance.DirAclAuthz.ldap.ldapconn.port', '636', quotes=False, separator='=') - installutils.set_directive( + directivesetter.set_directive( self.config, 'authz.instance.DirAclAuthz.ldap.ldapconn.secureConn', 'true', quotes=False, separator='=') - installutils.set_directive( + directivesetter.set_directive( self.config, 'internaldb.ldapauth.authtype', 'SslClientAuth', quotes=False, separator='=') - installutils.set_directive( + directivesetter.set_directive( self.config, 'internaldb.ldapauth.clientCertNickname', 'subsystemCert cert-pki-ca', quotes=False, separator='=') - installutils.set_directive( + directivesetter.set_directive( self.config, 'internaldb.ldapconn.port', '636', quotes=False, separator='=') - installutils.set_directive( + directivesetter.set_directive( self.config, 'internaldb.ldapconn.secureConn', 'true', quotes=False, separator='=') # Remove internaldb password as is not needed anymore - installutils.set_directive(paths.PKI_TOMCAT_PASSWORD_CONF, + directivesetter.set_directive(paths.PKI_TOMCAT_PASSWORD_CONF, 'internaldb', None, separator='=') def uninstall(self): @@ -353,7 +353,7 @@ class DogtagInstance(service.Service): """ with stopped_service('pki-tomcatd', 'pki-tomcat'): - installutils.set_directive( + directivesetter.set_directive( self.config, directive, # the cert must be only the base64 string without headers diff --git a/ipaserver/install/httpinstance.py b/ipaserver/install/httpinstance.py index fc0256bed..95283b65d 100644 --- a/ipaserver/install/httpinstance.py +++ b/ipaserver/install/httpinstance.py @@ -37,6 +37,7 @@ from ipaserver.install import replication from ipaserver.install import service from ipaserver.install import certs from ipaserver.install import installutils +from ipapython import directivesetter from ipapython import dogtag from ipapython import ipautil from ipapython.dn import DN @@ -208,8 +209,10 @@ class HTTPInstance(service.Service): services.knownservices.gssproxy.restart() def get_mod_nss_nickname(self): - cert = installutils.get_directive(paths.HTTPD_NSS_CONF, 'NSSNickname') - nickname = installutils.unquote_directive_value(cert, quote_char="'") + cert = directivesetter.get_directive(paths.HTTPD_NSS_CONF, + 'NSSNickname') + nickname = directivesetter.unquote_directive_value(cert, + quote_char="'") return nickname def backup_ssl_conf(self): @@ -231,7 +234,7 @@ class HTTPInstance(service.Service): installutils.remove_file(paths.HTTPD_NSS_CONF) def set_mod_ssl_protocol(self): - installutils.set_directive(paths.HTTPD_SSL_CONF, + directivesetter.set_directive(paths.HTTPD_SSL_CONF, 'SSLProtocol', '+TLSv1 +TLSv1.1 +TLSv1.2', False) @@ -400,22 +403,22 @@ class HTTPInstance(service.Service): def configure_mod_ssl_certs(self): """Configure the mod_ssl certificate directives""" - installutils.set_directive(paths.HTTPD_SSL_SITE_CONF, + directivesetter.set_directive(paths.HTTPD_SSL_SITE_CONF, 'SSLCertificateFile', paths.HTTPD_CERT_FILE, False) - installutils.set_directive(paths.HTTPD_SSL_SITE_CONF, + directivesetter.set_directive(paths.HTTPD_SSL_SITE_CONF, 'SSLCertificateKeyFile', paths.HTTPD_KEY_FILE, False) - installutils.set_directive( + directivesetter.set_directive( paths.HTTPD_SSL_CONF, 'SSLPassPhraseDialog', 'exec:{passread}'.format(passread=paths.IPA_HTTPD_PASSWD_READER), False) - installutils.set_directive(paths.HTTPD_SSL_SITE_CONF, + directivesetter.set_directive(paths.HTTPD_SSL_SITE_CONF, 'SSLCACertificateFile', paths.IPA_CA_CRT, False) # set SSLVerifyDepth for external CA installations - installutils.set_directive(paths.HTTPD_SSL_CONF, + directivesetter.set_directive(paths.HTTPD_SSL_CONF, 'SSLVerifyDepth', MOD_SSL_VERIFY_DEPTH, quotes=False) diff --git a/ipaserver/install/installutils.py b/ipaserver/install/installutils.py index 96707e99b..53b825aac 100644 --- a/ipaserver/install/installutils.py +++ b/ipaserver/install/installutils.py @@ -25,7 +25,6 @@ import logging import socket import getpass import gssapi -import io import ldif import os import re @@ -33,7 +32,6 @@ import fileinput import sys import tempfile import shutil -import stat import traceback import textwrap from contextlib import contextmanager @@ -406,229 +404,6 @@ def update_file(filename, orig, subst): return 1 -def quote_directive_value(value, quote_char): - """Quote a directive value - :param value: string to quote - :param quote_char: character which is used for quoting. All prior - occurences will be escaped before quoting to avoid unparseable value. - :returns: processed value - """ - if value.startswith(quote_char) and value.endswith(quote_char): - return value - - return "{quote}{value}{quote}".format( - quote=quote_char, - value="".join(ipautil.escape_seq(quote_char, value)) - ) - - -def unquote_directive_value(value, quote_char): - """Unquote a directive value - :param value: string to unquote - :param quote_char: character to strip. All escaped occurences of - `quote_char` will be uncescaped during processing - :returns: processed value - """ - unescaped_value = "".join(ipautil.unescape_seq(quote_char, value)) - if (unescaped_value.startswith(quote_char) and - unescaped_value.endswith(quote_char)): - return unescaped_value[1:-1] - - return unescaped_value - - -_SENTINEL = object() - - -class DirectiveSetter(object): - """Safe directive setter - - with DirectiveSetter('/path/to/conf') as ds: - ds.set(key, value) - """ - def __init__(self, filename, quotes=True, separator=' ', comment='#'): - self.filename = os.path.abspath(filename) - self.quotes = quotes - self.separator = separator - self.comment = comment - self.lines = None - self.stat = None - - def __enter__(self): - with io.open(self.filename) as f: - self.stat = os.fstat(f.fileno()) - self.lines = list(f) - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - if exc_type is not None: - # something went wrong, reset - self.lines = None - self.stat = None - return - - directory, prefix = os.path.split(self.filename) - # use tempfile in same directory to have atomic rename - fd, name = tempfile.mkstemp(prefix=prefix, dir=directory, text=True) - with io.open(fd, mode='w', closefd=True) as f: - for line in self.lines: - if not isinstance(line, six.text_type): - line = line.decode('utf-8') - f.write(line) - self.lines = None - os.fchmod(f.fileno(), stat.S_IMODE(self.stat.st_mode)) - os.fchown(f.fileno(), self.stat.st_uid, self.stat.st_gid) - self.stat = None - # flush and sync tempfile inode - f.flush() - os.fsync(f.fileno()) - - # rename file and sync directory inode - os.rename(name, self.filename) - dirfd = os.open(directory, os.O_RDONLY | os.O_DIRECTORY) - try: - os.fsync(dirfd) - finally: - os.close(dirfd) - - def set(self, directive, value, quotes=_SENTINEL, separator=_SENTINEL, - comment=_SENTINEL): - """Set a single directive - """ - if quotes is _SENTINEL: - quotes = self.quotes - if separator is _SENTINEL: - separator = self.separator - if comment is _SENTINEL: - comment = self.comment - # materialize lines - # set_directive_lines() modify item, shrink or enlage line count - self.lines = list(set_directive_lines( - quotes, separator, directive, value, self.lines, comment - )) - - def setitems(self, items): - """Set multiple directives from a dict or list with key/value pairs - """ - if isinstance(items, dict): - # dict-like, use sorted for stable order - items = sorted(items.items()) - for k, v in items: - self.set(k, v) - - -def set_directive(filename, directive, value, quotes=True, separator=' ', - comment='#'): - """Set a name/value pair directive in a configuration file. - - A value of None means to drop the directive. - - Does not tolerate (or put) spaces around the separator. - - :param filename: input filename - :param directive: directive name - :param value: value of the directive - :param quotes: whether to quote `value` in double quotes. If true, then - any existing double quotes are first escaped to avoid - unparseable directives. - :param separator: character serving as separator between directive and - value. Correct value required even when dropping a directive. - :param comment: comment character for the file to keep new values near - their commented-out counterpart - """ - st = os.stat(filename) - with open(filename, 'r') as f: - lines = list(f) # read the whole file - # materialize new list - new_lines = list(set_directive_lines( - quotes, separator, directive, value, lines, comment - )) - with open(filename, 'w') as f: - # don't construct the whole string; write line-wise - for line in new_lines: - f.write(line) - os.chown(filename, st.st_uid, st.st_gid) # reset perms - - -def set_directive_lines(quotes, separator, k, v, lines, comment): - """Set a name/value pair in a configuration (iterable of lines). - - Replaces the value of the key if found, otherwise adds it at - end. If value is ``None``, remove the key if found. - - Takes an iterable of lines (with trailing newline). - Yields lines (with trailing newline). - - """ - new_line = "" - if v is not None: - v_quoted = quote_directive_value(v, '"') if quotes else v - new_line = ''.join([k, separator, v_quoted, '\n']) - - # Special case: consider space as "white space" so tabs are allowed - if separator == ' ': - separator = '[ \t]+' - - found = False - addnext = False # add on next line, found a comment - matcher = re.compile(r'\s*{}\s*{}'.format(re.escape(k), separator)) - cmatcher = re.compile(r'\s*{}\s*{}\s*{}'.format(comment, - re.escape(k), separator)) - for line in lines: - if matcher.match(line): - found = True - addnext = False - if v is not None: - yield new_line - elif addnext: - found = True - addnext = False - yield new_line - yield line - elif cmatcher.match(line): - addnext = True - yield line - else: - yield line - - if not found and v is not None: - yield new_line - - -def get_directive(filename, directive, separator=' '): - """ - A rather inefficient way to get a configuration directive. - - :param filename: input filename - :param directive: directive name - :param separator: separator between directive and value - - :returns: The (unquoted) value if the directive was found, None otherwise - """ - # Special case: consider space as "white space" so tabs are allowed - if separator == ' ': - separator = '[ \t]+' - - fd = open(filename, "r") - for line in fd: - if line.lstrip().startswith(directive): - line = line.strip() - - match = re.match(r'{}\s*{}\s*(.*)'.format(directive, separator), - line) - if match: - value = match.group(1) - else: - raise ValueError("Malformed directive: {}".format(line)) - - result = unquote_directive_value(value.strip(), '"') - result = result.strip(' ') - fd.close() - return result - fd.close() - return None - - def kadmin(command): return ipautil.run( [ diff --git a/ipaserver/install/krainstance.py b/ipaserver/install/krainstance.py index a0fb4357f..aea08554d 100644 --- a/ipaserver/install/krainstance.py +++ b/ipaserver/install/krainstance.py @@ -34,6 +34,7 @@ from six.moves.configparser import RawConfigParser from ipalib import api from ipalib import x509 from ipaplatform.paths import paths +from ipapython import directivesetter from ipapython import ipautil from ipapython.dn import DN from ipaserver.install import cainstance @@ -362,7 +363,7 @@ class KRAInstance(DogtagInstance): write operations. """ with installutils.stopped_service('pki-tomcatd', 'pki-tomcat'): - installutils.set_directive( + directivesetter.set_directive( self.config, 'kra.ephemeralRequests', 'true', quotes=False, separator='=') diff --git a/ipaserver/install/odsexporterinstance.py b/ipaserver/install/odsexporterinstance.py index b301a167f..4fff72c2c 100644 --- a/ipaserver/install/odsexporterinstance.py +++ b/ipaserver/install/odsexporterinstance.py @@ -14,6 +14,7 @@ import ldap from ipaserver.install import service from ipaserver.install import installutils from ipapython.dn import DN +from ipapython import directivesetter from ipapython import ipautil from ipaplatform.constants import constants from ipaplatform.paths import paths @@ -79,7 +80,7 @@ class ODSExporterInstance(service.Service): logger.error("DNSKeyExporter service already exists") def __setup_key_exporter(self): - installutils.set_directive(paths.SYSCONFIG_IPA_ODS_EXPORTER, + directivesetter.set_directive(paths.SYSCONFIG_IPA_ODS_EXPORTER, 'SOFTHSM2_CONF', paths.DNSSEC_SOFTHSM2_CONF, quotes=False, separator='=') diff --git a/ipaserver/install/opendnssecinstance.py b/ipaserver/install/opendnssecinstance.py index d608294cb..aee6e1e77 100644 --- a/ipaserver/install/opendnssecinstance.py +++ b/ipaserver/install/opendnssecinstance.py @@ -14,8 +14,8 @@ from subprocess import CalledProcessError from ipalib.install import sysrestore from ipaserver.install import service -from ipaserver.install import installutils from ipapython.dn import DN +from ipapython import directivesetter from ipapython import ipautil from ipaplatform import services from ipaplatform.constants import constants @@ -199,10 +199,10 @@ class OpenDNSSECInstance(service.Service): if not self.fstore.has_file(paths.SYSCONFIG_ODS): self.fstore.backup_file(paths.SYSCONFIG_ODS) - installutils.set_directive(paths.SYSCONFIG_ODS, - 'SOFTHSM2_CONF', - paths.DNSSEC_SOFTHSM2_CONF, - quotes=False, separator='=') + directivesetter.set_directive(paths.SYSCONFIG_ODS, + 'SOFTHSM2_CONF', + paths.DNSSEC_SOFTHSM2_CONF, + quotes=False, separator='=') def __setup_ownership_file_modes(self): assert self.ods_uid is not None @@ -302,10 +302,10 @@ class OpenDNSSECInstance(service.Service): def __setup_dnskeysyncd(self): # set up dnskeysyncd this is DNSSEC master - installutils.set_directive(paths.SYSCONFIG_IPA_DNSKEYSYNCD, - 'ISMASTER', - '1', - quotes=False, separator='=') + directivesetter.set_directive(paths.SYSCONFIG_IPA_DNSKEYSYNCD, + 'ISMASTER', + '1', + quotes=False, separator='=') def __start(self): self.restart() # needed to reload conf files @@ -333,9 +333,9 @@ class OpenDNSSECInstance(service.Service): # remove directive from ipa-dnskeysyncd, this server is not DNSSEC # master anymore - installutils.set_directive(paths.SYSCONFIG_IPA_DNSKEYSYNCD, - 'ISMASTER', None, - quotes=False, separator='=') + directivesetter.set_directive(paths.SYSCONFIG_IPA_DNSKEYSYNCD, + 'ISMASTER', None, + quotes=False, separator='=') restore_list = [paths.OPENDNSSEC_CONF_FILE, paths.OPENDNSSEC_KASP_FILE, paths.SYSCONFIG_ODS, paths.OPENDNSSEC_ZONELIST_FILE] diff --git a/ipaserver/install/plugins/ca_renewal_master.py b/ipaserver/install/plugins/ca_renewal_master.py index 618f51244..0f3f1ec17 100644 --- a/ipaserver/install/plugins/ca_renewal_master.py +++ b/ipaserver/install/plugins/ca_renewal_master.py @@ -21,13 +21,14 @@ from __future__ import absolute_import import logging -from ipaserver.install import installutils, cainstance +from ipaserver.install import cainstance from ipalib import errors from ipalib import Updater from ipalib.install import certmonger from ipalib.plugable import Registry from ipaplatform.paths import paths from ipapython.dn import DN +from ipapython import directivesetter logger = logging.getLogger(__name__) @@ -108,7 +109,7 @@ class update_ca_renewal_master(Updater): else: logger.debug("certmonger request for RA cert not found") - config = installutils.get_directive( + config = directivesetter.get_directive( paths.CA_CS_CFG_PATH, 'subsystem.select', '=') if config == 'New': diff --git a/ipaserver/install/server/upgrade.py b/ipaserver/install/server/upgrade.py index bd0b4fcdf..5b0fcb67f 100644 --- a/ipaserver/install/server/upgrade.py +++ b/ipaserver/install/server/upgrade.py @@ -25,7 +25,7 @@ from ipaclient.install.client import sssd_enable_service from ipaplatform import services from ipaplatform.tasks import tasks from ipapython import ipautil, version -from ipapython import dnsutil +from ipapython import dnsutil, directivesetter from ipapython.dn import DN from ipaplatform.constants import constants from ipaplatform.paths import paths @@ -352,7 +352,7 @@ def ca_enable_ldap_profile_subsystem(ca): try: for i in range(15): directive = "subsystem.{}.class".format(i) - value = installutils.get_directive( + value = directivesetter.get_directive( paths.CA_CS_CFG_PATH, directive, separator='=') @@ -365,7 +365,7 @@ def ca_enable_ldap_profile_subsystem(ca): return False if needs_update: - installutils.set_directive( + directivesetter.set_directive( paths.CA_CS_CFG_PATH, directive, 'com.netscape.cmscore.profile.LDAPProfileSubsystem', @@ -407,14 +407,14 @@ def ca_add_default_ocsp_uri(ca): logger.info('CA is not configured') return False - value = installutils.get_directive( + value = directivesetter.get_directive( paths.CA_CS_CFG_PATH, 'ca.defaultOcspUri', separator='=') if value: return False # already set; restart not needed - installutils.set_directive( + directivesetter.set_directive( paths.CA_CS_CFG_PATH, 'ca.defaultOcspUri', 'http://ipa-ca.%s/ca/ocsp' % ipautil.format_netloc(api.env.domain), @@ -1107,7 +1107,7 @@ def migrate_crl_publish_dir(ca): return False try: - old_publish_dir = installutils.get_directive( + old_publish_dir = directivesetter.get_directive( paths.CA_CS_CFG_PATH, 'ca.publish.publisher.instance.FileBaseCRLPublisher.directory', separator='=') @@ -1144,7 +1144,7 @@ def migrate_crl_publish_dir(ca): logger.error('Cannot move CRL file to new directory: %s', e) try: - installutils.set_directive( + directivesetter.set_directive( paths.CA_CS_CFG_PATH, 'ca.publish.publisher.instance.FileBaseCRLPublisher.directory', publishdir, quotes=False, separator='=') @@ -1765,7 +1765,7 @@ def upgrade_configuration(): ca_restart = migrate_crl_publish_dir(ca) if ca.is_configured(): - crl = installutils.get_directive( + crl = directivesetter.get_directive( paths.CA_CS_CFG_PATH, 'ca.crl.MasterCRL.enableCRLUpdates', '=') sub_dict['CLONE']='#' if crl.lower() == 'true' else '' @@ -1797,7 +1797,7 @@ def upgrade_configuration(): if kra.is_installed(): logger.info('[Ensuring ephemeralRequest is enabled in KRA]') kra.backup_config() - value = installutils.get_directive( + value = directivesetter.get_directive( paths.KRA_CS_CFG_PATH, 'kra.ephemeralRequests', separator='=') diff --git a/ipatests/test_ipapython/test_directivesetter.py b/ipatests/test_ipapython/test_directivesetter.py new file mode 100644 index 000000000..c61c891dd --- /dev/null +++ b/ipatests/test_ipapython/test_directivesetter.py @@ -0,0 +1,177 @@ +# +# Copyright (C) 2017 FreeIPA Contributors. See COPYING for license +# +from __future__ import absolute_import + +import os +import shutil +import tempfile + +import pytest + +from ipapython import directivesetter + +EXAMPLE_CONFIG = [ + 'foo=1\n', + 'foobar=2\n', +] + +WHITESPACE_CONFIG = [ + 'foo 1\n', + 'foobar\t2\n', +] + + +@pytest.fixture +def tempdir(request): + tempdir = tempfile.mkdtemp() + + def fin(): + shutil.rmtree(tempdir) + + request.addfinalizer(fin) + return tempdir + + +class test_set_directive_lines(object): + def test_remove_directive(self): + lines = directivesetter.set_directive_lines( + False, '=', 'foo', None, EXAMPLE_CONFIG, comment="#") + assert list(lines) == ['foobar=2\n'] + + def test_add_directive(self): + lines = directivesetter.set_directive_lines( + False, '=', 'baz', '4', EXAMPLE_CONFIG, comment="#") + assert list(lines) == ['foo=1\n', 'foobar=2\n', 'baz=4\n'] + + def test_set_directive_does_not_clobber_suffix_key(self): + lines = directivesetter.set_directive_lines( + False, '=', 'foo', '3', EXAMPLE_CONFIG, comment="#") + assert list(lines) == ['foo=3\n', 'foobar=2\n'] + + +class test_set_directive_lines_whitespace(object): + def test_remove_directive(self): + lines = directivesetter.set_directive_lines( + False, ' ', 'foo', None, WHITESPACE_CONFIG, comment="#") + assert list(lines) == ['foobar\t2\n'] + + def test_add_directive(self): + lines = directivesetter.set_directive_lines( + False, ' ', 'baz', '4', WHITESPACE_CONFIG, comment="#") + assert list(lines) == ['foo 1\n', 'foobar\t2\n', 'baz 4\n'] + + def test_set_directive_does_not_clobber_suffix_key(self): + lines = directivesetter.set_directive_lines( + False, ' ', 'foo', '3', WHITESPACE_CONFIG, comment="#") + assert list(lines) == ['foo 3\n', 'foobar\t2\n'] + + def test_set_directive_with_tab(self): + lines = directivesetter.set_directive_lines( + False, ' ', 'foobar', '6', WHITESPACE_CONFIG, comment="#") + assert list(lines) == ['foo 1\n', 'foobar 6\n'] + + +class test_set_directive(object): + def test_set_directive(self): + """Check that set_directive writes the new data and preserves mode.""" + fd, filename = tempfile.mkstemp() + try: + os.close(fd) + stat_pre = os.stat(filename) + + with open(filename, 'w') as f: + for line in EXAMPLE_CONFIG: + f.write(line) + + directivesetter.set_directive( + filename, 'foo', '3', False, '=', "#") + + stat_post = os.stat(filename) + with open(filename, 'r') as f: + lines = list(f) + + assert lines == ['foo=3\n', 'foobar=2\n'] + assert stat_pre.st_mode == stat_post.st_mode + assert stat_pre.st_uid == stat_post.st_uid + assert stat_pre.st_gid == stat_post.st_gid + + finally: + os.remove(filename) + + +class test_get_directive(object): + def test_get_directive(self, tmpdir): + configfile = tmpdir.join('config') + configfile.write(''.join(EXAMPLE_CONFIG)) + + assert '1' == directivesetter.get_directive(str(configfile), + 'foo', + separator='=') + assert '2' == directivesetter.get_directive(str(configfile), + 'foobar', + separator='=') + + +class test_get_directive_whitespace(object): + def test_get_directive(self, tmpdir): + configfile = tmpdir.join('config') + configfile.write(''.join(WHITESPACE_CONFIG)) + + assert '1' == directivesetter.get_directive(str(configfile), + 'foo') + assert '2' == directivesetter.get_directive(str(configfile), + 'foobar') + + +def test_directivesetter(tempdir): + filename = os.path.join(tempdir, 'example.conf') + with open(filename, 'w') as f: + for line in EXAMPLE_CONFIG: + f.write(line) + + ds = directivesetter.DirectiveSetter(filename) + assert ds.lines is None + with ds: + assert ds.lines == EXAMPLE_CONFIG + ds.set('foo', '3') # quoted, space separated, doesn't change 'foo=' + ds.set('foobar', None, separator='=') # remove + ds.set('baz', '4', False, '=') # add + ds.setitems([ + ('list1', 'value1'), + ('list2', 'value2'), + ]) + ds.setitems({ + 'dict1': 'value1', + 'dict2': 'value2', + }) + + with open(filename, 'r') as f: + lines = list(f) + + assert lines == [ + 'foo=1\n', + 'foo "3"\n', + 'baz=4\n', + 'list1 "value1"\n', + 'list2 "value2"\n', + 'dict1 "value1"\n', + 'dict2 "value2"\n', + ] + + with directivesetter.DirectiveSetter(filename, True, '=') as ds: + ds.set('foo', '4') # doesn't change 'foo ' + + with open(filename, 'r') as f: + lines = list(f) + + assert lines == [ + 'foo="4"\n', + 'foo "3"\n', + 'baz=4\n', + 'list1 "value1"\n', + 'list2 "value2"\n', + 'dict1 "value1"\n', + 'dict2 "value2"\n', + + ] diff --git a/ipatests/test_ipaserver/test_install/test_installutils.py b/ipatests/test_ipaserver/test_install/test_installutils.py index bfa261759..577a993f8 100644 --- a/ipatests/test_ipaserver/test_install/test_installutils.py +++ b/ipatests/test_ipaserver/test_install/test_installutils.py @@ -6,8 +6,8 @@ from __future__ import absolute_import import binascii import os import re -import subprocess import shutil +import subprocess import tempfile import textwrap @@ -19,16 +19,6 @@ from ipaserver.install import installutils from ipaserver.install import ipa_backup from ipaserver.install import ipa_restore -EXAMPLE_CONFIG = [ - 'foo=1\n', - 'foobar=2\n', -] - -WHITESPACE_CONFIG = [ - 'foo 1\n', - 'foobar\t2\n', -] - @pytest.fixture def tempdir(request): @@ -122,149 +112,6 @@ def gpgkey(request, tempdir): os.environ['GNUPGHOME'] = gnupghome -class test_set_directive_lines(object): - def test_remove_directive(self): - lines = installutils.set_directive_lines( - False, '=', 'foo', None, EXAMPLE_CONFIG, comment="#") - assert list(lines) == ['foobar=2\n'] - - def test_add_directive(self): - lines = installutils.set_directive_lines( - False, '=', 'baz', '4', EXAMPLE_CONFIG, comment="#") - assert list(lines) == ['foo=1\n', 'foobar=2\n', 'baz=4\n'] - - def test_set_directive_does_not_clobber_suffix_key(self): - lines = installutils.set_directive_lines( - False, '=', 'foo', '3', EXAMPLE_CONFIG, comment="#") - assert list(lines) == ['foo=3\n', 'foobar=2\n'] - - -class test_set_directive_lines_whitespace(object): - def test_remove_directive(self): - lines = installutils.set_directive_lines( - False, ' ', 'foo', None, WHITESPACE_CONFIG, comment="#") - assert list(lines) == ['foobar\t2\n'] - - def test_add_directive(self): - lines = installutils.set_directive_lines( - False, ' ', 'baz', '4', WHITESPACE_CONFIG, comment="#") - assert list(lines) == ['foo 1\n', 'foobar\t2\n', 'baz 4\n'] - - def test_set_directive_does_not_clobber_suffix_key(self): - lines = installutils.set_directive_lines( - False, ' ', 'foo', '3', WHITESPACE_CONFIG, comment="#") - assert list(lines) == ['foo 3\n', 'foobar\t2\n'] - - def test_set_directive_with_tab(self): - lines = installutils.set_directive_lines( - False, ' ', 'foobar', '6', WHITESPACE_CONFIG, comment="#") - assert list(lines) == ['foo 1\n', 'foobar 6\n'] - - -class test_set_directive(object): - def test_set_directive(self): - """Check that set_directive writes the new data and preserves mode.""" - fd, filename = tempfile.mkstemp() - try: - os.close(fd) - stat_pre = os.stat(filename) - - with open(filename, 'w') as f: - for line in EXAMPLE_CONFIG: - f.write(line) - - installutils.set_directive(filename, 'foo', '3', False, '=', "#") - - stat_post = os.stat(filename) - with open(filename, 'r') as f: - lines = list(f) - - assert lines == ['foo=3\n', 'foobar=2\n'] - assert stat_pre.st_mode == stat_post.st_mode - assert stat_pre.st_uid == stat_post.st_uid - assert stat_pre.st_gid == stat_post.st_gid - - finally: - os.remove(filename) - - -class test_get_directive(object): - def test_get_directive(self, tmpdir): - configfile = tmpdir.join('config') - configfile.write(''.join(EXAMPLE_CONFIG)) - - assert '1' == installutils.get_directive(str(configfile), - 'foo', - separator='=') - assert '2' == installutils.get_directive(str(configfile), - 'foobar', - separator='=') - - -class test_get_directive_whitespace(object): - def test_get_directive(self, tmpdir): - configfile = tmpdir.join('config') - configfile.write(''.join(WHITESPACE_CONFIG)) - - assert '1' == installutils.get_directive(str(configfile), - 'foo') - assert '2' == installutils.get_directive(str(configfile), - 'foobar') - - -def test_directivesetter(tempdir): - filename = os.path.join(tempdir, 'example.conf') - with open(filename, 'w') as f: - for line in EXAMPLE_CONFIG: - f.write(line) - - ds = installutils.DirectiveSetter(filename) - assert ds.lines is None - with ds: - assert ds.lines == EXAMPLE_CONFIG - ds.set('foo', '3') # quoted, space separated, doesn't change 'foo=' - ds.set('foobar', None, separator='=') # remove - ds.set('baz', '4', False, '=') # add - ds.setitems([ - ('list1', 'value1'), - ('list2', 'value2'), - ]) - ds.setitems({ - 'dict1': 'value1', - 'dict2': 'value2', - }) - - with open(filename, 'r') as f: - lines = list(f) - - assert lines == [ - 'foo=1\n', - 'foo "3"\n', - 'baz=4\n', - 'list1 "value1"\n', - 'list2 "value2"\n', - 'dict1 "value1"\n', - 'dict2 "value2"\n', - ] - - with installutils.DirectiveSetter(filename, True, '=') as ds: - ds.set('foo', '4') # doesn't change 'foo ' - - with open(filename, 'r') as f: - lines = list(f) - - assert lines == [ - 'foo="4"\n', - 'foo "3"\n', - 'baz=4\n', - 'list1 "value1"\n', - 'list2 "value2"\n', - 'dict1 "value1"\n', - 'dict2 "value2"\n', - - ] - - def test_gpg_encrypt(tempdir): src = os.path.join(tempdir, "data.txt") encrypted = os.path.join(tempdir, "data.gpg")