Add user and group wrappers

New classes for user and group names provide a convenient way to access
the uid and primary gid of a user / gid of a group. The classes also
provide chown() and chgrp() methods to simplify common operations.

The wrappers are subclasses of builtin str type and behave like ordinary
strings with additional features. The pwd and grp structs are retrieved
once and then cached.

Signed-off-by: Christian Heimes <cheimes@redhat.com>
Reviewed-By: Rob Crittenden <rcritten@redhat.com>
This commit is contained in:
Christian Heimes 2020-09-11 12:22:02 +02:00 committed by Rob Crittenden
parent 99a40cbbe9
commit 72fb4e60c8
21 changed files with 215 additions and 181 deletions

View File

@ -12,7 +12,6 @@ from ipaplatform.paths import paths
import io
import sys
import os
import pwd
import tempfile
import textwrap
@ -97,8 +96,7 @@ def retrieve_keytab(api, ccache_name, oneway_keytab_name, oneway_principal):
)
# Make sure SSSD is able to read the keytab
try:
sssd = pwd.getpwnam(constants.SSSD_USER)
os.chown(oneway_keytab_name, sssd[2], sssd[3])
constants.SSSD_USER.chown(oneway_keytab_name)
except KeyError:
# If user 'sssd' does not exist, we don't need to chown from root to sssd
# because it means SSSD does not run as sssd user

View File

