Conditionally import pyhbac

The pyhbac module is part of SSSD. It's not available as stand-alone
PyPI package. It would take a lot of effort to package it because the
code is deeply tight into SSSD.

Let's follow the example of other SSSD Python packages and make the
import of pyhbac conditionally. It's only necessary for caacl and
hbactest plugins.

I renamed convert_to_ipa_rule() to _convert_to_ipa_rule() because it
does not check for presence of pyhbac package itself. The check is
performed earlier in execute(). The prefix indicates that it is an
internal function and developers have to think twice before using it
in another place.

This makes it much easier to install ipaserver with instrumented build
of Python with a different ABI or in isolated virtual envs to profile
and debug the server.

Signed-off-by: Christian Heimes <cheimes@redhat.com>
Reviewed-By: Martin Basti <mbasti@redhat.com>
Reviewed-By: Jan Cholasta <jcholast@redhat.com>
This commit is contained in:
Christian Heimes
2017-01-17 08:49:54 +01:00
committed by Martin Basti
parent a53e17830c
commit 3064b890e2
3 changed files with 105 additions and 90 deletions

View File

@@ -2,12 +2,10 @@
# Copyright (C) 2015 FreeIPA Contributors see COPYING for license
#
import pyhbac
import six
from ipalib import api, errors, output
from ipalib import Bool, Str, StrEnum
from ipalib.constants import IPA_CA_CN
from ipalib.plugable import Registry
from .baseldap import (
LDAPObject, LDAPSearch, LDAPCreate, LDAPDelete, LDAPQuery,
@@ -80,90 +78,6 @@ EXAMPLES:
register = Registry()
def _acl_make_request(principal_type, principal, ca_id, profile_id):
"""Construct HBAC request for the given principal, CA and profile"""
req = pyhbac.HbacRequest()
req.targethost.name = ca_id
req.service.name = profile_id
if principal_type == 'user':
req.user.name = principal.username
elif principal_type == 'host':
req.user.name = principal.hostname
elif principal_type == 'service':
req.user.name = unicode(principal)
groups = []
if principal_type == 'user':
user_obj = api.Command.user_show(principal.username)['result']
groups = user_obj.get('memberof_group', [])
groups += user_obj.get('memberofindirect_group', [])
elif principal_type == 'host':
host_obj = api.Command.host_show(principal.hostname)['result']
groups = host_obj.get('memberof_hostgroup', [])
groups += host_obj.get('memberofindirect_hostgroup', [])
req.user.groups = sorted(set(groups))
return req
def _acl_make_rule(principal_type, obj):
"""Turn CA ACL object into HBAC rule.
``principal_type``
String in {'user', 'host', 'service'}
"""
rule = pyhbac.HbacRule(obj['cn'][0])
rule.enabled = obj['ipaenabledflag'][0]
rule.srchosts.category = {pyhbac.HBAC_CATEGORY_ALL}
# add CA(s)
if 'ipacacategory' in obj and obj['ipacacategory'][0].lower() == 'all':
rule.targethosts.category = {pyhbac.HBAC_CATEGORY_ALL}
else:
# For compatibility with pre-lightweight-CAs CA ACLs,
# no CA members implies the host authority (only)
rule.targethosts.names = obj.get('ipamemberca_ca', [IPA_CA_CN])
# add profiles
if ('ipacertprofilecategory' in obj
and obj['ipacertprofilecategory'][0].lower() == 'all'):
rule.services.category = {pyhbac.HBAC_CATEGORY_ALL}
else:
attr = 'ipamembercertprofile_certprofile'
rule.services.names = obj.get(attr, [])
# add principals and principal's groups
category_attr = '{}category'.format(principal_type)
if category_attr in obj and obj[category_attr][0].lower() == 'all':
rule.users.category = {pyhbac.HBAC_CATEGORY_ALL}
else:
if principal_type == 'user':
rule.users.names = obj.get('memberuser_user', [])
rule.users.groups = obj.get('memberuser_group', [])
elif principal_type == 'host':
rule.users.names = obj.get('memberhost_host', [])
rule.users.groups = obj.get('memberhost_hostgroup', [])
elif principal_type == 'service':
rule.users.names = [
unicode(principal)
for principal in obj.get('memberservice_service', [])
]
return rule
def acl_evaluate(principal, ca_id, profile_id):
if principal.is_user:
principal_type = 'user'
elif principal.is_host:
principal_type = 'host'
else:
principal_type = 'service'
req = _acl_make_request(principal_type, principal, ca_id, profile_id)
acls = api.Command.caacl_find(no_members=False)['result']
rules = [_acl_make_rule(principal_type, obj) for obj in acls]
return req.evaluate(rules) == pyhbac.HBAC_EVAL_ALLOW
@register()
class caacl(LDAPObject):
"""

View File

@@ -43,7 +43,6 @@ from ipalib.plugable import Registry
from .virtual import VirtualCommand
from .baseldap import pkey_to_value
from .certprofile import validate_profile_id
from .caacl import acl_evaluate
from ipalib.text import _
from ipalib.request import context
from ipalib import output
@@ -52,6 +51,11 @@ from ipapython.dn import DN
from ipapython.ipa_log_manager import root_logger
from ipaserver.plugins.service import normalize_principal, validate_realm
try:
import pyhbac
except ImportError:
raise errors.SkipPluginModule(reason=_('pyhbac is not installed.'))
if six.PY3:
unicode = str
@@ -158,6 +162,90 @@ register = Registry()
PKIDATE_FORMAT = '%Y-%m-%d'
def _acl_make_request(principal_type, principal, ca_id, profile_id):
"""Construct HBAC request for the given principal, CA and profile"""
req = pyhbac.HbacRequest()
req.targethost.name = ca_id
req.service.name = profile_id
if principal_type == 'user':
req.user.name = principal.username
elif principal_type == 'host':
req.user.name = principal.hostname
elif principal_type == 'service':
req.user.name = unicode(principal)
groups = []
if principal_type == 'user':
user_obj = api.Command.user_show(principal.username)['result']
groups = user_obj.get('memberof_group', [])
groups += user_obj.get('memberofindirect_group', [])
elif principal_type == 'host':
host_obj = api.Command.host_show(principal.hostname)['result']
groups = host_obj.get('memberof_hostgroup', [])
groups += host_obj.get('memberofindirect_hostgroup', [])
req.user.groups = sorted(set(groups))
return req
def _acl_make_rule(principal_type, obj):
"""Turn CA ACL object into HBAC rule.
``principal_type``
String in {'user', 'host', 'service'}
"""
rule = pyhbac.HbacRule(obj['cn'][0])
rule.enabled = obj['ipaenabledflag'][0]
rule.srchosts.category = {pyhbac.HBAC_CATEGORY_ALL}
# add CA(s)
if 'ipacacategory' in obj and obj['ipacacategory'][0].lower() == 'all':
rule.targethosts.category = {pyhbac.HBAC_CATEGORY_ALL}
else:
# For compatibility with pre-lightweight-CAs CA ACLs,
# no CA members implies the host authority (only)
rule.targethosts.names = obj.get('ipamemberca_ca', [IPA_CA_CN])
# add profiles
if ('ipacertprofilecategory' in obj
and obj['ipacertprofilecategory'][0].lower() == 'all'):
rule.services.category = {pyhbac.HBAC_CATEGORY_ALL}
else:
attr = 'ipamembercertprofile_certprofile'
rule.services.names = obj.get(attr, [])
# add principals and principal's groups
category_attr = '{}category'.format(principal_type)
if category_attr in obj and obj[category_attr][0].lower() == 'all':
rule.users.category = {pyhbac.HBAC_CATEGORY_ALL}
else:
if principal_type == 'user':
rule.users.names = obj.get('memberuser_user', [])
rule.users.groups = obj.get('memberuser_group', [])
elif principal_type == 'host':
rule.users.names = obj.get('memberhost_host', [])
rule.users.groups = obj.get('memberhost_hostgroup', [])
elif principal_type == 'service':
rule.users.names = [
unicode(principal)
for principal in obj.get('memberservice_service', [])
]
return rule
def acl_evaluate(principal, ca_id, profile_id):
if principal.is_user:
principal_type = 'user'
elif principal.is_host:
principal_type = 'host'
else:
principal_type = 'service'
req = _acl_make_request(principal_type, principal, ca_id, profile_id)
acls = api.Command.caacl_find(no_members=False)['result']
rules = [_acl_make_rule(principal_type, obj) for obj in acls]
return req.evaluate(rules) == pyhbac.HBAC_EVAL_ALLOW
def normalize_pkidate(value):
return datetime.datetime.strptime(value, PKIDATE_FORMAT)

View File

@@ -29,9 +29,14 @@ if api.env.in_server and api.env.context in ['lite', 'server']:
except ImportError:
_dcerpc_bindings_installed = False
import pyhbac
import six
try:
import pyhbac
except ImportError:
pyhbac = None
if six.PY3:
unicode = str
@@ -210,7 +215,7 @@ EXAMPLES:
register = Registry()
def convert_to_ipa_rule(rule):
def _convert_to_ipa_rule(rule):
# convert a dict with a rule to an pyhbac rule
ipa_rule = pyhbac.HbacRule(rule['cn'][0])
ipa_rule.enabled = rule['ipaenabledflag'][0]
@@ -309,6 +314,14 @@ class hbactest(Command):
return host
def execute(self, *args, **options):
if pyhbac is None:
raise errors.ValidationError(
name=_('missing pyhbac'),
error=_(
'pyhbac is not available on the server.'
)
)
# First receive all needed information:
# 1. HBAC rules (whether enabled or disabled)
# 2. Required options are (user, target host, service)
@@ -356,7 +369,7 @@ class hbactest(Command):
# --disabled will import all disabled rules
# --rules will implicitly add the rules from a rule list
for rule in hbacset:
ipa_rule = convert_to_ipa_rule(rule)
ipa_rule = _convert_to_ipa_rule(rule)
if ipa_rule.name in testrules:
ipa_rule.enabled = True
rules.append(ipa_rule)