freeipa/ipapython/dnssec/ldapkeydb.py

352 lines
12 KiB
Python
Raw Normal View History

#
# Copyright (C) 2014 FreeIPA Contributors see COPYING for license
#
from binascii import hexlify
import collections
import sys
import time
import ipalib
from ipapython.dn import DN
from ipapython import ipaldap
from ipapython import ipautil
from ipaplatform.paths import paths
from ipapython.dnssec.abshsm import (attrs_name2id, attrs_id2name, AbstractHSM,
bool_attr_names, populate_pkcs11_metadata)
import _ipap11helper
import uuid
def uri_escape(val):
"""convert val to %-notation suitable for ID component in URI"""
assert len(val) > 0, "zero-length URI component detected"
hexval = hexlify(val)
out = '%'
# pylint: disable=E1127
out += '%'.join(hexval[i:i+2] for i in range(0, len(hexval), 2))
return out
def ldap_bool(val):
if val == 'TRUE' or val is True:
return True
elif val == 'FALSE' or val is False:
return False
else:
raise AssertionError('invalid LDAP boolean "%s"' % val)
def get_default_attrs(object_classes):
# object class -> default attribute values mapping
defaults = {
u'ipk11publickey': {
'ipk11copyable': True,
'ipk11derive': False,
'ipk11encrypt': False,
'ipk11local': True,
'ipk11modifiable': True,
'ipk11private': True,
'ipk11trusted': False,
'ipk11verify': True,
'ipk11verifyrecover': True,
'ipk11wrap': False
},
u'ipk11privatekey': {
'ipk11alwaysauthenticate': False,
'ipk11alwayssensitive': True,
'ipk11copyable': True,
'ipk11decrypt': False,
'ipk11derive': False,
'ipk11extractable': True,
'ipk11local': True,
'ipk11modifiable': True,
'ipk11neverextractable': False,
'ipk11private': True,
'ipk11sensitive': True,
'ipk11sign': True,
'ipk11signrecover': True,
'ipk11unwrap': False,
'ipk11wrapwithtrusted': False
},
u'ipk11secretkey': {
'ipk11alwaysauthenticate': False,
'ipk11alwayssensitive': True,
'ipk11copyable': True,
'ipk11decrypt': False,
'ipk11derive': False,
'ipk11encrypt': False,
'ipk11extractable': True,
'ipk11local': True,
'ipk11modifiable': True,
'ipk11neverextractable': False,
'ipk11private': True,
'ipk11sensitive': True,
'ipk11sign': False,
'ipk11trusted': False,
'ipk11unwrap': True,
'ipk11verify': False,
'ipk11wrap': True,
'ipk11wrapwithtrusted': False
}
}
# get set of supported object classes
present_clss = set()
for cls in object_classes:
present_clss.add(cls.lower())
present_clss.intersection_update(set(defaults.keys()))
if len(present_clss) <= 0:
raise AssertionError('none of "%s" object classes are supported' %
object_classes)
result = {}
for cls in present_clss:
result.update(defaults[cls])
return result
class Key(collections.MutableMapping):
"""abstraction to hide LDAP entry weirdnesses:
- non-normalized attribute names
- boolean attributes returned as strings
"""
def __init__(self, entry, ldap, ldapkeydb):
self.entry = entry
self.ldap = ldap
self.ldapkeydb = ldapkeydb
self.log = ldap.log.getChild(__name__)
def __getitem__(self, key):
val = self.entry.single_value[key]
if key.lower() in bool_attr_names:
val = ldap_bool(val)
return val
def __setitem__(self, key, value):
self.entry[key] = value
def __delitem__(self, key):
del self.entry[key]
def __iter__(self):
"""generates list of ipa names of all PKCS#11 attributes present in the object"""
for ipa_name in list(self.entry.keys()):
lowercase = ipa_name.lower()
if lowercase in attrs_name2id:
yield lowercase
def __len__(self):
return len(self.entry)
def __str__(self):
return str(self.entry)
def _cleanup_key(self):
"""remove default values from LDAP entry"""
default_attrs = get_default_attrs(self.entry['objectclass'])
empty = object()
for attr in default_attrs:
if self.get(attr, empty) == default_attrs[attr]:
del self[attr]
class ReplicaKey(Key):
# TODO: object class assert
def __init__(self, entry, ldap, ldapkeydb):
super(ReplicaKey, self).__init__(entry, ldap, ldapkeydb)
class MasterKey(Key):
# TODO: object class assert
def __init__(self, entry, ldap, ldapkeydb):
super(MasterKey, self).__init__(entry, ldap, ldapkeydb)
@property
def wrapped_entries(self):
"""LDAP entires with wrapped data
One entry = one blob + ipaWrappingKey pointer to unwrapping key"""
keys = []
if 'ipaSecretKeyRef' not in self.entry:
return keys
for dn in self.entry['ipaSecretKeyRef']:
try:
obj = self.ldap.get_entry(dn)
keys.append(obj)
except ipalib.errors.NotFound:
continue
return keys
def add_wrapped_data(self, data, wrapping_mech, replica_key_id):
wrapping_key_uri = 'pkcs11:id=%s;type=public' \
% uri_escape(replica_key_id)
# TODO: replace this with 'autogenerate' to prevent collisions
uuid_rdn = DN('ipk11UniqueId=%s' % uuid.uuid1())
entry_dn = DN(uuid_rdn, self.ldapkeydb.base_dn)
# TODO: add ipaWrappingMech attribute
entry = self.ldap.make_entry(entry_dn,
objectClass=['ipaSecretKeyObject', 'ipk11Object'],
ipaSecretKey=data,
ipaWrappingKey=wrapping_key_uri,
ipaWrappingMech=wrapping_mech)
self.log.info('adding master key 0x%s wrapped with replica key 0x%s to %s',
hexlify(self['ipk11id']),
hexlify(replica_key_id),
entry_dn)
self.ldap.add_entry(entry)
if 'ipaSecretKeyRef' not in self.entry:
self.entry['objectClass'] += ['ipaSecretKeyRefObject']
self.entry.setdefault('ipaSecretKeyRef', []).append(entry_dn)
class LdapKeyDB(AbstractHSM):
def __init__(self, log, ldap, base_dn):
self.ldap = ldap
self.base_dn = base_dn
self.log = log
self.cache_replica_pubkeys_wrap = None
self.cache_masterkeys = None
self.cache_zone_keypairs = None
def _get_key_dict(self, key_type, ldap_filter):
try:
objs = self.ldap.get_entries(base_dn=self.base_dn,
filter=ldap_filter)
except ipalib.errors.NotFound:
return {}
keys = {}
for o in objs:
# add default values not present in LDAP
key = key_type(o, self.ldap, self)
default_attrs = get_default_attrs(key.entry['objectclass'])
for attr in default_attrs:
key.setdefault(attr, default_attrs[attr])
assert 'ipk11id' in o, 'key is missing ipk11Id in %s' % key.entry.dn
key_id = key['ipk11id']
assert key_id not in keys, 'duplicate ipk11Id=0x%s in "%s" and "%s"' % (hexlify(key_id), key.entry.dn, keys[key_id].entry.dn)
assert 'ipk11label' in key, 'key "%s" is missing ipk11Label' % key.entry.dn
assert 'objectclass' in key.entry, 'key "%s" is missing objectClass attribute' % key.entry.dn
keys[key_id] = key
self._update_keys()
return keys
def _update_key(self, key):
"""remove default values from LDAP entry and write back changes"""
key._cleanup_key()
try:
self.ldap.update_entry(key.entry)
except ipalib.errors.EmptyModlist:
pass
def _update_keys(self):
for cache in [self.cache_masterkeys, self.cache_replica_pubkeys_wrap,
self.cache_zone_keypairs]:
if cache:
for key in cache.values():
self._update_key(key)
def flush(self):
"""write back content of caches to LDAP"""
self._update_keys()
self.cache_masterkeys = None
self.cache_replica_pubkeys_wrap = None
self.cache_zone_keypairs = None
def _import_keys_metadata(self, source_keys):
"""import key metadata from Key-compatible objects
metadata from multiple source keys can be imported into single LDAP
object
:param: source_keys is iterable of (Key object, PKCS#11 object class)"""
entry_dn = DN('ipk11UniqueId=autogenerate', self.base_dn)
entry = self.ldap.make_entry(entry_dn, objectClass=['ipk11Object'])
new_key = Key(entry, self.ldap, self)
for source_key, pkcs11_class in source_keys:
if pkcs11_class == _ipap11helper.KEY_CLASS_SECRET_KEY:
entry['objectClass'].append('ipk11SecretKey')
elif pkcs11_class == _ipap11helper.KEY_CLASS_PUBLIC_KEY:
entry['objectClass'].append('ipk11PublicKey')
elif pkcs11_class == _ipap11helper.KEY_CLASS_PRIVATE_KEY:
entry['objectClass'].append('ipk11PrivateKey')
else:
raise AssertionError('unsupported object class %s' % pkcs11_class)
populate_pkcs11_metadata(source_key, new_key)
new_key._cleanup_key()
return new_key
def import_master_key(self, mkey):
new_key = self._import_keys_metadata(
[(mkey, _ipap11helper.KEY_CLASS_SECRET_KEY)])
self.ldap.add_entry(new_key.entry)
self.log.debug('imported master key metadata: %s', new_key.entry)
def import_zone_key(self, pubkey, pubkey_data, privkey,
privkey_wrapped_data, wrapping_mech, master_key_id):
new_key = self._import_keys_metadata(
[(pubkey, _ipap11helper.KEY_CLASS_PUBLIC_KEY),
(privkey, _ipap11helper.KEY_CLASS_PRIVATE_KEY)])
new_key.entry['objectClass'].append('ipaPrivateKeyObject')
new_key.entry['ipaPrivateKey'] = privkey_wrapped_data
new_key.entry['ipaWrappingKey'] = 'pkcs11:id=%s;type=secret-key' \
% uri_escape(master_key_id)
new_key.entry['ipaWrappingMech'] = wrapping_mech
new_key.entry['objectClass'].append('ipaPublicKeyObject')
new_key.entry['ipaPublicKey'] = pubkey_data
self.ldap.add_entry(new_key.entry)
self.log.debug('imported zone key id: 0x%s', hexlify(new_key['ipk11id']))
@property
def replica_pubkeys_wrap(self):
if self.cache_replica_pubkeys_wrap:
return self.cache_replica_pubkeys_wrap
keys = self._filter_replica_keys(
self._get_key_dict(ReplicaKey,
'(&(objectClass=ipk11PublicKey)(ipk11Wrap=TRUE)(objectClass=ipaPublicKeyObject))'))
self.cache_replica_pubkeys_wrap = keys
return keys
@property
def master_keys(self):
if self.cache_masterkeys:
return self.cache_masterkeys
keys = self._get_key_dict(MasterKey,
'(&(objectClass=ipk11SecretKey)(|(ipk11UnWrap=TRUE)(!(ipk11UnWrap=*)))(ipk11Label=dnssec-master))')
for key in keys.values():
prefix = 'dnssec-master'
assert key['ipk11label'] == prefix, \
'secret key dn="%s" ipk11id=0x%s ipk11label="%s" with ipk11UnWrap = TRUE does not have '\
'"%s" key label' % (
key.entry.dn,
hexlify(key['ipk11id']),
str(key['ipk11label']),
prefix)
self.cache_masterkeys = keys
return keys
@property
def zone_keypairs(self):
if self.cache_zone_keypairs:
return self.cache_zone_keypairs
self.cache_zone_keypairs = self._filter_zone_keys(
self._get_key_dict(Key,
'(&(objectClass=ipk11PrivateKey)(objectClass=ipaPrivateKeyObject)(objectClass=ipk11PublicKey)(objectClass=ipaPublicKeyObject))'))
return self.cache_zone_keypairs