diff --git a/ipaclient/install/client.py b/ipaclient/install/client.py index bbd2778ed..83cfefb87 100644 --- a/ipaclient/install/client.py +++ b/ipaclient/install/client.py @@ -40,7 +40,7 @@ from ipalib.constants import FQDN, IPAAPI_USER, MAXHOSTNAMELEN from ipalib.install import certmonger, certstore, service from ipalib.install import hostname as hostname_ from ipalib.facts import is_ipa_client_configured, is_ipa_configured -from ipalib.install.kinit import kinit_keytab, kinit_password, kinit_pkinit +from ipalib.kinit import kinit_keytab, kinit_password, kinit_pkinit from ipalib.install.service import enroll_only, prepare_only from ipalib.rpc import delete_persistent_client_session_data from ipalib.util import ( diff --git a/ipaclient/install/ipa_client_automount.py b/ipaclient/install/ipa_client_automount.py index 297a784c4..4439932bd 100644 --- a/ipaclient/install/ipa_client_automount.py +++ b/ipaclient/install/ipa_client_automount.py @@ -43,7 +43,7 @@ from ipaclient.install.client import ( ) from ipalib import api, errors from ipalib.install import sysrestore -from ipalib.install.kinit import kinit_keytab +from ipalib.kinit import kinit_keytab from ipalib.util import check_client_configuration from ipapython import ipautil from ipapython.ipa_log_manager import standard_logging_setup diff --git a/ipalib/__init__.py b/ipalib/__init__.py index 906e026a9..1d540ebdf 100644 --- a/ipalib/__init__.py +++ b/ipalib/__init__.py @@ -955,12 +955,19 @@ class API(plugable.API): import os import ipalib + try: + from ipalib import kinit + except ImportError: + from ipalib.install import kinit + + # set a custom ccache to isolate from the environment + ccache_name = "FILE:/path/to/tmp/service.ccache" + os.environ["KRB5CCNAME"] = ccache_name # optional: automatic authentication with a KRB5 keytab - os.environ.update( - KRB5_CLIENT_KTNAME="/path/to/service.keytab", - KRB5RCACHENAME="FILE:/path/to/tmp/service.ccache", - ) + os.environ["KRB5_CLIENT_KTNAME"] = "/path/to/service.keytab" + # or with password + kinit.kinit_passwd("username", "password", ccache_name) # optional: override settings (once per process) overrides = {} diff --git a/ipalib/install/kinit.py b/ipalib/install/kinit.py index d5fb56bf0..6e785cf48 100644 --- a/ipalib/install/kinit.py +++ b/ipalib/install/kinit.py @@ -1,216 +1,20 @@ # -# Copyright (C) 2016 FreeIPA Contributors see COPYING for license +# Copyright (C) 2024 FreeIPA Contributors see COPYING for license # -from __future__ import absolute_import +# code was moved to ipalib.kinit. This module is now an alias +__all__ = ( + "validate_principal", + "kinit_keytab", + "kinit_password", + "kinit_armor", + "kinit_pkinit", +) -import logging -import os -import re -import time - -import gssapi - -from ipaplatform.paths import paths -from ipapython.ipautil import run -from ipalib.constants import PATTERN_GROUPUSER_NAME -from ipalib.util import validate_hostname - -logger = logging.getLogger(__name__) - -# Cannot contact any KDC for requested realm -KRB5_KDC_UNREACH = 2529639068 - -# A service is not available that s required to process the request -KRB5KDC_ERR_SVC_UNAVAILABLE = 2529638941 - -PATTERN_REALM = '@?([a-zA-Z0-9.-]*)$' -PATTERN_PRINCIPAL = '(' + PATTERN_GROUPUSER_NAME[:-1] + ')' + PATTERN_REALM -PATTERN_SERVICE = '([a-zA-Z0-9.-]+)/([a-zA-Z0-9.-]+)' + PATTERN_REALM - -user_pattern = re.compile(PATTERN_PRINCIPAL) -service_pattern = re.compile(PATTERN_SERVICE) - - -def validate_principal(principal): - if not isinstance(principal, str): - raise RuntimeError('Invalid principal: not a string') - if ('/' in principal) and (' ' in principal): - raise RuntimeError('Invalid principal: bad spacing') - else: - # For a user match in the regex - # username = match[1] - # realm = match[2] - match = user_pattern.match(principal) - if match is None: - match = service_pattern.match(principal) - if match is None: - raise RuntimeError('Invalid principal: cannot parse') - else: - # service = match[1] - hostname = match[2] - # realm = match[3] - try: - validate_hostname(hostname) - except ValueError as e: - raise RuntimeError(str(e)) - - -def kinit_keytab(principal, keytab, ccache_name, config=None, attempts=1): - """ - Given a ccache_path, keytab file and a principal kinit as that user. - - The optional parameter 'attempts' specifies how many times the credential - initialization should be attempted in case of non-responsive KDC. - """ - validate_principal(principal) - errors_to_retry = {KRB5KDC_ERR_SVC_UNAVAILABLE, - KRB5_KDC_UNREACH} - logger.debug("Initializing principal %s using keytab %s", - principal, keytab) - logger.debug("using ccache %s", ccache_name) - for attempt in range(1, attempts + 1): - old_config = os.environ.get('KRB5_CONFIG') - if config is not None: - os.environ['KRB5_CONFIG'] = config - else: - os.environ.pop('KRB5_CONFIG', None) - try: - name = gssapi.Name(principal, gssapi.NameType.kerberos_principal) - store = {'ccache': ccache_name, - 'client_keytab': keytab} - cred = gssapi.Credentials(name=name, store=store, usage='initiate') - logger.debug("Attempt %d/%d: success", attempt, attempts) - return cred - except gssapi.exceptions.GSSError as e: - if e.min_code not in errors_to_retry: # pylint: disable=no-member - raise - logger.debug("Attempt %d/%d: failed: %s", attempt, attempts, e) - if attempt == attempts: - logger.debug("Maximum number of attempts (%d) reached", - attempts) - raise - logger.debug("Waiting 5 seconds before next retry") - time.sleep(5) - finally: - if old_config is not None: - os.environ['KRB5_CONFIG'] = old_config - else: - os.environ.pop('KRB5_CONFIG', None) - - return None - - -def kinit_password(principal, password, ccache_name, config=None, - armor_ccache_name=None, canonicalize=False, - enterprise=False, lifetime=None): - """ - perform interactive kinit as principal using password. If using FAST for - web-based authentication, use armor_ccache_path to specify http service - ccache. - """ - validate_principal(principal) - logger.debug("Initializing principal %s using password", principal) - args = [paths.KINIT, '-c', ccache_name] - if armor_ccache_name is not None: - logger.debug("Using armor ccache %s for FAST webauth", - armor_ccache_name) - args.extend(['-T', armor_ccache_name]) - - if lifetime: - args.extend(['-l', lifetime]) - - if canonicalize: - logger.debug("Requesting principal canonicalization") - args.append('-C') - - if enterprise: - logger.debug("Using enterprise principal") - args.append('-E') - - args.extend(['--', principal]) - env = {'LC_ALL': 'C'} - if config is not None: - env['KRB5_CONFIG'] = config - - # this workaround enables us to capture stderr and put it - # into the raised exception in case of unsuccessful authentication - result = run(args, stdin=password, env=env, raiseonerr=False, - capture_error=True) - if result.returncode: - raise RuntimeError(result.error_output) - - -def kinit_armor(ccache_name, pkinit_anchors=None): - """ - perform anonymous pkinit to obtain anonymous ticket to be used as armor - for FAST. - - :param ccache_name: location of the armor ccache - :param pkinit_anchor: if not None, the location of PKINIT anchor file to - use. Otherwise the value from Kerberos client library configuration is - used - - :raises: CalledProcessError if the anonymous PKINIT fails - """ - logger.debug("Initializing anonymous ccache") - - env = {'LC_ALL': 'C'} - args = [paths.KINIT, '-n', '-c', ccache_name] - - if pkinit_anchors is not None: - for pkinit_anchor in pkinit_anchors: - args.extend(['-X', 'X509_anchors=FILE:{}'.format(pkinit_anchor)]) - - # this workaround enables us to capture stderr and put it - # into the raised exception in case of unsuccessful authentication - run(args, env=env, raiseonerr=True, capture_error=True) - - -def kinit_pkinit( - principal, - user_identity, - ccache_name, - config=None, - pkinit_anchors=None, -): - """Perform kinit with X.509 identity (PKINIT) - - :param principal: principal name - :param user_identity: X509_user_identity paramemter - :param ccache_name: location of ccache - :param config: path to krb5.conf (default: default location) - :param pkinit_anchor: if not None, the PKINIT anchors to use. Otherwise - the value from Kerberos client library configuration is used. Entries - must be prefixed with FILE: or DIR: - - user identity example: - FILE:filename[,keyfilename] - PKCS12:filename - PKCS11:... - DIR:directoryname - - :raises: CalledProcessError if PKINIT fails - """ - validate_principal(principal) - logger.debug( - "Initializing principal %s using PKINIT %s", principal, user_identity - ) - - env = {"LC_ALL": "C"} - if config is not None: - env["KRB5_CONFIG"] = config - - args = [paths.KINIT, "-c", ccache_name] - if pkinit_anchors is not None: - for pkinit_anchor in pkinit_anchors: - assert pkinit_anchor.startswith(("FILE:", "DIR:", "ENV:")) - args.extend(["-X", f"X509_anchors={pkinit_anchor}"]) - args.extend(["-X", f"X509_user_identity={user_identity}"]) - args.extend(['--', principal]) - - # this workaround enables us to capture stderr and put it - # into the raised exception in case of unsuccessful authentication - # Unsuccessful pkinit can lead to a password prompt. Send \n to skip - # prompt. - run(args, env=env, stdin="\n", raiseonerr=True, capture_error=True) +from ..kinit import ( + validate_principal, + kinit_keytab, + kinit_password, + kinit_armor, + kinit_pkinit, +) diff --git a/ipalib/kinit.py b/ipalib/kinit.py new file mode 100644 index 000000000..2bb9a2d5f --- /dev/null +++ b/ipalib/kinit.py @@ -0,0 +1,240 @@ +# +# Copyright (C) 2016 FreeIPA Contributors see COPYING for license +# +import logging +import os +import re +import time + +import gssapi + +from ipapython.kerberos import Principal +from ipaplatform.paths import paths +from ipapython.ipautil import run +from ipalib.constants import PATTERN_GROUPUSER_NAME +from ipalib import krb_utils +from ipalib.util import validate_hostname + +logger = logging.getLogger(__name__) + +PATTERN_REALM = '@?([a-zA-Z0-9.-]*)$' +PATTERN_PRINCIPAL = '(' + PATTERN_GROUPUSER_NAME[:-1] + ')' + PATTERN_REALM +PATTERN_SERVICE = '([a-zA-Z0-9.-]+)/([a-zA-Z0-9.-]+)' + PATTERN_REALM + +user_pattern = re.compile(PATTERN_PRINCIPAL) +service_pattern = re.compile(PATTERN_SERVICE) + + +def validate_principal(principal): + # TODO: use Principal() to verify value? + if isinstance(principal, Principal): + principal = str(principal) + elif not isinstance(principal, str): + raise RuntimeError('Invalid principal: not a string') + if ('/' in principal) and (' ' in principal): + raise RuntimeError('Invalid principal: bad spacing') + else: + # For a user match in the regex + # username = match[1] + # realm = match[2] + match = user_pattern.match(principal) + if match is None: + match = service_pattern.match(principal) + if match is None: + raise RuntimeError('Invalid principal: cannot parse') + else: + # service = match[1] + hostname = match[2] + # realm = match[3] + try: + validate_hostname(hostname) + except ValueError as e: + raise RuntimeError(str(e)) + return principal + + +def kinit_keytab(principal, keytab, ccache_name=None, config=None, attempts=1): + """ + Given a ccache_path, keytab file and a principal kinit as that user. + + The optional parameter 'attempts' specifies how many times the credential + initialization should be attempted in case of non-responsive KDC. + """ + validate_principal(principal) + errors_to_retry = { + krb_utils.KRB5KDC_ERR_SVC_UNAVAILABLE, krb_utils.KRB5_KDC_UNREACH + } + logger.debug("Initializing principal %s using keytab %s", + principal, keytab) + store = {'client_keytab': keytab} + if ccache_name is not None: + logger.debug("using ccache %s", ccache_name) + store['ccache'] = ccache_name + for attempt in range(1, attempts + 1): + old_config = os.environ.get('KRB5_CONFIG') + if config is not None: + os.environ['KRB5_CONFIG'] = config + else: + os.environ.pop('KRB5_CONFIG', None) + try: + name = gssapi.Name( + str(principal), gssapi.NameType.kerberos_principal + ) + cred = gssapi.Credentials(name=name, store=store, usage='initiate') + logger.debug("Attempt %d/%d: success", attempt, attempts) + return cred + except gssapi.exceptions.GSSError as e: + if e.min_code not in errors_to_retry: # pylint: disable=no-member + raise + logger.debug("Attempt %d/%d: failed: %s", attempt, attempts, e) + if attempt == attempts: + logger.debug("Maximum number of attempts (%d) reached", + attempts) + raise + logger.debug("Waiting 5 seconds before next retry") + time.sleep(5) + finally: + if old_config is not None: + os.environ['KRB5_CONFIG'] = old_config + else: + os.environ.pop('KRB5_CONFIG', None) + + return None + + +def _run_env(config=None): + """Common os.environ for kinit + + Passes KRB5* and GSS* envs like KRB5_TRACE + """ + env = {"LC_ALL": "C"} + for key, value in os.environ.items(): + if key.startswith(("KRB5", "GSS")): + env[key] = value + if config is not None: + env["KRB5_CONFIG"] = config + return env + + +def kinit_password(principal, password, ccache_name=None, config=None, + armor_ccache_name=None, canonicalize=False, + enterprise=False, lifetime=None): + """ + perform interactive kinit as principal using password. If using FAST for + web-based authentication, use armor_ccache_path to specify http service + ccache. + + :param principal: principal name + :param password: user password + :param ccache_name: location of ccache (default: default location) + :param config: path to krb5.conf (default: default location) + :param armor_ccache_name: armor ccache for FAST (-T) + :param canonicalize: request principal canonicalization (-C) + :param enterprise: use enterprise principal (-E) + :param lifetime: request TGT lifetime (-l) + """ + validate_principal(principal) + logger.debug("Initializing principal %s using password", principal) + args = [paths.KINIT] + if ccache_name is not None: + args.extend(['-c', ccache_name]) + if armor_ccache_name is not None: + logger.debug("Using armor ccache %s for FAST webauth", + armor_ccache_name) + args.extend(['-T', armor_ccache_name]) + + if lifetime: + args.extend(['-l', lifetime]) + + if canonicalize: + logger.debug("Requesting principal canonicalization") + args.append('-C') + + if enterprise: + logger.debug("Using enterprise principal") + args.append('-E') + + args.extend(['--', str(principal)]) + env = _run_env(config) + + # this workaround enables us to capture stderr and put it + # into the raised exception in case of unsuccessful authentication + result = run(args, stdin=password, env=env, raiseonerr=False, + capture_error=True) + if result.returncode: + raise RuntimeError(result.error_output) + return result + + +def kinit_armor(ccache_name, pkinit_anchors=None): + """ + perform anonymous pkinit to obtain anonymous ticket to be used as armor + for FAST. + + :param ccache_name: location of the armor ccache (required) + :param pkinit_anchor: if not None, the location of PKINIT anchor file to + use. Otherwise the value from Kerberos client library configuration is + used + + :raises: CalledProcessError if the anonymous PKINIT fails + """ + logger.debug("Initializing anonymous ccache") + + env = _run_env() + args = [paths.KINIT, '-n', '-c', ccache_name] + + if pkinit_anchors is not None: + for pkinit_anchor in pkinit_anchors: + args.extend(['-X', 'X509_anchors=FILE:{}'.format(pkinit_anchor)]) + + # this workaround enables us to capture stderr and put it + # into the raised exception in case of unsuccessful authentication + return run(args, env=env, raiseonerr=True, capture_error=True) + + +def kinit_pkinit( + principal, + user_identity, + ccache_name=None, + config=None, + pkinit_anchors=None, +): + """Perform kinit with X.509 identity (PKINIT) + + :param principal: principal name + :param user_identity: X509_user_identity paramemter + :param ccache_name: location of ccache (default: default location) + :param config: path to krb5.conf (default: default location) + :param pkinit_anchor: if not None, the PKINIT anchors to use. Otherwise + the value from Kerberos client library configuration is used. Entries + must be prefixed with FILE: or DIR: + + user identity example: + FILE:filename[,keyfilename] + PKCS12:filename + PKCS11:... + DIR:directoryname + + :raises: CalledProcessError if PKINIT fails + """ + validate_principal(principal) + logger.debug( + "Initializing principal %s using PKINIT %s", principal, user_identity + ) + + args = [paths.KINIT] + if ccache_name is not None: + args.extend(['-c', ccache_name]) + if pkinit_anchors is not None: + for pkinit_anchor in pkinit_anchors: + assert pkinit_anchor.startswith(("FILE:", "DIR:", "ENV:")) + args.extend(["-X", f"X509_anchors={pkinit_anchor}"]) + args.extend(["-X", f"X509_user_identity={user_identity}"]) + args.extend(['--', str(principal)]) + + # this workaround enables us to capture stderr and put it + # into the raised exception in case of unsuccessful authentication + # Unsuccessful pkinit can lead to a password prompt. Send \n to skip + # prompt. + env = _run_env(config) + return run(args, env=env, stdin="\n", raiseonerr=True, capture_error=True) diff --git a/ipalib/krb_utils.py b/ipalib/krb_utils.py index 8fe600827..6275d4b3f 100644 --- a/ipalib/krb_utils.py +++ b/ipalib/krb_utils.py @@ -34,6 +34,10 @@ KRB5KRB_AP_ERR_TKT_EXPIRED = 2529638944 # Ticket expired KRB5_FCC_PERM = 2529639106 # Credentials cache permissions incorrect KRB5_CC_FORMAT = 2529639111 # Bad format in credentials cache KRB5_REALM_CANT_RESOLVE = 2529639132 # Cannot resolve network address for KDC in requested realm +# Cannot contact any KDC for requested realm +KRB5_KDC_UNREACH = 2529639068 +# A service is not available that s required to process the request +KRB5KDC_ERR_SVC_UNAVAILABLE = 2529638941 # mechglue/gss_plugin.c: #define MAP_ERROR_BASE 0x04200000 GSSPROXY_MAP_ERROR_BASE = 69206016 diff --git a/ipaserver/install/kra.py b/ipaserver/install/kra.py index 59cbda812..834ad3fd4 100644 --- a/ipaserver/install/kra.py +++ b/ipaserver/install/kra.py @@ -12,7 +12,7 @@ import logging import os from ipalib import api -from ipalib.install.kinit import kinit_keytab +from ipalib.kinit import kinit_keytab from ipaplatform import services from ipaplatform.paths import paths from ipapython import ipautil diff --git a/ipaserver/install/server/replicainstall.py b/ipaserver/install/server/replicainstall.py index 4c1c07c8b..4c47c35fa 100644 --- a/ipaserver/install/server/replicainstall.py +++ b/ipaserver/install/server/replicainstall.py @@ -23,7 +23,7 @@ import six from ipaclient.install.client import check_ldap_conf, sssd_enable_ifp import ipaclient.install.timeconf from ipalib.install import sysrestore -from ipalib.install.kinit import kinit_keytab +from ipalib.kinit import kinit_keytab from ipapython import ipaldap, ipautil from ipapython.dn import DN from ipapython.dnsutil import DNSResolver diff --git a/ipatests/util.py b/ipatests/util.py index 61af0c40d..de3717353 100644 --- a/ipatests/util.py +++ b/ipatests/util.py @@ -38,17 +38,12 @@ import six import ipalib from ipalib import api +from ipalib.kinit import kinit_keytab, kinit_password from ipalib.plugable import Plugin from ipalib.request import context from ipapython.dn import DN from ipapython.ipautil import run -try: - # not available with client-only wheel packages - from ipalib.install.kinit import kinit_keytab, kinit_password -except ImportError: - kinit_keytab = kinit_password = None - try: # not available with client-only wheel packages from ipaplatform.paths import paths