csrgen: Modify cert_get_requestdata to return a CertificationRequestInfo

Also modify cert_request to use this new format. Note, only PEM private
keys are supported for now. NSS databases are not.

https://pagure.io/freeipa/issue/4899

Reviewed-By: Jan Cholasta <jcholast@redhat.com>
This commit is contained in:
Ben Lipton
2017-01-06 11:19:19 -05:00
committed by Jan Cholasta
parent 136c6c3e2a
commit e7588ab2dc
4 changed files with 411 additions and 57 deletions

View File

@@ -7,13 +7,22 @@ import errno
import json
import os.path
import pipes
import subprocess
import traceback
import pkg_resources
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.serialization import (
load_pem_private_key, Encoding, PublicFormat)
import jinja2
import jinja2.ext
import jinja2.sandbox
from pyasn1.codec.der import decoder, encoder
from pyasn1.type import univ
from pyasn1_modules import rfc2314
import six
from ipalib import api
@@ -56,7 +65,8 @@ class IPAExtension(jinja2.ext.Extension):
def required(self, data, name):
if not data:
raise errors.CSRTemplateError(
reason=_('Required CSR generation rule %(name)s is missing data') %
reason=_(
'Required CSR generation rule %(name)s is missing data') %
{'name': name})
return data
@@ -373,3 +383,66 @@ class CSRGenerator(object):
'Template error when formatting certificate data'))
return config
class CSRLibraryAdaptor(object):
def get_subject_public_key_info(self):
raise NotImplementedError('Use a subclass of CSRLibraryAdaptor')
def sign_csr(self, certification_request_info):
"""Sign a CertificationRequestInfo.
Returns: str, a DER-encoded signed CSR.
"""
raise NotImplementedError('Use a subclass of CSRLibraryAdaptor')
class OpenSSLAdaptor(object):
def __init__(self, key_filename, password_filename):
self.key_filename = key_filename
self.password_filename = password_filename
def key(self):
with open(self.key_filename, 'r') as key_file:
key_bytes = key_file.read()
password = None
if self.password_filename is not None:
with open(self.password_filename, 'r') as password_file:
password = password_file.read().strip()
key = load_pem_private_key(key_bytes, password, default_backend())
return key
def get_subject_public_key_info(self):
pubkey_info = self.key().public_key().public_bytes(
Encoding.DER, PublicFormat.SubjectPublicKeyInfo)
return pubkey_info
def sign_csr(self, certification_request_info):
reqinfo = decoder.decode(
certification_request_info, rfc2314.CertificationRequestInfo())[0]
csr = rfc2314.CertificationRequest()
csr.setComponentByName('certificationRequestInfo', reqinfo)
algorithm = rfc2314.SignatureAlgorithmIdentifier()
algorithm.setComponentByName(
'algorithm', univ.ObjectIdentifier(
'1.2.840.113549.1.1.11')) # sha256WithRSAEncryption
csr.setComponentByName('signatureAlgorithm', algorithm)
signature = self.key().sign(
certification_request_info,
padding.PKCS1v15(),
hashes.SHA256()
)
asn1sig = univ.BitString("'%s'H" % signature.encode('hex'))
csr.setComponentByName('signature', asn1sig)
return encoder.encode(csr)
class NSSAdaptor(object):
def get_subject_public_key_info(self):
raise NotImplementedError('NSS is not yet supported')
def sign_csr(self, certification_request_info):
raise NotImplementedError('NSS is not yet supported')

291
ipaclient/csrgen_ffi.py Normal file
View File

