Add safe DirectiveSetter context manager

installutils.set_directive() is both inefficient and potentially
dangerous. It does not ensure that the whole file is written and
properly synced to disk. In worst case it could lead to partially
written or destroyed config files.

The new DirectiveSetter context manager wraps everything under an easy
to use interface.

https://pagure.io/freeipa/issue/7312

Signed-off-by: Christian Heimes <cheimes@redhat.com>
Reviewed-By: Florence Blanc-Renaud <frenaud@redhat.com>
This commit is contained in:
Christian Heimes 2017-12-08 12:38:41 +01:00
parent f688b5d8a7
commit f4001e1c53
3 changed files with 201 additions and 54 deletions

View File

@ -929,62 +929,61 @@ class CAInstance(DogtagInstance):
https://access.redhat.com/knowledge/docs/en-US/Red_Hat_Certificate_System/8.0/html/Admin_Guide/Setting_up_Publishing.html
"""
def put(k, v):
installutils.set_directive(
paths.CA_CS_CFG_PATH, k, v, quotes=False, separator='=')
with installutils.DirectiveSetter(paths.CA_CS_CFG_PATH,
quotes=False, separator='=') as ds:
# Enable file publishing, disable LDAP
put('ca.publish.enable', 'true')
put('ca.publish.ldappublish.enable', 'false')
# Enable file publishing, disable LDAP
ds.set('ca.publish.enable', 'true')
ds.set('ca.publish.ldappublish.enable', 'false')
# Create the file publisher, der only, not b64
put('ca.publish.publisher.impl.FileBasedPublisher.class',
'com.netscape.cms.publish.publishers.FileBasedPublisher')
put('ca.publish.publisher.instance.FileBaseCRLPublisher.crlLinkExt',
'bin')
put('ca.publish.publisher.instance.FileBaseCRLPublisher.directory',
self.prepare_crl_publish_dir())
put('ca.publish.publisher.instance.FileBaseCRLPublisher.latestCrlLink',
'true')
put('ca.publish.publisher.instance.FileBaseCRLPublisher.pluginName',
'FileBasedPublisher')
put('ca.publish.publisher.instance.FileBaseCRLPublisher.timeStamp',
'LocalTime')
put('ca.publish.publisher.instance.FileBaseCRLPublisher.zipCRLs',
'false')
put('ca.publish.publisher.instance.FileBaseCRLPublisher.zipLevel', '9')
put('ca.publish.publisher.instance.FileBaseCRLPublisher.Filename.b64',
'false')
put('ca.publish.publisher.instance.FileBaseCRLPublisher.Filename.der',
'true')
# Create the file publisher, der only, not b64
ds.set(
'ca.publish.publisher.impl.FileBasedPublisher.class',
'com.netscape.cms.publish.publishers.FileBasedPublisher'
)
prefix = 'ca.publish.publisher.instance.FileBaseCRLPublisher.'
ds.set(prefix + 'crlLinkExt', 'bin')
ds.set(prefix + 'directory', self.prepare_crl_publish_dir())
ds.set(prefix + 'latestCrlLink', 'true')
ds.set(prefix + 'pluginName', 'FileBasedPublisher')
ds.set(prefix + 'timeStamp', 'LocalTime')
ds.set(prefix + 'zipCRLs', 'false')
ds.set(prefix + 'zipLevel', '9')
ds.set(prefix + 'Filename.b64', 'false')
ds.set(prefix + 'Filename.der', 'true')
# The publishing rule
put('ca.publish.rule.instance.FileCrlRule.enable', 'true')
put('ca.publish.rule.instance.FileCrlRule.mapper', 'NoMap')
put('ca.publish.rule.instance.FileCrlRule.pluginName', 'Rule')
put('ca.publish.rule.instance.FileCrlRule.predicate', '')
put('ca.publish.rule.instance.FileCrlRule.publisher',
'FileBaseCRLPublisher')
put('ca.publish.rule.instance.FileCrlRule.type', 'crl')
# The publishing rule
ds.set('ca.publish.rule.instance.FileCrlRule.enable', 'true')
ds.set('ca.publish.rule.instance.FileCrlRule.mapper', 'NoMap')
ds.set('ca.publish.rule.instance.FileCrlRule.pluginName', 'Rule')
ds.set('ca.publish.rule.instance.FileCrlRule.predicate', '')
ds.set(
'ca.publish.rule.instance.FileCrlRule.publisher',
'FileBaseCRLPublisher'
)
ds.set('ca.publish.rule.instance.FileCrlRule.type', 'crl')
# Now disable LDAP publishing
put('ca.publish.rule.instance.LdapCaCertRule.enable', 'false')
put('ca.publish.rule.instance.LdapCrlRule.enable', 'false')
put('ca.publish.rule.instance.LdapUserCertRule.enable', 'false')
put('ca.publish.rule.instance.LdapXCertRule.enable', 'false')
# Now disable LDAP publishing
ds.set('ca.publish.rule.instance.LdapCaCertRule.enable', 'false')
ds.set('ca.publish.rule.instance.LdapCrlRule.enable', 'false')
ds.set(
'ca.publish.rule.instance.LdapUserCertRule.enable',
'false'
)
ds.set('ca.publish.rule.instance.LdapXCertRule.enable', 'false')
# If we are the initial master then we are the CRL generator, otherwise
# we point to that master for CRLs.
if not self.clone:
# These next two are defaults, but I want to be explicit that the
# initial master is the CRL generator.
put('ca.crl.MasterCRL.enableCRLCache', 'true')
put('ca.crl.MasterCRL.enableCRLUpdates', 'true')
put('ca.listenToCloneModifications', 'true')
else:
put('ca.crl.MasterCRL.enableCRLCache', 'false')
put('ca.crl.MasterCRL.enableCRLUpdates', 'false')
put('ca.listenToCloneModifications', 'false')
# If we are the initial master then we are the CRL generator,
# otherwise we point to that master for CRLs.
if not self.clone:
# These next two are defaults, but I want to be explicit
# that the initial master is the CRL generator.
ds.set('ca.crl.MasterCRL.enableCRLCache', 'true')
ds.set('ca.crl.MasterCRL.enableCRLUpdates', 'true')
ds.set('ca.listenToCloneModifications', 'true')
else:
ds.set('ca.crl.MasterCRL.enableCRLCache', 'false')
ds.set('ca.crl.MasterCRL.enableCRLUpdates', 'false')
ds.set('ca.listenToCloneModifications', 'false')
def uninstall(self):
# just eat state

View File

@ -25,6 +25,7 @@ import logging
import socket
import getpass
import gssapi
import io
import ldif
import os
import re
@ -32,6 +33,7 @@ import fileinput
import sys
import tempfile
import shutil
import stat # pylint: disable=bad-python3-import
import traceback
import textwrap
from contextlib import contextmanager
@ -436,6 +438,82 @@ def unquote_directive_value(value, quote_char):
return unescaped_value
_SENTINEL = object()
class DirectiveSetter(object):
"""Safe directive setter
with DirectiveSetter('/path/to/conf') as ds:
ds.set(key, value)
"""
def __init__(self, filename, quotes=True, separator=' '):
self.filename = os.path.abspath(filename)
self.quotes = quotes
self.separator = separator
self.lines = None
self.stat = None
def __enter__(self):
with io.open(self.filename) as f:
self.stat = os.fstat(f.fileno())
self.lines = list(f)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
# something went wrong, reset
self.lines = None
self.stat = None
return
directory, prefix = os.path.split(self.filename)
# use tempfile in same directory to have atomic rename
fd, name = tempfile.mkstemp(prefix=prefix, dir=directory, text=True)
with io.open(fd, mode='w', closefd=True) as f:
for line in self.lines:
if not isinstance(line, six.text_type):
line = line.decode('utf-8')
f.write(line)
self.lines = None
os.fchmod(f.fileno(), stat.S_IMODE(self.stat.st_mode))
os.fchown(f.fileno(), self.stat.st_uid, self.stat.st_gid)
self.stat = None
# flush and sync tempfile inode
f.flush()
os.fsync(f.fileno())
# rename file and sync directory inode
os.rename(name, self.filename)
dirfd = os.open(directory, os.O_RDONLY | os.O_DIRECTORY)
try:
os.fsync(dirfd)
finally:
os.close(dirfd)
def set(self, directive, value, quotes=_SENTINEL, separator=_SENTINEL):
"""Set a single directive
"""
if quotes is _SENTINEL:
quotes = self.quotes
if separator is _SENTINEL:
separator = self.separator
# materialize lines
# set_directive_lines() modify item, shrink or enlage line count
self.lines = list(set_directive_lines(
quotes, separator, directive, value, self.lines
))
def setitems(self, items):
"""Set multiple directives from a dict or list with key/value pairs
"""
if isinstance(items, dict):
# dict-like, use sorted for stable order
items = sorted(items.items())
for k, v in items:
self.set(k, v)
def set_directive(filename, directive, value, quotes=True, separator=' '):
"""Set a name/value pair directive in a configuration file.
@ -455,8 +533,10 @@ def set_directive(filename, directive, value, quotes=True, separator=' '):
st = os.stat(filename)
with open(filename, 'r') as f:
lines = list(f) # read the whole file
new_lines = set_directive_lines(
quotes, separator, directive, value, lines)
# materialize new list
new_lines = list(set_directive_lines(
quotes, separator, directive, value, lines
))
with open(filename, 'w') as f:
# don't construct the whole string; write line-wise
for line in new_lines:
@ -480,8 +560,9 @@ def set_directive_lines(quotes, separator, k, v, lines):
new_line = ''.join([k, separator, v_quoted, '\n'])
found = False
matcher = re.compile(r'\s*{}'.format(re.escape(k + separator)))
for line in lines:
if re.match(r'\s*{}'.format(re.escape(k + separator)), line):
if matcher.match(line):
found = True
if v is not None:
yield new_line

