Use os.path.isfile() and isdir()

Replace custom file_exists() and dir_exists() functions with proper
functions from Python's stdlib.

The change also gets rid of pylint's invalid bad-python3-import error,
https://github.com/PyCQA/pylint/issues/1565

Signed-off-by: Christian Heimes <cheimes@redhat.com>
Reviewed-By: Stanislav Laznicka <slaznick@redhat.com>
This commit is contained in:
Christian Heimes 2017-10-20 11:10:20 +02:00 committed by Stanislav Laznicka
parent fad88b358b
commit b29db07c3b
26 changed files with 57 additions and 78 deletions

View File

@ -159,7 +159,7 @@ def install_replica(safe_options, options, filename):
else:
if filename is None:
sys.exit("A replica file is required")
if not ipautil.file_exists(filename):
if not os.path.isfile(filename):
sys.exit("Replica file %s does not exist" % filename)
if not options.promote:

View File

@ -113,7 +113,7 @@ def parse_options():
parser.error("You must specify at least one option: "
"--forwarder or --no-forwarders or --auto-forwarders")
if options.kasp_db_file and not ipautil.file_exists(options.kasp_db_file):
if options.kasp_db_file and not os.path.isfile(options.kasp_db_file):
parser.error("File %s does not exist" % options.kasp_db_file)
if options.dm_password:

View File

@ -54,8 +54,6 @@ from ipapython.install.core import group, knob, extend_knob
from ipapython.install.common import step
from ipapython.ipautil import (
CalledProcessError,
dir_exists,
file_exists,
realm_to_suffix,
run,
user_input,
@ -192,7 +190,7 @@ def nssldap_exists():
for file_type in ['mandatory', 'optional']:
try:
for filename in function[file_type]:
if file_exists(filename):
if os.path.isfile(filename):
files_found[function['function']].append(filename)
if file_type == 'mandatory':
retval = True
@ -605,7 +603,7 @@ def hardcode_ldap_server(cli_server):
DNS Discovery didn't return a valid IPA server, hardcode a value into
the file instead.
"""
if not file_exists(paths.LDAP_CONF):
if not os.path.isfile(paths.LDAP_CONF):
return
ldapconf = IPAChangeConf("IPA Installer")
@ -859,8 +857,8 @@ def configure_sssd_conf(
sssd_enable_service(sssdconfig, 'ifp')
if (
(options.conf_ssh and file_exists(paths.SSH_CONFIG)) or
(options.conf_sshd and file_exists(paths.SSHD_CONFIG))
(options.conf_ssh and os.path.isfile(paths.SSH_CONFIG)) or
(options.conf_sshd and os.path.isfile(paths.SSHD_CONFIG))
):
try:
sssdconfig.new_service('ssh')
@ -1032,7 +1030,7 @@ def change_ssh_config(filename, changes, sections):
def configure_ssh_config(fstore, options):
if not file_exists(paths.SSH_CONFIG):
if not os.path.isfile(paths.SSH_CONFIG):
logger.info("%s not found, skipping configuration", paths.SSH_CONFIG)
return
@ -1040,7 +1038,7 @@ def configure_ssh_config(fstore, options):
changes = {'PubkeyAuthentication': 'yes'}
if options.sssd and file_exists(paths.SSS_SSH_KNOWNHOSTSPROXY):
if options.sssd and os.path.isfile(paths.SSS_SSH_KNOWNHOSTSPROXY):
changes[
'ProxyCommand'] = '%s -p %%p %%h' % paths.SSS_SSH_KNOWNHOSTSPROXY
changes['GlobalKnownHostsFile'] = paths.SSSD_PUBCONF_KNOWN_HOSTS
@ -1055,7 +1053,7 @@ def configure_ssh_config(fstore, options):
def configure_sshd_config(fstore, options):
sshd = services.knownservices.sshd
if not file_exists(paths.SSHD_CONFIG):
if not os.path.isfile(paths.SSHD_CONFIG):
logger.info("%s not found, skipping configuration", paths.SSHD_CONFIG)
return
@ -1069,7 +1067,7 @@ def configure_sshd_config(fstore, options):
'ChallengeResponseAuthentication': 'yes',
}
if options.sssd and file_exists(paths.SSS_SSH_AUTHORIZEDKEYS):
if options.sssd and os.path.isfile(paths.SSS_SSH_AUTHORIZEDKEYS):
authorized_keys_changes = None
candidates = (
@ -1875,19 +1873,19 @@ def configure_firefox(options, statestore, domain):
if options.firefox_dir is not None:
pref_path = os.path.join(options.firefox_dir,
FIREFOX_PREFERENCES_REL_PATH)
if dir_exists(pref_path):
if os.path.isdir(pref_path):
preferences_dir = pref_path
else:
logger.error("Directory '%s' does not exists.", pref_path)
else:
# test if firefox is installed
if file_exists(paths.FIREFOX):
if os.path.isfile(paths.FIREFOX):
# find valid preferences path
for path in [paths.LIB_FIREFOX, paths.LIB64_FIREFOX]:
pref_path = os.path.join(path,
FIREFOX_PREFERENCES_REL_PATH)
if dir_exists(pref_path):
if os.path.isdir(pref_path):
preferences_dir = pref_path
break
else:
@ -3285,7 +3283,7 @@ def uninstall(options):
preferences_fname = statestore.restore_state(
'firefox', 'preferences_fname')
if preferences_fname is not None:
if file_exists(preferences_fname):
if os.path.isfile(preferences_fname):
try:
os.remove(preferences_fname)
except Exception as e:

View File

@ -515,7 +515,7 @@ class API(ReadOnly):
Add global options to an optparse.OptionParser instance.
"""
def config_file_callback(option, opt, value, parser):
if not ipautil.file_exists(value):
if not os.path.isfile(value):
parser.error(
_("%(filename)s: file not found") % dict(filename=value))

View File

@ -424,7 +424,7 @@ class SystemdService(PlatformService):
self.service_instance(instance_name))
try:
if not ipautil.dir_exists(srv_tgt):
if not os.path.isdir(srv_tgt):
os.mkdir(srv_tgt)
os.chmod(srv_tgt, 0o755)
if os.path.exists(srv_lnk):
@ -459,7 +459,7 @@ class SystemdService(PlatformService):
self.service_instance(instance_name))
try:
if ipautil.dir_exists(srv_tgt):
if os.path.isdir(srv_tgt):
if os.path.islink(srv_lnk):
os.unlink(srv_lnk)
ipautil.run([paths.SYSTEMCTL, "--system", "daemon-reload"])

