csrgen: Remove helper abstraction

All requests now use the OpenSSL formatter. However, we keep Formatter
a separate class so that it can be changed out for tests.

https://pagure.io/freeipa/issue/4899

Reviewed-By: Jan Cholasta <jcholast@redhat.com>
This commit is contained in:
Ben Lipton 2017-03-21 12:21:30 -04:00 committed by Jan Cholasta
parent 6c092c24b2
commit 5420e9cfbe
17 changed files with 75 additions and 253 deletions

View File

@ -244,13 +244,6 @@ class OpenSSLFormatter(Formatter):
return self.SyntaxRule(prepared_template, is_extension)
class CertutilFormatter(Formatter):
base_template_name = 'certutil_base.tmpl'
def _get_template_params(self, syntax_rules):
return {'options': syntax_rules}
class FieldMapping(object):
"""Representation of the rules needed to construct a complete cert field.
@ -279,13 +272,11 @@ class Rule(object):
class RuleProvider(object):
def rules_for_profile(self, profile_id, helper):
def rules_for_profile(self, profile_id):
"""
Return the rules needed to build a CSR using the given profile.
:param profile_id: str, name of the CSR generation profile to use
:param helper: str, name of tool (e.g. openssl, certutil) that will be
used to create CSR
:returns: list of FieldMapping, filled out with the appropriate rules
"""
@ -321,40 +312,31 @@ class FileRuleProvider(RuleProvider):
)
)
def _rule(self, rule_name, helper):
if (rule_name, helper) not in self.rules:
def _rule(self, rule_name):
if rule_name not in self.rules:
try:
with self._open('rules', '%s.json' % rule_name) as f:
ruleset = json.load(f)
ruleconf = json.load(f)
except IOError:
raise errors.NotFound(
reason=_('Ruleset %(ruleset)s does not exist.') %
{'ruleset': rule_name})
reason=_('No generation rule %(rulename)s found.') %
{'rulename': rule_name})
matching_rules = [r for r in ruleset['rules']
if r['helper'] == helper]
if len(matching_rules) == 0:
try:
rule = ruleconf['rule']
except KeyError:
raise errors.EmptyResult(
reason=_('No transformation in "%(ruleset)s" rule supports'
' helper "%(helper)s"') %
{'ruleset': rule_name, 'helper': helper})
elif len(matching_rules) > 1:
raise errors.RedundantMappingRule(
ruleset=rule_name, helper=helper)
rule = matching_rules[0]
reason=_('Generation rule "%(rulename)s" is missing the'
' "rule" key') % {'rulename': rule_name})
options = {}
if 'options' in ruleset:
options.update(ruleset['options'])
if 'options' in rule:
options.update(rule['options'])
options = ruleconf.get('options', {})
self.rules[(rule_name, helper)] = Rule(
self.rules[rule_name] = Rule(
rule_name, rule['template'], options)
return self.rules[(rule_name, helper)]
return self.rules[rule_name]
def rules_for_profile(self, profile_id, helper):
def rules_for_profile(self, profile_id):
try:
with self._open('profiles', '%s.json' % profile_id) as f:
profile = json.load(f)
@ -365,28 +347,23 @@ class FileRuleProvider(RuleProvider):
field_mappings = []
for field in profile:
syntax_rule = self._rule(field['syntax'], helper)
data_rules = [self._rule(name, helper) for name in field['data']]
syntax_rule = self._rule(field['syntax'])
data_rules = [self._rule(name) for name in field['data']]
field_mappings.append(FieldMapping(
syntax_rule.name, syntax_rule, data_rules))
return field_mappings
class CSRGenerator(object):
FORMATTERS = {
'openssl': OpenSSLFormatter,
'certutil': CertutilFormatter,
}
def __init__(self, rule_provider):
def __init__(self, rule_provider, formatter_class=OpenSSLFormatter):
self.rule_provider = rule_provider
self.formatter = formatter_class()
def csr_script(self, principal, config, profile_id, helper):
def csr_script(self, principal, config, profile_id):
render_data = {'subject': principal, 'config': config}
formatter = self.FORMATTERS[helper]()
rules = self.rule_provider.rules_for_profile(profile_id, helper)
template = formatter.build_template(rules)
rules = self.rule_provider.rules_for_profile(profile_id)
template = self.formatter.build_template(rules)
try:
script = template.render(render_data)

View File

@ -1,14 +1,7 @@
{
"rules": [
{
"helper": "openssl",
"template": "DNS = {{subject.krbprincipalname.0.partition('/')[2].partition('@')[0]}}"
},
{
"helper": "certutil",
"template": "dns:{{subject.krbprincipalname.0.partition('/')[2].partition('@')[0]|quote}}"
}
],
"rule": {
"template": "DNS = {{subject.krbprincipalname.0.partition('/')[2].partition('@')[0]}}"
},
"options": {
"data_source": "subject.krbprincipalname.0.partition('/')[2].partition('@')[0]"
}

View File

@ -1,14 +1,7 @@
{
"rules": [
{
"helper": "openssl",
"template": "email = {{subject.mail.0}}"
},
{
"helper": "certutil",
"template": "email:{{subject.mail.0|quote}}"
}
],
"rule": {
"template": "email = {{subject.mail.0}}"
},
"options": {
"data_source": "subject.mail.0"
}

View File

@ -1,14 +1,7 @@
{
"rules": [
{
"helper": "openssl",
"template": "CN={{subject.krbprincipalname.0.partition('/')[2].partition('@')[0]}}"
},
{
"helper": "certutil",
"template": "CN={{subject.krbprincipalname.0.partition('/')[2].partition('@')[0]|quote}}"
}
],
"rule": {
"template": "CN={{subject.krbprincipalname.0.partition('/')[2].partition('@')[0]}}"
},
"options": {
"data_source": "subject.krbprincipalname.0.partition('/')[2].partition('@')[0]"
}

View File

@ -1,14 +1,7 @@
{
"rules": [
{
"helper": "openssl",
"template": "{{config.ipacertificatesubjectbase.0}}"
},
{
"helper": "certutil",
"template": "{{config.ipacertificatesubjectbase.0|quote}}"
}
],
"rule": {
"template": "{{config.ipacertificatesubjectbase.0}}"
},
"options": {
"data_source": "config.ipacertificatesubjectbase.0"
}

View File

@ -1,14 +1,7 @@
{
"rules": [
{
"helper": "openssl",
"template": "CN={{subject.uid.0}}"
},
{
"helper": "certutil",
"template": "CN={{subject.uid.0|quote}}"
}
],
"rule": {
"template": "CN={{subject.uid.0}}"
},
"options": {
"data_source": "subject.uid.0"
}

View File

@ -1,15 +1,8 @@
{
"rules": [
{
"helper": "openssl",
"template": "subjectAltName = @{% call openssl.section() %}{{ datarules|join('\n') }}{% endcall %}",
"options": {
"extension": true
}
},
{
"helper": "certutil",
"template": "--extSAN {{ datarules|join(',') }}"
}
]
"rule": {
"template": "subjectAltName = @{% call openssl.section() %}{{ datarules|join('\n') }}{% endcall %}"
},
"options": {
"extension": true
}
}

View File

@ -1,14 +1,7 @@
{
"rules": [
{
"helper": "openssl",
"template": "distinguished_name = {% call openssl.section() %}{{ datarules|reverse|join('\n') }}{% endcall %}"
},
{
"helper": "certutil",
"template": "-s {{ datarules|join(',') }}"
}
],
"rule": {
"template": "distinguished_name = {% call openssl.section() %}{{ datarules|reverse|join('\n') }}{% endcall %}"
},
"options": {
"required": true,
"data_source_combinator": "and"

View File

@ -1,11 +0,0 @@
#!/bin/bash -e
if [[ $# -lt 1 ]]; then
echo "Usage: $0 <outfile> [<any> <certutil> <args>]"
echo "Called as: $0 $@"
exit 1
fi
CSR="$1"
shift
certutil -R -a -z <(head -c 4096 /dev/urandom) -o "$CSR" {{ options|join(' ') }} "$@"

View File

@ -106,7 +106,7 @@ class cert_get_requestdata(Local):
generator = CSRGenerator(FileRuleProvider())
script = generator.csr_script(
principal_obj, config, profile_id, helper)
principal_obj, config, profile_id)
result = {}
if 'out' in options:

View File

@ -1,12 +1,5 @@
{
"rules": [
{
"helper": "openssl",
"template": "openssl_rule"
},
{
"helper": "certutil",
"template": "certutil_rule"
}
]
"rule": {
"template": "openssl_rule"
}
}

View File

@ -1,18 +1,8 @@
{
"rules": [
{
"helper": "openssl",
"template": "openssl_rule",
"options": {
"helper_option": true
}
},
{
"helper": "certutil",
"template": "certutil_rule"
}
],
"rule": {
"template": "openssl_rule"
},
"options": {
"global_option": true
"rule_option": true
}
}

View File

@ -1,11 +0,0 @@
#!/bin/bash -e
if [[ $# -lt 1 ]]; then
echo "Usage: $0 <outfile> [<any> <certutil> <args>]"
echo "Called as: $0 $@"
exit 1
fi
CSR="$1"
shift
certutil -R -a -z <(head -c 4096 /dev/urandom) -o "$CSR" -s CN=machine.example.com,O=DOMAIN.EXAMPLE.COM --extSAN dns:machine.example.com "$@"

View File

@ -1,11 +0,0 @@
#!/bin/bash -e
if [[ $# -lt 1 ]]; then
echo "Usage: $0 <outfile> [<any> <certutil> <args>]"
echo "Called as: $0 $@"
exit 1
fi
CSR="$1"
shift
certutil -R -a -z <(head -c 4096 /dev/urandom) -o "$CSR" -s CN=testuser,O=DOMAIN.EXAMPLE.COM --extSAN email:testuser@example.com "$@"

View File

@ -36,7 +36,7 @@ class StubRuleProvider(csrgen.RuleProvider):
'example', self.syntax_rule, [self.data_rule])
self.rules = [self.field_mapping]
def rules_for_profile(self, profile_id, helper):
def rules_for_profile(self, profile_id):
return self.rules
@ -50,10 +50,6 @@ class IdentityFormatter(csrgen.Formatter):
return {'options': syntax_rules}
class IdentityCSRGenerator(csrgen.CSRGenerator):
FORMATTERS = {'identity': IdentityFormatter}
class test_Formatter(object):
def test_prepare_data_rule_with_data_source(self, formatter):
data_rule = csrgen.Rule('uid', '{{subject.uid.0}}',
@ -139,40 +135,23 @@ class test_FileRuleProvider(object):
def test_rule_basic(self, rule_provider):
rule_name = 'basic'
rule1 = rule_provider._rule(rule_name, 'openssl')
rule2 = rule_provider._rule(rule_name, 'certutil')
rule = rule_provider._rule(rule_name)
assert rule1.template == 'openssl_rule'
assert rule2.template == 'certutil_rule'
assert rule.template == 'openssl_rule'
def test_rule_global_options(self, rule_provider):
rule_name = 'options'
rule1 = rule_provider._rule(rule_name, 'openssl')
rule2 = rule_provider._rule(rule_name, 'certutil')
rule = rule_provider._rule(rule_name)
assert rule1.options['global_option'] is True
assert rule2.options['global_option'] is True
def test_rule_helper_options(self, rule_provider):
rule_name = 'options'
rule1 = rule_provider._rule(rule_name, 'openssl')
rule2 = rule_provider._rule(rule_name, 'certutil')
assert rule1.options['helper_option'] is True
assert 'helper_option' not in rule2.options
assert rule.options['rule_option'] is True
def test_rule_nosuchrule(self, rule_provider):
with pytest.raises(errors.NotFound):
rule_provider._rule('nosuchrule', 'openssl')
def test_rule_nosuchhelper(self, rule_provider):
with pytest.raises(errors.EmptyResult):
rule_provider._rule('basic', 'nosuchhelper')
rule_provider._rule('nosuchrule')
def test_rules_for_profile_success(self, rule_provider):
rules = rule_provider.rules_for_profile('profile', 'certutil')
rules = rule_provider.rules_for_profile('profile')
assert len(rules) == 1
field_mapping = rules[0]
@ -182,7 +161,7 @@ class test_FileRuleProvider(object):
def test_rules_for_profile_nosuchprofile(self, rule_provider):
with pytest.raises(errors.NotFound):
rule_provider.rules_for_profile('nosuchprofile', 'certutil')
rule_provider.rules_for_profile('nosuchprofile')
class test_CSRGenerator(object):
@ -197,28 +176,9 @@ class test_CSRGenerator(object):
],
}
script = generator.csr_script(principal, config, 'userCert', 'openssl')
script = generator.csr_script(principal, config, 'userCert')
with open(os.path.join(
CSR_DATA_DIR, 'scripts', 'userCert_openssl.sh')) as f:
expected_script = f.read()
assert script == expected_script
def test_userCert_Certutil(self, generator):
principal = {
'uid': ['testuser'],
'mail': ['testuser@example.com'],
}
config = {
'ipacertificatesubjectbase': [
'O=DOMAIN.EXAMPLE.COM'
],
}
script = generator.csr_script(
principal, config, 'userCert', 'certutil')
with open(os.path.join(
CSR_DATA_DIR, 'scripts', 'userCert_certutil.sh')) as f:
CSR_DATA_DIR, 'configs', 'userCert.conf')) as f:
expected_script = f.read()
assert script == expected_script
@ -235,28 +195,9 @@ class test_CSRGenerator(object):
}
script = generator.csr_script(
principal, config, 'caIPAserviceCert', 'openssl')
principal, config, 'caIPAserviceCert')
with open(os.path.join(
CSR_DATA_DIR, 'scripts', 'caIPAserviceCert_openssl.sh')) as f:
expected_script = f.read()
assert script == expected_script
def test_caIPAserviceCert_Certutil(self, generator):
principal = {
'krbprincipalname': [
'HTTP/machine.example.com@DOMAIN.EXAMPLE.COM'
],
}
config = {
'ipacertificatesubjectbase': [
'O=DOMAIN.EXAMPLE.COM'
],
}
script = generator.csr_script(
principal, config, 'caIPAserviceCert', 'certutil')
with open(os.path.join(
CSR_DATA_DIR, 'scripts', 'caIPAserviceCert_certutil.sh')) as f:
CSR_DATA_DIR, 'configs', 'caIPAserviceCert.conf')) as f:
expected_script = f.read()
assert script == expected_script
@ -267,10 +208,11 @@ class test_rule_handling(object):
rule_provider = StubRuleProvider()
rule_provider.data_rule.template = '{{subject.mail}}'
rule_provider.data_rule.options = {'data_source': 'subject.mail'}
generator = IdentityCSRGenerator(rule_provider)
generator = csrgen.CSRGenerator(
rule_provider, formatter_class=IdentityFormatter)
script = generator.csr_script(
principal, {}, 'example', 'identity')
principal, {}, 'example')
assert script == '\n'
def test_twoDataRulesOneMissing(self, generator):
@ -280,9 +222,10 @@ class test_rule_handling(object):
rule_provider.data_rule.options = {'data_source': 'subject.mail'}
rule_provider.field_mapping.data_rules.append(csrgen.Rule(
'data2', '{{subject.uid}}', {'data_source': 'subject.uid'}))
generator = IdentityCSRGenerator(rule_provider)
generator = csrgen.CSRGenerator(
rule_provider, formatter_class=IdentityFormatter)
script = generator.csr_script(principal, {}, 'example', 'identity')
script = generator.csr_script(principal, {}, 'example')
assert script == ',testuser\n'
def test_requiredAttributeMissing(self):
@ -291,8 +234,9 @@ class test_rule_handling(object):
rule_provider.data_rule.template = '{{subject.mail}}'
rule_provider.data_rule.options = {'data_source': 'subject.mail'}
rule_provider.syntax_rule.options = {'required': True}
generator = IdentityCSRGenerator(rule_provider)
generator = csrgen.CSRGenerator(
rule_provider, formatter_class=IdentityFormatter)
with pytest.raises(errors.CSRTemplateError):
_script = generator.csr_script(
principal, {}, 'example', 'identity')
principal, {}, 'example')