mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2024-12-24 16:10:02 -06:00
7a482b7c72
Deprecate `ipa_log_manager.root_logger` and replace all calls to it with module-level logger calls. Reviewed-By: Martin Basti <mbasti@redhat.com>
521 lines
16 KiB
Python
521 lines
16 KiB
Python
# Authors: Tomas Babej <tbabej@redhat.com>
|
|
#
|
|
# Copyright (C) 2013 Red Hat
|
|
# see file 'COPYING' for use and warranty information
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
|
|
from __future__ import print_function
|
|
|
|
from contextlib import contextmanager
|
|
import logging
|
|
import os
|
|
from textwrap import wrap
|
|
|
|
from ipalib import api
|
|
from ipalib.plugable import Plugin, API
|
|
from ipalib.errors import ValidationError
|
|
from ipaplatform.paths import paths
|
|
from ipapython import admintool
|
|
from ipapython.ipa_log_manager import Filter
|
|
|
|
|
|
"""
|
|
To add configuration instructions for a new use case, define a new class that
|
|
inherits from Advice class.
|
|
|
|
You should create a plugin file for it in ipaserver/advise/plugins folder.
|
|
|
|
The class can run any arbitrary code or IPA command via api.Command['command']()
|
|
calls. It needs to override get_info() method, which returns the formatted
|
|
advice string.
|
|
|
|
Important! Do not forget to register the class to the API.
|
|
|
|
>>> @register()
|
|
>>> class sample_advice(Advice):
|
|
>>> description = 'Instructions for machine with SSSD 1.0 setup.'
|
|
|
|
Description provided shows itself as a header and in the list of all advices
|
|
currently available via ipa-advise.
|
|
|
|
Optionally, you can require root privileges for your plugin:
|
|
|
|
>>> require_root = True
|
|
|
|
The following method should be implemented in your plugin:
|
|
|
|
>>> def get_info():
|
|
>>> self.log.debug('Entering execute() method')
|
|
>>> self.log.comment('Providing useful advice just for you')
|
|
>>> self.log.command('yum update sssd -y')
|
|
|
|
As you can see, Advice's log has 3 different levels. Debug lines are printed
|
|
out with '# DEBUG:' prefix if --verbose had been used. Comment lines utilize
|
|
'# ' prefix and command lines are printed raw.
|
|
|
|
Please note that comments are automatically wrapped after 70 characters.
|
|
Use wrapped=False option to force the unwrapped line in the comment.
|
|
|
|
>>> self.log.comment("This line should not be wrapped", wrapped=False)
|
|
|
|
As a result, you can redirect the advice's output directly to a script file.
|
|
|
|
# ipa-advise sample-advice > script.sh
|
|
# ./script.sh
|
|
"""
|
|
|
|
DEFAULT_INDENTATION_INCREMENT = 2
|
|
|
|
|
|
class _IndentationTracker(object):
|
|
"""
|
|
A simple wrapper that tracks the indentation level of the generated bash
|
|
commands
|
|
"""
|
|
def __init__(self, spaces_per_indent=0):
|
|
if spaces_per_indent <= 0:
|
|
raise ValueError(
|
|
"Indentation increments cannot be zero or negative")
|
|
self.spaces_per_indent = spaces_per_indent
|
|
self._indentation_stack = []
|
|
self._total_indentation_level = 0
|
|
|
|
@property
|
|
def indentation_string(self):
|
|
"""
|
|
return a string containing number of spaces corresponding to
|
|
indentation level
|
|
"""
|
|
return " " * self._total_indentation_level
|
|
|
|
def indent(self):
|
|
"""
|
|
track a single indentation of the generated code
|
|
"""
|
|
self._indentation_stack.append(self.spaces_per_indent)
|
|
self._recompute_indentation_level()
|
|
|
|
def _recompute_indentation_level(self):
|
|
"""
|
|
Track total indentation level of the generated code
|
|
"""
|
|
self._total_indentation_level = sum(self._indentation_stack)
|
|
|
|
def dedent(self):
|
|
"""
|
|
track a single dedentation of the generated code
|
|
dedents that would result in zero or negative indentation level will be
|
|
ignored
|
|
"""
|
|
try:
|
|
self._indentation_stack.pop()
|
|
except IndexError:
|
|
# can not dedent any further
|
|
pass
|
|
|
|
self._recompute_indentation_level()
|
|
|
|
|
|
class CompoundStatement(object):
|
|
"""
|
|
Wrapper around indented blocks of Bash statements.
|
|
|
|
Override `begin_statement` and `end_statement` methods to issue
|
|
opening/closing commands using the passed in _AdviceOutput instance
|
|
"""
|
|
|
|
def __init__(self, advice_output):
|
|
self.advice_output = advice_output
|
|
|
|
def __enter__(self):
|
|
self.begin_statement()
|
|
self.advice_output.indent()
|
|
|
|
def begin_statement(self):
|
|
pass
|
|
|
|
def __exit__(self, exc_type, exc_value, traceback):
|
|
self.advice_output.dedent()
|
|
self.end_statement()
|
|
|
|
def end_statement(self):
|
|
pass
|
|
|
|
|
|
class IfBranch(CompoundStatement):
|
|
"""
|
|
Base wrapper around `if` branch. The closing statement is empty so it
|
|
leaves trailing block that can be closed off or continued by else branches
|
|
"""
|
|
def __init__(self, advice_output, conditional):
|
|
super(IfBranch, self).__init__(advice_output)
|
|
self.conditional = conditional
|
|
|
|
def begin_statement(self):
|
|
self.advice_output.command('if {}'.format(self.conditional))
|
|
self.advice_output.command('then')
|
|
|
|
|
|
class ElseIfBranch(CompoundStatement):
|
|
"""
|
|
Wrapper for `else if <CONDITIONAL>`
|
|
"""
|
|
def __init__(self, advice_output, alternative_conditional):
|
|
super(ElseIfBranch, self).__init__(advice_output)
|
|
self.alternative_conditional = alternative_conditional
|
|
|
|
def begin_statement(self):
|
|
command = 'else if {}'.format(self.alternative_conditional)
|
|
|
|
self.advice_output.command(command)
|
|
|
|
|
|
class ElseBranch(CompoundStatement):
|
|
"""
|
|
Wrapper for final `else` block
|
|
"""
|
|
def begin_statement(self):
|
|
self.advice_output.command('else')
|
|
|
|
def end_statement(self):
|
|
self.advice_output.command('fi')
|
|
|
|
|
|
class UnbranchedIfStatement(IfBranch):
|
|
"""
|
|
Plain `if` without branches
|
|
"""
|
|
def end_statement(self):
|
|
self.advice_output.command('fi')
|
|
|
|
|
|
class ForLoop(CompoundStatement):
|
|
"""
|
|
Wrapper around the for loop
|
|
"""
|
|
def __init__(self, advice_output, loop_variable, iterable):
|
|
super(ForLoop, self).__init__(advice_output)
|
|
self.loop_variable = loop_variable
|
|
self.iterable = iterable
|
|
|
|
def begin_statement(self):
|
|
self.advice_output.command(
|
|
'for {} in {}'.format(self.loop_variable, self.iterable))
|
|
self.advice_output.command('do')
|
|
|
|
def end_statement(self):
|
|
self.advice_output.command('done')
|
|
|
|
|
|
class _AdviceOutput(object):
|
|
|
|
def __init__(self):
|
|
self.content = []
|
|
self.prefix = '# '
|
|
self.options = None
|
|
self._indentation_tracker = _IndentationTracker(
|
|
spaces_per_indent=DEFAULT_INDENTATION_INCREMENT)
|
|
|
|
def indent(self):
|
|
"""
|
|
Indent the statements by one level
|
|
"""
|
|
self._indentation_tracker.indent()
|
|
|
|
def dedent(self):
|
|
"""
|
|
Dedent the statements by one level
|
|
"""
|
|
self._indentation_tracker.dedent()
|
|
|
|
@contextmanager
|
|
def indented_block(self):
|
|
self.indent()
|
|
try:
|
|
yield
|
|
finally:
|
|
self.dedent()
|
|
|
|
def comment(self, line, wrapped=True):
|
|
if wrapped:
|
|
self.append_wrapped_and_indented_comment(line)
|
|
else:
|
|
self.append_comment(line)
|
|
|
|
def append_wrapped_and_indented_comment(self, line, character_limit=70):
|
|
"""
|
|
append wrapped and indented comment to the output
|
|
"""
|
|
for wrapped_indented_line in wrap(
|
|
self.indent_statement(line), character_limit):
|
|
self.append_comment(wrapped_indented_line)
|
|
|
|
def append_comment(self, line):
|
|
self.append_statement(self.prefix + line)
|
|
|
|
def append_statement(self, statement):
|
|
"""
|
|
Append a line to the generated content indenting it by tracked number
|
|
of spaces
|
|
"""
|
|
self.content.append(self.indent_statement(statement))
|
|
|
|
def indent_statement(self, statement):
|
|
return '{indent}{statement}'.format(
|
|
indent=self._indentation_tracker.indentation_string,
|
|
statement=statement)
|
|
|
|
def debug(self, line):
|
|
if self.options.verbose:
|
|
self.comment('DEBUG: ' + line)
|
|
|
|
def command(self, line):
|
|
self.append_statement(line)
|
|
|
|
def echo_error(self, error_message):
|
|
self.command(self._format_error(error_message))
|
|
|
|
def _format_error(self, error_message):
|
|
return 'echo "{}" >&2'.format(error_message)
|
|
|
|
def exit_on_failed_command(self, command_to_run,
|
|
error_message_lines):
|
|
self.command(command_to_run)
|
|
self.exit_on_predicate(
|
|
'[ "$?" -ne "0" ]',
|
|
error_message_lines)
|
|
|
|
def exit_on_nonroot_euid(self):
|
|
self.exit_on_predicate(
|
|
'[ "$(id -u)" -ne "0" ]',
|
|
["This script has to be run as root user"]
|
|
)
|
|
|
|
def exit_on_predicate(self, predicate, error_message_lines):
|
|
with self.unbranched_if(predicate):
|
|
for error_message_line in error_message_lines:
|
|
self.command(self._format_error(error_message_line))
|
|
|
|
self.command('exit 1')
|
|
|
|
@contextmanager
|
|
def unbranched_if(self, predicate):
|
|
with self._compound_statement(UnbranchedIfStatement, predicate):
|
|
yield
|
|
|
|
@contextmanager
|
|
def _compound_statement(self, statement_cls, *args):
|
|
with statement_cls(self, *args):
|
|
yield
|
|
|
|
def commands_on_predicate(self, predicate, commands_to_run_when_true,
|
|
commands_to_run_when_false=None):
|
|
if commands_to_run_when_false is not None:
|
|
if_statement = self.if_branch
|
|
else:
|
|
if_statement = self.unbranched_if
|
|
|
|
with if_statement(predicate):
|
|
for command_to_run_when_true in commands_to_run_when_true:
|
|
self.command(
|
|
command_to_run_when_true)
|
|
|
|
if commands_to_run_when_false is not None:
|
|
with self.else_branch():
|
|
for command_to_run_when_false in commands_to_run_when_false:
|
|
self.command(command_to_run_when_false)
|
|
|
|
@contextmanager
|
|
def if_branch(self, predicate):
|
|
with self._compound_statement(IfBranch, predicate):
|
|
yield
|
|
|
|
@contextmanager
|
|
def else_branch(self):
|
|
with self._compound_statement(ElseBranch):
|
|
yield
|
|
|
|
@contextmanager
|
|
def else_if_branch(self, predicate):
|
|
with self._compound_statement(ElseIfBranch, predicate):
|
|
yield
|
|
|
|
@contextmanager
|
|
def for_loop(self, loop_variable, iterable):
|
|
with self._compound_statement(ForLoop, loop_variable, iterable):
|
|
yield
|
|
|
|
|
|
class Advice(Plugin):
|
|
"""
|
|
Base class for advices, plugins for ipa-advise.
|
|
"""
|
|
|
|
options = None
|
|
require_root = False
|
|
description = ''
|
|
|
|
def __init__(self, api):
|
|
super(Advice, self).__init__(api)
|
|
self.log = _AdviceOutput()
|
|
|
|
def set_options(self, options):
|
|
self.options = options
|
|
self.log.options = options
|
|
|
|
def get_info(self):
|
|
"""
|
|
This method should be overridden by child Advices.
|
|
|
|
Returns a string with instructions.
|
|
"""
|
|
|
|
raise NotImplementedError
|
|
|
|
|
|
class AdviseAPI(API):
|
|
bases = (Advice,)
|
|
|
|
@property
|
|
def packages(self):
|
|
import ipaserver.advise.plugins
|
|
return (ipaserver.advise.plugins,)
|
|
|
|
advise_api = AdviseAPI()
|
|
|
|
|
|
class IpaAdvise(admintool.AdminTool):
|
|
"""
|
|
Admin tool that given systems's configuration provides instructions how to
|
|
configure the systems for various use cases.
|
|
"""
|
|
|
|
command_name = 'ipa-advise'
|
|
usage = "%prog ADVICE"
|
|
description = "Provides configuration advice for various use cases. To "\
|
|
"see the list of possible ADVICEs, run ipa-advise without "\
|
|
"any arguments."
|
|
|
|
def __init__(self, options, args):
|
|
super(IpaAdvise, self).__init__(options, args)
|
|
|
|
@classmethod
|
|
def add_options(cls, parser):
|
|
super(IpaAdvise, cls).add_options(parser)
|
|
|
|
def validate_options(self):
|
|
super(IpaAdvise, self).validate_options(needs_root=False)
|
|
|
|
if len(self.args) > 1:
|
|
raise self.option_parser.error("You can only provide one "
|
|
"positional argument.")
|
|
|
|
def log_success(self):
|
|
pass
|
|
|
|
def print_config_list(self):
|
|
self.print_header('List of available advices')
|
|
|
|
max_keyword_len = max(
|
|
(len(advice.name) for advice in advise_api.Advice))
|
|
|
|
for advice in advise_api.Advice:
|
|
description = getattr(advice, 'description', '')
|
|
keyword = advice.name.replace('_', '-')
|
|
|
|
# Compute the number of spaces needed for the table to be aligned
|
|
offset = max_keyword_len - len(keyword)
|
|
prefix = " {key} {off}: ".format(key=keyword, off=' ' * offset)
|
|
wrapped_description = wrap(description, 80 - len(prefix))
|
|
|
|
# Print the first line with the prefix (keyword)
|
|
print(prefix + wrapped_description[0])
|
|
|
|
# Print the rest wrapped behind the colon
|
|
for line in wrapped_description[1:]:
|
|
print("{off}{line}".format(off=' ' * len(prefix), line=line))
|
|
|
|
def print_header(self, header, print_shell=False):
|
|
header_size = len(header)
|
|
|
|
prefix = ''
|
|
if print_shell:
|
|
prefix = '# '
|
|
print('#!/bin/sh')
|
|
|
|
# Do not print out empty header
|
|
if header_size > 0:
|
|
print((prefix + '-' * 70))
|
|
for line in wrap(header, 70):
|
|
print((prefix + line))
|
|
print((prefix + '-' * 70))
|
|
|
|
def print_advice(self, keyword):
|
|
advice = getattr(advise_api.Advice, keyword, None)
|
|
|
|
# Ensure that Configuration class for given --setup option value exists
|
|
if advice is None:
|
|
raise ValidationError(
|
|
name="advice",
|
|
error="No instructions are available for '{con}'. "
|
|
"See the list of available configuration "
|
|
"by invoking the ipa-advise command with no argument."
|
|
.format(con=keyword.replace('_', '-')))
|
|
|
|
# Check whether root privileges are needed
|
|
if advice.require_root and os.getegid() != 0:
|
|
raise admintool.ScriptError(
|
|
'Must be root to get advice for {adv}'
|
|
.format(adv=keyword.replace('_', '-')), 1)
|
|
|
|
# Print out nicely formatted header
|
|
self.print_header(advice.description, print_shell=True)
|
|
|
|
# Set options so that plugin can use verbose/quiet options
|
|
advice.set_options(self.options)
|
|
|
|
# Print out the actual advice
|
|
api.Backend.rpcclient.connect()
|
|
advice.get_info()
|
|
api.Backend.rpcclient.disconnect()
|
|
for line in advice.log.content:
|
|
print(line)
|
|
|
|
def run(self):
|
|
super(IpaAdvise, self).run()
|
|
|
|
api.bootstrap(in_server=False,
|
|
context='cli',
|
|
confdir=paths.ETC_IPA)
|
|
api.finalize()
|
|
advise_api.bootstrap(in_server=False,
|
|
context='cli',
|
|
confdir=paths.ETC_IPA)
|
|
advise_api.finalize()
|
|
if not self.options.verbose:
|
|
# Do not print connection information by default
|
|
logger_name = r'ipalib\.rpc'
|
|
root_logger = logging.getLogger()
|
|
root_logger.addFilter(Filter(logger_name, logging.WARNING))
|
|
|
|
# With no argument, print the list out and exit
|
|
if not self.args:
|
|
self.print_config_list()
|
|
return
|
|
else:
|
|
keyword = self.args[0].replace('-', '_')
|
|
self.print_advice(keyword)
|