@@ -0,0 +1,291 @@
#!/usr/bin/python
from cffi import FFI
import ctypes.util
from ipalib import errors
_ffi = FFI()
_ffi.cdef('''
typedef ... CONF;
typedef ... CONF_METHOD;
typedef ... BIO;
typedef ... ipa_STACK_OF_CONF_VALUE;
/* openssl/conf.h */
typedef struct {
char *section;
char *name;
char *value;
} CONF_VALUE;
CONF *NCONF_new(CONF_METHOD *meth);
void NCONF_free(CONF *conf);
int NCONF_load_bio(CONF *conf, BIO *bp, long *eline);
ipa_STACK_OF_CONF_VALUE *NCONF_get_section(const CONF *conf,
const char *section);
char *NCONF_get_string(const CONF *conf, const char *group, const char *name);
/* openssl/safestack.h */
// int sk_CONF_VALUE_num(ipa_STACK_OF_CONF_VALUE *);
// CONF_VALUE *sk_CONF_VALUE_value(ipa_STACK_OF_CONF_VALUE *, int);
/* openssl/stack.h */
typedef ... _STACK;
int sk_num(const _STACK *);
void *sk_value(const _STACK *, int);
/* openssl/bio.h */
BIO *BIO_new_mem_buf(const void *buf, int len);
int BIO_free(BIO *a);
/* openssl/asn1.h */
typedef struct ASN1_ENCODING_st {
unsigned char *enc; /* DER encoding */
long len; /* Length of encoding */
int modified; /* set to 1 if 'enc' is invalid */
} ASN1_ENCODING;
/* openssl/evp.h */
typedef ... EVP_PKEY;
void EVP_PKEY_free(EVP_PKEY *pkey);
/* openssl/x509.h */
typedef ... ASN1_INTEGER;
typedef ... ASN1_BIT_STRING;
typedef ... X509;
typedef ... X509_ALGOR;
typedef ... X509_CRL;
typedef ... X509_NAME;
typedef ... X509_PUBKEY;
typedef ... ipa_STACK_OF_X509_ATTRIBUTE;
typedef struct X509_req_info_st {
ASN1_ENCODING enc;
ASN1_INTEGER *version;
X509_NAME *subject;
X509_PUBKEY *pubkey;
/* d=2 hl=2 l= 0 cons: cont: 00 */
ipa_STACK_OF_X509_ATTRIBUTE *attributes; /* [ 0 ] */
} X509_REQ_INFO;
typedef struct X509_req_st {
X509_REQ_INFO *req_info;
X509_ALGOR *sig_alg;
ASN1_BIT_STRING *signature;
int references;
} X509_REQ;
X509_REQ *X509_REQ_new(void);
void X509_REQ_free(X509_REQ *);
EVP_PKEY *d2i_PUBKEY_bio(BIO *bp, EVP_PKEY **a);
int X509_REQ_set_pubkey(X509_REQ *x, EVP_PKEY *pkey);
int X509_NAME_add_entry_by_txt(X509_NAME *name, const char *field, int type,
const unsigned char *bytes, int len, int loc,
int set);
int X509_NAME_entry_count(X509_NAME *name);
int i2d_X509_REQ_INFO(X509_REQ_INFO *a, unsigned char **out); \
/* openssl/x509v3.h */
typedef ... X509V3_CONF_METHOD;
typedef struct v3_ext_ctx {
int flags;
X509 *issuer_cert;
X509 *subject_cert;
X509_REQ *subject_req;
X509_CRL *crl;
X509V3_CONF_METHOD *db_meth;
void *db;
} X509V3_CTX;
void X509V3_set_ctx(X509V3_CTX *ctx, X509 *issuer, X509 *subject,
X509_REQ *req, X509_CRL *crl, int flags);
void X509V3_set_nconf(X509V3_CTX *ctx, CONF *conf);
int X509V3_EXT_REQ_add_nconf(CONF *conf, X509V3_CTX *ctx, char *section,
X509_REQ *req);
/* openssl/x509v3.h */
unsigned long ERR_get_error(void);
char *ERR_error_string(unsigned long e, char *buf);
''')
_libcrypto = _ffi.dlopen(ctypes.util.find_library('crypto'))
NULL = _ffi.NULL
# openssl/conf.h
NCONF_new = _libcrypto.NCONF_new
NCONF_free = _libcrypto.NCONF_free
NCONF_load_bio = _libcrypto.NCONF_load_bio
NCONF_get_section = _libcrypto.NCONF_get_section
NCONF_get_string = _libcrypto.NCONF_get_string
# openssl/stack.h
sk_num = _libcrypto.sk_num
sk_value = _libcrypto.sk_value
def sk_CONF_VALUE_num(sk):
return sk_num(_ffi.cast("_STACK *", sk))
def sk_CONF_VALUE_value(sk, i):
return _ffi.cast("CONF_VALUE *", sk_value(_ffi.cast("_STACK *", sk), i))
# openssl/bio.h
BIO_new_mem_buf = _libcrypto.BIO_new_mem_buf
BIO_free = _libcrypto.BIO_free
# openssl/x509.h
X509_REQ_new = _libcrypto.X509_REQ_new
X509_REQ_free = _libcrypto.X509_REQ_free
X509_REQ_set_pubkey = _libcrypto.X509_REQ_set_pubkey
d2i_PUBKEY_bio = _libcrypto.d2i_PUBKEY_bio
i2d_X509_REQ_INFO = _libcrypto.i2d_X509_REQ_INFO
X509_NAME_add_entry_by_txt = _libcrypto.X509_NAME_add_entry_by_txt
X509_NAME_entry_count = _libcrypto.X509_NAME_entry_count
def X509_REQ_get_subject_name(req):
return req.req_info.subject
# openssl/evp.h
EVP_PKEY_free = _libcrypto.EVP_PKEY_free
# openssl/asn1.h
MBSTRING_UTF8 = 0x1000
# openssl/x509v3.h
X509V3_set_ctx = _libcrypto.X509V3_set_ctx
X509V3_set_nconf = _libcrypto.X509V3_set_nconf
X509V3_EXT_REQ_add_nconf = _libcrypto.X509V3_EXT_REQ_add_nconf
# openssl/err.h
ERR_get_error = _libcrypto.ERR_get_error
ERR_error_string = _libcrypto.ERR_error_string
def _raise_openssl_errors():
msgs = []
code = ERR_get_error()
while code != 0:
msg = ERR_error_string(code, NULL)
msgs.append(_ffi.string(msg))
code = ERR_get_error()
raise errors.CSRTemplateError(reason='\n'.join(msgs))
def _parse_dn_section(subj, dn_sk):
for i in range(sk_CONF_VALUE_num(dn_sk)):
v = sk_CONF_VALUE_value(dn_sk, i)
rdn_type = _ffi.string(v.name)
# Skip past any leading X. X: X, etc to allow for multiple instances
for idx, c in enumerate(rdn_type):
if c in ':,.':
if idx+1 < len(rdn_type):
rdn_type = rdn_type[idx+1:]
break
if rdn_type.startswith('+'):
rdn_type = rdn_type[1:]
mval = -1
else:
mval = 0
if not X509_NAME_add_entry_by_txt(
subj, rdn_type, MBSTRING_UTF8, v.value, -1, -1, mval):
_raise_openssl_errors()
if not X509_NAME_entry_count(subj):
raise errors.CSRTemplateError(
reason='error, subject in config file is empty')
def build_requestinfo(config, public_key_info):
reqdata = NULL
req = NULL
nconf_bio = NULL
pubkey_bio = NULL
pubkey = NULL
try:
reqdata = NCONF_new(NULL)
if reqdata == NULL:
_raise_openssl_errors()
nconf_bio = BIO_new_mem_buf(config, len(config))
errorline = _ffi.new('long[1]', [-1])
i = NCONF_load_bio(reqdata, nconf_bio, errorline)
if i < 0:
if errorline[0] < 0:
raise errors.CSRTemplateError(reason="Can't load config file")
else:
raise errors.CSRTemplateError(
reason='Error on line %d of config file' % errorline[0])
dn_sect = NCONF_get_string(reqdata, 'req', 'distinguished_name')
if dn_sect == NULL:
raise errors.CSRTemplateError(
reason='Unable to find "distinguished_name" key in config')
dn_sk = NCONF_get_section(reqdata, dn_sect)
if dn_sk == NULL:
raise errors.CSRTemplateError(
reason='Unable to find "%s" section in config' %
_ffi.string(dn_sect))
pubkey_bio = BIO_new_mem_buf(public_key_info, len(public_key_info))
pubkey = d2i_PUBKEY_bio(pubkey_bio, NULL)
if pubkey == NULL:
_raise_openssl_errors()
req = X509_REQ_new()
if req == NULL:
_raise_openssl_errors()
subject = X509_REQ_get_subject_name(req)
_parse_dn_section(subject, dn_sk)
if not X509_REQ_set_pubkey(req, pubkey):
_raise_openssl_errors()
ext_ctx = _ffi.new("X509V3_CTX[1]")
X509V3_set_ctx(ext_ctx, NULL, NULL, req, NULL, 0)
X509V3_set_nconf(ext_ctx, reqdata)
extn_section = NCONF_get_string(reqdata, "req", "req_extensions")
if extn_section != NULL:
if not X509V3_EXT_REQ_add_nconf(
reqdata, ext_ctx, extn_section, req):
_raise_openssl_errors()
der_len = i2d_X509_REQ_INFO(req.req_info, NULL)
if der_len < 0:
_raise_openssl_errors()
der_buf = _ffi.new("unsigned char[%d]" % der_len)
der_out = _ffi.new("unsigned char **", der_buf)
der_len = i2d_X509_REQ_INFO(req.req_info, der_out)
if der_len < 0:
_raise_openssl_errors()
return _ffi.buffer(der_buf, der_len)
finally:
if reqdata != NULL:
NCONF_free(reqdata)
if req != NULL:
X509_REQ_free(req)
if nconf_bio != NULL:
BIO_free(nconf_bio)
if pubkey_bio != NULL:
BIO_free(pubkey_bio)
if pubkey != NULL:
EVP_PKEY_free(pubkey)

