Add user and group wrappers

New classes for user and group names provide a convenient way to access
the uid and primary gid of a user / gid of a group. The classes also
provide chown() and chgrp() methods to simplify common operations.

The wrappers are subclasses of builtin str type and behave like ordinary
strings with additional features. The pwd and grp structs are retrieved
once and then cached.

Signed-off-by: Christian Heimes <cheimes@redhat.com>
Reviewed-By: Rob Crittenden <rcritten@redhat.com>
This commit is contained in:
Christian Heimes
2020-09-11 12:22:02 +02:00
committed by Rob Crittenden
parent 99a40cbbe9
commit 72fb4e60c8
21 changed files with 215 additions and 181 deletions

View File

@@ -23,7 +23,6 @@ from __future__ import print_function
import logging
import tempfile
import os
import pwd
import netaddr
import re
import shutil
@@ -993,8 +992,7 @@ class BindInstance(service.Service):
dns_principal = p
# Make sure access is strictly reserved to the named user
pent = pwd.getpwnam(self.service_user)
os.chown(self.keytab, pent.pw_uid, pent.pw_gid)
self.service_user.chown(self.keytab)
os.chmod(self.keytab, 0o400)
# modify the principal so that it is marked as an ipa service so that
@@ -1040,7 +1038,7 @@ class BindInstance(service.Service):
"""
# files are owned by root:named and are readable by user and group
uid = 0
gid = pwd.getpwnam(constants.NAMED_USER).pw_gid
gid = constants.NAMED_GROUP.gid
mode = 0o640
changed = False

View File

@@ -28,8 +28,6 @@ import logging
import dbus
import ldap
import os
import pwd
import grp
import re
import shutil
import sys
@@ -44,7 +42,6 @@ from ipalib import errors
import ipalib.constants
from ipalib.install import certmonger
from ipaplatform import services
from ipaplatform.constants import constants
from ipaplatform.paths import paths
from ipaplatform.tasks import tasks
@@ -534,8 +531,7 @@ class CAInstance(DogtagInstance):
# so remove the file first
ipautil.remove_file(paths.TMP_CA_P12)
shutil.copy(cafile, paths.TMP_CA_P12)
pent = pwd.getpwnam(self.service_user)
os.chown(paths.TMP_CA_P12, pent.pw_uid, pent.pw_gid)
self.service_user.chown(paths.TMP_CA_P12)
self._configure_clone(
cfg,
@@ -595,11 +591,10 @@ class CAInstance(DogtagInstance):
config = self._create_spawn_config(cfg)
self.set_hsm_state(config)
pent = pwd.getpwnam(self.service_user)
with tempfile.NamedTemporaryFile('w') as f:
config.write(f)
f.flush()
os.fchown(f.fileno(), pent.pw_uid, pent.pw_gid)
self.service_user.chown(f.fileno())
self.backup_state('installed', True)
@@ -682,8 +677,7 @@ class CAInstance(DogtagInstance):
'ca.enableNonces=false')
if update_result != 0:
raise RuntimeError("Disabling nonces failed")
pent = pwd.getpwnam(self.service_user)
os.chown(self.config, pent.pw_uid, pent.pw_gid)
self.service_user.chown(self.config)
def enable_pkix(self):
directivesetter.set_directive(paths.SYSCONFIG_PKI_TOMCAT,
@@ -732,9 +726,9 @@ class CAInstance(DogtagInstance):
"""
Sets the correct permissions for the RA_AGENT_PEM, RA_AGENT_KEY files
"""
ipaapi_gid = grp.getgrnam(ipalib.constants.IPAAPI_GROUP).gr_gid
group = ipalib.constants.IPAAPI_GROUP
for fname in (paths.RA_AGENT_PEM, paths.RA_AGENT_KEY):
os.chown(fname, -1, ipaapi_gid)
group.chgrp(fname)
os.chmod(fname, 0o440)
tasks.restore_context(fname)
@@ -913,8 +907,7 @@ class CAInstance(DogtagInstance):
os.mkdir(publishdir)
os.chmod(publishdir, 0o775)
pent = pwd.getpwnam(self.service_user)
os.chown(publishdir, 0, pent.pw_gid)
os.chown(publishdir, 0, self.service_user.pgid)
tasks.restore_context(publishdir)
@@ -1294,8 +1287,6 @@ class CAInstance(DogtagInstance):
sysupgrade.set_upgrade_state('dogtag', LWCA_KEY_RETRIEVAL, True)
def __setup_lightweight_ca_key_retrieval_kerberos(self):
pent = pwd.getpwnam(self.service_user)
logger.debug('Creating principal')
installutils.kadmin_addprinc(self.principal)
self.suffix = ipautil.realm_to_suffix(self.realm)
@@ -1304,11 +1295,9 @@ class CAInstance(DogtagInstance):
logger.debug('Retrieving keytab')
installutils.create_keytab(self.keytab, self.principal)
os.chmod(self.keytab, 0o600)
os.chown(self.keytab, pent.pw_uid, pent.pw_gid)
self.service_user.chown(self.keytab)
def __setup_lightweight_ca_key_retrieval_custodia(self):
pent = pwd.getpwnam(self.service_user)
logger.debug('Creating Custodia keys')
custodia_basedn = DN(
('cn', 'custodia'), ('cn', 'ipa'), ('cn', 'etc'), api.env.basedn)
@@ -1326,7 +1315,7 @@ class CAInstance(DogtagInstance):
keystore = IPAKEMKeys({'server_keys': keyfile})
keystore.generate_keys(self.service_prefix)
os.chmod(keyfile, 0o600)
os.chown(keyfile, pent.pw_uid, pent.pw_gid)
self.service_user.chown(keyfile)
def __remove_lightweight_ca_key_retrieval_custodia(self):
keyfile = os.path.join(paths.PKI_TOMCAT,
@@ -1573,8 +1562,7 @@ class CAInstance(DogtagInstance):
with open(target, 'w') as f:
f.write(filled)
os.fchmod(f.fileno(), 0o600)
pent = pwd.getpwnam(constants.PKI_USER)
os.fchown(f.fileno(), pent.pw_uid, pent.pw_gid)
self.service_user.chown(f.fileno())
# deploy ACME Tomcat application
ipautil.run(['pki-server', 'acme-deploy'])

View File

@@ -20,7 +20,6 @@ from jwcrypto.common import json_decode
import os
import stat
import time
import pwd
logger = logging.getLogger(__name__)
@@ -110,15 +109,14 @@ class CustodiaInstance(SimpleServiceInstance):
def __config_file(self):
template_file = os.path.basename(self.config_file) + '.template'
template = os.path.join(paths.USR_SHARE_IPA_DIR, template_file)
httpd_info = pwd.getpwnam(constants.HTTPD_USER)
sub_dict = dict(
IPA_CUSTODIA_CONF_DIR=paths.IPA_CUSTODIA_CONF_DIR,
IPA_CUSTODIA_KEYS=paths.IPA_CUSTODIA_KEYS,
IPA_CUSTODIA_SOCKET=paths.IPA_CUSTODIA_SOCKET,
IPA_CUSTODIA_AUDIT_LOG=paths.IPA_CUSTODIA_AUDIT_LOG,
LDAP_URI=ipaldap.realm_to_ldapi_uri(self.realm),
UID=httpd_info.pw_uid,
GID=httpd_info.pw_gid
UID=constants.HTTPD_USER.uid,
GID=constants.HTTPD_USER.pgid
)
conf = ipautil.template_file(template, sub_dict)
with open(self.config_file, "w") as f:

View File

@@ -5,10 +5,8 @@
from __future__ import print_function, absolute_import
import errno
import grp
import logging
import os
import pwd
import re
import shutil
import stat
@@ -57,10 +55,10 @@ class DNSKeySyncInstance(service.Service):
keytab=paths.IPA_DNSKEYSYNCD_KEYTAB
)
self.extra_config = [u'dnssecVersion 1', ] # DNSSEC enabled
self.named_uid = self.__get_named_uid()
self.named_gid = self.__get_named_gid()
self.ods_uid = self.__get_ods_uid()
self.ods_gid = self.__get_ods_gid()
self.named_uid = constants.NAMED_USER.uid
self.named_gid = constants.NAMED_GROUP.gid
self.ods_uid = constants.ODS_USER.uid
self.ods_gid = constants.ODS_GROUP.gid
suffix = ipautil.dn_attribute_property('_suffix')
@@ -116,30 +114,6 @@ class DNSKeySyncInstance(service.Service):
# we need restart named after setting up this service
self.start_creation()
def __get_named_uid(self):
try:
return pwd.getpwnam(constants.NAMED_USER).pw_uid
except KeyError:
raise RuntimeError("Named UID not found")
def __get_named_gid(self):
try:
return grp.getgrnam(constants.NAMED_GROUP).gr_gid
except KeyError:
raise RuntimeError("Named GID not found")
def __get_ods_uid(self):
try:
return pwd.getpwnam(constants.ODS_USER).pw_uid
except KeyError:
raise RuntimeError("OpenDNSSEC UID not found")
def __get_ods_gid(self):
try:
return grp.getgrnam(constants.ODS_GROUP).gr_gid
except KeyError:
raise RuntimeError("OpenDNSSEC GID not found")
def __check_dnssec_status(self):
if not dns_container_exists(self.suffix):
raise RuntimeError("DNS container does not exist")

View File

@@ -30,7 +30,6 @@ import shutil
import traceback
import dbus
import re
import pwd
import lxml.etree
from configparser import DEFAULTSECT, ConfigParser, RawConfigParser
@@ -363,11 +362,10 @@ class DogtagInstance(service.Service):
connector.attrib[secretattr] = self.ajp_secret
if rewrite:
pent = pwd.getpwnam(constants.PKI_USER)
with open(paths.PKI_TOMCAT_SERVER_XML, "wb") as fd:
server_xml.write(fd, pretty_print=True, encoding="utf-8")
os.fchmod(fd.fileno(), 0o660)
os.fchown(fd.fileno(), pent.pw_uid, pent.pw_gid)
self.service_user.chown(fd.fileno())
def http_proxy(self):
""" Update the http proxy file """

View File

@@ -22,7 +22,6 @@ from __future__ import print_function, absolute_import
import logging
import shutil
import pwd
import os
import time
import tempfile
@@ -630,14 +629,13 @@ class DsInstance(service.Service):
logger.debug("Failed to clean temporary file: %s", e)
def __add_default_schemas(self):
pent = pwd.getpwnam(DS_USER)
for schema_fname in IPA_SCHEMA_FILES:
target_fname = schema_dirname(self.serverid) + schema_fname
shutil.copyfile(
os.path.join(paths.USR_SHARE_IPA_DIR, schema_fname),
target_fname)
os.chmod(target_fname, 0o440) # read access for dirsrv user/group
os.chown(target_fname, pent.pw_uid, pent.pw_gid)
DS_USER.chown(target_fname)
try:
shutil.move(schema_dirname(self.serverid) + "05rfc2247.ldif",
@@ -648,7 +646,7 @@ class DsInstance(service.Service):
os.path.join(paths.USR_SHARE_IPA_DIR, "05rfc2247.ldif"),
target_fname)
os.chmod(target_fname, 0o440)
os.chown(target_fname, pent.pw_uid, pent.pw_gid)
DS_USER.chown(target_fname)
except IOError:
# Does not apply with newer DS releases
pass
@@ -772,13 +770,12 @@ class DsInstance(service.Service):
self._ldap_mod("repoint-managed-entries.ldif", self.sub_dict)
def configure_systemd_ipa_env(self):
pent = pwd.getpwnam(platformconstants.DS_USER)
template = os.path.join(
paths.USR_SHARE_IPA_DIR, "ds-ipa-env.conf.template"
)
sub_dict = dict(
KRB5_KTNAME=paths.DS_KEYTAB,
KRB5CCNAME=paths.TMP_KRB5CC % pent.pw_uid
KRB5CCNAME=paths.TMP_KRB5CC % platformconstants.DS_USER.uid
)
conf = ipautil.template_file(template, sub_dict)

View File

@@ -26,7 +26,6 @@ import shutil
import sys
import tempfile
import time
import pwd
import six
@@ -295,14 +294,12 @@ class Backup(admintool.AdminTool):
logger.info("Preparing backup on %s", api.env.host)
pent = pwd.getpwnam(constants.DS_USER)
self.top_dir = tempfile.mkdtemp("ipa")
os.chown(self.top_dir, pent.pw_uid, pent.pw_gid)
constants.DS_USER.chown(self.top_dir)
os.chmod(self.top_dir, 0o750)
self.dir = os.path.join(self.top_dir, "ipa")
os.mkdir(self.dir, 0o750)
os.chown(self.dir, pent.pw_uid, pent.pw_gid)
constants.DS_USER.chown(self.dir)
self.tarfile = None
self.header = os.path.join(self.top_dir, 'header')

View File

@@ -26,7 +26,6 @@ import shutil
import sys
import tempfile
import time
import pwd
import ldif
import itertools
@@ -346,16 +345,14 @@ class Restore(admintool.AdminTool):
)
)
pent = pwd.getpwnam(constants.DS_USER)
# Temporary directory for decrypting files before restoring
self.top_dir = tempfile.mkdtemp("ipa")
os.chown(self.top_dir, pent.pw_uid, pent.pw_gid)
constants.DS_USER.chown(self.top_dir)
os.chmod(self.top_dir, 0o750)
self.dir = os.path.join(self.top_dir, "ipa")
os.mkdir(self.dir)
os.chmod(self.dir, 0o750)
os.chown(self.dir, pent.pw_uid, pent.pw_gid)
constants.DS_USER.chown(self.dir)
logger.info("Temporary setting umask to 022")
old_umask = os.umask(0o022)
@@ -590,10 +587,9 @@ class Restore(admintool.AdminTool):
srcldiffile = os.path.join(self.dir, ldifname)
if not os.path.exists(ldifdir):
pent = pwd.getpwnam(constants.DS_USER)
os.mkdir(ldifdir)
os.chmod(ldifdir, 0o770)
os.chown(ldifdir, pent.pw_uid, pent.pw_gid)
constants.DS_USER.chown(ldifdir)
ipautil.backup_file(ldiffile)
with open(ldiffile, 'w') as out_file:
@@ -603,8 +599,7 @@ class Restore(admintool.AdminTool):
ldif_parser.parse()
# Make sure the modified ldiffile is owned by DS_USER
pent = pwd.getpwnam(constants.DS_USER)
os.chown(ldiffile, pent.pw_uid, pent.pw_gid)
constants.DS_USER.chown(ldiffile)
if online:
conn = self.get_connection()
@@ -634,7 +629,7 @@ class Restore(admintool.AdminTool):
except OSError as e:
pass
os.chown(template_dir, pent.pw_uid, pent.pw_gid)
constants.DS_USER.chown(template_dir)
os.chmod(template_dir, 0o770)
# Restore SELinux context of template_dir
@@ -825,9 +820,10 @@ class Restore(admintool.AdminTool):
]
run(args, cwd=self.dir)
pent = pwd.getpwnam(constants.DS_USER)
os.chown(self.top_dir, pent.pw_uid, pent.pw_gid)
recursive_chown(self.dir, pent.pw_uid, pent.pw_gid)
constants.DS_USER.chown(self.top_dir)
recursive_chown(
self.dir, constants.DS_USER.uid, constants.DS_USER.pgid
)
if encrypt:
# We can remove the decoded tarball
@@ -851,7 +847,7 @@ class Restore(admintool.AdminTool):
paths.TOMCAT_SIGNEDAUDIT_DIR]
try:
pent = pwd.getpwnam(constants.PKI_USER)
pent = constants.PKI_USER.entity
except KeyError:
logger.debug("No %s user exists, skipping CA directory creation",
constants.PKI_USER)

View File

@@ -21,7 +21,6 @@ from __future__ import absolute_import
import logging
import os
import pwd
import shutil
import tempfile
import base64
@@ -183,8 +182,7 @@ class KRAInstance(DogtagInstance):
if self.clone:
krafile = self.pkcs12_info[0]
shutil.copy(krafile, p12_tmpfile_name)
pent = pwd.getpwnam(self.service_user)
os.chown(p12_tmpfile_name, pent.pw_uid, pent.pw_gid)
self.service_user.chown(p12_tmpfile_name)
self._configure_clone(
cfg,
@@ -207,11 +205,10 @@ class KRAInstance(DogtagInstance):
)
# Generate configuration file
pent = pwd.getpwnam(self.service_user)
config = self._create_spawn_config(cfg)
with tempfile.NamedTemporaryFile('w', delete=False) as f:
config.write(f)
os.fchown(f.fileno(), pent.pw_uid, pent.pw_gid)
self.service_user.chown(f.fileno())
cfg_file = f.name
nolog_list = [

View File

@@ -22,7 +22,6 @@ from __future__ import print_function
import logging
import os
import pwd
import socket
import dbus
@@ -384,8 +383,7 @@ class KrbInstance(service.Service):
self.fstore.backup_file(paths.DS_KEYTAB)
installutils.create_keytab(paths.DS_KEYTAB, ldap_principal)
pent = pwd.getpwnam(constants.DS_USER)
os.chown(paths.DS_KEYTAB, pent.pw_uid, pent.pw_gid)
constants.DS_USER.chown(paths.DS_KEYTAB)
def __create_host_keytab(self):
host_principal = "host/" + self.fqdn + "@" + self.realm

View File

@@ -6,8 +6,6 @@ from __future__ import absolute_import
import logging
import os
import pwd
import grp
import ldap
@@ -33,8 +31,8 @@ class ODSExporterInstance(service.Service):
keytab=paths.IPA_ODS_EXPORTER_KEYTAB,
service_prefix=u'ipa-ods-exporter'
)
self.ods_uid = None
self.ods_gid = None
self.ods_uid = constants.ODS_USER.uid
self.ods_gid = constants.ODS_GROUP.gid
self.enable_if_exists = False
suffix = ipautil.dn_attribute_property('_suffix')
@@ -51,8 +49,6 @@ class ODSExporterInstance(service.Service):
except Exception:
pass
# checking status step must be first
self.step("checking status", self.__check_dnssec_status)
self.step("setting up DNS Key Exporter", self.__setup_key_exporter)
self.step("setting up kerberos principal", self.__setup_principal)
self.step("disabling default signer daemon", self.__disable_signerd)
@@ -60,17 +56,6 @@ class ODSExporterInstance(service.Service):
self.step("configuring DNS Key Exporter to start on boot", self.__enable)
self.start_creation()
def __check_dnssec_status(self):
try:
self.ods_uid = pwd.getpwnam(constants.ODS_USER).pw_uid
except KeyError:
raise RuntimeError("OpenDNSSEC UID not found")
try:
self.ods_gid = grp.getgrnam(constants.ODS_GROUP).gr_gid
except KeyError:
raise RuntimeError("OpenDNSSEC GID not found")
def __enable(self):
try:

View File

@@ -6,8 +6,6 @@ from __future__ import absolute_import
import logging
import os
import pwd
import grp
import stat
import shutil
from subprocess import CalledProcessError
@@ -69,8 +67,9 @@ class OpenDNSSECInstance(service.Service):
self, "ods-enforcerd",
service_desc="OpenDNSSEC enforcer daemon",
)
self.ods_uid = None
self.ods_gid = None
self.named_gid = constants.NAMED_GROUP.gid
self.ods_uid = constants.ODS_USER.uid
self.ods_gid = constants.ODS_GROUP.gid
self.conf_file_dict = {
'SOFTHSM_LIB': paths.LIBSOFTHSM2_SO,
'TOKEN_LABEL': SOFTHSM_DNSSEC_TOKEN_LABEL,
@@ -107,8 +106,6 @@ class OpenDNSSECInstance(service.Service):
except Exception:
pass
# checking status must be first
self.step("checking status", self.__check_dnssec_status)
self.step("setting up configuration files", self.__setup_conf_files)
self.step("setting up ownership and file mode bits", self.__setup_ownership_file_modes)
if generate_master_key:
@@ -119,27 +116,6 @@ class OpenDNSSECInstance(service.Service):
self.step("configuring OpenDNSSEC enforcer to start on boot", self.__enable)
self.start_creation()
def __check_dnssec_status(self):
try:
self.named_uid = pwd.getpwnam(constants.NAMED_USER).pw_uid
except KeyError:
raise RuntimeError("Named UID not found")
try:
self.named_gid = grp.getgrnam(constants.NAMED_GROUP).gr_gid
except KeyError:
raise RuntimeError("Named GID not found")
try:
self.ods_uid = pwd.getpwnam(constants.ODS_USER).pw_uid
except KeyError:
raise RuntimeError("OpenDNSSEC UID not found")
try:
self.ods_gid = grp.getgrnam(constants.ODS_GROUP).gr_gid
except KeyError:
raise RuntimeError("OpenDNSSEC GID not found")
def __enable(self):
try:
self.ldap_configure('DNSSEC', self.fqdn, None,

View File

@@ -11,7 +11,6 @@ import re
import os
import glob
import shutil
import pwd
import fileinput
import ssl
import stat
@@ -755,8 +754,7 @@ def copy_crl_file(old_path, new_path=None):
os.symlink(realpath, new_path)
else:
shutil.copy2(old_path, new_path)
pent = pwd.getpwnam(constants.PKI_USER)
os.chown(new_path, pent.pw_uid, pent.pw_gid)
constants.PKI_USER.chown(new_path)
tasks.restore_context(new_path)
@@ -1104,8 +1102,7 @@ def update_http_keytab(http):
'Cannot remove file %s (%s). Please remove the file manually.',
paths.OLD_IPA_KEYTAB, e
)
pent = pwd.getpwnam(http.keytab_user)
os.chown(http.keytab, pent.pw_uid, pent.pw_gid)
http.keytab_user.chown(http.keytab)
def ds_enable_sidgen_extdom_plugins(ds):

View File

@@ -22,7 +22,6 @@ from __future__ import absolute_import
import logging
import sys
import os
import pwd
import socket
import time
import traceback
@@ -758,9 +757,7 @@ class Service:
keytab = self.keytab
if owner is None:
owner = self.service_user
pent = pwd.getpwnam(owner)
os.chown(keytab, pent.pw_uid, pent.pw_gid)
owner.chown(keytab)
def run_getkeytab(self, ldap_uri, keytab, principal, retrieve=False):
"""