Move ipalib.install.kinit to ipalib

- kinit helpers are now in `ipalib.kinit`.
- helpers can now use default ccache locations like many other similar
  helpers
- helpers return the result from `run` for debugging
- constants are now in `krb_utils`
- helpers pass `KRB5*` and `GSS*` env vars along, so `KRB5_TRACE` works
- document how to kinit for `ipalib.api`

Signed-off-by: Christian Heimes <cheimes@redhat.com>
Reviewed-By: Alexander Bokovoy <abokovoy@redhat.com>
This commit is contained in:
Christian Heimes 2024-03-22 10:35:37 +01:00 committed by Florence Blanc-Renaud
parent 6cc668ffeb
commit 38d0e74b6d
9 changed files with 276 additions and 226 deletions

View File

@ -40,7 +40,7 @@ from ipalib.constants import FQDN, IPAAPI_USER, MAXHOSTNAMELEN
from ipalib.install import certmonger, certstore, service from ipalib.install import certmonger, certstore, service
from ipalib.install import hostname as hostname_ from ipalib.install import hostname as hostname_
from ipalib.facts import is_ipa_client_configured, is_ipa_configured from ipalib.facts import is_ipa_client_configured, is_ipa_configured
from ipalib.install.kinit import kinit_keytab, kinit_password, kinit_pkinit from ipalib.kinit import kinit_keytab, kinit_password, kinit_pkinit
from ipalib.install.service import enroll_only, prepare_only from ipalib.install.service import enroll_only, prepare_only
from ipalib.rpc import delete_persistent_client_session_data from ipalib.rpc import delete_persistent_client_session_data
from ipalib.util import ( from ipalib.util import (

View File

@ -43,7 +43,7 @@ from ipaclient.install.client import (
) )
from ipalib import api, errors from ipalib import api, errors
from ipalib.install import sysrestore from ipalib.install import sysrestore
from ipalib.install.kinit import kinit_keytab from ipalib.kinit import kinit_keytab
from ipalib.util import check_client_configuration from ipalib.util import check_client_configuration
from ipapython import ipautil from ipapython import ipautil
from ipapython.ipa_log_manager import standard_logging_setup from ipapython.ipa_log_manager import standard_logging_setup

View File

@ -955,12 +955,19 @@ class API(plugable.API):
import os import os
import ipalib import ipalib
try:
from ipalib import kinit
except ImportError:
from ipalib.install import kinit
# set a custom ccache to isolate from the environment
ccache_name = "FILE:/path/to/tmp/service.ccache"
os.environ["KRB5CCNAME"] = ccache_name
# optional: automatic authentication with a KRB5 keytab # optional: automatic authentication with a KRB5 keytab
os.environ.update( os.environ["KRB5_CLIENT_KTNAME"] = "/path/to/service.keytab"
KRB5_CLIENT_KTNAME="/path/to/service.keytab", # or with password
KRB5RCACHENAME="FILE:/path/to/tmp/service.ccache", kinit.kinit_passwd("username", "password", ccache_name)
)
# optional: override settings (once per process) # optional: override settings (once per process)
overrides = {} overrides = {}

View File

@ -1,216 +1,20 @@
# #
# Copyright (C) 2016 FreeIPA Contributors see COPYING for license # Copyright (C) 2024 FreeIPA Contributors see COPYING for license
# #
from __future__ import absolute_import # code was moved to ipalib.kinit. This module is now an alias
__all__ = (
import logging "validate_principal",
import os "kinit_keytab",
import re "kinit_password",
import time "kinit_armor",
"kinit_pkinit",
import gssapi
from ipaplatform.paths import paths
from ipapython.ipautil import run
from ipalib.constants import PATTERN_GROUPUSER_NAME
from ipalib.util import validate_hostname
logger = logging.getLogger(__name__)
# Cannot contact any KDC for requested realm
KRB5_KDC_UNREACH = 2529639068
# A service is not available that s required to process the request
KRB5KDC_ERR_SVC_UNAVAILABLE = 2529638941
PATTERN_REALM = '@?([a-zA-Z0-9.-]*)$'
PATTERN_PRINCIPAL = '(' + PATTERN_GROUPUSER_NAME[:-1] + ')' + PATTERN_REALM
PATTERN_SERVICE = '([a-zA-Z0-9.-]+)/([a-zA-Z0-9.-]+)' + PATTERN_REALM
user_pattern = re.compile(PATTERN_PRINCIPAL)
service_pattern = re.compile(PATTERN_SERVICE)
def validate_principal(principal):
if not isinstance(principal, str):
raise RuntimeError('Invalid principal: not a string')
if ('/' in principal) and (' ' in principal):
raise RuntimeError('Invalid principal: bad spacing')
else:
# For a user match in the regex
# username = match[1]
# realm = match[2]
match = user_pattern.match(principal)
if match is None:
match = service_pattern.match(principal)
if match is None:
raise RuntimeError('Invalid principal: cannot parse')
else:
# service = match[1]
hostname = match[2]
# realm = match[3]
try:
validate_hostname(hostname)
except ValueError as e:
raise RuntimeError(str(e))
def kinit_keytab(principal, keytab, ccache_name, config=None, attempts=1):
"""
Given a ccache_path, keytab file and a principal kinit as that user.
The optional parameter 'attempts' specifies how many times the credential
initialization should be attempted in case of non-responsive KDC.
"""
validate_principal(principal)
errors_to_retry = {KRB5KDC_ERR_SVC_UNAVAILABLE,
KRB5_KDC_UNREACH}
logger.debug("Initializing principal %s using keytab %s",
principal, keytab)
logger.debug("using ccache %s", ccache_name)
for attempt in range(1, attempts + 1):
old_config = os.environ.get('KRB5_CONFIG')
if config is not None:
os.environ['KRB5_CONFIG'] = config
else:
os.environ.pop('KRB5_CONFIG', None)
try:
name = gssapi.Name(principal, gssapi.NameType.kerberos_principal)
store = {'ccache': ccache_name,
'client_keytab': keytab}
cred = gssapi.Credentials(name=name, store=store, usage='initiate')
logger.debug("Attempt %d/%d: success", attempt, attempts)
return cred
except gssapi.exceptions.GSSError as e:
if e.min_code not in errors_to_retry: # pylint: disable=no-member
raise
logger.debug("Attempt %d/%d: failed: %s", attempt, attempts, e)
if attempt == attempts:
logger.debug("Maximum number of attempts (%d) reached",
attempts)
raise
logger.debug("Waiting 5 seconds before next retry")
time.sleep(5)
finally:
if old_config is not None:
os.environ['KRB5_CONFIG'] = old_config
else:
os.environ.pop('KRB5_CONFIG', None)
return None
def kinit_password(principal, password, ccache_name, config=None,
armor_ccache_name=None, canonicalize=False,
enterprise=False, lifetime=None):
"""
perform interactive kinit as principal using password. If using FAST for
web-based authentication, use armor_ccache_path to specify http service
ccache.
"""
validate_principal(principal)
logger.debug("Initializing principal %s using password", principal)
args = [paths.KINIT, '-c', ccache_name]
if armor_ccache_name is not None:
logger.debug("Using armor ccache %s for FAST webauth",
armor_ccache_name)
args.extend(['-T', armor_ccache_name])
if lifetime:
args.extend(['-l', lifetime])
if canonicalize:
logger.debug("Requesting principal canonicalization")
args.append('-C')
if enterprise:
logger.debug("Using enterprise principal")
args.append('-E')
args.extend(['--', principal])
env = {'LC_ALL': 'C'}
if config is not None:
env['KRB5_CONFIG'] = config
# this workaround enables us to capture stderr and put it
# into the raised exception in case of unsuccessful authentication
result = run(args, stdin=password, env=env, raiseonerr=False,
capture_error=True)
if result.returncode:
raise RuntimeError(result.error_output)
def kinit_armor(ccache_name, pkinit_anchors=None):
"""
perform anonymous pkinit to obtain anonymous ticket to be used as armor
for FAST.
:param ccache_name: location of the armor ccache
:param pkinit_anchor: if not None, the location of PKINIT anchor file to
use. Otherwise the value from Kerberos client library configuration is
used
:raises: CalledProcessError if the anonymous PKINIT fails
"""
logger.debug("Initializing anonymous ccache")
env = {'LC_ALL': 'C'}
args = [paths.KINIT, '-n', '-c', ccache_name]
if pkinit_anchors is not None:
for pkinit_anchor in pkinit_anchors:
args.extend(['-X', 'X509_anchors=FILE:{}'.format(pkinit_anchor)])
# this workaround enables us to capture stderr and put it
# into the raised exception in case of unsuccessful authentication
run(args, env=env, raiseonerr=True, capture_error=True)
def kinit_pkinit(
principal,
user_identity,
ccache_name,
config=None,
pkinit_anchors=None,
):
"""Perform kinit with X.509 identity (PKINIT)
:param principal: principal name
:param user_identity: X509_user_identity paramemter
:param ccache_name: location of ccache
:param config: path to krb5.conf (default: default location)
:param pkinit_anchor: if not None, the PKINIT anchors to use. Otherwise
the value from Kerberos client library configuration is used. Entries
must be prefixed with FILE: or DIR:
user identity example:
FILE:filename[,keyfilename]
PKCS12:filename
PKCS11:...
DIR:directoryname
:raises: CalledProcessError if PKINIT fails
"""
validate_principal(principal)
logger.debug(
"Initializing principal %s using PKINIT %s", principal, user_identity
) )
env = {"LC_ALL": "C"} from ..kinit import (
if config is not None: validate_principal,
env["KRB5_CONFIG"] = config kinit_keytab,
kinit_password,
args = [paths.KINIT, "-c", ccache_name] kinit_armor,
if pkinit_anchors is not None: kinit_pkinit,
for pkinit_anchor in pkinit_anchors: )
assert pkinit_anchor.startswith(("FILE:", "DIR:", "ENV:"))
args.extend(["-X", f"X509_anchors={pkinit_anchor}"])
args.extend(["-X", f"X509_user_identity={user_identity}"])
args.extend(['--', principal])
# this workaround enables us to capture stderr and put it
# into the raised exception in case of unsuccessful authentication
# Unsuccessful pkinit can lead to a password prompt. Send \n to skip
# prompt.
run(args, env=env, stdin="\n", raiseonerr=True, capture_error=True)

240
ipalib/kinit.py Normal file
View File

@ -0,0 +1,240 @@
#
# Copyright (C) 2016 FreeIPA Contributors see COPYING for license
#
import logging
import os
import re
import time
import gssapi
from ipapython.kerberos import Principal
from ipaplatform.paths import paths
from ipapython.ipautil import run
from ipalib.constants import PATTERN_GROUPUSER_NAME
from ipalib import krb_utils
from ipalib.util import validate_hostname
logger = logging.getLogger(__name__)
PATTERN_REALM = '@?([a-zA-Z0-9.-]*)$'
PATTERN_PRINCIPAL = '(' + PATTERN_GROUPUSER_NAME[:-1] + ')' + PATTERN_REALM
PATTERN_SERVICE = '([a-zA-Z0-9.-]+)/([a-zA-Z0-9.-]+)' + PATTERN_REALM
user_pattern = re.compile(PATTERN_PRINCIPAL)
service_pattern = re.compile(PATTERN_SERVICE)
def validate_principal(principal):
# TODO: use Principal() to verify value?
if isinstance(principal, Principal):
principal = str(principal)
elif not isinstance(principal, str):
raise RuntimeError('Invalid principal: not a string')
if ('/' in principal) and (' ' in principal):
raise RuntimeError('Invalid principal: bad spacing')
else:
# For a user match in the regex
# username = match[1]
# realm = match[2]
match = user_pattern.match(principal)
if match is None:
match = service_pattern.match(principal)
if match is None:
raise RuntimeError('Invalid principal: cannot parse')
else:
# service = match[1]
hostname = match[2]
# realm = match[3]
try:
validate_hostname(hostname)
except ValueError as e:
raise RuntimeError(str(e))
return principal
def kinit_keytab(principal, keytab, ccache_name=None, config=None, attempts=1):
"""
Given a ccache_path, keytab file and a principal kinit as that user.
The optional parameter 'attempts' specifies how many times the credential
initialization should be attempted in case of non-responsive KDC.
"""
validate_principal(principal)
errors_to_retry = {
krb_utils.KRB5KDC_ERR_SVC_UNAVAILABLE, krb_utils.KRB5_KDC_UNREACH
}
logger.debug("Initializing principal %s using keytab %s",
principal, keytab)
store = {'client_keytab': keytab}
if ccache_name is not None:
logger.debug("using ccache %s", ccache_name)
store['ccache'] = ccache_name
for attempt in range(1, attempts + 1):
old_config = os.environ.get('KRB5_CONFIG')
if config is not None:
os.environ['KRB5_CONFIG'] = config
else:
os.environ.pop('KRB5_CONFIG', None)
try:
name = gssapi.Name(
str(principal), gssapi.NameType.kerberos_principal
)
cred = gssapi.Credentials(name=name, store=store, usage='initiate')
logger.debug("Attempt %d/%d: success", attempt, attempts)
return cred
except gssapi.exceptions.GSSError as e:
if e.min_code not in errors_to_retry: # pylint: disable=no-member
raise
logger.debug("Attempt %d/%d: failed: %s", attempt, attempts, e)
if attempt == attempts:
logger.debug("Maximum number of attempts (%d) reached",
attempts)
raise
logger.debug("Waiting 5 seconds before next retry")
time.sleep(5)
finally:
if old_config is not None:
os.environ['KRB5_CONFIG'] = old_config
else:
os.environ.pop('KRB5_CONFIG', None)
return None
def _run_env(config=None):
"""Common os.environ for kinit
Passes KRB5* and GSS* envs like KRB5_TRACE
"""
env = {"LC_ALL": "C"}
for key, value in os.environ.items():
if key.startswith(("KRB5", "GSS")):
env[key] = value
if config is not None:
env["KRB5_CONFIG"] = config
return env
def kinit_password(principal, password, ccache_name=None, config=None,
armor_ccache_name=None, canonicalize=False,
enterprise=False, lifetime=None):
"""
perform interactive kinit as principal using password. If using FAST for
web-based authentication, use armor_ccache_path to specify http service
ccache.
:param principal: principal name
:param password: user password
:param ccache_name: location of ccache (default: default location)
:param config: path to krb5.conf (default: default location)
:param armor_ccache_name: armor ccache for FAST (-T)
:param canonicalize: request principal canonicalization (-C)
:param enterprise: use enterprise principal (-E)
:param lifetime: request TGT lifetime (-l)
"""
validate_principal(principal)
logger.debug("Initializing principal %s using password", principal)
args = [paths.KINIT]
if ccache_name is not None:
args.extend(['-c', ccache_name])
if armor_ccache_name is not None:
logger.debug("Using armor ccache %s for FAST webauth",
armor_ccache_name)
args.extend(['-T', armor_ccache_name])
if lifetime:
args.extend(['-l', lifetime])
if canonicalize:
logger.debug("Requesting principal canonicalization")
args.append('-C')
if enterprise:
logger.debug("Using enterprise principal")
args.append('-E')
args.extend(['--', str(principal)])
env = _run_env(config)
# this workaround enables us to capture stderr and put it
# into the raised exception in case of unsuccessful authentication
result = run(args, stdin=password, env=env, raiseonerr=False,
capture_error=True)
if result.returncode:
raise RuntimeError(result.error_output)
return result
def kinit_armor(ccache_name, pkinit_anchors=None):
"""
perform anonymous pkinit to obtain anonymous ticket to be used as armor
for FAST.
:param ccache_name: location of the armor ccache (required)
:param pkinit_anchor: if not None, the location of PKINIT anchor file to
use. Otherwise the value from Kerberos client library configuration is
used
:raises: CalledProcessError if the anonymous PKINIT fails
"""
logger.debug("Initializing anonymous ccache")
env = _run_env()
args = [paths.KINIT, '-n', '-c', ccache_name]
if pkinit_anchors is not None:
for pkinit_anchor in pkinit_anchors:
args.extend(['-X', 'X509_anchors=FILE:{}'.format(pkinit_anchor)])
# this workaround enables us to capture stderr and put it
# into the raised exception in case of unsuccessful authentication
return run(args, env=env, raiseonerr=True, capture_error=True)
def kinit_pkinit(
principal,
user_identity,
ccache_name=None,
config=None,
pkinit_anchors=None,
):
"""Perform kinit with X.509 identity (PKINIT)
:param principal: principal name
:param user_identity: X509_user_identity paramemter
:param ccache_name: location of ccache (default: default location)
:param config: path to krb5.conf (default: default location)
:param pkinit_anchor: if not None, the PKINIT anchors to use. Otherwise
the value from Kerberos client library configuration is used. Entries
must be prefixed with FILE: or DIR:
user identity example:
FILE:filename[,keyfilename]
PKCS12:filename
PKCS11:...
DIR:directoryname
:raises: CalledProcessError if PKINIT fails
"""
validate_principal(principal)
logger.debug(
"Initializing principal %s using PKINIT %s", principal, user_identity
)
args = [paths.KINIT]
if ccache_name is not None:
args.extend(['-c', ccache_name])
if pkinit_anchors is not None:
for pkinit_anchor in pkinit_anchors:
assert pkinit_anchor.startswith(("FILE:", "DIR:", "ENV:"))
args.extend(["-X", f"X509_anchors={pkinit_anchor}"])
args.extend(["-X", f"X509_user_identity={user_identity}"])
args.extend(['--', str(principal)])
# this workaround enables us to capture stderr and put it
# into the raised exception in case of unsuccessful authentication
# Unsuccessful pkinit can lead to a password prompt. Send \n to skip
# prompt.
env = _run_env(config)
return run(args, env=env, stdin="\n", raiseonerr=True, capture_error=True)

View File

@ -34,6 +34,10 @@ KRB5KRB_AP_ERR_TKT_EXPIRED = 2529638944 # Ticket expired
KRB5_FCC_PERM = 2529639106 # Credentials cache permissions incorrect KRB5_FCC_PERM = 2529639106 # Credentials cache permissions incorrect
KRB5_CC_FORMAT = 2529639111 # Bad format in credentials cache KRB5_CC_FORMAT = 2529639111 # Bad format in credentials cache
KRB5_REALM_CANT_RESOLVE = 2529639132 # Cannot resolve network address for KDC in requested realm KRB5_REALM_CANT_RESOLVE = 2529639132 # Cannot resolve network address for KDC in requested realm
# Cannot contact any KDC for requested realm
KRB5_KDC_UNREACH = 2529639068
# A service is not available that s required to process the request
KRB5KDC_ERR_SVC_UNAVAILABLE = 2529638941
# mechglue/gss_plugin.c: #define MAP_ERROR_BASE 0x04200000 # mechglue/gss_plugin.c: #define MAP_ERROR_BASE 0x04200000
GSSPROXY_MAP_ERROR_BASE = 69206016 GSSPROXY_MAP_ERROR_BASE = 69206016

View File

@ -12,7 +12,7 @@ import logging
import os import os
from ipalib import api from ipalib import api
from ipalib.install.kinit import kinit_keytab from ipalib.kinit import kinit_keytab
from ipaplatform import services from ipaplatform import services
from ipaplatform.paths import paths from ipaplatform.paths import paths
from ipapython import ipautil from ipapython import ipautil

View File

@ -23,7 +23,7 @@ import six
from ipaclient.install.client import check_ldap_conf, sssd_enable_ifp from ipaclient.install.client import check_ldap_conf, sssd_enable_ifp
import ipaclient.install.timeconf import ipaclient.install.timeconf
from ipalib.install import sysrestore from ipalib.install import sysrestore
from ipalib.install.kinit import kinit_keytab from ipalib.kinit import kinit_keytab
from ipapython import ipaldap, ipautil from ipapython import ipaldap, ipautil
from ipapython.dn import DN from ipapython.dn import DN
from ipapython.dnsutil import DNSResolver from ipapython.dnsutil import DNSResolver

View File

@ -38,17 +38,12 @@ import six
import ipalib import ipalib
from ipalib import api from ipalib import api
from ipalib.kinit import kinit_keytab, kinit_password
from ipalib.plugable import Plugin from ipalib.plugable import Plugin
from ipalib.request import context from ipalib.request import context
from ipapython.dn import DN from ipapython.dn import DN
from ipapython.ipautil import run from ipapython.ipautil import run
try:
# not available with client-only wheel packages
from ipalib.install.kinit import kinit_keytab, kinit_password
except ImportError:
kinit_keytab = kinit_password = None
try: try:
# not available with client-only wheel packages # not available with client-only wheel packages
from ipaplatform.paths import paths from ipaplatform.paths import paths