mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2024-12-24 16:10:02 -06:00
8fbcc33534
Add support for Custodia ca_wrapped clients to specify the desired symmetric encryption algorithm for exporting the wrapped signing key (this mechanism is used for LWCA key replication). If not specified, we must assume that the client has an older Dogtag version that can only import keys wrapped with DES-EDE3-CBC encryption. The selected algorithm gets passed to the 'nsswrappedcert' handler, which in turn passes it to the 'pki ca-authority-key-export' command (which is part of Dogtag). Client-side changes will occur in a subsequent commit. Part of: https://pagure.io/freeipa/issue/8020 Reviewed-By: Alexander Bokovoy <abbra@users.noreply.github.com>
234 lines
6.4 KiB
Python
234 lines
6.4 KiB
Python
# Copyright (C) 2015 IPA Project Contributors, see COPYING for license
|
|
|
|
from __future__ import print_function, absolute_import
|
|
import os
|
|
import sys
|
|
|
|
from custodia.plugin import CSStore
|
|
|
|
from ipaplatform.paths import paths
|
|
from ipaplatform.constants import constants
|
|
from ipapython import ipautil
|
|
|
|
|
|
class UnknownKeyName(Exception):
|
|
pass
|
|
|
|
|
|
class InvalidKeyArguments(Exception):
|
|
pass
|
|
|
|
|
|
class DBMAPHandler:
|
|
dbtype = None
|
|
supports_extra_args = False
|
|
|
|
def __init__(self, config, dbmap, nickname):
|
|
dbtype = dbmap.get('type')
|
|
if dbtype is None or dbtype != self.dbtype:
|
|
raise ValueError(
|
|
"Invalid type '{}', expected '{}'".format(
|
|
dbtype, self.dbtype
|
|
)
|
|
)
|
|
self.config = config
|
|
self.dbmap = dbmap
|
|
self.nickname = nickname
|
|
|
|
def export_key(self):
|
|
raise NotImplementedError
|
|
|
|
def import_key(self, value):
|
|
raise NotImplementedError
|
|
|
|
|
|
class DBMAPCommandHandler(DBMAPHandler):
|
|
def __init__(self, config, dbmap, nickname):
|
|
super().__init__(config, dbmap, nickname)
|
|
self.runas = dbmap.get('runas')
|
|
self.command = os.path.join(
|
|
paths.IPA_CUSTODIA_HANDLER,
|
|
dbmap['command']
|
|
)
|
|
|
|
def run_handler(self, extra_args=(), stdin=None):
|
|
"""Run handler script to export / import key material
|
|
"""
|
|
args = [self.command]
|
|
args.extend(extra_args)
|
|
kwargs = dict(
|
|
runas=self.runas,
|
|
encoding='utf-8',
|
|
)
|
|
|
|
if stdin:
|
|
args.extend(['--import', '-'])
|
|
kwargs.update(stdin=stdin)
|
|
else:
|
|
args.extend(['--export', '-'])
|
|
kwargs.update(capture_output=True)
|
|
|
|
result = ipautil.run(args, **kwargs)
|
|
|
|
if stdin is None:
|
|
return result.output
|
|
else:
|
|
return None
|
|
|
|
|
|
def log_error(error):
|
|
print(error, file=sys.stderr)
|
|
|
|
|
|
class NSSWrappedCertDB(DBMAPCommandHandler):
|
|
"""
|
|
Store that extracts private keys from an NSSDB, wrapped with the
|
|
private key of the primary CA.
|
|
"""
|
|
dbtype = 'NSSDB'
|
|
supports_extra_args = True
|
|
|
|
OID_DES_EDE3_CBC = '1.2.840.113549.3.7'
|
|
|
|
def __init__(self, config, dbmap, nickname, *extra_args):
|
|
super().__init__(config, dbmap, nickname)
|
|
|
|
# Extra args is either a single OID specifying desired wrap
|
|
# algorithm, or empty. If empty, we must assume that the
|
|
# client is an old version that only supports DES-EDE3-CBC.
|
|
#
|
|
# Using either the client's requested algorithm or the
|
|
# default of DES-EDE3-CBC, we pass it along to the handler
|
|
# via the --algorithm option. The handler, in turn, passes
|
|
# it along to the 'pki ca-authority-key-export' program
|
|
# (which is part of Dogtag).
|
|
#
|
|
if len(extra_args) > 1:
|
|
raise InvalidKeyArguments("Too many arguments")
|
|
if len(extra_args) == 1:
|
|
self.alg = extra_args[0]
|
|
else:
|
|
self.alg = self.OID_DES_EDE3_CBC
|
|
|
|
def export_key(self):
|
|
return self.run_handler([
|
|
'--nickname', self.nickname,
|
|
'--algorithm', self.alg,
|
|
])
|
|
|
|
|
|
class NSSCertDB(DBMAPCommandHandler):
|
|
dbtype = 'NSSDB'
|
|
|
|
def export_key(self):
|
|
return self.run_handler(['--nickname', self.nickname])
|
|
|
|
def import_key(self, value):
|
|
return self.run_handler(
|
|
['--nickname', self.nickname],
|
|
stdin=value
|
|
)
|
|
|
|
|
|
# Exfiltrate the DM password Hash so it can be set in replica's and this
|
|
# way let a replica be install without knowing the DM password and yet
|
|
# still keep the DM password synchronized across replicas
|
|
class DMLDAP(DBMAPCommandHandler):
|
|
dbtype = 'DMLDAP'
|
|
|
|
def __init__(self, config, dbmap, nickname):
|
|
super().__init__(config, dbmap, nickname)
|
|
if nickname != 'DMHash':
|
|
raise UnknownKeyName("Unknown Key Named '%s'" % nickname)
|
|
|
|
def export_key(self):
|
|
return self.run_handler()
|
|
|
|
def import_key(self, value):
|
|
self.run_handler(stdin=value)
|
|
|
|
|
|
class PEMFileHandler(DBMAPCommandHandler):
|
|
dbtype = 'PEM'
|
|
|
|
def export_key(self):
|
|
return self.run_handler()
|
|
|
|
def import_key(self, value):
|
|
return self.run_handler(stdin=value)
|
|
|
|
|
|
NAME_DB_MAP = {
|
|
'ca': {
|
|
'type': 'NSSDB',
|
|
'handler': NSSCertDB,
|
|
'command': 'ipa-custodia-pki-tomcat',
|
|
'runas': constants.PKI_USER,
|
|
},
|
|
'ca_wrapped': {
|
|
'type': 'NSSDB',
|
|
'handler': NSSWrappedCertDB,
|
|
'command': 'ipa-custodia-pki-tomcat-wrapped',
|
|
'runas': constants.PKI_USER,
|
|
},
|
|
'ra': {
|
|
'type': 'PEM',
|
|
'handler': PEMFileHandler,
|
|
'command': 'ipa-custodia-ra-agent',
|
|
'runas': None, # import needs root permission to write to directory
|
|
},
|
|
'dm': {
|
|
'type': 'DMLDAP',
|
|
'handler': DMLDAP,
|
|
'command': 'ipa-custodia-dmldap',
|
|
'runas': None, # root
|
|
}
|
|
}
|
|
|
|
|
|
class IPASecStore(CSStore):
|
|
|
|
def __init__(self, config=None):
|
|
self.config = config
|
|
|
|
def _get_handler(self, key):
|
|
path = key.split('/', 3)
|
|
if len(path) < 3 or path[0] != 'keys':
|
|
raise ValueError('Invalid name')
|
|
if path[1] not in NAME_DB_MAP:
|
|
raise UnknownKeyName("Unknown DB named '%s'" % path[1])
|
|
dbmap = NAME_DB_MAP[path[1]]
|
|
handler = dbmap['handler']
|
|
if len(path) > 3 and not handler.supports_extra_args:
|
|
raise InvalidKeyArguments('Handler does not support extra args')
|
|
return handler(self.config, dbmap, path[2], *path[3:])
|
|
|
|
def get(self, key):
|
|
try:
|
|
key_handler = self._get_handler(key)
|
|
value = key_handler.export_key()
|
|
except Exception as e: # pylint: disable=broad-except
|
|
log_error('Error retrieving key "%s": %s' % (key, str(e)))
|
|
value = None
|
|
return value
|
|
|
|
def set(self, key, value, replace=False):
|
|
try:
|
|
key_handler = self._get_handler(key)
|
|
key_handler.import_key(value)
|
|
except Exception as e: # pylint: disable=broad-except
|
|
log_error('Error storing key "%s": %s' % (key, str(e)))
|
|
|
|
def list(self, keyfilter=None):
|
|
raise NotImplementedError
|
|
|
|
def cut(self, key):
|
|
raise NotImplementedError
|
|
|
|
def span(self, key):
|
|
raise NotImplementedError
|
|
|
|
|
|
# backwards compatibility with FreeIPA 4.3 and 4.4.
|
|
iSecStore = IPASecStore
|