@ -24,6 +24,7 @@ All constants centralised in one file.
import os
import socket
from ipaplatform.constants import constants as _constants
from ipapython.dn import DN
from ipapython.version import VERSION, API_VERSION
@ -327,8 +328,8 @@ PATTERN_GROUPUSER_NAME = (
ANON_USER = 'WELLKNOWN/ANONYMOUS'
# IPA API Framework user
IPAAPI_USER = 'ipaapi'
IPAAPI_GROUP = 'ipaapi'
IPAAPI_USER = _constants.IPAAPI_USER
IPAAPI_GROUP = _constants.IPAAPI_GROUP
# Use cache path
USER_CACHE_PATH = (

View File

@ -5,35 +5,123 @@
'''
This base platform module exports platform dependant constants.
'''
import grp
import os
import pwd
import sys
class _Entity(str):
__slots__ = ("_entity", )
def __init__(self, name):
super().__init__()
self._entity = None
def __str__(self):
return super().__str__()
def __repr__(self):
return f'<{self.__class__.__name__} "{self!s}">'
class User(_Entity):
__slots__ = ()
@property
def entity(self):
"""User information struct
:return: pwd.struct_passwd instance
"""
entity = self._entity
if entity is None:
try:
self._entity = entity = pwd.getpwnam(self)
except KeyError:
raise ValueError(f"user '{self!s}' not found") from None
return entity
@property
def uid(self):
"""Numeric user id (int)
"""
return self.entity.pw_uid
@property
def pgid(self):
"""Primary group id (int)"""
return self.entity.pw_gid
def chown(self, path, gid=None, **kwargs):
"""chown() file by path or file descriptor
gid defaults to user's primary gid. Use -1 to keep gid.
"""
if gid is None:
gid = self.pgid
elif isinstance(gid, Group):
gid = gid.gid
os.chown(path, self.uid, gid, **kwargs)
class Group(_Entity):
__slots__ = ()
@property
def entity(self):
"""Group information
:return: grp.struct_group instance
"""
entity = self._entity
if entity is None:
try:
self._entity = entity = grp.getgrnam(self)
except KeyError:
raise ValueError(f"group '{self!s}' not found") from None
return entity
@property
def gid(self):
"""Numeric group id (int)
"""
return self.entity.gr_gid
def chgrp(self, path, **kwargs):
"""change group owner file by path or file descriptor
"""
os.chown(path, -1, self.gid, **kwargs)
class BaseConstantsNamespace:
IS_64BITS = sys.maxsize > 2 ** 32
DEFAULT_ADMIN_SHELL = '/bin/bash'
DEFAULT_SHELL = '/bin/sh'
DS_USER = 'dirsrv'
DS_GROUP = 'dirsrv'
HTTPD_USER = "apache"
HTTPD_GROUP = "apache"
GSSPROXY_USER = "root"
IPAAPI_USER = User("ipaapi")
IPAAPI_GROUP = Group("ipaapi")
DS_USER = User("dirsrv")
DS_GROUP = Group("dirsrv")
HTTPD_USER = User("apache")
HTTPD_GROUP = Group("apache")
GSSPROXY_USER = User("root")
IPA_ADTRUST_PACKAGE_NAME = "freeipa-server-trust-ad"
IPA_DNS_PACKAGE_NAME = "freeipa-server-dns"
KDCPROXY_USER = "kdcproxy"
NAMED_USER = "named"
NAMED_GROUP = "named"
KDCPROXY_USER = User("kdcproxy")
NAMED_USER = User("named")
NAMED_GROUP = Group("named")
NAMED_DATA_DIR = "data/"
NAMED_OPTIONS_VAR = "OPTIONS"
NAMED_OPENSSL_ENGINE = None
NAMED_ZONE_COMMENT = ""
PKI_USER = 'pkiuser'
PKI_GROUP = 'pkiuser'
PKI_USER = User("pkiuser")
PKI_GROUP = Group("pkiuser")
# ntpd init variable used for daemon options
NTPD_OPTS_VAR = "OPTIONS"
# quote used for daemon options
NTPD_OPTS_QUOTE = "\""
ODS_USER = "ods"
ODS_GROUP = "ods"
ODS_USER = User("ods")
ODS_GROUP = Group("ods")
# nfsd init variable used to enable kerberized NFS
SECURE_NFS_VAR = "SECURE_NFS"
SELINUX_BOOLEAN_ADTRUST = {
@ -70,7 +158,7 @@ class BaseConstantsNamespace:
"$sysadm_u:s0-s0:c0.c1023"
"$unconfined_u:s0-s0:c0.c1023"
)
SSSD_USER = "sssd"
SSSD_USER = User("sssd")
# WSGI module override, only used on Fedora
MOD_WSGI_PYTHON2 = None
MOD_WSGI_PYTHON3 = None

View File

@ -9,22 +9,22 @@ This Debian family platform module exports platform dependant constants.
# Fallback to default path definitions
from __future__ import absolute_import
from ipaplatform.base.constants import BaseConstantsNamespace
from ipaplatform.base.constants import BaseConstantsNamespace, User, Group
class DebianConstantsNamespace(BaseConstantsNamespace):
HTTPD_USER = "www-data"
HTTPD_GROUP = "www-data"
NAMED_USER = "bind"
NAMED_GROUP = "bind"
HTTPD_USER = User("www-data")
HTTPD_GROUP = Group("www-data")
NAMED_USER = User("bind")
NAMED_GROUP = Group("bind")
NAMED_DATA_DIR = ""
NAMED_ZONE_COMMENT = "//"
# ntpd init variable used for daemon options
NTPD_OPTS_VAR = "NTPD_OPTS"
# quote used for daemon options
NTPD_OPTS_QUOTE = "\'"
ODS_USER = "opendnssec"
ODS_GROUP = "opendnssec"
ODS_USER = User("opendnssec")
ODS_GROUP = Group("opendnssec")
SECURE_NFS_VAR = "NEED_GSSD"
constants = DebianConstantsNamespace()

View File

@ -8,14 +8,14 @@ related constants for the SUSE OS family-based systems.
"""
# Fallback to default path definitions
from ipaplatform.base.constants import BaseConstantsNamespace
from ipaplatform.base.constants import BaseConstantsNamespace, User, Group
class SuseConstantsNamespace(BaseConstantsNamespace):
HTTPD_USER = "wwwrun"
HTTPD_GROUP = "www"
HTTPD_USER = User("wwwrun")
HTTPD_GROUP = Group("www")
# Don't have it yet
SSSD_USER = "root"
SSSD_USER = User("root")
TLS_HIGH_CIPHERS = None

View File

@ -23,7 +23,6 @@ from __future__ import print_function
import logging
import tempfile
import os
import pwd
import netaddr
import re
import shutil
@ -993,8 +992,7 @@ class BindInstance(service.Service):
dns_principal = p
# Make sure access is strictly reserved to the named user
pent = pwd.getpwnam(self.service_user)
os.chown(self.keytab, pent.pw_uid, pent.pw_gid)
self.service_user.chown(self.keytab)
os.chmod(self.keytab, 0o400)
# modify the principal so that it is marked as an ipa service so that
@ -1040,7 +1038,7 @@ class BindInstance(service.Service):
"""
# files are owned by root:named and are readable by user and group
uid = 0
gid = pwd.getpwnam(constants.NAMED_USER).pw_gid
gid = constants.NAMED_GROUP.gid
mode = 0o640
changed = False

View File

@ -28,8 +28,6 @@ import logging
import dbus
import ldap
import os
import pwd
import grp
import re
import shutil
import sys
@ -44,7 +42,6 @@ from ipalib import errors
import ipalib.constants
from ipalib.install import certmonger
from ipaplatform import services
from ipaplatform.constants import constants
from ipaplatform.paths import paths
from ipaplatform.tasks import tasks
@ -534,8 +531,7 @@ class CAInstance(DogtagInstance):
# so remove the file first
ipautil.remove_file(paths.TMP_CA_P12)
shutil.copy(cafile, paths.TMP_CA_P12)
pent = pwd.getpwnam(self.service_user)
os.chown(paths.TMP_CA_P12, pent.pw_uid, pent.pw_gid)
self.service_user.chown(paths.TMP_CA_P12)
self._configure_clone(
cfg,
@ -595,11 +591,10 @@ class CAInstance(DogtagInstance):
config = self._create_spawn_config(cfg)
self.set_hsm_state(config)
pent = pwd.getpwnam(self.service_user)
with tempfile.NamedTemporaryFile('w') as f:
config.write(f)
f.flush()
os.fchown(f.fileno(), pent.pw_uid, pent.pw_gid)
self.service_user.chown(f.fileno())
self.backup_state('installed', True)
@ -682,8 +677,7 @@ class CAInstance(DogtagInstance):
'ca.enableNonces=false')
if update_result != 0:
raise RuntimeError("Disabling nonces failed")
pent = pwd.getpwnam(self.service_user)
os.chown(self.config, pent.pw_uid, pent.pw_gid)
self.service_user.chown(self.config)
def enable_pkix(self):
directivesetter.set_directive(paths.SYSCONFIG_PKI_TOMCAT,
@ -732,9 +726,9 @@ class CAInstance(DogtagInstance):
"""
Sets the correct permissions for the RA_AGENT_PEM, RA_AGENT_KEY files
"""
ipaapi_gid = grp.getgrnam(ipalib.constants.IPAAPI_GROUP).gr_gid
group = ipalib.constants.IPAAPI_GROUP
for fname in (paths.RA_AGENT_PEM, paths.RA_AGENT_KEY):
os.chown(fname, -1, ipaapi_gid)
group.chgrp(fname)
os.chmod(fname, 0o440)
tasks.restore_context(fname)
@ -913,8 +907,7 @@ class CAInstance(DogtagInstance):
os.mkdir(publishdir)
os.chmod(publishdir, 0o775)
pent = pwd.getpwnam(self.service_user)
os.chown(publishdir, 0, pent.pw_gid)
os.chown(publishdir, 0, self.service_user.pgid)
tasks.restore_context(publishdir)
@ -1294,8 +1287,6 @@ class CAInstance(DogtagInstance):
sysupgrade.set_upgrade_state('dogtag', LWCA_KEY_RETRIEVAL, True)
def __setup_lightweight_ca_key_retrieval_kerberos(self):
pent = pwd.getpwnam(self.service_user)
logger.debug('Creating principal')
installutils.kadmin_addprinc(self.principal)
self.suffix = ipautil.realm_to_suffix(self.realm)
@ -1304,11 +1295,9 @@ class CAInstance(DogtagInstance):
logger.debug('Retrieving keytab')
installutils.create_keytab(self.keytab, self.principal)
os.chmod(self.keytab, 0o600)
os.chown(self.keytab, pent.pw_uid, pent.pw_gid)
self.service_user.chown(self.keytab)
def __setup_lightweight_ca_key_retrieval_custodia(self):
pent = pwd.getpwnam(self.service_user)
logger.debug('Creating Custodia keys')
custodia_basedn = DN(
('cn', 'custodia'), ('cn', 'ipa'), ('cn', 'etc'), api.env.basedn)
@ -1326,7 +1315,7 @@ class CAInstance(DogtagInstance):
keystore = IPAKEMKeys({'server_keys': keyfile})
keystore.generate_keys(self.service_prefix)
os.chmod(keyfile, 0o600)
os.chown(keyfile, pent.pw_uid, pent.pw_gid)
self.service_user.chown(keyfile)
def __remove_lightweight_ca_key_retrieval_custodia(self):
keyfile = os.path.join(paths.PKI_TOMCAT,
@ -1573,8 +1562,7 @@ class CAInstance(DogtagInstance):
with open(target, 'w') as f:
f.write(filled)
os.fchmod(f.fileno(), 0o600)
pent = pwd.getpwnam(constants.PKI_USER)
os.fchown(f.fileno(), pent.pw_uid, pent.pw_gid)
self.service_user.chown(f.fileno())
# deploy ACME Tomcat application
ipautil.run(['pki-server', 'acme-deploy'])

View File

@ -20,7 +20,6 @@ from jwcrypto.common import json_decode
import os
import stat
import time
import pwd
logger = logging.getLogger(__name__)
@ -110,15 +109,14 @@ class CustodiaInstance(SimpleServiceInstance):
def __config_file(self):
template_file = os.path.basename(self.config_file) + '.template'
template = os.path.join(paths.USR_SHARE_IPA_DIR, template_file)
httpd_info = pwd.getpwnam(constants.HTTPD_USER)
sub_dict = dict(
IPA_CUSTODIA_CONF_DIR=paths.IPA_CUSTODIA_CONF_DIR,
IPA_CUSTODIA_KEYS=paths.IPA_CUSTODIA_KEYS,
IPA_CUSTODIA_SOCKET=paths.IPA_CUSTODIA_SOCKET,
IPA_CUSTODIA_AUDIT_LOG=paths.IPA_CUSTODIA_AUDIT_LOG,
LDAP_URI=ipaldap.realm_to_ldapi_uri(self.realm),
UID=httpd_info.pw_uid,
GID=httpd_info.pw_gid
UID=constants.HTTPD_USER.uid,
GID=constants.HTTPD_USER.pgid
)
conf = ipautil.template_file(template, sub_dict)
with open(self.config_file, "w") as f:

View File

@ -5,10 +5,8 @@
from __future__ import print_function, absolute_import
import errno
import grp
import logging
import os
import pwd
import re
import shutil
import stat
@ -57,10 +55,10 @@ class DNSKeySyncInstance(service.Service):
keytab=paths.IPA_DNSKEYSYNCD_KEYTAB
)
self.extra_config = [u'dnssecVersion 1', ] # DNSSEC enabled
self.named_uid = self.__get_named_uid()
self.named_gid = self.__get_named_gid()
self.ods_uid = self.__get_ods_uid()
self.ods_gid = self.__get_ods_gid()
self.named_uid = constants.NAMED_USER.uid
self.named_gid = constants.NAMED_GROUP.gid
self.ods_uid = constants.ODS_USER.uid
self.ods_gid = constants.ODS_GROUP.gid
suffix = ipautil.dn_attribute_property('_suffix')
@ -116,30 +114,6 @@ class DNSKeySyncInstance(service.Service):
# we need restart named after setting up this service
self.start_creation()
def __get_named_uid(self):
try:
return pwd.getpwnam(constants.NAMED_USER).pw_uid
except KeyError:
raise RuntimeError("Named UID not found")
def __get_named_gid(self):
try:
return grp.getgrnam(constants.NAMED_GROUP).gr_gid
except KeyError:
raise RuntimeError("Named GID not found")
def __get_ods_uid(self):
try:
return pwd.getpwnam(constants.ODS_USER).pw_uid
except KeyError:
raise RuntimeError("OpenDNSSEC UID not found")
def __get_ods_gid(self):
try:
return grp.getgrnam(constants.ODS_GROUP).gr_gid
except KeyError:
raise RuntimeError("OpenDNSSEC GID not found")
def __check_dnssec_status(self):
if not dns_container_exists(self.suffix):
raise RuntimeError("DNS container does not exist")

View File

@ -30,7 +30,6 @@ import shutil
import traceback
import dbus
import re
import pwd
import lxml.etree
from configparser import DEFAULTSECT, ConfigParser, RawConfigParser
@ -363,11 +362,10 @@ class DogtagInstance(service.Service):
connector.attrib[secretattr] = self.ajp_secret
if rewrite:
pent = pwd.getpwnam(constants.PKI_USER)
with open(paths.PKI_TOMCAT_SERVER_XML, "wb") as fd:
server_xml.write(fd, pretty_print=True, encoding="utf-8")
os.fchmod(fd.fileno(), 0o660)
os.fchown(fd.fileno(), pent.pw_uid, pent.pw_gid)
self.service_user.chown(fd.fileno())
def http_proxy(self):
""" Update the http proxy file """

View File

@ -22,7 +22,6 @@ from __future__ import print_function, absolute_import
import logging
import shutil
import pwd
import os
import time
import tempfile
@ -630,14 +629,13 @@ class DsInstance(service.Service):
logger.debug("Failed to clean temporary file: %s", e)
def __add_default_schemas(self):
pent = pwd.getpwnam(DS_USER)
for schema_fname in IPA_SCHEMA_FILES:
target_fname = schema_dirname(self.serverid) + schema_fname
shutil.copyfile(
os.path.join(paths.USR_SHARE_IPA_DIR, schema_fname),
target_fname)
os.chmod(target_fname, 0o440) # read access for dirsrv user/group
os.chown(target_fname, pent.pw_uid, pent.pw_gid)
DS_USER.chown(target_fname)
try:
shutil.move(schema_dirname(self.serverid) + "05rfc2247.ldif",
@ -648,7 +646,7 @@ class DsInstance(service.Service):
os.path.join(paths.USR_SHARE_IPA_DIR, "05rfc2247.ldif"),
target_fname)
os.chmod(target_fname, 0o440)
os.chown(target_fname, pent.pw_uid, pent.pw_gid)
DS_USER.chown(target_fname)
except IOError:
# Does not apply with newer DS releases
pass
@ -772,13 +770,12 @@ class DsInstance(service.Service):
self._ldap_mod("repoint-managed-entries.ldif", self.sub_dict)
def configure_systemd_ipa_env(self):
pent = pwd.getpwnam(platformconstants.DS_USER)
template = os.path.join(
paths.USR_SHARE_IPA_DIR, "ds-ipa-env.conf.template"
)
sub_dict = dict(
KRB5_KTNAME=paths.DS_KEYTAB,
KRB5CCNAME=paths.TMP_KRB5CC % pent.pw_uid
KRB5CCNAME=paths.TMP_KRB5CC % platformconstants.DS_USER.uid
)
conf = ipautil.template_file(template, sub_dict)

View File

@ -26,7 +26,6 @@ import shutil
import sys
import tempfile
import time
import pwd
import six
@ -295,14 +294,12 @@ class Backup(admintool.AdminTool):
logger.info("Preparing backup on %s", api.env.host)
pent = pwd.getpwnam(constants.DS_USER)
self.top_dir = tempfile.mkdtemp("ipa")
os.chown(self.top_dir, pent.pw_uid, pent.pw_gid)
constants.DS_USER.chown(self.top_dir)
os.chmod(self.top_dir, 0o750)
self.dir = os.path.join(self.top_dir, "ipa")
os.mkdir(self.dir, 0o750)
os.chown(self.dir, pent.pw_uid, pent.pw_gid)
constants.DS_USER.chown(self.dir)
self.tarfile = None
self.header = os.path.join(self.top_dir, 'header')

View File

@ -26,7 +26,6 @@ import shutil
import sys
import tempfile
import time
import pwd
import ldif
import itertools
@ -346,16 +345,14 @@ class Restore(admintool.AdminTool):
)
)
pent = pwd.getpwnam(constants.DS_USER)
# Temporary directory for decrypting files before restoring
self.top_dir = tempfile.mkdtemp("ipa")
os.chown(self.top_dir, pent.pw_uid, pent.pw_gid)
constants.DS_USER.chown(self.top_dir)
os.chmod(self.top_dir, 0o750)
self.dir = os.path.join(self.top_dir, "ipa")
os.mkdir(self.dir)
os.chmod(self.dir, 0o750)
os.chown(self.dir, pent.pw_uid, pent.pw_gid)
constants.DS_USER.chown(self.dir)
logger.info("Temporary setting umask to 022")
old_umask = os.umask(0o022)
@ -590,10 +587,9 @@ class Restore(admintool.AdminTool):
srcldiffile = os.path.join(self.dir, ldifname)
if not os.path.exists(ldifdir):
pent = pwd.getpwnam(constants.DS_USER)
os.mkdir(ldifdir)
os.chmod(ldifdir, 0o770)
os.chown(ldifdir, pent.pw_uid, pent.pw_gid)
constants.DS_USER.chown(ldifdir)
ipautil.backup_file(ldiffile)
with open(ldiffile, 'w') as out_file:
@ -603,8 +599,7 @@ class Restore(admintool.AdminTool):
ldif_parser.parse()
# Make sure the modified ldiffile is owned by DS_USER
pent = pwd.getpwnam(constants.DS_USER)
os.chown(ldiffile, pent.pw_uid, pent.pw_gid)
constants.DS_USER.chown(ldiffile)
if online:
conn = self.get_connection()
@ -634,7 +629,7 @@ class Restore(admintool.AdminTool):
except OSError as e:
pass
os.chown(template_dir, pent.pw_uid, pent.pw_gid)
constants.DS_USER.chown(template_dir)
os.chmod(template_dir, 0o770)
# Restore SELinux context of template_dir
@ -825,9 +820,10 @@ class Restore(admintool.AdminTool):
]
run(args, cwd=self.dir)
pent = pwd.getpwnam(constants.DS_USER)
os.chown(self.top_dir, pent.pw_uid, pent.pw_gid)
recursive_chown(self.dir, pent.pw_uid, pent.pw_gid)
constants.DS_USER.chown(self.top_dir)
recursive_chown(
self.dir, constants.DS_USER.uid, constants.DS_USER.pgid
)
if encrypt:
# We can remove the decoded tarball
@ -851,7 +847,7 @@ class Restore(admintool.AdminTool):
paths.TOMCAT_SIGNEDAUDIT_DIR]
try:
pent = pwd.getpwnam(constants.PKI_USER)
pent = constants.PKI_USER.entity
except KeyError:
logger.debug("No %s user exists, skipping CA directory creation",
constants.PKI_USER)