View File

@@ -20,11 +20,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import base64
import subprocess
from tempfile import NamedTemporaryFile as NTF
import six
from ipaclient import csrgen
from ipaclient.frontend import MethodOverride
from ipalib import errors
from ipalib import x509
@@ -108,54 +107,40 @@ class cert_request(CertRetrieveOverride):
if csr is None:
if database:
helper = u'certutil'
helper_args = ['-d', database]
if password_file:
helper_args += ['-f', password_file]
adaptor = csrgen.NSSAdaptor(database, password_file)
elif private_key:
helper = u'openssl'
helper_args = [private_key]
if password_file:
helper_args += ['-passin', 'file:%s' % password_file]
adaptor = csrgen.OpenSSLAdaptor(private_key, password_file)
else:
raise errors.InvocationError(
message=u"One of 'database' or 'private_key' is required")
with NTF() as scriptfile, NTF() as csrfile:
# If csr_profile_id is passed, that takes precedence.
# Otherwise, use profile_id. If neither are passed, the default
# in cert_get_requestdata will be used.
profile_id = csr_profile_id
if profile_id is None:
profile_id = options.get('profile_id')
pubkey_info = adaptor.get_subject_public_key_info()
pubkey_info_b64 = base64.b64encode(pubkey_info)
self.api.Command.cert_get_requestdata(
profile_id=profile_id,
principal=options.get('principal'),
out=unicode(scriptfile.name),
helper=helper)
# If csr_profile_id is passed, that takes precedence.
# Otherwise, use profile_id. If neither are passed, the default
# in cert_get_requestdata will be used.
profile_id = csr_profile_id
if profile_id is None:
profile_id = options.get('profile_id')
helper_cmd = [
'bash', '-e', scriptfile.name, csrfile.name] + helper_args
response = self.api.Command.cert_get_requestdata(
profile_id=profile_id,
principal=options.get('principal'),
public_key_info=unicode(pubkey_info_b64))
try:
subprocess.check_output(helper_cmd)
except subprocess.CalledProcessError as e:
raise errors.CertificateOperationError(
error=(
_('Error running "%(cmd)s" to generate CSR:'
' %(err)s') %
{'cmd': ' '.join(helper_cmd), 'err': e.output}))
req_info_b64 = response['result']['request_info']
req_info = base64.b64decode(req_info_b64)
try:
csr = unicode(csrfile.read())
except IOError as e:
raise errors.CertificateOperationError(
error=(_('Unable to read generated CSR file: %(err)s')
% {'err': e}))
if not csr:
raise errors.CertificateOperationError(
error=(_('Generated CSR was empty')))
csr = adaptor.sign_csr(req_info)
if not csr:
raise errors.CertificateOperationError(
error=(_('Generated CSR was empty')))
# cert_request requires the CSR to be base64-encoded (but PEM
# header and footer are not required)
csr = unicode(base64.b64encode(csr))
else:
if database is not None or private_key is not None:
raise errors.MutuallyExclusiveError(reason=_(

View File

@@ -2,15 +2,18 @@
# Copyright (C) 2016 FreeIPA Contributors see COPYING for license
#
import base64
import six
from ipaclient.csrgen import CSRGenerator, FileRuleProvider
from ipaclient import csrgen
from ipaclient import csrgen_ffi
from ipalib import api
from ipalib import errors
from ipalib import output
from ipalib import util
from ipalib.frontend import Local, Str
from ipalib.parameters import Principal
from ipalib.parameters import File, Principal
from ipalib.plugable import Registry
from ipalib.text import _
from ipapython import dogtag
@@ -43,15 +46,14 @@ class cert_get_requestdata(Local):
label=_('Profile ID'),
doc=_('CSR Generation Profile to use'),
),
Str(
'helper',
label=_('Name of CSR generation tool'),
doc=_('Name of tool (e.g. openssl, certutil) that will be used to'
' create CSR'),
File(
'public_key_info',
label=_('Subject Public Key Info'),
doc=_('DER-encoded SubjectPublicKeyInfo structure'),
),
Str(
'out?',
doc=_('Write CSR generation script to file'),
doc=_('Write CertificationRequestInfo to file'),
),
)
@@ -65,8 +67,8 @@ class cert_get_requestdata(Local):
has_output_params = (
Str(
'script',
label=_('Generation script'),
'request_info',
label=_('CertificationRequestInfo structure'),
)
)
@@ -78,7 +80,8 @@ class cert_get_requestdata(Local):
profile_id = options.get('profile_id')
if profile_id is None:
profile_id = dogtag.DEFAULT_PROFILE
helper = options.get('helper')
public_key_info = options.get('public_key_info')
public_key_info = base64.b64decode(public_key_info)
if self.api.env.in_server:
backend = self.api.Backend.ldap2
@@ -103,16 +106,18 @@ class cert_get_requestdata(Local):
principal_obj = principal_obj['result']
config = api.Command.config_show()['result']
generator = CSRGenerator(FileRuleProvider())
generator = csrgen.CSRGenerator(csrgen.FileRuleProvider())
script = generator.csr_config(principal_obj, config, profile_id)
csr_config = generator.csr_config(principal_obj, config, profile_id)
request_info = base64.b64encode(csrgen_ffi.build_requestinfo(
csr_config.encode('utf8'), public_key_info))
result = {}
if 'out' in options:
with open(options['out'], 'wb') as f:
f.write(script)
f.write(request_info)
else:
result = dict(script=script)
result = dict(request_info=request_info)
return dict(
result=result