View File

@ -29,7 +29,6 @@ import math
import os
import sys
import copy
import stat
import shutil
import socket
import re
@ -543,31 +542,16 @@ def nolog_replace(string, nolog):
return string
def file_exists(filename):
try:
mode = os.stat(filename)[stat.ST_MODE]
return bool(stat.S_ISREG(mode))
except Exception:
return False
def dir_exists(filename):
try:
mode = os.stat(filename)[stat.ST_MODE]
return bool(stat.S_ISDIR(mode))
except Exception:
return False
def install_file(fname, dest):
# SELinux: use copy to keep the right context
if file_exists(dest):
if os.path.isfile(dest):
os.rename(dest, dest + ".orig")
shutil.copy(fname, dest)
os.remove(fname)
def backup_file(fname):
if file_exists(fname):
if os.path.isfile(fname):
os.rename(fname, fname + ".orig")

View File

@ -126,7 +126,7 @@ def install_check(standalone, replica_config, options):
raise ScriptError('A selfsign CA can not be added')
cafile = os.path.join(replica_config.dir, 'cacert.p12')
if not options.promote and not ipautil.file_exists(cafile):
if not options.promote and not os.path.isfile(cafile):
raise ScriptError('CA cannot be installed in CA-less setup.')
if standalone and not options.skip_conncheck:
@ -168,7 +168,7 @@ def install_check(standalone, replica_config, options):
raise ScriptError(
"CA is already installed.\nRun the installer with "
"--external-cert-file.")
if ipautil.file_exists(paths.ROOT_IPA_CSR):
if os.path.isfile(paths.ROOT_IPA_CSR):
raise ScriptError(
"CA CSR file %s already exists.\nIn order to continue "
"remove the file and run the installer again." %

View File

@ -804,7 +804,7 @@ class CAInstance(DogtagInstance):
# We need to append the certs to the existing file, so start by
# reading the file
if ipautil.file_exists(paths.IPA_CA_CRT):
if os.path.isfile(paths.IPA_CA_CRT):
ca_certs = x509.load_certificate_list_from_file(paths.IPA_CA_CRT)
certlist.extend(ca_certs)

View File

@ -227,7 +227,7 @@ class CertDB(object):
return self.nssdb.run_certutil(args, stdin, **kwargs)
def create_noise_file(self):
if ipautil.file_exists(self.noise_fname):
if os.path.isfile(self.noise_fname):
os.remove(self.noise_fname)
with open(self.noise_fname, "w") as f:
self.set_perms(f)
@ -565,7 +565,7 @@ class CertDB(object):
def create_from_cacert(self):
cacert_fname = paths.IPA_CA_CRT
if ipautil.file_exists(self.certdb_fname):
if os.path.isfile(self.certdb_fname):
# We already have a cert db, see if it is for the same CA.
# If it is we leave things as they are.
with open(cacert_fname, "r") as f:

View File

@ -11,6 +11,7 @@ from __future__ import print_function
import enum
import logging
import os
# absolute import is necessary because IPA module dns clashes with python-dns
from dns import resolver
@ -114,7 +115,7 @@ def install_check(standalone, api, replica, options, hostname):
global reverse_zones
fstore = sysrestore.FileStore(paths.SYSRESTORE)
if not ipautil.file_exists(paths.IPA_DNS_INSTALL):
if not os.path.isfile(paths.IPA_DNS_INSTALL):
raise RuntimeError("Integrated DNS requires '%s' package" %
constants.IPA_DNS_PACKAGE_NAME)

View File

@ -84,7 +84,7 @@ DS_INSTANCE_PREFIX = 'slapd-'
def find_server_root():
if ipautil.dir_exists(paths.USR_LIB_DIRSRV_64):
if os.path.isdir(paths.USR_LIB_DIRSRV_64):
return paths.USR_LIB_DIRSRV_64
else:
return paths.USR_LIB_DIRSRV

View File

@ -523,7 +523,7 @@ def kadmin_modprinc(principal, options):
def create_keytab(path, principal):
try:
if ipautil.file_exists(path):
if os.path.isfile(path):
os.remove(path)
except os.error:
logger.critical("Failed to remove %s.", path)
@ -769,7 +769,7 @@ def read_replica_info(dir_path, rconfig):
def read_replica_info_dogtag_port(config_dir):
portfile = config_dir + "/dogtag_directory_port.txt"
default_port = 7389
if not ipautil.file_exists(portfile):
if not os.path.isfile(portfile):
dogtag_master_ds_port = default_port
else:
with open(portfile) as fd:

View File

@ -21,6 +21,7 @@
from __future__ import print_function
import logging
import os
import sys
import tempfile
from optparse import SUPPRESS_HELP # pylint: disable=deprecated-module
@ -30,7 +31,6 @@ from ipalib import api
from ipalib.constants import DOMAIN_LEVEL_0
from ipaplatform.paths import paths
from ipapython import admintool
from ipapython import ipautil
from ipaserver.install import service
from ipaserver.install import cainstance
from ipaserver.install import krainstance
@ -122,7 +122,7 @@ class KRAInstaller(KRAInstall):
self.option_parser.error("Too many arguments provided")
elif len(self.args) == 1:
self.replica_file = self.args[0]
if not ipautil.file_exists(self.replica_file):
if not os.path.isfile(self.replica_file):
self.option_parser.error(
"Replica file %s does not exist" % self.replica_file)

View File

@ -180,7 +180,7 @@ class ReplicaPrepare(admintool.AdminTool):
config_dir = dsinstance.config_dirname(
installutils.realm_to_serverid(api.env.realm))
if not ipautil.dir_exists(config_dir):
if not os.path.isdir(config_dir):
raise admintool.ScriptError(
"could not find directory instance: %s" % config_dir)
@ -225,7 +225,7 @@ class ReplicaPrepare(admintool.AdminTool):
except errors.DatabaseError as e:
raise admintool.ScriptError(e.desc)
if ca_enabled and not ipautil.file_exists(paths.CA_CS_CFG_PATH):
if ca_enabled and not os.path.isfile(paths.CA_CS_CFG_PATH):
raise admintool.ScriptError(
"CA is not installed on this server. "
"ipa-replica-prepare must be run on an IPA server with CA.")
@ -358,7 +358,7 @@ class ReplicaPrepare(admintool.AdminTool):
logger.info("Copying SSL certificate for the Directory Server")
self.copy_info_file(self.dirsrv_pkcs12_file.name, "dscert.p12")
else:
if ipautil.file_exists(options.ca_file):
if os.path.isfile(options.ca_file):
# Since it is possible that the Directory Manager password
# has changed since ipa-server-install, we need to regenerate
# the CA PKCS#12 file and update the pki admin user password
@ -404,7 +404,7 @@ class ReplicaPrepare(admintool.AdminTool):
logger.info("Copying additional files")
cacert_filename = paths.CACERT_PEM
if ipautil.file_exists(cacert_filename):
if os.path.isfile(cacert_filename):
self.copy_info_file(cacert_filename, "cacert.pem")
self.copy_info_file(paths.IPA_DEFAULT_CONF, "default.conf")
@ -571,7 +571,7 @@ class ReplicaPrepare(admintool.AdminTool):
self.remove_info_file("noise.txt")
orig_filename = passwd_fname + ".orig"
if ipautil.file_exists(orig_filename):
if os.path.isfile(orig_filename):
installutils.remove_file(orig_filename)
except errors.CertificateOperationError as e:
raise admintool.ScriptError(str(e))

View File

@ -100,7 +100,7 @@ def install(api, replica_config, options):
replica_config.dirman_password)
else:
cafile = os.path.join(replica_config.dir, 'cacert.p12')
if not ipautil.file_exists(cafile):
if not os.path.isfile(cafile):
raise RuntimeError(
"Unable to clone KRA."
" cacert.p12 file not found in replica file")