View File

@ -21,7 +21,6 @@ from __future__ import absolute_import
import logging
import os
import pwd
import shutil
import tempfile
import base64
@ -183,8 +182,7 @@ class KRAInstance(DogtagInstance):
if self.clone:
krafile = self.pkcs12_info[0]
shutil.copy(krafile, p12_tmpfile_name)
pent = pwd.getpwnam(self.service_user)
os.chown(p12_tmpfile_name, pent.pw_uid, pent.pw_gid)
self.service_user.chown(p12_tmpfile_name)
self._configure_clone(
cfg,
@ -207,11 +205,10 @@ class KRAInstance(DogtagInstance):
)
# Generate configuration file
pent = pwd.getpwnam(self.service_user)
config = self._create_spawn_config(cfg)
with tempfile.NamedTemporaryFile('w', delete=False) as f:
config.write(f)
os.fchown(f.fileno(), pent.pw_uid, pent.pw_gid)
self.service_user.chown(f.fileno())
cfg_file = f.name
nolog_list = [

View File

@ -22,7 +22,6 @@ from __future__ import print_function
import logging
import os
import pwd
import socket
import dbus
@ -384,8 +383,7 @@ class KrbInstance(service.Service):
self.fstore.backup_file(paths.DS_KEYTAB)
installutils.create_keytab(paths.DS_KEYTAB, ldap_principal)
pent = pwd.getpwnam(constants.DS_USER)
os.chown(paths.DS_KEYTAB, pent.pw_uid, pent.pw_gid)
constants.DS_USER.chown(paths.DS_KEYTAB)
def __create_host_keytab(self):
host_principal = "host/" + self.fqdn + "@" + self.realm

View File

@ -6,8 +6,6 @@ from __future__ import absolute_import
import logging
import os
import pwd
import grp
import ldap
@ -33,8 +31,8 @@ class ODSExporterInstance(service.Service):
keytab=paths.IPA_ODS_EXPORTER_KEYTAB,
service_prefix=u'ipa-ods-exporter'
)
self.ods_uid = None
self.ods_gid = None
self.ods_uid = constants.ODS_USER.uid
self.ods_gid = constants.ODS_GROUP.gid
self.enable_if_exists = False
suffix = ipautil.dn_attribute_property('_suffix')
@ -51,8 +49,6 @@ class ODSExporterInstance(service.Service):
except Exception:
pass
# checking status step must be first
self.step("checking status", self.__check_dnssec_status)
self.step("setting up DNS Key Exporter", self.__setup_key_exporter)
self.step("setting up kerberos principal", self.__setup_principal)
self.step("disabling default signer daemon", self.__disable_signerd)
@ -60,17 +56,6 @@ class ODSExporterInstance(service.Service):
self.step("configuring DNS Key Exporter to start on boot", self.__enable)
self.start_creation()
def __check_dnssec_status(self):
try:
self.ods_uid = pwd.getpwnam(constants.ODS_USER).pw_uid
except KeyError:
raise RuntimeError("OpenDNSSEC UID not found")
try:
self.ods_gid = grp.getgrnam(constants.ODS_GROUP).gr_gid
except KeyError:
raise RuntimeError("OpenDNSSEC GID not found")
def __enable(self):
try:

