freeipa/ipaserver/install/ipa_acme_manage.py
Rob Crittenden 724c8314b5 Fix setting values of 0 in ACME pruning
Replace comparisons of "if value" with "if value is not None"
in order to handle 0.

Add a short reference to the man page to indicat that a cert
or request retention time of 0 means remove at the next
execution.

Also indicate that the search time limit is in seconds.

Fixes: https://pagure.io/freeipa/issue/9325

Signed-off-by: Rob Crittenden <rcritten@redhat.com>
Reviewed-By: Francisco Trivino <ftrivino@redhat.com>
2023-02-09 13:28:23 -05:00

414 lines
16 KiB
Python

#
# Copyright (C) 2020 FreeIPA Contributors see COPYING for license
#
import enum
import pki.util
import logging
from optparse import OptionGroup # pylint: disable=deprecated-module
from ipalib import api, errors, x509
from ipalib import _
from ipalib.facts import is_ipa_configured
from ipaplatform.paths import paths
from ipapython.admintool import AdminTool
from ipapython import cookie, dogtag
from ipapython.ipautil import run
from ipapython.certdb import NSSDatabase, EXTERNAL_CA_TRUST_FLAGS
from ipaserver.install import cainstance
from ipaserver.install.ca import lookup_random_serial_number_version
from ipaserver.plugins.dogtag import RestClient
logger = logging.getLogger(__name__)
default_pruning_options = {
'certRetentionTime': '30',
'certRetentionUnit': 'day',
'certSearchSizeLimit': '1000',
'certSearchTimeLimit': '0',
'requestRetentionTime': 'day',
'requestRetentionUnit': '30',
'requestSearchSizeLimit': '1000',
'requestSearchTimeLimit': '0',
'cron': ''
}
pruning_labels = {
'certRetentionTime': 'Certificate Retention Time',
'certRetentionUnit': 'Certificate Retention Unit',
'certSearchSizeLimit': 'Certificate Search Size Limit',
'certSearchTimeLimit': 'Certificate Search Time Limit',
'requestRetentionTime': 'Request Retention Time',
'requestRetentionUnit': 'Request Retention Unit',
'requestSearchSizeLimit': 'Request Search Size Limit',
'requestSearchTimeLimit': 'Request Search Time Limit',
'cron': 'cron Schedule'
}
def validate_range(val, min, max):
"""dogtag appears to have no error checking in the cron
entry so do some minimum amount of validation. It is
left as an exercise for the user to do month/day
validation so requesting Feb 31 will be accepted.
Only * and a number within a min/max range are allowed.
"""
if val == '*':
return
if '-' in val or '/' in val:
raise ValueError(f"{val} ranges are not supported")
try:
int(val)
except ValueError:
# raise a clearer error
raise ValueError(f"{val} is not a valid integer")
if int(val) < min or int(val) > max:
raise ValueError(f"{val} not within the range {min}-{max}")
# Manages the FreeIPA ACME service on a per-server basis.
#
# This program is a stop-gap until the deployment-wide management of
# the ACME service is implemented. So we will eventually have API
# calls for managing the ACME service, e.g. `ipa acme-enable'.
# After that is implemented, we can either deprecate and eventually
# remove this program, or make it a wrapper for the API commands.
class acme_state(RestClient):
def _request(self, url, headers=None):
headers = headers or {}
return dogtag.https_request(
self.ca_host, 8443,
url=url,
cafile=self.ca_cert,
client_certfile=paths.RA_AGENT_PEM,
client_keyfile=paths.RA_AGENT_KEY,
headers=headers,
method='POST'
)
def __enter__(self):
status, resp_headers, _unused = self._request('/acme/login')
cookies = cookie.Cookie.parse(resp_headers.get('set-cookie', ''))
if status != 200 or len(cookies) == 0:
raise errors.RemoteRetrieveError(
reason=_('Failed to authenticate to CA REST API')
)
object.__setattr__(self, 'cookie', str(cookies[0]))
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Log out of the REST API"""
headers = dict(Cookie=self.cookie)
status, unused, _unused = self._request('/acme/logout', headers=headers)
object.__setattr__(self, 'cookie', None)
if status != 204:
raise RuntimeError('Failed to logout')
def enable(self):
headers = dict(Cookie=self.cookie)
status, unused, _unused = self._request('/acme/enable', headers=headers)
if status != 200:
raise RuntimeError('Failed to enable ACME')
def disable(self):
headers = dict(Cookie=self.cookie)
status, unused, _unused = self._request('/acme/disable',
headers=headers)
if status != 200:
raise RuntimeError('Failed to disable ACME')
class Command(enum.Enum):
ENABLE = 'enable'
DISABLE = 'disable'
STATUS = 'status'
PRUNE = 'pruning'
class IPAACMEManage(AdminTool):
command_name = "ipa-acme-manage"
usage = "%prog [enable|disable|status|pruning]"
description = "Manage the IPA ACME service"
@classmethod
def add_options(cls, parser):
group = OptionGroup(parser, 'Pruning')
group.add_option(
"--enable", dest="enable", action="store_true",
default=False, help="Enable certificate pruning")
group.add_option(
"--disable", dest="disable", action="store_true",
default=False, help="Disable certificate pruning")
group.add_option(
"--cron", dest="cron", action="store",
default=None, help="Configure the pruning cron job")
group.add_option(
"--certretention", dest="certretention", action="store",
default=None, help="Certificate retention time", type=int)
group.add_option(
"--certretentionunit", dest="certretentionunit", action="store",
choices=['minute', 'hour', 'day', 'year'],
default=None, help="Certificate retention units")
group.add_option(
"--certsearchsizelimit", dest="certsearchsizelimit",
action="store",
default=None, help="LDAP search size limit", type=int)
group.add_option(
"--certsearchtimelimit", dest="certsearchtimelimit", action="store",
default=None, help="LDAP search time limit", type=int)
group.add_option(
"--requestretention", dest="requestretention", action="store",
default=None, help="Request retention time", type=int)
group.add_option(
"--requestretentionunit", dest="requestretentionunit",
choices=['minute', 'hour', 'day', 'year'],
action="store", default=None, help="Request retention units")
group.add_option(
"--requestsearchsizelimit", dest="requestsearchsizelimit",
action="store",
default=None, help="LDAP search size limit", type=int)
group.add_option(
"--requestsearchtimelimit", dest="requestsearchtimelimit",
action="store",
default=None, help="LDAP search time limit", type=int)
group.add_option(
"--config-show", dest="config_show", action="store_true",
default=False, help="Show the current pruning configuration")
group.add_option(
"--run", dest="run", action="store_true",
default=False, help="Run the pruning job now")
parser.add_option_group(group)
super(IPAACMEManage, cls).add_options(parser, debug_option=True)
def validate_options(self):
super(IPAACMEManage, self).validate_options(needs_root=True)
if len(self.args) < 1:
self.option_parser.error(f'missing command argument')
if self.args[0] == "pruning":
if self.options.enable and self.options.disable:
self.option_parser.error("Cannot both enable and disable")
elif (
any(
[
self.options.enable,
self.options.disable,
self.options.cron,
self.options.certretention is not None,
self.options.certretentionunit,
self.options.requestretention is not None,
self.options.requestretentionunit,
self.options.certsearchsizelimit is not None,
self.options.certsearchtimelimit is not None,
self.options.requestsearchsizelimit is not None,
self.options.requestsearchtimelimit is not None,
]
)
and (self.options.config_show or self.options.run)
):
self.option_parser.error(
"Cannot change and show config or run at the same time"
)
elif self.options.cron:
if len(self.options.cron.split()) != 5:
self.option_parser.error("Invalid format for --cron")
# dogtag does no validation when setting this option so
# do the minimum. The dogtag cron is limited compared to
# crontab(5).
opt = self.options.cron.split()
validate_range(opt[0], 0, 59)
validate_range(opt[1], 0, 23)
validate_range(opt[2], 1, 31)
validate_range(opt[3], 1, 12)
validate_range(opt[4], 0, 6)
try:
self.command = Command(self.args[0])
except ValueError:
self.option_parser.error(f'unknown command "{self.args[0]}"')
def check_san_status(self):
"""
Require the Apache cert to have ipa-ca.$DOMAIN SAN
"""
cert = x509.load_certificate_from_file(paths.HTTPD_CERT_FILE)
cainstance.check_ipa_ca_san(cert)
def pruning(self):
def run_pki_server(command, directive, prefix, value=None):
"""Take a set of arguments to append to pki-server"""
args = [
'pki-server', command,
f'{prefix}.{directive}'
]
if value is not None:
args.extend([str(value)])
logger.debug(args)
result = run(args, raiseonerr=False, capture_output=True,
capture_error=True)
if result.returncode != 0:
raise RuntimeError(result.error_output)
return result
def ca_config_set(directive, value,
prefix='jobsScheduler.job.pruning'):
run_pki_server('ca-config-set', directive, prefix, value)
# ca-config-set always succeeds, even if the option is
# not supported.
newvalue = ca_config_show(directive)
if str(value) != newvalue.strip():
raise RuntimeError('Updating %s failed' % directive)
def ca_config_show(directive):
result = run_pki_server('ca-config-show', directive,
prefix='jobsScheduler.job.pruning')
return result.output.strip()
def config_show():
status = ca_config_show('enabled')
if status.strip() == 'true':
print("Status: enabled")
else:
print("Status: disabled")
for option in (
'certRetentionTime', 'certRetentionUnit',
'certSearchSizeLimit', 'certSearchTimeLimit',
'requestRetentionTime', 'requestRetentionUnit',
'requestSearchSizeLimit', 'requestSearchTimeLimit',
'cron',
):
value = ca_config_show(option)
if value:
print("{}: {}".format(pruning_labels[option], value))
else:
print("{}: {}".format(pruning_labels[option],
default_pruning_options[option]))
def run_pruning():
"""Run the pruning job manually"""
with NSSDatabase() as tmpdb:
print("Preparing...")
tmpdb.create_db()
tmpdb.import_files((paths.RA_AGENT_PEM, paths.RA_AGENT_KEY),
import_keys=True)
tmpdb.import_files((paths.IPA_CA_CRT,))
for nickname, trust_flags in tmpdb.list_certs():
if trust_flags.has_key:
ra_nickname = nickname
continue
# external is suffucient for our purposes: C,,
tmpdb.trust_root_cert(nickname, EXTERNAL_CA_TRUST_FLAGS)
print("Starting job...")
args = ['pki', '-C', tmpdb.pwd_file, '-d', tmpdb.secdir,
'-n', ra_nickname,
'ca-job-start', 'pruning']
logger.debug(args)
run(args, stdin='y')
pki_version = pki.util.Version(pki.specification_version())
if pki_version < pki.util.Version("11.3.0"):
raise RuntimeError(
'Certificate pruning is not supported in PKI version %s'
% pki_version
)
if lookup_random_serial_number_version(api) == 0:
raise RuntimeError(
'Certificate pruning requires random serial numbers'
)
if self.options.config_show:
config_show()
return
if self.options.run:
run_pruning()
return
# Don't play the enable/disable at the same time game
if self.options.enable:
ca_config_set('owner', 'ipara')
ca_config_set('enabled', 'true')
ca_config_set('enabled', 'true', 'jobsScheduler')
elif self.options.disable:
ca_config_set('enabled', 'false')
# pki-server ca-config-set can only set one option at a time so
# loop through all the options and set what is there.
if self.options.certretention is not None:
ca_config_set('certRetentionTime',
self.options.certretention)
if self.options.certretentionunit:
ca_config_set('certRetentionUnit',
self.options.certretentionunit)
if self.options.certsearchtimelimit is not None:
ca_config_set('certSearchTimeLimit',
self.options.certsearchtimelimit)
if self.options.certsearchsizelimit is not None:
ca_config_set('certSearchSizeLimit',
self.options.certsearchsizelimit)
if self.options.requestretention is not None:
ca_config_set('requestRetentionTime',
self.options.requestretention)
if self.options.requestretentionunit:
ca_config_set('requestRetentionUnit',
self.options.requestretentionunit)
if self.options.requestsearchsizelimit is not None:
ca_config_set('requestSearchSizeLimit',
self.options.requestsearchsizelimit)
if self.options.requestsearchtimelimit is not None:
ca_config_set('requestSearchTimeLimit',
self.options.requestsearchtimelimit)
if self.options.cron:
ca_config_set('cron', self.options.cron)
config_show()
print("The CA service must be restarted for changes to take effect")
def run(self):
if not is_ipa_configured():
print("IPA is not configured.")
return 2
if not cainstance.is_ca_installed_locally():
print("CA is not installed on this server.")
return 3
api.bootstrap(in_server=True, confdir=paths.ETC_IPA)
api.finalize()
api.Backend.ldap2.connect()
state = acme_state(api)
with state as ca_api:
if self.command == Command.ENABLE:
self.check_san_status()
ca_api.enable()
elif self.command == Command.DISABLE:
ca_api.disable()
elif self.command == Command.STATUS:
status = "enabled" if dogtag.acme_status() else "disabled"
print("ACME is {}".format(status))
elif self.command == Command.PRUNE:
self.pruning()
else:
raise RuntimeError('programmer error: unhandled enum case')
return 0