#!/usr/bin/python3 """Test client for ipa-custodia The test script is expected to be executed on an IPA server with existing Custodia server keys. """ from __future__ import print_function import argparse import logging import os import platform import warnings from custodia.message.kem import KEY_USAGE_SIG, KEY_USAGE_ENC, KEY_USAGE_MAP from jwcrypto.common import json_decode from jwcrypto.jwk import JWK from ipalib import api from ipalib.facts import is_ipa_configured from ipaplatform.paths import paths import ipapython.version try: # FreeIPA >= 4.5 from ipaserver.secrets.client import CustodiaClient except ImportError: # FreeIPA <= 4.4 from ipapython.secrets.client import CustodiaClient # Ignore security warning from vendored and non-vendored urllib3 try: from urllib3.exceptions import SecurityWarning except ImportError: SecurityWarning = None else: warnings.simplefilter("ignore", SecurityWarning) try: from requests.packages.urllib3.exceptions import SecurityWarning except ImportError: SecurityWarning = None else: warnings.simplefilter("ignore", SecurityWarning) KEYS = [ 'dm/DMHash', 'ra/ipaCert', 'ca/auditSigningCert cert-pki-ca', 'ca_wrapped/auditSigningCert cert-pki-ca', 'ca_wrapped/auditSigningCert cert-pki-ca/1.2.840.113549.3.7', 'ca/caSigningCert cert-pki-ca', 'ca/ocspSigningCert cert-pki-ca', 'ca/subsystemCert cert-pki-ca', ] IPA_CUSTODIA_KEYFILE = os.path.join(paths.IPA_CUSTODIA_CONF_DIR, 'server.keys') logger = logging.getLogger('ipa-custodia-tester') parser = argparse.ArgumentParser( "IPA Custodia check", ) # --store is dangerous and therefore hidden! Don't use it unless you really # know what you are doing! Keep in mind that it might destroy your NSSDB # unless it uses sqlite format. parser.add_argument( "--store", action='store_true', dest='store', help=argparse.SUPPRESS ) parser.add_argument( "--debug", action='store_true', help="Debug mode" ) parser.add_argument( "--verbose", action='store_true', help='Verbose mode' ) parser.add_argument( "server", help="FQDN of a IPA server (can be own FQDN for self-test)" ) parser.add_argument( 'keys', nargs='*', default=KEYS, help="Remote key ({})".format(', '.join(KEYS)) ) class IPACustodiaTester: files = [ paths.IPA_DEFAULT_CONF, paths.KRB5_KEYTAB, paths.IPA_CUSTODIA_CONF, IPA_CUSTODIA_KEYFILE ] def __init__(self, parser, args): self.parser = parser self.args = args if not api.isdone('bootstrap'): # bootstrap to initialize api.env api.bootstrap(log=None) self.debug("IPA API bootstrapped") self.realm = api.env.realm self.host = api.env.host self.host_spn = 'host/{}@{}'.format(self.host, self.realm) self.server_spn = 'host/{}@{}'.format(self.args.server, self.realm) self.client = None self._errors = [] def error(self, msg, fatal=False): self._errors.append(msg) logger.error(msg, exc_info=self.args.verbose) if fatal: self.exit() def exit(self): if self._errors: self.parser.exit(1, "[ERROR] One or more tests have failed.\n") else: self.parser.exit(0, "All tests have passed successfully.\n") def warning(self, msg): logger.warning(msg) def info(self, msg): logger.info(msg) def debug(self, msg): logger.debug(msg) def check(self): self.status() self.check_files() self.check_client() self.check_jwk() self.check_keys() def status(self): self.info("Platform: {}".format(platform.platform())) self.info("IPA version: {}".format( ipapython.version.VERSION )) self.info("IPA vendor version: {}".format( ipapython.version.VENDOR_VERSION )) self.info("Realm: {}".format(self.realm)) self.info("Host: {}".format(self.host)) self.info("Remote server: {}".format(self.args.server)) if self.host == self.args.server: self.warning("Performing self-test only.") def check_files(self): for filename in self.files: if not os.path.isfile(filename): self.error("File '{0}' is missing.".format(filename)) else: self.info("File '{0}' exists.".format(filename)) def check_client(self): try: self.client = CustodiaClient( server=self.args.server, client_service='host@{}'.format(self.host), keyfile=IPA_CUSTODIA_KEYFILE, keytab=paths.KRB5_KEYTAB, realm=self.realm, ) except Exception as e: self.error("Failed to create client: {}".format(e), fatal=True) else: self.info("Custodia client created.") def _check_jwk_single(self, usage_id): usage = KEY_USAGE_MAP[usage_id] with open(IPA_CUSTODIA_KEYFILE) as f: dictkeys = json_decode(f.read()) try: pkey = JWK(**dictkeys[usage_id]) local_pubkey = json_decode(pkey.export_public()) except Exception: raise self.error( "Failed to load and parse local JWK.", fatal=True ) else: self.info("Loaded key for usage '{}' from '{}'.".format( usage, IPA_CUSTODIA_KEYFILE )) if pkey.key_id != self.host_spn: raise self.error( "KID '{}' != host service principal name '{}' " "(usage: {})".format(pkey.key_id, self.host_spn, usage), fatal=True ) else: self.info( "JWK KID matches host's service principal name '{}'.".format( self.host_spn )) # LDAP doesn't contain KID local_pubkey.pop("kid", None) find_key = self.client.ikk.find_key try: host_pubkey = json_decode(find_key(self.host_spn, usage_id)) except Exception: raise self.error( "Fetching host keys {} (usage: {}) failed.".format( self.host_spn, usage), fatal=True ) else: self.info("Checked host LDAP keys '{}' for usage {}.".format( self.host_spn, usage )) if host_pubkey != local_pubkey: self.debug("LDAP: '{}'".format(host_pubkey)) self.debug("Local: '{}'".format(local_pubkey)) raise self.error( "Host key in LDAP does not match local key.", fatal=True ) else: self.info( "Local key for usage '{}' matches key in LDAP.".format(usage) ) try: server_pubkey = json_decode(find_key(self.server_spn, usage_id)) except Exception: raise self.error( "Fetching server keys {} (usage: {}) failed.".format( self.server_spn, usage), fatal=True ) else: self.info("Checked server LDAP keys '{}' for usage {}.".format( self.server_spn, usage )) return local_pubkey, host_pubkey, server_pubkey def check_jwk(self): self._check_jwk_single(KEY_USAGE_SIG) self._check_jwk_single(KEY_USAGE_ENC) def check_keys(self): for key in self.args.keys: try: result = self.client.fetch_key(key, store=self.args.store) except Exception as e: self.error("Failed to retrieve key '{}': {}.".format( key, e )) else: self.info("Successfully retrieved '{}'.".format(key)) if not self.args.store: self.debug(result) def main(): args = parser.parse_args() if args.debug: args.verbose = True logging.basicConfig( level=logging.DEBUG if args.debug else logging.INFO, format='[%(asctime)s %(name)s] <%(levelname)s>: %(message)s', datefmt='%Y-%m-%dT%H:%M:%S', ) if not is_ipa_configured(): parser.error("IPA is not configured on this system.\n") if os.geteuid() != 0: parser.error("Script must be executed as root.\n") tester = IPACustodiaTester(parser, args) tester.check() tester.exit() if __name__ == '__main__': main()