freeipa/ipaserver/secrets/store.py
Stanislav Laznicka 24b134c633 Added a PEMFileHandler for Custodia store
This is a preparation step to be able to handle sending RA agent
certificate over Custodia during domain level 1 replica installation.

https://fedorahosted.org/freeipa/ticket/5695

Reviewed-By: Jan Cholasta <jcholast@redhat.com>
2017-03-01 09:43:41 +00:00

323 lines
11 KiB
Python

# Copyright (C) 2015 IPA Project Contributors, see COPYING for license
from __future__ import print_function
from base64 import b64encode, b64decode
from custodia.store.interface import CSStore
from jwcrypto.common import json_decode, json_encode
from ipaplatform.paths import paths
from ipapython import ipautil
from ipaserver.secrets.common import iSecLdap
import ldap
import os
import shutil
import sys
import tempfile
class UnknownKeyName(Exception):
pass
class DBMAPHandler(object):
def __init__(self, config, dbmap, nickname):
raise NotImplementedError
def export_key(self):
raise NotImplementedError
def import_key(self, value):
raise NotImplementedError
def log_error(error):
print(error, file=sys.stderr)
def PKI_TOMCAT_password_callback():
password = None
with open(paths.PKI_TOMCAT_PASSWORD_CONF) as f:
for line in f.readlines():
key, value = line.strip().split('=')
if key == 'internal':
password = value
break
return password
def HTTPD_password_callback():
with open(os.path.join(paths.IPA_RADB_DIR, 'pwdfile.txt')) as f:
password = f.read()
return password
class NSSWrappedCertDB(DBMAPHandler):
'''
Store that extracts private keys from an NSSDB, wrapped with the
private key of the primary CA.
'''
def __init__(self, config, dbmap, nickname):
if 'path' not in dbmap:
raise ValueError(
'Configuration does not provide NSSDB path')
if 'pwcallback' not in dbmap:
raise ValueError(
'Configuration does not provide Password Calback')
if 'wrap_nick' not in dbmap:
raise ValueError(
'Configuration does not provide nickname of wrapping key')
self.nssdb_path = dbmap['path']
self.nssdb_password = dbmap['pwcallback']()
self.wrap_nick = dbmap['wrap_nick']
self.target_nick = nickname
def export_key(self):
tdir = tempfile.mkdtemp(dir=paths.TMP)
try:
nsspwfile = os.path.join(tdir, 'nsspwfile')
with open(nsspwfile, 'w+') as f:
f.write(self.nssdb_password)
wrapped_key_file = os.path.join(tdir, 'wrapped_key')
certificate_file = os.path.join(tdir, 'certificate')
ipautil.run([
paths.PKI, '-d', self.nssdb_path, '-C', nsspwfile,
'ca-authority-key-export',
'--wrap-nickname', self.wrap_nick,
'--target-nickname', self.target_nick,
'-o', wrapped_key_file])
ipautil.run([
paths.CERTUTIL, '-d', self.nssdb_path,
'-L', '-n', self.target_nick,
'-a', '-o', certificate_file])
with open(wrapped_key_file, 'r') as f:
wrapped_key = f.read()
with open(certificate_file, 'r') as f:
certificate = f.read()
finally:
shutil.rmtree(tdir)
return json_encode({
'wrapped_key': b64encode(wrapped_key),
'certificate': certificate})
class NSSCertDB(DBMAPHandler):
def __init__(self, config, dbmap, nickname):
if 'type' not in dbmap or dbmap['type'] != 'NSSDB':
raise ValueError('Invalid type "%s",'
' expected "NSSDB"' % (dbmap['type'],))
if 'path' not in dbmap:
raise ValueError('Configuration does not provide NSSDB path')
if 'pwcallback' not in dbmap:
raise ValueError('Configuration does not provide Password Calback')
self.nssdb_path = dbmap['path']
self.nickname = nickname
self.nssdb_password = dbmap['pwcallback']()
def export_key(self):
tdir = tempfile.mkdtemp(dir=paths.TMP)
try:
nsspwfile = os.path.join(tdir, 'nsspwfile')
with open(nsspwfile, 'w+') as f:
f.write(self.nssdb_password)
pk12pwfile = os.path.join(tdir, 'pk12pwfile')
password = ipautil.ipa_generate_password()
with open(pk12pwfile, 'w+') as f:
f.write(password)
pk12file = os.path.join(tdir, 'pk12file')
ipautil.run([paths.PK12UTIL,
"-d", self.nssdb_path,
"-o", pk12file,
"-n", self.nickname,
"-k", nsspwfile,
"-w", pk12pwfile])
with open(pk12file, 'r') as f:
data = f.read()
finally:
shutil.rmtree(tdir)
return json_encode({'export password': password,
'pkcs12 data': b64encode(data)})
def import_key(self, value):
v = json_decode(value)
tdir = tempfile.mkdtemp(dir=paths.TMP)
try:
nsspwfile = os.path.join(tdir, 'nsspwfile')
with open(nsspwfile, 'w+') as f:
f.write(self.nssdb_password)
pk12pwfile = os.path.join(tdir, 'pk12pwfile')
with open(pk12pwfile, 'w+') as f:
f.write(v['export password'])
pk12file = os.path.join(tdir, 'pk12file')
with open(pk12file, 'w+') as f:
f.write(b64decode(v['pkcs12 data']))
ipautil.run([paths.PK12UTIL,
"-d", self.nssdb_path,
"-i", pk12file,
"-n", self.nickname,
"-k", nsspwfile,
"-w", pk12pwfile])
finally:
shutil.rmtree(tdir)
# Exfiltrate the DM password Hash so it can be set in replica's and this
# way let a replica be install without knowing the DM password and yet
# still keep the DM password synchronized across replicas
class DMLDAP(DBMAPHandler):
def __init__(self, config, dbmap, nickname):
if 'type' not in dbmap or dbmap['type'] != 'DMLDAP':
raise ValueError('Invalid type "%s",'
' expected "DMLDAP"' % (dbmap['type'],))
if nickname != 'DMHash':
raise UnknownKeyName("Unknown Key Named '%s'" % nickname)
self.ldap = iSecLdap(config['ldap_uri'],
config.get('auth_type', None))
def export_key(self):
conn = self.ldap.connect()
r = conn.search_s('cn=config', ldap.SCOPE_BASE,
attrlist=['nsslapd-rootpw'])
if len(r) != 1:
raise RuntimeError('DM Hash not found!')
return json_encode({'dmhash': r[0][1]['nsslapd-rootpw'][0]})
def import_key(self, value):
v = json_decode(value)
conn = self.ldap.connect()
mods = [(ldap.MOD_REPLACE, 'nsslapd-rootpw', str(v['dmhash']))]
conn.modify_s('cn=config', mods)
class PEMFileHandler(DBMAPHandler):
def __init__(self, config, dbmap, nickname=None):
if 'type' not in dbmap or dbmap['type'] != 'OPENSSL':
raise ValueError('Invalid type "{t}", expected OPENSSL'
.format(t=dbmap['type']))
self.certfile = dbmap['certfile']
self.keyfile = dbmap.get(['keyfile'])
def export_key(self):
_fd, tmpfile = tempfile.mkstemp(dir=paths.TMP)
password = ipautil.ipa_generate_password()
args = [
paths.OPENSSL,
"pkcs12", "-export",
"-in", self.certfile,
"-out", tmpfile,
"-password", "pass:{pwd}".format(pwd=password)
]
if self.keyfile is not None:
args.extend(["-inkey", self.keyfile])
try:
ipautil.run(args, nolog=password)
with open(tmpfile, 'r') as f:
data = f.read()
finally:
os.remove(tmpfile)
return json_encode({'export password': password,
'pkcs12 data': b64encode(data)})
def import_key(self, value):
v = json_decode(value)
data = b64decode(v['pkcs12 data'])
password = v['export password']
try:
_fd, tmpdata = tempfile.mkstemp(dir=paths.TMP)
with open(tmpdata, 'w') as f:
f.write(data)
# get the certificate from the file
ipautil.run([paths.OPENSSL,
"pkcs12",
"-in", tmpdata,
"-clcerts", "-nokeys",
"-out", self.certfile,
"-passin", "pass:{pwd}".format(pwd=password)],
nolog=(password))
if self.keyfile is not None:
# get the private key from the file
ipautil.run([paths.OPENSSL,
"pkcs12",
"-in", tmpdata,
"-nocerts", "-nodes",
"-out", self.keyfile,
"-passin", "pass:{pwd}".format(pwd=password)],
nolog=(password))
finally:
os.remove(tmpdata)
NAME_DB_MAP = {
'ca': {
'type': 'NSSDB',
'path': paths.PKI_TOMCAT_ALIAS_DIR,
'handler': NSSCertDB,
'pwcallback': PKI_TOMCAT_password_callback,
},
'ca_wrapped': {
'handler': NSSWrappedCertDB,
'path': paths.PKI_TOMCAT_ALIAS_DIR,
'pwcallback': PKI_TOMCAT_password_callback,
'wrap_nick': 'caSigningCert cert-pki-ca',
},
'ra': {
'type': 'NSSDB',
'path': paths.IPA_RADB_DIR,
'handler': NSSCertDB,
'pwcallback': HTTPD_password_callback,
},
'dm': {
'type': 'DMLDAP',
'handler': DMLDAP,
}
}
class IPASecStore(CSStore):
def __init__(self, config=None):
self.config = config
def _get_handler(self, key):
path = key.split('/', 3)
if len(path) != 3 or path[0] != 'keys':
raise ValueError('Invalid name')
if path[1] not in NAME_DB_MAP:
raise UnknownKeyName("Unknown DB named '%s'" % path[1])
dbmap = NAME_DB_MAP[path[1]]
return dbmap['handler'](self.config, dbmap, path[2])
def get(self, key):
try:
key_handler = self._get_handler(key)
value = key_handler.export_key()
except Exception as e: # pylint: disable=broad-except
log_error('Error retrievieng key "%s": %s' % (key, str(e)))
value = None
return value
def set(self, key, value, replace=False):
try:
key_handler = self._get_handler(key)
key_handler.import_key(value)
except Exception as e: # pylint: disable=broad-except
log_error('Error storing key "%s": %s' % (key, str(e)))
def list(self, keyfilter=None):
raise NotImplementedError
def cut(self, key):
raise NotImplementedError
def span(self, key):
raise NotImplementedError
# backwards compatibility with FreeIPA 4.3 and 4.4.
iSecStore = IPASecStore