dogtaginstance: extract user creation to subroutine.

Extract the user and group membership creation behaviour from
DogtagInstance.setup_admin to its own method, 'create_user'.  The
ACME setup routine will use it to create ACME RA accounts.

The @staticmethod decorator documents that 'create_user' does not
use 'self' or 'cls'.  I preferred not to lift to a top-level def
because it is very much a "DogtagInstance" behaviour.

Part of: https://pagure.io/freeipa/issue/4751

Reviewed-By: Rob Crittenden <rcritten@redhat.com>
This commit is contained in:
Fraser Tweedale
2020-05-27 12:12:45 +10:00
committed by Rob Crittenden
parent dd301a4535
commit 5883cff0b7

View File

@@ -22,6 +22,7 @@ from __future__ import absolute_import
import base64
import logging
import time
import typing
import ldap
import os
@@ -61,6 +62,16 @@ logger = logging.getLogger(__name__)
INTERNAL_TOKEN = "internal"
OU_GROUPS_DN = DN(('ou', 'groups'), ('o', 'ipaca'))
def _person_dn(uid):
return DN(('uid', uid), ('ou', 'people'), ('o', 'ipaca'))
def _group_dn(group):
return DN(('cn', group), OU_GROUPS_DN)
def get_security_domain():
"""
@@ -117,8 +128,6 @@ class DogtagInstance(service.Service):
"""Look up token name for nickname."""
return self.token_names.get(nickname, self.token_name)
ipaca_groups = DN(('ou', 'groups'), ('o', 'ipaca'))
ipaca_people = DN(('ou', 'people'), ('o', 'ipaca'))
groups_aci = (
b'(targetfilter="(objectClass=groupOfUniqueNames)")'
b'(targetattr="cn || description || objectclass || uniquemember")'
@@ -147,9 +156,7 @@ class DogtagInstance(service.Service):
self.basedn = None
self.admin_user = "admin"
self.admin_dn = DN(
('uid', self.admin_user), self.ipaca_people
)
self.admin_dn = _person_dn(self.admin_user)
self.admin_groups = None
self.tmp_agent_db = None
self.subsystem = subsystem
@@ -525,7 +532,7 @@ class DogtagInstance(service.Service):
setup_admin() method needs the permission to wait, until all group
information has been replicated.
"""
dn = self.ipaca_groups
dn = OU_GROUPS_DN
mod = [(ldap.MOD_ADD, 'aci', [self.groups_aci])]
try:
api.Backend.ldap2.modify_s(dn, mod)
@@ -534,44 +541,88 @@ class DogtagInstance(service.Service):
else:
logger.debug("Added ACI to read groups to %s", dn)
def setup_admin(self):
self.admin_user = "admin-%s" % self.fqdn
self.admin_password = ipautil.ipa_generate_password()
self.admin_dn = DN(
('uid', self.admin_user), self.ipaca_people
)
# remove user if left-over exists
@staticmethod
def create_user(
uid: str,
cn: str,
sn: str,
user_type: str,
groups: typing.Collection[str],
force: bool,
) -> typing.Optional[str]:
"""
Create the user entry with a random password, and add the user to
the given groups.
If such a user entry already exists, ``force`` determines whether the
existing entry is replaced, or if the operation fails.
**Does not wait for replication**. This should be done by caller,
if necessary.
Return the password if entry was created, otherwise ``None``.
"""
user_types = {'adminType', 'agentType'}
if user_type not in user_types:
raise ValueError(f"user_type must be in {user_types}")
# if entry already exists, delete (force=True) or fail
dn = _person_dn(uid)
try:
api.Backend.ldap2.delete_entry(self.admin_dn)
api.Backend.ldap2.get_entry(dn, ['uid'])
except errors.NotFound:
pass
else:
if force:
api.Backend.ldap2.delete_entry(dn)
else:
return None
# add user
password = ipautil.ipa_generate_password()
entry = api.Backend.ldap2.make_entry(
self.admin_dn,
objectclass=["top", "person", "organizationalPerson",
"inetOrgPerson", "cmsuser"],
uid=[self.admin_user],
cn=[self.admin_user],
sn=[self.admin_user],
usertype=['adminType'],
mail=['root@localhost'],
userPassword=[self.admin_password],
userstate=['1']
dn,
objectclass=[
"top", "person", "organizationalPerson",
"inetOrgPerson", "cmsuser",
],
uid=[uid],
cn=[cn],
sn=[sn],
usertype=[user_type],
userPassword=[password],
userstate=['1'],
)
api.Backend.ldap2.add_entry(entry)
wait_groups = []
for group in self.admin_groups:
group_dn = DN(('cn', group), self.ipaca_groups)
mod = [(ldap.MOD_ADD, 'uniqueMember', [self.admin_dn])]
# add to groups
for group in groups:
mod = [(ldap.MOD_ADD, 'uniqueMember', [dn])]
try:
api.Backend.ldap2.modify_s(group_dn, mod)
api.Backend.ldap2.modify_s(_group_dn(group), mod)
except ldap.TYPE_OR_VALUE_EXISTS:
# already there
return None
else:
wait_groups.append(group_dn)
pass # already there, somehow
return password
def setup_admin(self):
self.admin_user = "admin-%s" % self.fqdn
self.admin_password = ipautil.ipa_generate_password()
self.admin_dn = _person_dn(self.admin_user)
result = self.create_user(
uid=self.admin_user,
cn=self.admin_user,
sn=self.admin_user,
user_type='adminType',
groups=self.admin_groups,
force=True,
)
if result is None:
return None # something went wrong
else:
self.admin_password = result
# Now wait until the other server gets replicated this data
master_conn = ipaldap.LDAPClient.from_hostname_secure(
@@ -606,7 +657,7 @@ class DogtagInstance(service.Service):
)
# wait for group membership
for group_dn in wait_groups:
for group_dn in (_group_dn(group) for group in self.admin_groups):
replication.wait_for_entry(
master_conn,
group_dn,
@@ -616,10 +667,9 @@ class DogtagInstance(service.Service):
)
def __remove_admin_from_group(self, group):
dn = DN(('cn', group), self.ipaca_groups)
mod = [(ldap.MOD_DELETE, 'uniqueMember', self.admin_dn)]
try:
api.Backend.ldap2.modify_s(dn, mod)
api.Backend.ldap2.modify_s(_group_dn(group), mod)
except ldap.NO_SUCH_ATTRIBUTE:
# already removed
pass