View File

@ -6,8 +6,6 @@ from __future__ import absolute_import
import logging
import os
import pwd
import grp
import stat
import shutil
from subprocess import CalledProcessError
@ -69,8 +67,9 @@ class OpenDNSSECInstance(service.Service):
self, "ods-enforcerd",
service_desc="OpenDNSSEC enforcer daemon",
)
self.ods_uid = None
self.ods_gid = None
self.named_gid = constants.NAMED_GROUP.gid
self.ods_uid = constants.ODS_USER.uid
self.ods_gid = constants.ODS_GROUP.gid
self.conf_file_dict = {
'SOFTHSM_LIB': paths.LIBSOFTHSM2_SO,
'TOKEN_LABEL': SOFTHSM_DNSSEC_TOKEN_LABEL,
@ -107,8 +106,6 @@ class OpenDNSSECInstance(service.Service):
except Exception:
pass
# checking status must be first
self.step("checking status", self.__check_dnssec_status)
self.step("setting up configuration files", self.__setup_conf_files)
self.step("setting up ownership and file mode bits", self.__setup_ownership_file_modes)
if generate_master_key:
@ -119,27 +116,6 @@ class OpenDNSSECInstance(service.Service):
self.step("configuring OpenDNSSEC enforcer to start on boot", self.__enable)
self.start_creation()
def __check_dnssec_status(self):
try:
self.named_uid = pwd.getpwnam(constants.NAMED_USER).pw_uid
except KeyError:
raise RuntimeError("Named UID not found")
try:
self.named_gid = grp.getgrnam(constants.NAMED_GROUP).gr_gid
except KeyError:
raise RuntimeError("Named GID not found")
try:
self.ods_uid = pwd.getpwnam(constants.ODS_USER).pw_uid
except KeyError:
raise RuntimeError("OpenDNSSEC UID not found")
try:
self.ods_gid = grp.getgrnam(constants.ODS_GROUP).gr_gid
except KeyError:
raise RuntimeError("OpenDNSSEC GID not found")
def __enable(self):
try:
self.ldap_configure('DNSSEC', self.fqdn, None,

View File

@ -11,7 +11,6 @@ import re
import os
import glob
import shutil
import pwd
import fileinput
import ssl
import stat
@ -755,8 +754,7 @@ def copy_crl_file(old_path, new_path=None):
os.symlink(realpath, new_path)
else:
shutil.copy2(old_path, new_path)
pent = pwd.getpwnam(constants.PKI_USER)
os.chown(new_path, pent.pw_uid, pent.pw_gid)
constants.PKI_USER.chown(new_path)
tasks.restore_context(new_path)
@ -1104,8 +1102,7 @@ def update_http_keytab(http):
'Cannot remove file %s (%s). Please remove the file manually.',
paths.OLD_IPA_KEYTAB, e
)
pent = pwd.getpwnam(http.keytab_user)
os.chown(http.keytab, pent.pw_uid, pent.pw_gid)
http.keytab_user.chown(http.keytab)
def ds_enable_sidgen_extdom_plugins(ds):

View File

@ -22,7 +22,6 @@ from __future__ import absolute_import
import logging
import sys
import os
import pwd
import socket
import time
import traceback
@ -758,9 +757,7 @@ class Service:
keytab = self.keytab
if owner is None:
owner = self.service_user
pent = pwd.getpwnam(owner)
os.chown(keytab, pent.pw_uid, pent.pw_gid)
owner.chown(keytab)
def run_getkeytab(self, ldap_uri, keytab, principal, retrieve=False):
"""

View File

@ -0,0 +1,53 @@
#
# Copyright (C) 2020 FreeIPA Contributors see COPYING for license
#
import pytest
from ipaplatform.base.constants import User, Group
def test_user_root():
user = User("root")
assert user == "root"
assert str(user) == "root"
assert repr(user) == '<User "root">'
assert user.uid == 0
assert user.pgid == 0
assert user.entity.pw_uid == 0
def test_user_invalid():
invalid = User("invalid")
with pytest.raises(ValueError) as e:
assert invalid.uid
assert str(e.value) == "user 'invalid' not found"
def test_group():
group = Group("root")
assert group == "root"
assert str(group) == "root"
assert repr(group) == '<Group "root">'
assert group.gid == 0
assert group.entity.gr_gid == 0
def test_group_invalid():
invalid = Group("invalid")
with pytest.raises(ValueError) as e:
assert invalid.gid
assert str(e.value) == "group 'invalid' not found"
@pytest.mark.skip_if_platform("debian", reason="test is Fedora specific")
@pytest.mark.skip_if_platform("suse", reason="test is Fedora specific")
def test_user_group_daemon():
# daemon user / group are always defined
user = User("daemon")
assert user == "daemon"
assert user.uid == 2
assert user.pgid == 2
group = Group("daemon")
assert group == "daemon"
assert group.gid == 2

View File

@ -24,7 +24,6 @@ Test the `ipapython/ipautil.py` module.
from __future__ import absolute_import
import os
import pwd
import socket
import sys
import tempfile
@ -530,14 +529,13 @@ def test_run_runas():
The test is using 'ipaapi' user as it is configured when
ipa-server-common package is installed.
"""
user = pwd.getpwnam(IPAAPI_USER)
res = ipautil.run(['/usr/bin/id', '-u'], runas=IPAAPI_USER)
assert res.returncode == 0
assert res.raw_output == b'%d\n' % user.pw_uid
assert res.raw_output == b'%d\n' % IPAAPI_USER.uid
res = ipautil.run(['/usr/bin/id', '-g'], runas=IPAAPI_USER)
assert res.returncode == 0
assert res.raw_output == b'%d\n' % user.pw_gid
assert res.raw_output == b'%d\n' % IPAAPI_USER.pgid
@pytest.fixture(scope='function')