mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-01-08 15:23:00 -06:00
e5a9e46138
This function is designed to retrieve a value from an ini-like file. In particular PKI CS.cfg. In an attempt to be more efficient a substring search, using startswith(), is used before calling a regular expression match. The problem is that if the requested directive is a substring of a different one then it will pass the startswith() and fail the regular expression match with a ValueError, assuming it is malformed. There is no need for this. The caller must be able to handle None as a response anyway. So continue if no match is found. This was seen when PKI dropped storing certificate blobs in CS.cfg. The CA certificate is stored in ca.signing.cert. If it isn't present then ca.signing.certnickname will match the substring but not the directive. This should not be treated as an error. Fixes: https://pagure.io/freeipa/issue/9506 Signed-off-by: Rob Crittenden <rcritten@redhat.com> Reviewed-By: Florence Blanc-Renaud <flo@redhat.com>
236 lines
7.5 KiB
Python
236 lines
7.5 KiB
Python
#
|
|
# Copyright (C) 2018 FreeIPA Contributors see COPYING for license
|
|
#
|
|
|
|
import io
|
|
import os
|
|
import re
|
|
import stat
|
|
import tempfile
|
|
|
|
from ipapython.ipautil import unescape_seq, escape_seq
|
|
|
|
_SENTINEL = object()
|
|
|
|
|
|
class DirectiveSetter:
|
|
"""Safe directive setter
|
|
|
|
with DirectiveSetter('/path/to/conf') as ds:
|
|
ds.set(key, value)
|
|
"""
|
|
def __init__(self, filename, quotes=True, separator=' ', comment='#'):
|
|
self.filename = os.path.abspath(filename)
|
|
self.quotes = quotes
|
|
self.separator = separator
|
|
self.comment = comment
|
|
self.lines = None
|
|
self.stat = None
|
|
|
|
def __enter__(self):
|
|
with io.open(self.filename) as f:
|
|
self.stat = os.fstat(f.fileno())
|
|
self.lines = list(f)
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
if exc_type is not None:
|
|
# something went wrong, reset
|
|
self.lines = None
|
|
self.stat = None
|
|
return
|
|
|
|
directory, prefix = os.path.split(self.filename)
|
|
# use tempfile in same directory to have atomic rename
|
|
fd, name = tempfile.mkstemp(prefix=prefix, dir=directory, text=True)
|
|
with io.open(fd, mode='w', closefd=True) as f:
|
|
for line in self.lines:
|
|
if not isinstance(line, str):
|
|
line = line.decode('utf-8')
|
|
f.write(line)
|
|
self.lines = None
|
|
os.fchmod(f.fileno(), stat.S_IMODE(self.stat.st_mode))
|
|
os.fchown(f.fileno(), self.stat.st_uid, self.stat.st_gid)
|
|
self.stat = None
|
|
# flush and sync tempfile inode
|
|
f.flush()
|
|
os.fsync(f.fileno())
|
|
|
|
# rename file and sync directory inode
|
|
os.rename(name, self.filename)
|
|
dirfd = os.open(directory, os.O_RDONLY | os.O_DIRECTORY)
|
|
try:
|
|
os.fsync(dirfd)
|
|
finally:
|
|
os.close(dirfd)
|
|
|
|
def set(self, directive, value, quotes=_SENTINEL, separator=_SENTINEL,
|
|
comment=_SENTINEL):
|
|
"""Set a single directive
|
|
"""
|
|
if quotes is _SENTINEL:
|
|
quotes = self.quotes
|
|
if separator is _SENTINEL:
|
|
separator = self.separator
|
|
if comment is _SENTINEL:
|
|
comment = self.comment
|
|
# materialize lines
|
|
# set_directive_lines() modify item, shrink or enlage line count
|
|
self.lines = list(set_directive_lines(
|
|
quotes, separator, directive, value, self.lines, comment
|
|
))
|
|
|
|
def setitems(self, items):
|
|
"""Set multiple directives from a dict or list with key/value pairs
|
|
"""
|
|
if isinstance(items, dict):
|
|
# dict-like, use sorted for stable order
|
|
items = sorted(items.items())
|
|
for k, v in items:
|
|
self.set(k, v)
|
|
|
|
|
|
def set_directive(filename, directive, value, quotes=True, separator=' ',
|
|
comment='#'):
|
|
"""Set a name/value pair directive in a configuration file.
|
|
|
|
A value of None means to drop the directive.
|
|
|
|
Does not tolerate (or put) spaces around the separator.
|
|
|
|
:param filename: input filename
|
|
:param directive: directive name
|
|
:param value: value of the directive
|
|
:param quotes: whether to quote `value` in double quotes. If true, then
|
|
any existing double quotes are first escaped to avoid
|
|
unparseable directives.
|
|
:param separator: character serving as separator between directive and
|
|
value. Correct value required even when dropping a directive.
|
|
:param comment: comment character for the file to keep new values near
|
|
their commented-out counterpart
|
|
"""
|
|
st = os.stat(filename)
|
|
with open(filename, 'r') as f:
|
|
lines = list(f) # read the whole file
|
|
# materialize new list
|
|
new_lines = list(set_directive_lines(
|
|
quotes, separator, directive, value, lines, comment
|
|
))
|
|
with open(filename, 'w') as f:
|
|
# don't construct the whole string; write line-wise
|
|
for line in new_lines:
|
|
f.write(line)
|
|
os.chown(filename, st.st_uid, st.st_gid) # reset perms
|
|
|
|
|
|
def set_directive_lines(quotes, separator, k, v, lines, comment):
|
|
"""Set a name/value pair in a configuration (iterable of lines).
|
|
|
|
Replaces the value of the key if found, otherwise adds it at
|
|
end. If value is ``None``, remove the key if found.
|
|
|
|
Takes an iterable of lines (with trailing newline).
|
|
Yields lines (with trailing newline).
|
|
|
|
"""
|
|
new_line = ""
|
|
if v is not None:
|
|
v_quoted = quote_directive_value(v, '"') if quotes else v
|
|
new_line = ''.join([k, separator, v_quoted, '\n'])
|
|
|
|
# Special case: consider space as "whitespace" so tabs are allowed
|
|
if separator == ' ':
|
|
separator = '[ \t]+'
|
|
|
|
found = False
|
|
addnext = False # add on next line, found a comment
|
|
matcher = re.compile(r'\s*{}\s*{}'.format(re.escape(k), separator))
|
|
cmatcher = re.compile(r'\s*{}\s*{}\s*{}'.format(comment,
|
|
re.escape(k), separator))
|
|
for line in lines:
|
|
if matcher.match(line):
|
|
found = True
|
|
addnext = False
|
|
if v is not None:
|
|
yield new_line
|
|
elif addnext:
|
|
found = True
|
|
addnext = False
|
|
yield new_line
|
|
yield line
|
|
elif cmatcher.match(line):
|
|
addnext = True
|
|
yield line
|
|
else:
|
|
yield line
|
|
|
|
if not found and v is not None:
|
|
yield new_line
|
|
|
|
|
|
def get_directive(filename, directive, separator=' '):
|
|
"""
|
|
A rather inefficient way to get a configuration directive.
|
|
|
|
:param filename: input filename
|
|
:param directive: directive name
|
|
:param separator: separator between directive and value
|
|
|
|
:returns: The (unquoted) value if the directive was found, None otherwise
|
|
"""
|
|
# Special case: consider space as "whitespace" so tabs are allowed
|
|
if separator == ' ':
|
|
separator = '[ \t]+'
|
|
|
|
if directive is None:
|
|
return None
|
|
|
|
result = None
|
|
with open(filename, "r") as fd:
|
|
for line in fd:
|
|
if line.lstrip().startswith(directive):
|
|
line = line.strip()
|
|
|
|
match = re.match(
|
|
r'{}\s*{}\s*(.*)'.format(directive, separator), line)
|
|
if match:
|
|
value = match.group(1)
|
|
else:
|
|
continue
|
|
|
|
result = unquote_directive_value(value.strip(), '"')
|
|
result = result.strip(' ')
|
|
break
|
|
return result
|
|
|
|
|
|
def quote_directive_value(value, quote_char):
|
|
"""Quote a directive value
|
|
:param value: string to quote
|
|
:param quote_char: character which is used for quoting. All prior
|
|
occurences will be escaped before quoting to avoid unparseable value.
|
|
:returns: processed value
|
|
"""
|
|
if value.startswith(quote_char) and value.endswith(quote_char):
|
|
return value
|
|
|
|
return "{quote}{value}{quote}".format(
|
|
quote=quote_char,
|
|
value="".join(escape_seq(quote_char, value))
|
|
)
|
|
|
|
|
|
def unquote_directive_value(value, quote_char):
|
|
"""Unquote a directive value
|
|
:param value: string to unquote
|
|
:param quote_char: character to strip. All escaped occurences of
|
|
`quote_char` will be uncescaped during processing
|
|
:returns: processed value
|
|
"""
|
|
unescaped_value = "".join(unescape_seq(quote_char, value))
|
|
if (unescaped_value.startswith(quote_char) and
|
|
unescaped_value.endswith(quote_char)):
|
|
return unescaped_value[1:-1]
|
|
|
|
return unescaped_value
|