View File

@ -3,8 +3,11 @@
#
import os
import shutil
import tempfile
import pytest
from ipaserver.install import installutils
EXAMPLE_CONFIG = [
@ -13,6 +16,17 @@ EXAMPLE_CONFIG = [
]
@pytest.fixture
def tempdir(request):
tempdir = tempfile.mkdtemp()
def fin():
shutil.rmtree(tempdir)
request.addfinalizer(fin)
return tempdir
class test_set_directive_lines(object):
def test_remove_directive(self):
lines = installutils.set_directive_lines(
@ -55,3 +69,56 @@ class test_set_directive(object):
finally:
os.remove(filename)
def test_directivesetter(tempdir):
filename = os.path.join(tempdir, 'example.conf')
with open(filename, 'w') as f:
for line in EXAMPLE_CONFIG:
f.write(line)
ds = installutils.DirectiveSetter(filename)
assert ds.lines is None
with ds:
assert ds.lines == EXAMPLE_CONFIG
ds.set('foo', '3') # quoted, space separated, doesn't change 'foo='
ds.set('foobar', None, separator='=') # remove
ds.set('baz', '4', False, '=') # add
ds.setitems([
('list1', 'value1'),
('list2', 'value2'),
])
ds.setitems({
'dict1': 'value1',
'dict2': 'value2',
})
with open(filename, 'r') as f:
lines = list(f)
assert lines == [
'foo=1\n',
'foo "3"\n',
'baz=4\n',
'list1 "value1"\n',
'list2 "value2"\n',
'dict1 "value1"\n',
'dict2 "value2"\n',
]
with installutils.DirectiveSetter(filename, True, '=') as ds:
ds.set('foo', '4') # doesn't change 'foo '
with open(filename, 'r') as f:
lines = list(f)
assert lines == [
'foo="4"\n',
'foo "3"\n',
'baz=4\n',
'list1 "value1"\n',
'list2 "value2"\n',
'dict1 "value1"\n',
'dict2 "value2"\n',
]