View File

@ -338,7 +338,7 @@ class OpenDNSSECInstance(service.Service):
restore_list = [paths.OPENDNSSEC_CONF_FILE, paths.OPENDNSSEC_KASP_FILE,
paths.SYSCONFIG_ODS, paths.OPENDNSSEC_ZONELIST_FILE]
if ipautil.file_exists(paths.OPENDNSSEC_KASP_DB):
if os.path.isfile(paths.OPENDNSSEC_KASP_DB):
# force to export data
cmd = [paths.IPA_ODS_EXPORTER, 'ipa-full-update']

View File

@ -482,7 +482,7 @@ class ServerInstallInterface(ServerCertificateInstallInterface,
"domain via the --domain option")
else:
if not ipautil.file_exists(self.replica_file):
if not os.path.isfile(self.replica_file):
raise RuntimeError(
"Replica file %s does not exist" % self.replica_file)

View File

@ -100,7 +100,7 @@ def read_cache(dm_password):
"""
Returns a dict of cached answers or empty dict if no cache file exists.
"""
if not ipautil.file_exists(paths.ROOT_IPA_CACHE):
if not os.path.isfile(paths.ROOT_IPA_CACHE):
return {}
top_dir = tempfile.mkdtemp("ipa")
@ -340,7 +340,7 @@ def install_check(installer):
sstore = sysrestore.StateFile(SYSRESTORE_DIR_PATH)
# This will override any settings passed in on the cmdline
if ipautil.file_exists(paths.ROOT_IPA_CACHE):
if os.path.isfile(paths.ROOT_IPA_CACHE):
if options.dm_password is not None:
dm_password = options.dm_password
else:
@ -947,7 +947,7 @@ def install(installer):
print("use a SSL signing certificate. See the IPA documentation for "
"more details.")
if ipautil.file_exists(paths.ROOT_IPA_CACHE):
if os.path.isfile(paths.ROOT_IPA_CACHE):
os.remove(paths.ROOT_IPA_CACHE)

