mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
Currently CustodiaClient assumes that the client is the host principal, and it is hard-coded to read the host keytab and server keys. For the Lightweight CAs feature, Dogtag on CA replicas will use CustodiaClient to retrieve signing keys from the originating replica. Because this process runs as 'pkiuser', the host keys cannot be used; instead, each Dogtag replica will have a service principal to use for Custodia authentication. Update CustodiaClient to require specifying the client keytab and Custodia keyfile to use, and change the client argument to be a full GSS service name (instead of hard-coding host service) to load from the keytab. Update call sites accordingly. Also pass the given 'ldap_uri' argument through to IPAKEMKeys because without it, the client tries to use LDAPI, but may not have access. Part of: https://fedorahosted.org/freeipa/ticket/4559 Reviewed-By: Jan Cholasta <jcholast@redhat.com>
110 lines
3.7 KiB
Python
110 lines
3.7 KiB
Python
# Copyright (C) 2015 IPA Project Contributors, see COPYING for license
|
|
|
|
from __future__ import print_function
|
|
from custodia.message.kem import KEMClient, KEY_USAGE_SIG, KEY_USAGE_ENC
|
|
from jwcrypto.common import json_decode
|
|
from jwcrypto.jwk import JWK
|
|
from ipapython.secrets.kem import IPAKEMKeys
|
|
from ipapython.secrets.store import iSecStore
|
|
from ipaplatform.paths import paths
|
|
from base64 import b64encode
|
|
import ldapurl
|
|
import gssapi
|
|
import os
|
|
import requests
|
|
|
|
|
|
class CustodiaClient(object):
|
|
|
|
def _client_keys(self):
|
|
return self.ikk.server_keys
|
|
|
|
def _server_keys(self, server, realm):
|
|
principal = 'host/%s@%s' % (server, realm)
|
|
sk = JWK(**json_decode(self.ikk.find_key(principal, KEY_USAGE_SIG)))
|
|
ek = JWK(**json_decode(self.ikk.find_key(principal, KEY_USAGE_ENC)))
|
|
return (sk, ek)
|
|
|
|
def _ldap_uri(self, realm):
|
|
dashrealm = '-'.join(realm.split('.'))
|
|
socketpath = paths.SLAPD_INSTANCE_SOCKET_TEMPLATE % (dashrealm,)
|
|
return 'ldapi://' + ldapurl.ldapUrlEscape(socketpath)
|
|
|
|
def _keystore(self, realm, ldap_uri, auth_type):
|
|
config = dict()
|
|
if ldap_uri is None:
|
|
config['ldap_uri'] = self._ldap_uri(realm)
|
|
else:
|
|
config['ldap_uri'] = ldap_uri
|
|
if auth_type is not None:
|
|
config['auth_type'] = auth_type
|
|
|
|
return iSecStore(config)
|
|
|
|
def __init__(
|
|
self, client_service, keyfile, keytab, server, realm,
|
|
ldap_uri=None, auth_type=None):
|
|
self.client_service = client_service
|
|
self.keytab = keytab
|
|
|
|
# Init creds immediately to make sure they are valid. Creds
|
|
# can also be re-inited by _auth_header to avoid expiry.
|
|
#
|
|
self.creds = self.init_creds()
|
|
|
|
self.service_name = gssapi.Name('HTTP@%s' % (server,),
|
|
gssapi.NameType.hostbased_service)
|
|
self.server = server
|
|
|
|
self.ikk = IPAKEMKeys({'server_keys': keyfile, 'ldap_uri': ldap_uri})
|
|
|
|
self.kemcli = KEMClient(self._server_keys(server, realm),
|
|
self._client_keys())
|
|
|
|
self.keystore = self._keystore(realm, ldap_uri, auth_type)
|
|
|
|
# FIXME: Remove warnings about missig subjAltName
|
|
requests.packages.urllib3.disable_warnings()
|
|
|
|
def init_creds(self):
|
|
name = gssapi.Name(self.client_service,
|
|
gssapi.NameType.hostbased_service)
|
|
store = {'client_keytab': self.keytab,
|
|
'ccache': 'MEMORY:Custodia_%s' % b64encode(os.urandom(8))}
|
|
return gssapi.Credentials(name=name, store=store, usage='initiate')
|
|
|
|
def _auth_header(self):
|
|
if not self.creds or self.creds.lifetime < 300:
|
|
self.creds = self.init_creds()
|
|
ctx = gssapi.SecurityContext(name=self.service_name, creds=self.creds)
|
|
authtok = ctx.step()
|
|
return {'Authorization': 'Negotiate %s' % b64encode(authtok)}
|
|
|
|
def fetch_key(self, keyname, store=True):
|
|
|
|
# Prepare URL
|
|
url = 'https://%s/ipa/keys/%s' % (self.server, keyname)
|
|
|
|
# Prepare signed/encrypted request
|
|
encalg = ('RSA1_5', 'A256CBC-HS512')
|
|
request = self.kemcli.make_request(keyname, encalg=encalg)
|
|
|
|
# Prepare Authentication header
|
|
headers = self._auth_header()
|
|
|
|
# Perform request
|
|
r = requests.get(url, headers=headers,
|
|
params={'type': 'kem', 'value': request})
|
|
r.raise_for_status()
|
|
reply = r.json()
|
|
|
|
if 'type' not in reply or reply['type'] != 'kem':
|
|
raise RuntimeError('Invlid JSON response type')
|
|
|
|
value = self.kemcli.parse_reply(keyname, reply['value'])
|
|
|
|
if store:
|
|
self.keystore.set('keys/%s' % keyname, value)
|
|
else:
|
|
return value
|