freeipa/ipaserver/plugins/subid.py
Christian Heimes 51035d9354 Use 389-DS' dnaInterval setting to assign intervals
Signed-off-by: Christian Heimes <cheimes@redhat.com>
Reviewed-By: Francois Cami <fcami@redhat.com>
Reviewed-By: Rob Crittenden <rcritten@redhat.com>
2021-07-09 09:47:30 -04:00

597 lines
19 KiB
Python

#
# Copyright (C) 2021 FreeIPA Contributors see COPYING for license
#
import uuid
from ipalib import api
from ipalib import constants
from ipalib import errors
from ipalib import output
from ipalib.plugable import Registry
from ipalib.parameters import Int, Str
from ipalib.request import context
from ipalib.text import _, ngettext
from ipapython.dn import DN
from .baseldap import (
LDAPObject,
LDAPCreate,
LDAPDelete,
LDAPUpdate,
LDAPSearch,
LDAPRetrieve,
LDAPQuery,
DNA_MAGIC,
)
__doc__ = _(
"""
Subordinate ids
Manage subordinate user and group ids for users
EXAMPLES:
Auto-assign a subordinate id range to current user
ipa subid-generate
Auto-assign a subordinate id range to user alice:
ipa subid-generate --owner=alice
Find subordinate ids for user alice:
ipa subid-find --owner=alice
Match entry by any subordinate uid in range:
ipa subid-match --subuid=2147483649
"""
)
register = Registry()
@register()
class subid(LDAPObject):
"""Subordinate id object."""
container_dn = api.env.container_subids
object_name = _("Subordinate id")
object_name_plural = _("Subordinate ids")
label = _("Subordinate ids")
label_singular = _("Subordinate id")
object_class = ["ipasubordinateidentry"]
possible_objectclasses = [
"ipasubordinategid",
"ipasubordinateuid",
"ipasubordinateid",
]
default_attributes = [
"ipauniqueid",
"ipaowner",
"ipasubuidnumber",
"ipasubuidcount",
"ipasubgidnumber",
"ipasubgidcount",
]
allow_rename = False
permission_filter_objectclasses_string = (
"(objectclass=ipasubordinateidentry)"
)
managed_permissions = {
# all authenticated principals can read subordinate id information
"System: Read Subordinate Id Attributes": {
"ipapermbindruletype": "all",
"ipapermright": {"read", "search", "compare"},
"ipapermtargetfilter": [
permission_filter_objectclasses_string,
],
"ipapermdefaultattr": {
"objectclass",
"ipauniqueid",
"description",
"ipaowner",
"ipasubuidnumber",
"ipasubuidcount",
"ipasubgidnumber",
"ipasubgidcount",
},
},
"System: Read Subordinate Id Count": {
"ipapermbindruletype": "all",
"ipapermright": {"read", "search", "compare"},
"ipapermtargetfilter": [],
"ipapermtarget": DN(container_dn, api.env.basedn),
"ipapermdefaultattr": {"numSubordinates"},
},
# user administrators can remove subordinate ids or update the
# ipaowner attribute. This enables user admins to remove users
# with assigned subids or move them to staging area (--preserve).
"System: Manage Subordinate Ids": {
"ipapermright": {"write"},
"ipapermtargetfilter": [
permission_filter_objectclasses_string,
],
"ipapermdefaultattr": {
"description",
"ipaowner", # allow user admins to preserve users
},
"default_privileges": {"User Administrators"},
},
"System: Remove Subordinate Ids": {
"ipapermright": {"delete"},
"ipapermtargetfilter": [
permission_filter_objectclasses_string,
],
"default_privileges": {"User Administrators"},
},
}
takes_params = (
Str(
"ipauniqueid",
cli_name="id",
label=_("Unique ID"),
primary_key=True,
flags={"optional_create"},
),
Str(
"description?",
cli_name="desc",
label=_("Description"),
doc=_("Subordinate id description"),
),
Str(
"ipaowner",
cli_name="owner",
label=_("Owner"),
doc=_("Owning user of subordinate id entry"),
flags={"no_update"},
),
Int(
"ipasubuidnumber?",
label=_("SubUID range start"),
cli_name="subuid",
doc=_("Start value for subordinate user ID (subuid) range"),
flags={"no_update"},
minvalue=constants.SUBID_RANGE_START,
maxvalue=constants.SUBID_RANGE_MAX,
),
Int(
"ipasubuidcount?",
label=_("SubUID range size"),
cli_name="subuidcount",
doc=_("Subordinate user ID count"),
flags={"no_create", "no_update", "no_search"}, # auto-assigned
minvalue=constants.SUBID_COUNT,
maxvalue=constants.SUBID_COUNT,
),
Int(
"ipasubgidnumber?",
label=_("SubGID range start"),
cli_name="subgid",
doc=_("Start value for subordinate group ID (subgid) range"),
flags={"no_create", "no_update"}, # auto-assigned
minvalue=constants.SUBID_RANGE_START,
maxvalue=constants.SUBID_RANGE_MAX,
),
Int(
"ipasubgidcount?",
label=_("SubGID range size"),
cli_name="subgidcount",
doc=_("Subordinate group ID count"),
flags={"no_create", "no_update", "no_search"}, # auto-assigned
minvalue=constants.SUBID_COUNT,
maxvalue=constants.SUBID_COUNT,
),
)
def fixup_objectclass(self, entry_attrs):
"""Add missing object classes to entry"""
has_subuid = "ipasubuidnumber" in entry_attrs
has_subgid = "ipasubgidnumber" in entry_attrs
candicates = set(self.object_class)
if has_subgid:
candicates.add("ipasubordinategid")
if has_subuid:
candicates.add("ipasubordinateuid")
if has_subgid and has_subuid:
candicates.add("ipasubordinateid")
entry_oc = entry_attrs.setdefault("objectclass", [])
current_oc = {x.lower() for x in entry_oc}
for oc in candicates.difference(current_oc):
entry_oc.append(oc)
def handle_duplicate_entry(self, *keys):
if hasattr(context, "subid_owner_dn"):
uid = context.subid_owner_dn[0].value
msg = _(
'%(oname)s with with name "%(pkey)s" or for user "%(uid)s" '
"already exists."
) % {
"uid": uid,
"pkey": keys[-1] if keys else "",
"oname": self.object_name,
}
raise errors.DuplicateEntry(message=msg) from None
else:
super().handle_duplicate_entry(*keys)
def convert_owner(self, entry_attrs, options):
"""Change owner from DN to uid string"""
if not options.get("raw", False) and "ipaowner" in entry_attrs:
userobj = self.api.Object.user
entry_attrs["ipaowner"] = [
userobj.get_primary_key_from_dn(entry_attrs["ipaowner"][0])
]
def get_owner_dn(self, *keys, **options):
"""Get owning user entry entry (username or DN)"""
owner = keys[-1]
userobj = self.api.Object.user
if isinstance(owner, DN):
# it's already a DN, validate it's either an active or preserved
# user. Ref integrity plugin checks that it's not a dangling DN.
user_dns = (
DN(userobj.active_container_dn, self.api.env.basedn),
DN(userobj.delete_container_dn, self.api.env.basedn),
)
if not owner.endswith(user_dns):
raise errors.ValidationError(
name="ipaowner",
error=_("'%(dn)s is not a valid user") % {"dn": owner},
)
return owner
# similar to user.get_either_dn() but with error reporting and
# returns an entry
ldap = self.backend
try:
active_dn = userobj.get_dn(owner, **options)
entry = ldap.get_entry(active_dn, attrs_list=[])
return entry.dn
except errors.NotFound:
# fall back to deleted user
try:
delete_dn = userobj.get_delete_dn(owner, **options)
entry = ldap.get_entry(delete_dn, attrs_list=[])
return entry.dn
except errors.NotFound:
raise userobj.handle_not_found(owner)
def handle_subordinate_ids(self, ldap, dn, entry_attrs):
"""Handle ipaSubordinateId object class"""
new_subuid = entry_attrs.single_value.get("ipasubuidnumber")
new_subgid = entry_attrs.single_value.get("ipasubgidnumber")
if new_subuid is None:
new_subuid = DNA_MAGIC
# enforce subuid == subgid
if new_subgid is not None and new_subgid != new_subuid:
raise errors.ValidationError(
name="ipasubgidnumber",
error=_("subgidnumber must be equal to subuidnumber"),
)
self.set_subordinate_ids(ldap, dn, entry_attrs, new_subuid)
return True
def set_subordinate_ids(self, ldap, dn, entry_attrs, subuid):
"""Set subuid value of an entry
Takes care of objectclass and sibbling attributes
"""
if "objectclass" not in entry_attrs:
_entry_attrs = ldap.get_entry(dn, ["objectclass"])
entry_attrs["objectclass"] = _entry_attrs["objectclass"]
entry_attrs["ipasubuidnumber"] = subuid
# enforce subuid == subgid for now
entry_attrs["ipasubgidnumber"] = subuid
# hard-coded constants
entry_attrs["ipasubuidcount"] = constants.SUBID_COUNT
entry_attrs["ipasubgidcount"] = constants.SUBID_COUNT
self.fixup_objectclass(entry_attrs)
def get_subid_match_candidate_filter(
self,
ldap,
*,
subuid,
subgid,
extra_filters=(),
offset=None,
):
"""Create LDAP filter to locate matching/overlapping subids"""
if subuid is None and subgid is None:
raise ValueError("subuid and subgid are both None")
if offset is None:
# assumes that no subordinate count is larger than SUBID_COUNT
offset = constants.SUBID_COUNT - 1
class_filters = "(objectclass=ipasubordinateid)"
subid_filters = []
if subuid is not None:
subid_filters.append(
ldap.combine_filters(
[
f"(ipasubuidnumber>={subuid - offset})",
f"(ipasubuidnumber<={subuid + offset})",
],
rules=ldap.MATCH_ALL,
)
)
if subgid is not None:
subid_filters.append(
ldap.combine_filters(
[
f"(ipasubgidnumber>={subgid - offset})",
f"(ipasubgidnumber<={subgid + offset})",
],
rules=ldap.MATCH_ALL,
)
)
subid_filters = ldap.combine_filters(
subid_filters, rules=ldap.MATCH_ANY
)
filters = [class_filters, subid_filters]
filters.extend(extra_filters)
return ldap.combine_filters(filters, rules=ldap.MATCH_ALL)
@register()
class subid_add(LDAPCreate):
__doc__ = _("Add a new subordinate id.")
msg_summary = _('Added subordinate id "%(value)s"')
# internal command, use subid-auto to auto-assign subids
NO_CLI = True
def pre_callback(
self, ldap, dn, entry_attrs, attrs_list, *keys, **options
):
# XXX let ref integrity plugin validate DN?
owner_dn = self.obj.get_owner_dn(entry_attrs["ipaowner"], **options)
context.subid_owner_dn = owner_dn
entry_attrs["ipaowner"] = owner_dn
self.obj.handle_subordinate_ids(ldap, dn, entry_attrs)
attrs_list.append("objectclass")
return dn
def execute(self, ipauniqueid=None, **options):
if ipauniqueid is None:
ipauniqueid = str(uuid.uuid4())
return super().execute(ipauniqueid, **options)
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
self.obj.convert_owner(entry_attrs, options)
return super(subid_add, self).post_callback(
ldap, dn, entry_attrs, *keys, **options
)
@register()
class subid_del(LDAPDelete):
__doc__ = _("Delete a subordinate id.")
msg_summary = _('Deleted subordinate id "%(value)s"')
# internal command, subids cannot be removed
NO_CLI = True
@register()
class subid_mod(LDAPUpdate):
__doc__ = _("Modify a subordinate id.")
msg_summary = _('Modified subordinate id "%(value)s"')
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
self.obj.convert_owner(entry_attrs, options)
return super(subid_mod, self).post_callback(
ldap, dn, entry_attrs, *keys, **options
)
@register()
class subid_find(LDAPSearch):
__doc__ = _("Search for subordinate id.")
msg_summary = ngettext(
"%(count)d subordinate id matched",
"%(count)d subordinate ids matched",
0,
)
def pre_callback(
self, ldap, filters, attrs_list, base_dn, scope, *args, **options
):
attrs_list.append("objectclass")
return super(subid_find, self).pre_callback(
ldap, filters, attrs_list, base_dn, scope, *args, **options
)
def args_options_2_entry(self, *args, **options):
entry_attrs = super(subid_find, self).args_options_2_entry(
*args, **options
)
owner = entry_attrs.get("ipaowner")
if owner is not None:
owner_dn = self.obj.get_owner_dn(owner, **options)
entry_attrs["ipaowner"] = owner_dn
return entry_attrs
def post_callback(self, ldap, entries, truncated, *args, **options):
for entry in entries:
self.obj.convert_owner(entry, options)
return super(subid_find, self).post_callback(
ldap, entries, truncated, *args, **options
)
@register()
class subid_show(LDAPRetrieve):
__doc__ = _("Display information about a subordinate id.")
def pre_callback(self, ldap, dn, attrs_list, *keys, **options):
attrs_list.append("objectclass")
return super(subid_show, self).pre_callback(
ldap, dn, attrs_list, *keys, **options
)
def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
self.obj.convert_owner(entry_attrs, options)
return super(subid_show, self).post_callback(
ldap, dn, entry_attrs, *keys, **options
)
@register()
class subid_generate(LDAPQuery):
__doc__ = _(
"Generate and auto-assign subuid and subgid range to user entry"
)
has_output = output.standard_entry
takes_options = LDAPQuery.takes_options + (
Str(
"ipaowner?",
cli_name="owner",
label=_("Owner"),
doc=_("Owning user of subordinate id entry"),
),
)
def get_args(self):
return []
def execute(self, *keys, **options):
owner_uid = options.get("ipaowner")
# default to current user
if owner_uid is None:
owner_dn = DN(self.api.Backend.ldap2.conn.whoami_s()[4:])
# validate it's a user and not a service or host
owner_dn = self.obj.get_owner_dn(owner_dn)
owner_uid = owner_dn[0].value
return self.api.Command.subid_add(
description="auto-assigned subid",
ipaowner=owner_uid,
version=options["version"],
)
@register()
class subid_match(subid_find):
__doc__ = _("Match users by any subordinate uid in their range")
def get_options(self):
base_options = {p.name for p in self.obj.takes_params}
for option in super().get_options():
if option.name == "ipasubuidnumber":
yield option.clone(
label=_("SubUID match"),
doc=_("Match value for subordinate user ID"),
required=True,
)
elif option.name not in base_options:
# raw, version
yield option.clone()
def pre_callback(
self, ldap, filters, attrs_list, base_dn, scope, *args, **options
):
# search for candidates in range
# Code assumes that no subordinate count is larger than SUBID_COUNT
filters = self.obj.get_subid_match_candidate_filter(
ldap,
subuid=options["ipasubuidnumber"],
subgid=None,
)
attrs_list.extend(self.obj.default_attributes)
return filters, base_dn, scope
def post_callback(self, ldap, entries, truncated, *args, **options):
# filter out mismatches manually
osubuid = options["ipasubuidnumber"]
new_entries = []
for entry in entries:
esubuid = int(entry.single_value["ipasubuidnumber"])
esubcount = int(entry.single_value["ipasubuidcount"])
minsubuid = esubuid
maxsubuid = esubuid + esubcount - 1
if minsubuid <= osubuid <= maxsubuid:
new_entries.append(entry)
entries[:] = new_entries
return truncated
@register()
class subid_stats(LDAPQuery):
__doc__ = _("Subordinate id statistics")
takes_options = ()
has_output = (
output.summary,
output.Entry("result"),
)
def get_args(self):
return ()
def get_remaining_dna(self, ldap, **options):
base_dn = DN(
self.api.env.container_dna_subordinate_ids, self.api.env.basedn
)
entries, _truncated = ldap.find_entries(
"(objectClass=dnaSharedConfig)",
attrs_list=["dnaRemainingValues"],
base_dn=base_dn,
scope=ldap.SCOPE_ONELEVEL,
)
return sum(
int(entry.single_value["dnaRemainingValues"]) for entry in entries
)
def get_idrange(self, ldap, **options):
cn = f"{self.api.env.realm}_subid_range"
result = self.api.Command.idrange_show(cn, version=options["version"])
baseid = int(result["result"]["ipabaseid"][0])
rangesize = int(result["result"]["ipaidrangesize"][0])
return baseid, rangesize
def get_subid_assigned(self, ldap, **options):
dn = DN(self.api.env.container_subids, self.api.env.basedn)
entry = ldap.get_entry(dn=dn, attrs_list=["numSubordinates"])
return int(entry.single_value["numSubordinates"])
def execute(self, *keys, **options):
ldap = self.obj.backend
dna_remaining = self.get_remaining_dna(ldap, **options)
baseid, rangesize = self.get_idrange(ldap, **options)
assigned_subids = self.get_subid_assigned(ldap, **options)
remaining_subids = dna_remaining // constants.SUBID_COUNT
return dict(
summary=_("%(remaining)i remaining subordinate id ranges")
% {
"remaining": remaining_subids,
},
result=dict(
baseid=baseid,
rangesize=rangesize,
dna_remaining=dna_remaining,
assigned_subids=assigned_subids,
remaining_subids=remaining_subids,
),
)