From 3064b890e24a5056c77f19c3951cfb59d49366f8 Mon Sep 17 00:00:00 2001 From: Christian Heimes Date: Tue, 17 Jan 2017 08:49:54 +0100 Subject: [PATCH] Conditionally import pyhbac The pyhbac module is part of SSSD. It's not available as stand-alone PyPI package. It would take a lot of effort to package it because the code is deeply tight into SSSD. Let's follow the example of other SSSD Python packages and make the import of pyhbac conditionally. It's only necessary for caacl and hbactest plugins. I renamed convert_to_ipa_rule() to _convert_to_ipa_rule() because it does not check for presence of pyhbac package itself. The check is performed earlier in execute(). The prefix indicates that it is an internal function and developers have to think twice before using it in another place. This makes it much easier to install ipaserver with instrumented build of Python with a different ABI or in isolated virtual envs to profile and debug the server. Signed-off-by: Christian Heimes Reviewed-By: Martin Basti Reviewed-By: Jan Cholasta --- ipaserver/plugins/caacl.py | 86 --------------------------------- ipaserver/plugins/cert.py | 90 ++++++++++++++++++++++++++++++++++- ipaserver/plugins/hbactest.py | 19 ++++++-- 3 files changed, 105 insertions(+), 90 deletions(-) diff --git a/ipaserver/plugins/caacl.py b/ipaserver/plugins/caacl.py index ff1178ad7..43a397d8c 100644 --- a/ipaserver/plugins/caacl.py +++ b/ipaserver/plugins/caacl.py @@ -2,12 +2,10 @@ # Copyright (C) 2015 FreeIPA Contributors see COPYING for license # -import pyhbac import six from ipalib import api, errors, output from ipalib import Bool, Str, StrEnum -from ipalib.constants import IPA_CA_CN from ipalib.plugable import Registry from .baseldap import ( LDAPObject, LDAPSearch, LDAPCreate, LDAPDelete, LDAPQuery, @@ -80,90 +78,6 @@ EXAMPLES: register = Registry() -def _acl_make_request(principal_type, principal, ca_id, profile_id): - """Construct HBAC request for the given principal, CA and profile""" - - req = pyhbac.HbacRequest() - req.targethost.name = ca_id - req.service.name = profile_id - if principal_type == 'user': - req.user.name = principal.username - elif principal_type == 'host': - req.user.name = principal.hostname - elif principal_type == 'service': - req.user.name = unicode(principal) - groups = [] - if principal_type == 'user': - user_obj = api.Command.user_show(principal.username)['result'] - groups = user_obj.get('memberof_group', []) - groups += user_obj.get('memberofindirect_group', []) - elif principal_type == 'host': - host_obj = api.Command.host_show(principal.hostname)['result'] - groups = host_obj.get('memberof_hostgroup', []) - groups += host_obj.get('memberofindirect_hostgroup', []) - req.user.groups = sorted(set(groups)) - return req - - -def _acl_make_rule(principal_type, obj): - """Turn CA ACL object into HBAC rule. - - ``principal_type`` - String in {'user', 'host', 'service'} - """ - rule = pyhbac.HbacRule(obj['cn'][0]) - rule.enabled = obj['ipaenabledflag'][0] - rule.srchosts.category = {pyhbac.HBAC_CATEGORY_ALL} - - # add CA(s) - if 'ipacacategory' in obj and obj['ipacacategory'][0].lower() == 'all': - rule.targethosts.category = {pyhbac.HBAC_CATEGORY_ALL} - else: - # For compatibility with pre-lightweight-CAs CA ACLs, - # no CA members implies the host authority (only) - rule.targethosts.names = obj.get('ipamemberca_ca', [IPA_CA_CN]) - - # add profiles - if ('ipacertprofilecategory' in obj - and obj['ipacertprofilecategory'][0].lower() == 'all'): - rule.services.category = {pyhbac.HBAC_CATEGORY_ALL} - else: - attr = 'ipamembercertprofile_certprofile' - rule.services.names = obj.get(attr, []) - - # add principals and principal's groups - category_attr = '{}category'.format(principal_type) - if category_attr in obj and obj[category_attr][0].lower() == 'all': - rule.users.category = {pyhbac.HBAC_CATEGORY_ALL} - else: - if principal_type == 'user': - rule.users.names = obj.get('memberuser_user', []) - rule.users.groups = obj.get('memberuser_group', []) - elif principal_type == 'host': - rule.users.names = obj.get('memberhost_host', []) - rule.users.groups = obj.get('memberhost_hostgroup', []) - elif principal_type == 'service': - rule.users.names = [ - unicode(principal) - for principal in obj.get('memberservice_service', []) - ] - - return rule - - -def acl_evaluate(principal, ca_id, profile_id): - if principal.is_user: - principal_type = 'user' - elif principal.is_host: - principal_type = 'host' - else: - principal_type = 'service' - req = _acl_make_request(principal_type, principal, ca_id, profile_id) - acls = api.Command.caacl_find(no_members=False)['result'] - rules = [_acl_make_rule(principal_type, obj) for obj in acls] - return req.evaluate(rules) == pyhbac.HBAC_EVAL_ALLOW - - @register() class caacl(LDAPObject): """ diff --git a/ipaserver/plugins/cert.py b/ipaserver/plugins/cert.py index dfc7444dd..5590913e1 100644 --- a/ipaserver/plugins/cert.py +++ b/ipaserver/plugins/cert.py @@ -43,7 +43,6 @@ from ipalib.plugable import Registry from .virtual import VirtualCommand from .baseldap import pkey_to_value from .certprofile import validate_profile_id -from .caacl import acl_evaluate from ipalib.text import _ from ipalib.request import context from ipalib import output @@ -52,6 +51,11 @@ from ipapython.dn import DN from ipapython.ipa_log_manager import root_logger from ipaserver.plugins.service import normalize_principal, validate_realm +try: + import pyhbac +except ImportError: + raise errors.SkipPluginModule(reason=_('pyhbac is not installed.')) + if six.PY3: unicode = str @@ -158,6 +162,90 @@ register = Registry() PKIDATE_FORMAT = '%Y-%m-%d' +def _acl_make_request(principal_type, principal, ca_id, profile_id): + """Construct HBAC request for the given principal, CA and profile""" + + req = pyhbac.HbacRequest() + req.targethost.name = ca_id + req.service.name = profile_id + if principal_type == 'user': + req.user.name = principal.username + elif principal_type == 'host': + req.user.name = principal.hostname + elif principal_type == 'service': + req.user.name = unicode(principal) + groups = [] + if principal_type == 'user': + user_obj = api.Command.user_show(principal.username)['result'] + groups = user_obj.get('memberof_group', []) + groups += user_obj.get('memberofindirect_group', []) + elif principal_type == 'host': + host_obj = api.Command.host_show(principal.hostname)['result'] + groups = host_obj.get('memberof_hostgroup', []) + groups += host_obj.get('memberofindirect_hostgroup', []) + req.user.groups = sorted(set(groups)) + return req + + +def _acl_make_rule(principal_type, obj): + """Turn CA ACL object into HBAC rule. + + ``principal_type`` + String in {'user', 'host', 'service'} + """ + rule = pyhbac.HbacRule(obj['cn'][0]) + rule.enabled = obj['ipaenabledflag'][0] + rule.srchosts.category = {pyhbac.HBAC_CATEGORY_ALL} + + # add CA(s) + if 'ipacacategory' in obj and obj['ipacacategory'][0].lower() == 'all': + rule.targethosts.category = {pyhbac.HBAC_CATEGORY_ALL} + else: + # For compatibility with pre-lightweight-CAs CA ACLs, + # no CA members implies the host authority (only) + rule.targethosts.names = obj.get('ipamemberca_ca', [IPA_CA_CN]) + + # add profiles + if ('ipacertprofilecategory' in obj + and obj['ipacertprofilecategory'][0].lower() == 'all'): + rule.services.category = {pyhbac.HBAC_CATEGORY_ALL} + else: + attr = 'ipamembercertprofile_certprofile' + rule.services.names = obj.get(attr, []) + + # add principals and principal's groups + category_attr = '{}category'.format(principal_type) + if category_attr in obj and obj[category_attr][0].lower() == 'all': + rule.users.category = {pyhbac.HBAC_CATEGORY_ALL} + else: + if principal_type == 'user': + rule.users.names = obj.get('memberuser_user', []) + rule.users.groups = obj.get('memberuser_group', []) + elif principal_type == 'host': + rule.users.names = obj.get('memberhost_host', []) + rule.users.groups = obj.get('memberhost_hostgroup', []) + elif principal_type == 'service': + rule.users.names = [ + unicode(principal) + for principal in obj.get('memberservice_service', []) + ] + + return rule + + +def acl_evaluate(principal, ca_id, profile_id): + if principal.is_user: + principal_type = 'user' + elif principal.is_host: + principal_type = 'host' + else: + principal_type = 'service' + req = _acl_make_request(principal_type, principal, ca_id, profile_id) + acls = api.Command.caacl_find(no_members=False)['result'] + rules = [_acl_make_rule(principal_type, obj) for obj in acls] + return req.evaluate(rules) == pyhbac.HBAC_EVAL_ALLOW + + def normalize_pkidate(value): return datetime.datetime.strptime(value, PKIDATE_FORMAT) diff --git a/ipaserver/plugins/hbactest.py b/ipaserver/plugins/hbactest.py index 626e89424..e15656800 100644 --- a/ipaserver/plugins/hbactest.py +++ b/ipaserver/plugins/hbactest.py @@ -29,9 +29,14 @@ if api.env.in_server and api.env.context in ['lite', 'server']: except ImportError: _dcerpc_bindings_installed = False -import pyhbac import six +try: + import pyhbac +except ImportError: + pyhbac = None + + if six.PY3: unicode = str @@ -210,7 +215,7 @@ EXAMPLES: register = Registry() -def convert_to_ipa_rule(rule): +def _convert_to_ipa_rule(rule): # convert a dict with a rule to an pyhbac rule ipa_rule = pyhbac.HbacRule(rule['cn'][0]) ipa_rule.enabled = rule['ipaenabledflag'][0] @@ -309,6 +314,14 @@ class hbactest(Command): return host def execute(self, *args, **options): + if pyhbac is None: + raise errors.ValidationError( + name=_('missing pyhbac'), + error=_( + 'pyhbac is not available on the server.' + ) + ) + # First receive all needed information: # 1. HBAC rules (whether enabled or disabled) # 2. Required options are (user, target host, service) @@ -356,7 +369,7 @@ class hbactest(Command): # --disabled will import all disabled rules # --rules will implicitly add the rules from a rule list for rule in hbacset: - ipa_rule = convert_to_ipa_rule(rule) + ipa_rule = _convert_to_ipa_rule(rule) if ipa_rule.name in testrules: ipa_rule.enabled = True rules.append(ipa_rule)