mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2024-12-24 08:00:02 -06:00
Add user and group wrappers
New classes for user and group names provide a convenient way to access the uid and primary gid of a user / gid of a group. The classes also provide chown() and chgrp() methods to simplify common operations. The wrappers are subclasses of builtin str type and behave like ordinary strings with additional features. The pwd and grp structs are retrieved once and then cached. Signed-off-by: Christian Heimes <cheimes@redhat.com> Reviewed-By: Rob Crittenden <rcritten@redhat.com>
This commit is contained in:
parent
99a40cbbe9
commit
72fb4e60c8
@ -12,7 +12,6 @@ from ipaplatform.paths import paths
|
||||
import io
|
||||
import sys
|
||||
import os
|
||||
import pwd
|
||||
import tempfile
|
||||
import textwrap
|
||||
|
||||
@ -97,8 +96,7 @@ def retrieve_keytab(api, ccache_name, oneway_keytab_name, oneway_principal):
|
||||
)
|
||||
# Make sure SSSD is able to read the keytab
|
||||
try:
|
||||
sssd = pwd.getpwnam(constants.SSSD_USER)
|
||||
os.chown(oneway_keytab_name, sssd[2], sssd[3])
|
||||
constants.SSSD_USER.chown(oneway_keytab_name)
|
||||
except KeyError:
|
||||
# If user 'sssd' does not exist, we don't need to chown from root to sssd
|
||||
# because it means SSSD does not run as sssd user
|
||||
|
@ -24,6 +24,7 @@ All constants centralised in one file.
|
||||
|
||||
import os
|
||||
import socket
|
||||
from ipaplatform.constants import constants as _constants
|
||||
from ipapython.dn import DN
|
||||
from ipapython.version import VERSION, API_VERSION
|
||||
|
||||
@ -327,8 +328,8 @@ PATTERN_GROUPUSER_NAME = (
|
||||
ANON_USER = 'WELLKNOWN/ANONYMOUS'
|
||||
|
||||
# IPA API Framework user
|
||||
IPAAPI_USER = 'ipaapi'
|
||||
IPAAPI_GROUP = 'ipaapi'
|
||||
IPAAPI_USER = _constants.IPAAPI_USER
|
||||
IPAAPI_GROUP = _constants.IPAAPI_GROUP
|
||||
|
||||
# Use cache path
|
||||
USER_CACHE_PATH = (
|
||||
|
@ -5,35 +5,123 @@
|
||||
'''
|
||||
This base platform module exports platform dependant constants.
|
||||
'''
|
||||
import grp
|
||||
import os
|
||||
import pwd
|
||||
import sys
|
||||
|
||||
|
||||
class _Entity(str):
|
||||
__slots__ = ("_entity", )
|
||||
|
||||
def __init__(self, name):
|
||||
super().__init__()
|
||||
self._entity = None
|
||||
|
||||
def __str__(self):
|
||||
return super().__str__()
|
||||
|
||||
def __repr__(self):
|
||||
return f'<{self.__class__.__name__} "{self!s}">'
|
||||
|
||||
|
||||
class User(_Entity):
|
||||
__slots__ = ()
|
||||
|
||||
@property
|
||||
def entity(self):
|
||||
"""User information struct
|
||||
|
||||
:return: pwd.struct_passwd instance
|
||||
"""
|
||||
entity = self._entity
|
||||
if entity is None:
|
||||
try:
|
||||
self._entity = entity = pwd.getpwnam(self)
|
||||
except KeyError:
|
||||
raise ValueError(f"user '{self!s}' not found") from None
|
||||
return entity
|
||||
|
||||
@property
|
||||
def uid(self):
|
||||
"""Numeric user id (int)
|
||||
"""
|
||||
return self.entity.pw_uid
|
||||
|
||||
@property
|
||||
def pgid(self):
|
||||
"""Primary group id (int)"""
|
||||
return self.entity.pw_gid
|
||||
|
||||
def chown(self, path, gid=None, **kwargs):
|
||||
"""chown() file by path or file descriptor
|
||||
|
||||
gid defaults to user's primary gid. Use -1 to keep gid.
|
||||
"""
|
||||
if gid is None:
|
||||
gid = self.pgid
|
||||
elif isinstance(gid, Group):
|
||||
gid = gid.gid
|
||||
os.chown(path, self.uid, gid, **kwargs)
|
||||
|
||||
|
||||
class Group(_Entity):
|
||||
__slots__ = ()
|
||||
|
||||
@property
|
||||
def entity(self):
|
||||
"""Group information
|
||||
|
||||
:return: grp.struct_group instance
|
||||
"""
|
||||
entity = self._entity
|
||||
if entity is None:
|
||||
try:
|
||||
self._entity = entity = grp.getgrnam(self)
|
||||
except KeyError:
|
||||
raise ValueError(f"group '{self!s}' not found") from None
|
||||
return entity
|
||||
|
||||
@property
|
||||
def gid(self):
|
||||
"""Numeric group id (int)
|
||||
"""
|
||||
return self.entity.gr_gid
|
||||
|
||||
def chgrp(self, path, **kwargs):
|
||||
"""change group owner file by path or file descriptor
|
||||
"""
|
||||
os.chown(path, -1, self.gid, **kwargs)
|
||||
|
||||
|
||||
class BaseConstantsNamespace:
|
||||
IS_64BITS = sys.maxsize > 2 ** 32
|
||||
DEFAULT_ADMIN_SHELL = '/bin/bash'
|
||||
DEFAULT_SHELL = '/bin/sh'
|
||||
DS_USER = 'dirsrv'
|
||||
DS_GROUP = 'dirsrv'
|
||||
HTTPD_USER = "apache"
|
||||
HTTPD_GROUP = "apache"
|
||||
GSSPROXY_USER = "root"
|
||||
IPAAPI_USER = User("ipaapi")
|
||||
IPAAPI_GROUP = Group("ipaapi")
|
||||
DS_USER = User("dirsrv")
|
||||
DS_GROUP = Group("dirsrv")
|
||||
HTTPD_USER = User("apache")
|
||||
HTTPD_GROUP = Group("apache")
|
||||
GSSPROXY_USER = User("root")
|
||||
IPA_ADTRUST_PACKAGE_NAME = "freeipa-server-trust-ad"
|
||||
IPA_DNS_PACKAGE_NAME = "freeipa-server-dns"
|
||||
KDCPROXY_USER = "kdcproxy"
|
||||
NAMED_USER = "named"
|
||||
NAMED_GROUP = "named"
|
||||
KDCPROXY_USER = User("kdcproxy")
|
||||
NAMED_USER = User("named")
|
||||
NAMED_GROUP = Group("named")
|
||||
NAMED_DATA_DIR = "data/"
|
||||
NAMED_OPTIONS_VAR = "OPTIONS"
|
||||
NAMED_OPENSSL_ENGINE = None
|
||||
NAMED_ZONE_COMMENT = ""
|
||||
PKI_USER = 'pkiuser'
|
||||
PKI_GROUP = 'pkiuser'
|
||||
PKI_USER = User("pkiuser")
|
||||
PKI_GROUP = Group("pkiuser")
|
||||
# ntpd init variable used for daemon options
|
||||
NTPD_OPTS_VAR = "OPTIONS"
|
||||
# quote used for daemon options
|
||||
NTPD_OPTS_QUOTE = "\""
|
||||
ODS_USER = "ods"
|
||||
ODS_GROUP = "ods"
|
||||
ODS_USER = User("ods")
|
||||
ODS_GROUP = Group("ods")
|
||||
# nfsd init variable used to enable kerberized NFS
|
||||
SECURE_NFS_VAR = "SECURE_NFS"
|
||||
SELINUX_BOOLEAN_ADTRUST = {
|
||||
@ -70,7 +158,7 @@ class BaseConstantsNamespace:
|
||||
"$sysadm_u:s0-s0:c0.c1023"
|
||||
"$unconfined_u:s0-s0:c0.c1023"
|
||||
)
|
||||
SSSD_USER = "sssd"
|
||||
SSSD_USER = User("sssd")
|
||||
# WSGI module override, only used on Fedora
|
||||
MOD_WSGI_PYTHON2 = None
|
||||
MOD_WSGI_PYTHON3 = None
|
||||
|
@ -9,22 +9,22 @@ This Debian family platform module exports platform dependant constants.
|
||||
# Fallback to default path definitions
|
||||
from __future__ import absolute_import
|
||||
|
||||
from ipaplatform.base.constants import BaseConstantsNamespace
|
||||
from ipaplatform.base.constants import BaseConstantsNamespace, User, Group
|
||||
|
||||
|
||||
class DebianConstantsNamespace(BaseConstantsNamespace):
|
||||
HTTPD_USER = "www-data"
|
||||
HTTPD_GROUP = "www-data"
|
||||
NAMED_USER = "bind"
|
||||
NAMED_GROUP = "bind"
|
||||
HTTPD_USER = User("www-data")
|
||||
HTTPD_GROUP = Group("www-data")
|
||||
NAMED_USER = User("bind")
|
||||
NAMED_GROUP = Group("bind")
|
||||
NAMED_DATA_DIR = ""
|
||||
NAMED_ZONE_COMMENT = "//"
|
||||
# ntpd init variable used for daemon options
|
||||
NTPD_OPTS_VAR = "NTPD_OPTS"
|
||||
# quote used for daemon options
|
||||
NTPD_OPTS_QUOTE = "\'"
|
||||
ODS_USER = "opendnssec"
|
||||
ODS_GROUP = "opendnssec"
|
||||
ODS_USER = User("opendnssec")
|
||||
ODS_GROUP = Group("opendnssec")
|
||||
SECURE_NFS_VAR = "NEED_GSSD"
|
||||
|
||||
constants = DebianConstantsNamespace()
|
||||
|
@ -8,14 +8,14 @@ related constants for the SUSE OS family-based systems.
|
||||
"""
|
||||
|
||||
# Fallback to default path definitions
|
||||
from ipaplatform.base.constants import BaseConstantsNamespace
|
||||
from ipaplatform.base.constants import BaseConstantsNamespace, User, Group
|
||||
|
||||
|
||||
class SuseConstantsNamespace(BaseConstantsNamespace):
|
||||
HTTPD_USER = "wwwrun"
|
||||
HTTPD_GROUP = "www"
|
||||
HTTPD_USER = User("wwwrun")
|
||||
HTTPD_GROUP = Group("www")
|
||||
# Don't have it yet
|
||||
SSSD_USER = "root"
|
||||
SSSD_USER = User("root")
|
||||
TLS_HIGH_CIPHERS = None
|
||||
|
||||
|
||||
|
@ -23,7 +23,6 @@ from __future__ import print_function
|
||||
import logging
|
||||
import tempfile
|
||||
import os
|
||||
import pwd
|
||||
import netaddr
|
||||
import re
|
||||
import shutil
|
||||
@ -993,8 +992,7 @@ class BindInstance(service.Service):
|
||||
dns_principal = p
|
||||
|
||||
# Make sure access is strictly reserved to the named user
|
||||
pent = pwd.getpwnam(self.service_user)
|
||||
os.chown(self.keytab, pent.pw_uid, pent.pw_gid)
|
||||
self.service_user.chown(self.keytab)
|
||||
os.chmod(self.keytab, 0o400)
|
||||
|
||||
# modify the principal so that it is marked as an ipa service so that
|
||||
@ -1040,7 +1038,7 @@ class BindInstance(service.Service):
|
||||
"""
|
||||
# files are owned by root:named and are readable by user and group
|
||||
uid = 0
|
||||
gid = pwd.getpwnam(constants.NAMED_USER).pw_gid
|
||||
gid = constants.NAMED_GROUP.gid
|
||||
mode = 0o640
|
||||
|
||||
changed = False
|
||||
|
@ -28,8 +28,6 @@ import logging
|
||||
import dbus
|
||||
import ldap
|
||||
import os
|
||||
import pwd
|
||||
import grp
|
||||
import re
|
||||
import shutil
|
||||
import sys
|
||||
@ -44,7 +42,6 @@ from ipalib import errors
|
||||
import ipalib.constants
|
||||
from ipalib.install import certmonger
|
||||
from ipaplatform import services
|
||||
from ipaplatform.constants import constants
|
||||
from ipaplatform.paths import paths
|
||||
from ipaplatform.tasks import tasks
|
||||
|
||||
@ -534,8 +531,7 @@ class CAInstance(DogtagInstance):
|
||||
# so remove the file first
|
||||
ipautil.remove_file(paths.TMP_CA_P12)
|
||||
shutil.copy(cafile, paths.TMP_CA_P12)
|
||||
pent = pwd.getpwnam(self.service_user)
|
||||
os.chown(paths.TMP_CA_P12, pent.pw_uid, pent.pw_gid)
|
||||
self.service_user.chown(paths.TMP_CA_P12)
|
||||
|
||||
self._configure_clone(
|
||||
cfg,
|
||||
@ -595,11 +591,10 @@ class CAInstance(DogtagInstance):
|
||||
|
||||
config = self._create_spawn_config(cfg)
|
||||
self.set_hsm_state(config)
|
||||
pent = pwd.getpwnam(self.service_user)
|
||||
with tempfile.NamedTemporaryFile('w') as f:
|
||||
config.write(f)
|
||||
f.flush()
|
||||
os.fchown(f.fileno(), pent.pw_uid, pent.pw_gid)
|
||||
self.service_user.chown(f.fileno())
|
||||
|
||||
self.backup_state('installed', True)
|
||||
|
||||
@ -682,8 +677,7 @@ class CAInstance(DogtagInstance):
|
||||
'ca.enableNonces=false')
|
||||
if update_result != 0:
|
||||
raise RuntimeError("Disabling nonces failed")
|
||||
pent = pwd.getpwnam(self.service_user)
|
||||
os.chown(self.config, pent.pw_uid, pent.pw_gid)
|
||||
self.service_user.chown(self.config)
|
||||
|
||||
def enable_pkix(self):
|
||||
directivesetter.set_directive(paths.SYSCONFIG_PKI_TOMCAT,
|
||||
@ -732,9 +726,9 @@ class CAInstance(DogtagInstance):
|
||||
"""
|
||||
Sets the correct permissions for the RA_AGENT_PEM, RA_AGENT_KEY files
|
||||
"""
|
||||
ipaapi_gid = grp.getgrnam(ipalib.constants.IPAAPI_GROUP).gr_gid
|
||||
group = ipalib.constants.IPAAPI_GROUP
|
||||
for fname in (paths.RA_AGENT_PEM, paths.RA_AGENT_KEY):
|
||||
os.chown(fname, -1, ipaapi_gid)
|
||||
group.chgrp(fname)
|
||||
os.chmod(fname, 0o440)
|
||||
tasks.restore_context(fname)
|
||||
|
||||
@ -913,8 +907,7 @@ class CAInstance(DogtagInstance):
|
||||
os.mkdir(publishdir)
|
||||
|
||||
os.chmod(publishdir, 0o775)
|
||||
pent = pwd.getpwnam(self.service_user)
|
||||
os.chown(publishdir, 0, pent.pw_gid)
|
||||
os.chown(publishdir, 0, self.service_user.pgid)
|
||||
|
||||
tasks.restore_context(publishdir)
|
||||
|
||||
@ -1294,8 +1287,6 @@ class CAInstance(DogtagInstance):
|
||||
sysupgrade.set_upgrade_state('dogtag', LWCA_KEY_RETRIEVAL, True)
|
||||
|
||||
def __setup_lightweight_ca_key_retrieval_kerberos(self):
|
||||
pent = pwd.getpwnam(self.service_user)
|
||||
|
||||
logger.debug('Creating principal')
|
||||
installutils.kadmin_addprinc(self.principal)
|
||||
self.suffix = ipautil.realm_to_suffix(self.realm)
|
||||
@ -1304,11 +1295,9 @@ class CAInstance(DogtagInstance):
|
||||
logger.debug('Retrieving keytab')
|
||||
installutils.create_keytab(self.keytab, self.principal)
|
||||
os.chmod(self.keytab, 0o600)
|
||||
os.chown(self.keytab, pent.pw_uid, pent.pw_gid)
|
||||
self.service_user.chown(self.keytab)
|
||||
|
||||
def __setup_lightweight_ca_key_retrieval_custodia(self):
|
||||
pent = pwd.getpwnam(self.service_user)
|
||||
|
||||
logger.debug('Creating Custodia keys')
|
||||
custodia_basedn = DN(
|
||||
('cn', 'custodia'), ('cn', 'ipa'), ('cn', 'etc'), api.env.basedn)
|
||||
@ -1326,7 +1315,7 @@ class CAInstance(DogtagInstance):
|
||||
keystore = IPAKEMKeys({'server_keys': keyfile})
|
||||
keystore.generate_keys(self.service_prefix)
|
||||
os.chmod(keyfile, 0o600)
|
||||
os.chown(keyfile, pent.pw_uid, pent.pw_gid)
|
||||
self.service_user.chown(keyfile)
|
||||
|
||||
def __remove_lightweight_ca_key_retrieval_custodia(self):
|
||||
keyfile = os.path.join(paths.PKI_TOMCAT,
|
||||
@ -1573,8 +1562,7 @@ class CAInstance(DogtagInstance):
|
||||
with open(target, 'w') as f:
|
||||
f.write(filled)
|
||||
os.fchmod(f.fileno(), 0o600)
|
||||
pent = pwd.getpwnam(constants.PKI_USER)
|
||||
os.fchown(f.fileno(), pent.pw_uid, pent.pw_gid)
|
||||
self.service_user.chown(f.fileno())
|
||||
|
||||
# deploy ACME Tomcat application
|
||||
ipautil.run(['pki-server', 'acme-deploy'])
|
||||
|
@ -20,7 +20,6 @@ from jwcrypto.common import json_decode
|
||||
import os
|
||||
import stat
|
||||
import time
|
||||
import pwd
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -110,15 +109,14 @@ class CustodiaInstance(SimpleServiceInstance):
|
||||
def __config_file(self):
|
||||
template_file = os.path.basename(self.config_file) + '.template'
|
||||
template = os.path.join(paths.USR_SHARE_IPA_DIR, template_file)
|
||||
httpd_info = pwd.getpwnam(constants.HTTPD_USER)
|
||||
sub_dict = dict(
|
||||
IPA_CUSTODIA_CONF_DIR=paths.IPA_CUSTODIA_CONF_DIR,
|
||||
IPA_CUSTODIA_KEYS=paths.IPA_CUSTODIA_KEYS,
|
||||
IPA_CUSTODIA_SOCKET=paths.IPA_CUSTODIA_SOCKET,
|
||||
IPA_CUSTODIA_AUDIT_LOG=paths.IPA_CUSTODIA_AUDIT_LOG,
|
||||
LDAP_URI=ipaldap.realm_to_ldapi_uri(self.realm),
|
||||
UID=httpd_info.pw_uid,
|
||||
GID=httpd_info.pw_gid
|
||||
UID=constants.HTTPD_USER.uid,
|
||||
GID=constants.HTTPD_USER.pgid
|
||||
)
|
||||
conf = ipautil.template_file(template, sub_dict)
|
||||
with open(self.config_file, "w") as f:
|
||||
|
@ -5,10 +5,8 @@
|
||||
from __future__ import print_function, absolute_import
|
||||
|
||||
import errno
|
||||
import grp
|
||||
import logging
|
||||
import os
|
||||
import pwd
|
||||
import re
|
||||
import shutil
|
||||
import stat
|
||||
@ -57,10 +55,10 @@ class DNSKeySyncInstance(service.Service):
|
||||
keytab=paths.IPA_DNSKEYSYNCD_KEYTAB
|
||||
)
|
||||
self.extra_config = [u'dnssecVersion 1', ] # DNSSEC enabled
|
||||
self.named_uid = self.__get_named_uid()
|
||||
self.named_gid = self.__get_named_gid()
|
||||
self.ods_uid = self.__get_ods_uid()
|
||||
self.ods_gid = self.__get_ods_gid()
|
||||
self.named_uid = constants.NAMED_USER.uid
|
||||
self.named_gid = constants.NAMED_GROUP.gid
|
||||
self.ods_uid = constants.ODS_USER.uid
|
||||
self.ods_gid = constants.ODS_GROUP.gid
|
||||
|
||||
suffix = ipautil.dn_attribute_property('_suffix')
|
||||
|
||||
@ -116,30 +114,6 @@ class DNSKeySyncInstance(service.Service):
|
||||
# we need restart named after setting up this service
|
||||
self.start_creation()
|
||||
|
||||
def __get_named_uid(self):
|
||||
try:
|
||||
return pwd.getpwnam(constants.NAMED_USER).pw_uid
|
||||
except KeyError:
|
||||
raise RuntimeError("Named UID not found")
|
||||
|
||||
def __get_named_gid(self):
|
||||
try:
|
||||
return grp.getgrnam(constants.NAMED_GROUP).gr_gid
|
||||
except KeyError:
|
||||
raise RuntimeError("Named GID not found")
|
||||
|
||||
def __get_ods_uid(self):
|
||||
try:
|
||||
return pwd.getpwnam(constants.ODS_USER).pw_uid
|
||||
except KeyError:
|
||||
raise RuntimeError("OpenDNSSEC UID not found")
|
||||
|
||||
def __get_ods_gid(self):
|
||||
try:
|
||||
return grp.getgrnam(constants.ODS_GROUP).gr_gid
|
||||
except KeyError:
|
||||
raise RuntimeError("OpenDNSSEC GID not found")
|
||||
|
||||
def __check_dnssec_status(self):
|
||||
if not dns_container_exists(self.suffix):
|
||||
raise RuntimeError("DNS container does not exist")
|
||||
|
@ -30,7 +30,6 @@ import shutil
|
||||
import traceback
|
||||
import dbus
|
||||
import re
|
||||
import pwd
|
||||
import lxml.etree
|
||||
|
||||
from configparser import DEFAULTSECT, ConfigParser, RawConfigParser
|
||||
@ -363,11 +362,10 @@ class DogtagInstance(service.Service):
|
||||
connector.attrib[secretattr] = self.ajp_secret
|
||||
|
||||
if rewrite:
|
||||
pent = pwd.getpwnam(constants.PKI_USER)
|
||||
with open(paths.PKI_TOMCAT_SERVER_XML, "wb") as fd:
|
||||
server_xml.write(fd, pretty_print=True, encoding="utf-8")
|
||||
os.fchmod(fd.fileno(), 0o660)
|
||||
os.fchown(fd.fileno(), pent.pw_uid, pent.pw_gid)
|
||||
self.service_user.chown(fd.fileno())
|
||||
|
||||
def http_proxy(self):
|
||||
""" Update the http proxy file """
|
||||
|
@ -22,7 +22,6 @@ from __future__ import print_function, absolute_import
|
||||
|
||||
import logging
|
||||
import shutil
|
||||
import pwd
|
||||
import os
|
||||
import time
|
||||
import tempfile
|
||||
@ -630,14 +629,13 @@ class DsInstance(service.Service):
|
||||
logger.debug("Failed to clean temporary file: %s", e)
|
||||
|
||||
def __add_default_schemas(self):
|
||||
pent = pwd.getpwnam(DS_USER)
|
||||
for schema_fname in IPA_SCHEMA_FILES:
|
||||
target_fname = schema_dirname(self.serverid) + schema_fname
|
||||
shutil.copyfile(
|
||||
os.path.join(paths.USR_SHARE_IPA_DIR, schema_fname),
|
||||
target_fname)
|
||||
os.chmod(target_fname, 0o440) # read access for dirsrv user/group
|
||||
os.chown(target_fname, pent.pw_uid, pent.pw_gid)
|
||||
DS_USER.chown(target_fname)
|
||||
|
||||
try:
|
||||
shutil.move(schema_dirname(self.serverid) + "05rfc2247.ldif",
|
||||
@ -648,7 +646,7 @@ class DsInstance(service.Service):
|
||||
os.path.join(paths.USR_SHARE_IPA_DIR, "05rfc2247.ldif"),
|
||||
target_fname)
|
||||
os.chmod(target_fname, 0o440)
|
||||
os.chown(target_fname, pent.pw_uid, pent.pw_gid)
|
||||
DS_USER.chown(target_fname)
|
||||
except IOError:
|
||||
# Does not apply with newer DS releases
|
||||
pass
|
||||
@ -772,13 +770,12 @@ class DsInstance(service.Service):
|
||||
self._ldap_mod("repoint-managed-entries.ldif", self.sub_dict)
|
||||
|
||||
def configure_systemd_ipa_env(self):
|
||||
pent = pwd.getpwnam(platformconstants.DS_USER)
|
||||
template = os.path.join(
|
||||
paths.USR_SHARE_IPA_DIR, "ds-ipa-env.conf.template"
|
||||
)
|
||||
sub_dict = dict(
|
||||
KRB5_KTNAME=paths.DS_KEYTAB,
|
||||
KRB5CCNAME=paths.TMP_KRB5CC % pent.pw_uid
|
||||
KRB5CCNAME=paths.TMP_KRB5CC % platformconstants.DS_USER.uid
|
||||
)
|
||||
conf = ipautil.template_file(template, sub_dict)
|
||||
|
||||
|
@ -26,7 +26,6 @@ import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import pwd
|
||||
|
||||
import six
|
||||
|
||||
@ -295,14 +294,12 @@ class Backup(admintool.AdminTool):
|
||||
|
||||
logger.info("Preparing backup on %s", api.env.host)
|
||||
|
||||
pent = pwd.getpwnam(constants.DS_USER)
|
||||
|
||||
self.top_dir = tempfile.mkdtemp("ipa")
|
||||
os.chown(self.top_dir, pent.pw_uid, pent.pw_gid)
|
||||
constants.DS_USER.chown(self.top_dir)
|
||||
os.chmod(self.top_dir, 0o750)
|
||||
self.dir = os.path.join(self.top_dir, "ipa")
|
||||
os.mkdir(self.dir, 0o750)
|
||||
os.chown(self.dir, pent.pw_uid, pent.pw_gid)
|
||||
constants.DS_USER.chown(self.dir)
|
||||
self.tarfile = None
|
||||
|
||||
self.header = os.path.join(self.top_dir, 'header')
|
||||
|
@ -26,7 +26,6 @@ import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
import pwd
|
||||
import ldif
|
||||
import itertools
|
||||
|
||||
@ -346,16 +345,14 @@ class Restore(admintool.AdminTool):
|
||||
)
|
||||
)
|
||||
|
||||
pent = pwd.getpwnam(constants.DS_USER)
|
||||
|
||||
# Temporary directory for decrypting files before restoring
|
||||
self.top_dir = tempfile.mkdtemp("ipa")
|
||||
os.chown(self.top_dir, pent.pw_uid, pent.pw_gid)
|
||||
constants.DS_USER.chown(self.top_dir)
|
||||
os.chmod(self.top_dir, 0o750)
|
||||
self.dir = os.path.join(self.top_dir, "ipa")
|
||||
os.mkdir(self.dir)
|
||||
os.chmod(self.dir, 0o750)
|
||||
os.chown(self.dir, pent.pw_uid, pent.pw_gid)
|
||||
constants.DS_USER.chown(self.dir)
|
||||
|
||||
logger.info("Temporary setting umask to 022")
|
||||
old_umask = os.umask(0o022)
|
||||
@ -590,10 +587,9 @@ class Restore(admintool.AdminTool):
|
||||
srcldiffile = os.path.join(self.dir, ldifname)
|
||||
|
||||
if not os.path.exists(ldifdir):
|
||||
pent = pwd.getpwnam(constants.DS_USER)
|
||||
os.mkdir(ldifdir)
|
||||
os.chmod(ldifdir, 0o770)
|
||||
os.chown(ldifdir, pent.pw_uid, pent.pw_gid)
|
||||
constants.DS_USER.chown(ldifdir)
|
||||
|
||||
ipautil.backup_file(ldiffile)
|
||||
with open(ldiffile, 'w') as out_file:
|
||||
@ -603,8 +599,7 @@ class Restore(admintool.AdminTool):
|
||||
ldif_parser.parse()
|
||||
|
||||
# Make sure the modified ldiffile is owned by DS_USER
|
||||
pent = pwd.getpwnam(constants.DS_USER)
|
||||
os.chown(ldiffile, pent.pw_uid, pent.pw_gid)
|
||||
constants.DS_USER.chown(ldiffile)
|
||||
|
||||
if online:
|
||||
conn = self.get_connection()
|
||||
@ -634,7 +629,7 @@ class Restore(admintool.AdminTool):
|
||||
except OSError as e:
|
||||
pass
|
||||
|
||||
os.chown(template_dir, pent.pw_uid, pent.pw_gid)
|
||||
constants.DS_USER.chown(template_dir)
|
||||
os.chmod(template_dir, 0o770)
|
||||
|
||||
# Restore SELinux context of template_dir
|
||||
@ -825,9 +820,10 @@ class Restore(admintool.AdminTool):
|
||||
]
|
||||
run(args, cwd=self.dir)
|
||||
|
||||
pent = pwd.getpwnam(constants.DS_USER)
|
||||
os.chown(self.top_dir, pent.pw_uid, pent.pw_gid)
|
||||
recursive_chown(self.dir, pent.pw_uid, pent.pw_gid)
|
||||
constants.DS_USER.chown(self.top_dir)
|
||||
recursive_chown(
|
||||
self.dir, constants.DS_USER.uid, constants.DS_USER.pgid
|
||||
)
|
||||
|
||||
if encrypt:
|
||||
# We can remove the decoded tarball
|
||||
@ -851,7 +847,7 @@ class Restore(admintool.AdminTool):
|
||||
paths.TOMCAT_SIGNEDAUDIT_DIR]
|
||||
|
||||
try:
|
||||
pent = pwd.getpwnam(constants.PKI_USER)
|
||||
pent = constants.PKI_USER.entity
|
||||
except KeyError:
|
||||
logger.debug("No %s user exists, skipping CA directory creation",
|
||||
constants.PKI_USER)
|
||||
|
@ -21,7 +21,6 @@ from __future__ import absolute_import
|
||||
|
||||
import logging
|
||||
import os
|
||||
import pwd
|
||||
import shutil
|
||||
import tempfile
|
||||
import base64
|
||||
@ -183,8 +182,7 @@ class KRAInstance(DogtagInstance):
|
||||
if self.clone:
|
||||
krafile = self.pkcs12_info[0]
|
||||
shutil.copy(krafile, p12_tmpfile_name)
|
||||
pent = pwd.getpwnam(self.service_user)
|
||||
os.chown(p12_tmpfile_name, pent.pw_uid, pent.pw_gid)
|
||||
self.service_user.chown(p12_tmpfile_name)
|
||||
|
||||
self._configure_clone(
|
||||
cfg,
|
||||
@ -207,11 +205,10 @@ class KRAInstance(DogtagInstance):
|
||||
)
|
||||
|
||||
# Generate configuration file
|
||||
pent = pwd.getpwnam(self.service_user)
|
||||
config = self._create_spawn_config(cfg)
|
||||
with tempfile.NamedTemporaryFile('w', delete=False) as f:
|
||||
config.write(f)
|
||||
os.fchown(f.fileno(), pent.pw_uid, pent.pw_gid)
|
||||
self.service_user.chown(f.fileno())
|
||||
cfg_file = f.name
|
||||
|
||||
nolog_list = [
|
||||
|
@ -22,7 +22,6 @@ from __future__ import print_function
|
||||
|
||||
import logging
|
||||
import os
|
||||
import pwd
|
||||
import socket
|
||||
import dbus
|
||||
|
||||
@ -384,8 +383,7 @@ class KrbInstance(service.Service):
|
||||
|
||||
self.fstore.backup_file(paths.DS_KEYTAB)
|
||||
installutils.create_keytab(paths.DS_KEYTAB, ldap_principal)
|
||||
pent = pwd.getpwnam(constants.DS_USER)
|
||||
os.chown(paths.DS_KEYTAB, pent.pw_uid, pent.pw_gid)
|
||||
constants.DS_USER.chown(paths.DS_KEYTAB)
|
||||
|
||||
def __create_host_keytab(self):
|
||||
host_principal = "host/" + self.fqdn + "@" + self.realm
|
||||
|
@ -6,8 +6,6 @@ from __future__ import absolute_import
|
||||
|
||||
import logging
|
||||
import os
|
||||
import pwd
|
||||
import grp
|
||||
|
||||
import ldap
|
||||
|
||||
@ -33,8 +31,8 @@ class ODSExporterInstance(service.Service):
|
||||
keytab=paths.IPA_ODS_EXPORTER_KEYTAB,
|
||||
service_prefix=u'ipa-ods-exporter'
|
||||
)
|
||||
self.ods_uid = None
|
||||
self.ods_gid = None
|
||||
self.ods_uid = constants.ODS_USER.uid
|
||||
self.ods_gid = constants.ODS_GROUP.gid
|
||||
self.enable_if_exists = False
|
||||
|
||||
suffix = ipautil.dn_attribute_property('_suffix')
|
||||
@ -51,8 +49,6 @@ class ODSExporterInstance(service.Service):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# checking status step must be first
|
||||
self.step("checking status", self.__check_dnssec_status)
|
||||
self.step("setting up DNS Key Exporter", self.__setup_key_exporter)
|
||||
self.step("setting up kerberos principal", self.__setup_principal)
|
||||
self.step("disabling default signer daemon", self.__disable_signerd)
|
||||
@ -60,17 +56,6 @@ class ODSExporterInstance(service.Service):
|
||||
self.step("configuring DNS Key Exporter to start on boot", self.__enable)
|
||||
self.start_creation()
|
||||
|
||||
def __check_dnssec_status(self):
|
||||
try:
|
||||
self.ods_uid = pwd.getpwnam(constants.ODS_USER).pw_uid
|
||||
except KeyError:
|
||||
raise RuntimeError("OpenDNSSEC UID not found")
|
||||
|
||||
try:
|
||||
self.ods_gid = grp.getgrnam(constants.ODS_GROUP).gr_gid
|
||||
except KeyError:
|
||||
raise RuntimeError("OpenDNSSEC GID not found")
|
||||
|
||||
def __enable(self):
|
||||
|
||||
try:
|
||||
|
@ -6,8 +6,6 @@ from __future__ import absolute_import
|
||||
|
||||
import logging
|
||||
import os
|
||||
import pwd
|
||||
import grp
|
||||
import stat
|
||||
import shutil
|
||||
from subprocess import CalledProcessError
|
||||
@ -69,8 +67,9 @@ class OpenDNSSECInstance(service.Service):
|
||||
self, "ods-enforcerd",
|
||||
service_desc="OpenDNSSEC enforcer daemon",
|
||||
)
|
||||
self.ods_uid = None
|
||||
self.ods_gid = None
|
||||
self.named_gid = constants.NAMED_GROUP.gid
|
||||
self.ods_uid = constants.ODS_USER.uid
|
||||
self.ods_gid = constants.ODS_GROUP.gid
|
||||
self.conf_file_dict = {
|
||||
'SOFTHSM_LIB': paths.LIBSOFTHSM2_SO,
|
||||
'TOKEN_LABEL': SOFTHSM_DNSSEC_TOKEN_LABEL,
|
||||
@ -107,8 +106,6 @@ class OpenDNSSECInstance(service.Service):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# checking status must be first
|
||||
self.step("checking status", self.__check_dnssec_status)
|
||||
self.step("setting up configuration files", self.__setup_conf_files)
|
||||
self.step("setting up ownership and file mode bits", self.__setup_ownership_file_modes)
|
||||
if generate_master_key:
|
||||
@ -119,27 +116,6 @@ class OpenDNSSECInstance(service.Service):
|
||||
self.step("configuring OpenDNSSEC enforcer to start on boot", self.__enable)
|
||||
self.start_creation()
|
||||
|
||||
def __check_dnssec_status(self):
|
||||
try:
|
||||
self.named_uid = pwd.getpwnam(constants.NAMED_USER).pw_uid
|
||||
except KeyError:
|
||||
raise RuntimeError("Named UID not found")
|
||||
|
||||
try:
|
||||
self.named_gid = grp.getgrnam(constants.NAMED_GROUP).gr_gid
|
||||
except KeyError:
|
||||
raise RuntimeError("Named GID not found")
|
||||
|
||||
try:
|
||||
self.ods_uid = pwd.getpwnam(constants.ODS_USER).pw_uid
|
||||
except KeyError:
|
||||
raise RuntimeError("OpenDNSSEC UID not found")
|
||||
|
||||
try:
|
||||
self.ods_gid = grp.getgrnam(constants.ODS_GROUP).gr_gid
|
||||
except KeyError:
|
||||
raise RuntimeError("OpenDNSSEC GID not found")
|
||||
|
||||
def __enable(self):
|
||||
try:
|
||||
self.ldap_configure('DNSSEC', self.fqdn, None,
|
||||
|
@ -11,7 +11,6 @@ import re
|
||||
import os
|
||||
import glob
|
||||
import shutil
|
||||
import pwd
|
||||
import fileinput
|
||||
import ssl
|
||||
import stat
|
||||
@ -755,8 +754,7 @@ def copy_crl_file(old_path, new_path=None):
|
||||
os.symlink(realpath, new_path)
|
||||
else:
|
||||
shutil.copy2(old_path, new_path)
|
||||
pent = pwd.getpwnam(constants.PKI_USER)
|
||||
os.chown(new_path, pent.pw_uid, pent.pw_gid)
|
||||
constants.PKI_USER.chown(new_path)
|
||||
|
||||
tasks.restore_context(new_path)
|
||||
|
||||
@ -1104,8 +1102,7 @@ def update_http_keytab(http):
|
||||
'Cannot remove file %s (%s). Please remove the file manually.',
|
||||
paths.OLD_IPA_KEYTAB, e
|
||||
)
|
||||
pent = pwd.getpwnam(http.keytab_user)
|
||||
os.chown(http.keytab, pent.pw_uid, pent.pw_gid)
|
||||
http.keytab_user.chown(http.keytab)
|
||||
|
||||
|
||||
def ds_enable_sidgen_extdom_plugins(ds):
|
||||
|
@ -22,7 +22,6 @@ from __future__ import absolute_import
|
||||
import logging
|
||||
import sys
|
||||
import os
|
||||
import pwd
|
||||
import socket
|
||||
import time
|
||||
import traceback
|
||||
@ -758,9 +757,7 @@ class Service:
|
||||
keytab = self.keytab
|
||||
if owner is None:
|
||||
owner = self.service_user
|
||||
|
||||
pent = pwd.getpwnam(owner)
|
||||
os.chown(keytab, pent.pw_uid, pent.pw_gid)
|
||||
owner.chown(keytab)
|
||||
|
||||
def run_getkeytab(self, ldap_uri, keytab, principal, retrieve=False):
|
||||
"""
|
||||
|
53
ipatests/test_ipaplatform/test_constants.py
Normal file
53
ipatests/test_ipaplatform/test_constants.py
Normal file
@ -0,0 +1,53 @@
|
||||
#
|
||||
# Copyright (C) 2020 FreeIPA Contributors see COPYING for license
|
||||
#
|
||||
import pytest
|
||||
|
||||
from ipaplatform.base.constants import User, Group
|
||||
|
||||
|
||||
def test_user_root():
|
||||
user = User("root")
|
||||
assert user == "root"
|
||||
assert str(user) == "root"
|
||||
assert repr(user) == '<User "root">'
|
||||
assert user.uid == 0
|
||||
assert user.pgid == 0
|
||||
assert user.entity.pw_uid == 0
|
||||
|
||||
|
||||
def test_user_invalid():
|
||||
invalid = User("invalid")
|
||||
with pytest.raises(ValueError) as e:
|
||||
assert invalid.uid
|
||||
assert str(e.value) == "user 'invalid' not found"
|
||||
|
||||
|
||||
def test_group():
|
||||
group = Group("root")
|
||||
assert group == "root"
|
||||
assert str(group) == "root"
|
||||
assert repr(group) == '<Group "root">'
|
||||
assert group.gid == 0
|
||||
assert group.entity.gr_gid == 0
|
||||
|
||||
|
||||
def test_group_invalid():
|
||||
invalid = Group("invalid")
|
||||
with pytest.raises(ValueError) as e:
|
||||
assert invalid.gid
|
||||
assert str(e.value) == "group 'invalid' not found"
|
||||
|
||||
|
||||
@pytest.mark.skip_if_platform("debian", reason="test is Fedora specific")
|
||||
@pytest.mark.skip_if_platform("suse", reason="test is Fedora specific")
|
||||
def test_user_group_daemon():
|
||||
# daemon user / group are always defined
|
||||
user = User("daemon")
|
||||
assert user == "daemon"
|
||||
assert user.uid == 2
|
||||
assert user.pgid == 2
|
||||
|
||||
group = Group("daemon")
|
||||
assert group == "daemon"
|
||||
assert group.gid == 2
|
@ -24,7 +24,6 @@ Test the `ipapython/ipautil.py` module.
|
||||
from __future__ import absolute_import
|
||||
|
||||
import os
|
||||
import pwd
|
||||
import socket
|
||||
import sys
|
||||
import tempfile
|
||||
@ -530,14 +529,13 @@ def test_run_runas():
|
||||
The test is using 'ipaapi' user as it is configured when
|
||||
ipa-server-common package is installed.
|
||||
"""
|
||||
user = pwd.getpwnam(IPAAPI_USER)
|
||||
res = ipautil.run(['/usr/bin/id', '-u'], runas=IPAAPI_USER)
|
||||
assert res.returncode == 0
|
||||
assert res.raw_output == b'%d\n' % user.pw_uid
|
||||
assert res.raw_output == b'%d\n' % IPAAPI_USER.uid
|
||||
|
||||
res = ipautil.run(['/usr/bin/id', '-g'], runas=IPAAPI_USER)
|
||||
assert res.returncode == 0
|
||||
assert res.raw_output == b'%d\n' % user.pw_gid
|
||||
assert res.raw_output == b'%d\n' % IPAAPI_USER.pgid
|
||||
|
||||
|
||||
@pytest.fixture(scope='function')
|
||||
|
Loading…
Reference in New Issue
Block a user