View File

@ -68,7 +68,7 @@ def make_pkcs12_info(directory, cert_name, password_name):
:return: a (full cert path, password) tuple, or None if cert is not found
"""
cert_path = os.path.join(directory, cert_name)
if ipautil.file_exists(cert_path):
if os.path.isfile(cert_path):
password_file = os.path.join(directory, password_name)
password = open(password_file).read().strip()
return cert_path, password
@ -709,7 +709,7 @@ def install_check(installer):
installer._top_dir = config.top_dir
installer._config = config
ca_enabled = ipautil.file_exists(os.path.join(config.dir, "cacert.p12"))
ca_enabled = os.path.isfile(os.path.join(config.dir, "cacert.p12"))
# Create the management framework config file
# Note: We must do this before bootstraping and finalizing ipalib.api
create_ipa_conf(fstore, config, ca_enabled)
@ -720,7 +720,7 @@ def install_check(installer):
installutils.verify_fqdn(config.master_host_name, options.no_host_dns)
cafile = os.path.join(config.dir, "ca.crt")
if not ipautil.file_exists(cafile):
if not os.path.isfile(cafile):
raise RuntimeError("CA cert file is not available. Please run "
"ipa-replica-prepare to create a new replica file.")
@ -1096,7 +1096,7 @@ def promote_check(installer):
ccache)
cafile = paths.IPA_CA_CRT
if not ipautil.file_exists(cafile):
if not os.path.isfile(cafile):
raise RuntimeError("CA cert file is not available! Please reinstall"
"the client and try again.")

View File

@ -1699,7 +1699,7 @@ def upgrade_configuration():
os.path.join(paths.USR_SHARE_IPA_DIR, "ipa-pki-proxy.conf"),
add=True)
else:
if ipautil.file_exists(paths.HTTPD_IPA_PKI_PROXY_CONF):
if os.path.isfile(paths.HTTPD_IPA_PKI_PROXY_CONF):
os.remove(paths.HTTPD_IPA_PKI_PROXY_CONF)
if subject_base:
upgrade_file(

View File

@ -30,7 +30,7 @@ from ipalib import api
from ipalib import errors
from ipaserver.install.ldapupdate import LDAPUpdate, BadSyntax
from ipaserver.install import installutils
from ipapython import ipautil, ipaldap
from ipapython import ipaldap
from ipaplatform.paths import paths
from ipapython.dn import DN
@ -56,7 +56,7 @@ class test_update(unittest.TestCase):
def setUp(self):
fqdn = installutils.get_fqdn()
pwfile = api.env.dot_ipa + os.sep + ".dmpw"
if ipautil.file_exists(pwfile):
if os.path.isfile(pwfile):
fp = open(pwfile, "r")
self.dm_password = fp.read().rstrip()
fp.close()
@ -68,7 +68,7 @@ class test_update(unittest.TestCase):
self.ld.simple_bind(bind_dn=ipaldap.DIRMAN_DN,
bind_password=self.dm_password)
self.testdir = os.path.abspath(os.path.dirname(__file__))
if not ipautil.file_exists(os.path.join(self.testdir,
if not os.path.isfile(os.path.join(self.testdir,
"0_reset.update")):
raise nose.SkipTest("Unable to find test update files")

View File

@ -33,7 +33,6 @@ import pytest
from ipatests.i18n import create_po, po_file_iterate
from ipalib.request import context
from ipalib import text
from ipapython.ipautil import file_exists
if six.PY3:
unicode = str
@ -106,11 +105,11 @@ class test_TestLang(object):
raise nose.SkipTest('Unable to create po file "%s" & mo file "%s" from pot file "%s"' %
(self.po_file, self.mo_file, self.pot_file))
if not file_exists(self.po_file):
if not os.path.isfile(self.po_file):
raise nose.SkipTest(
'Test po file unavailable: {}'.format(self.po_file))
if not file_exists(self.mo_file):
if not os.path.isfile(self.mo_file):
raise nose.SkipTest(
'Test mo file unavailable: {}'.format(self.mo_file))

View File

@ -36,7 +36,6 @@ import six
from ipaplatform.paths import paths
from ipaserver.plugins.ldap2 import ldap2, AUTOBIND_DISABLED
from ipalib import api, create_api, errors
from ipapython import ipautil
from ipapython.dn import DN
if six.PY3:
@ -85,7 +84,7 @@ class test_ldap(object):
Test a simple LDAP bind using ldap2
"""
pwfile = api.env.dot_ipa + os.sep + ".dmpw"
if ipautil.file_exists(pwfile):
if os.path.isfile(pwfile):
with open(pwfile, "r") as fp:
dm_password = fp.read().rstrip()
else:
@ -109,7 +108,7 @@ class test_ldap(object):
myapi.finalize()
pwfile = api.env.dot_ipa + os.sep + ".dmpw"
if ipautil.file_exists(pwfile):
if os.path.isfile(pwfile):
with open(pwfile, "r") as fp:
dm_password = fp.read().rstrip()
else:

View File

@ -6,7 +6,6 @@ import io
import os
from ipaserver.plugins.ldap2 import ldap2
from ipalib import api
from ipapython import ipautil
from ipapython.dn import DN
import pytest
@ -31,7 +30,7 @@ class TestTopologyPlugin(object):
if self.conn and self.conn.isconnected():
self.conn.disconnect()
@pytest.mark.skipif(ipautil.file_exists(pwfile) is False,
@pytest.mark.skipif(os.path.isfile(pwfile) is False,
reason="You did not provide a .dmpw file with the DM password")
def test_topologyplugin(self):
pluginattrs = {

View File

@ -22,7 +22,6 @@ Test the `pkcs10.py` module.
import nose
from ipalib import pkcs10
from ipapython import ipautil
import pytest
import os
import cryptography.x509
@ -36,7 +35,7 @@ class test_update(object):
def setup(self):
self.testdir = os.path.abspath(os.path.dirname(__file__))
if not ipautil.file_exists(os.path.join(self.testdir,
if not os.path.isfile(os.path.join(self.testdir,
"test0.csr")):
raise nose.SkipTest("Unable to find test update files")

View File

@ -58,7 +58,7 @@ def is_db_configured():
aliasdir = api.env.dot_ipa + os.sep + 'alias' + os.sep + '.pwd'
if (api.env.xmlrpc_uri == u'http://localhost:8888/ipa/xml' and
not ipautil.file_exists(aliasdir)):
not os.path.isfile(aliasdir)):
raise nose.SkipTest('developer CA not configured in %s' % aliasdir)
# Test setup