mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-01-24 07:06:37 -06:00
pylint: Fix useless-suppression
Cleanup up no longer used Pylint's disables where possible. Fixes: https://pagure.io/freeipa/issue/9117 Signed-off-by: Stanislav Levin <slev@altlinux.org> Reviewed-By: Rob Crittenden <rcritten@redhat.com>
This commit is contained in:
parent
ac6fe016c6
commit
5a00882eab
@ -59,7 +59,7 @@ def main():
|
||||
rsp.DecodePacket(buf)
|
||||
pkt.VerifyReply(rsp)
|
||||
|
||||
proc.terminate() # pylint: disable=E1101
|
||||
proc.terminate()
|
||||
proc.wait()
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
@ -220,7 +220,7 @@ def request_cert(reuse_existing, **kwargs):
|
||||
sys.stderr.write(result.raw_error_output)
|
||||
else:
|
||||
# Write bytes directly
|
||||
sys.stderr.buffer.write(result.raw_error_output) #pylint: disable=no-member
|
||||
sys.stderr.buffer.write(result.raw_error_output)
|
||||
sys.stderr.flush()
|
||||
|
||||
syslog.syslog(syslog.LOG_NOTICE,
|
||||
|
@ -36,8 +36,8 @@ def run_operation(cmd):
|
||||
|
||||
result = ipautil.run(cmd, raiseonerr=False, env=os.environ)
|
||||
# Write bytes directly
|
||||
sys.stdout.buffer.write(result.raw_output) #pylint: disable=no-member
|
||||
sys.stderr.buffer.write(result.raw_error_output) #pylint: disable=no-member
|
||||
sys.stdout.buffer.write(result.raw_output)
|
||||
sys.stderr.buffer.write(result.raw_error_output)
|
||||
sys.stdout.flush()
|
||||
sys.stderr.flush()
|
||||
|
||||
|
@ -34,7 +34,6 @@ def main():
|
||||
"File '{}' missing or not readable.\n".format(filename)
|
||||
)
|
||||
|
||||
# pylint: disable=no-member
|
||||
client = CustodiaClient(
|
||||
client_service="{}@{}".format(service, env.host),
|
||||
server=args.servername,
|
||||
|
@ -319,7 +319,7 @@ class PortResponder(threading.Thread):
|
||||
logger.debug('%d %s: Stopped listening', port, proto)
|
||||
|
||||
def _is_closing(self):
|
||||
with self._close_lock: # pylint: disable=not-context-manager
|
||||
with self._close_lock:
|
||||
return self._close
|
||||
|
||||
def _bind_to_port(self, port, socket_type):
|
||||
@ -370,7 +370,7 @@ class PortResponder(threading.Thread):
|
||||
def stop(self):
|
||||
logger.debug('Stopping listening thread.')
|
||||
|
||||
with self._close_lock: # pylint: disable=not-context-manager
|
||||
with self._close_lock:
|
||||
self._close = True
|
||||
|
||||
|
||||
|
@ -3667,7 +3667,7 @@ def init(installer):
|
||||
root_logger = logging.getLogger()
|
||||
for handler in root_logger.handlers:
|
||||
if (isinstance(handler, logging.StreamHandler) and
|
||||
handler.stream is sys.stderr): # pylint: disable=no-member
|
||||
handler.stream is sys.stderr):
|
||||
installer.debug = handler.level == logging.DEBUG
|
||||
break
|
||||
else:
|
||||
@ -3749,7 +3749,6 @@ class ClientInstallInterface(hostname_.HostNameInstallInterface,
|
||||
force_join = enroll_only(force_join)
|
||||
|
||||
ntp_servers = knob(
|
||||
# pylint: disable=invalid-sequence-index
|
||||
typing.List[str], None,
|
||||
description="ntp server to use. This option can be used multiple "
|
||||
"times",
|
||||
|
@ -113,9 +113,9 @@ def run_with_args(api):
|
||||
|
||||
update_server(certs)
|
||||
|
||||
# pylint: disable=import-error,ipa-forbidden-import
|
||||
# pylint: disable=ipa-forbidden-import
|
||||
from ipaserver.install import cainstance, custodiainstance
|
||||
# pylint: enable=import-error,ipa-forbidden-import
|
||||
# pylint: enable=ipa-forbidden-import
|
||||
|
||||
# Add LWCA tracking requests. Only execute if *this server*
|
||||
# has CA installed (ca_enabled indicates CA-ful topology).
|
||||
|
@ -37,10 +37,8 @@ except ImportError:
|
||||
from xml.etree import ElementTree as etree
|
||||
import SSSDConfig
|
||||
|
||||
# pylint: disable=import-error
|
||||
from six.moves.urllib.parse import urlsplit
|
||||
|
||||
# pylint: enable=import-error
|
||||
from optparse import OptionParser # pylint: disable=deprecated-module
|
||||
from ipapython import ipachangeconf
|
||||
from ipaclient.install import ipadiscovery
|
||||
|
@ -129,7 +129,6 @@ class HTTPSHandler(urllib.request.HTTPSHandler):
|
||||
return create_https_connection(host, **tmp)
|
||||
|
||||
def https_open(self, req):
|
||||
# pylint: disable=no-member
|
||||
return self.do_open(self.__inner, req)
|
||||
|
||||
@register()
|
||||
@ -156,8 +155,6 @@ class otptoken_sync(Local):
|
||||
segments = list(urllib.parse.urlparse(self.api.env.xmlrpc_uri))
|
||||
assert segments[0] == 'https' # Ensure encryption.
|
||||
segments[2] = segments[2].replace('/xml', '/session/sync_token')
|
||||
# urlunparse *can* take one argument
|
||||
# pylint: disable=too-many-function-args
|
||||
sync_uri = urllib.parse.urlunparse(segments)
|
||||
|
||||
# Prepare the query.
|
||||
@ -170,7 +167,6 @@ class otptoken_sync(Local):
|
||||
query = query.encode('utf-8')
|
||||
|
||||
# Sync the token.
|
||||
# pylint: disable=E1101
|
||||
handler = HTTPSHandler(
|
||||
cafile=api.env.tls_ca_cert,
|
||||
tls_version_min=api.env.tls_version_min,
|
||||
|
@ -109,9 +109,12 @@ class ServerInfo(MutableMapping):
|
||||
|
||||
def get_package(api):
|
||||
if api.env.in_tree:
|
||||
# server packages are not published on pypi.org
|
||||
# pylint: disable=useless-suppression
|
||||
# pylint: disable=import-error,ipa-forbidden-import
|
||||
from ipaserver import plugins
|
||||
# pylint: enable=import-error,ipa-forbidden-import
|
||||
# pylint: enable=useless-suppression
|
||||
else:
|
||||
try:
|
||||
plugins = api._remote_plugins
|
||||
|
@ -526,7 +526,7 @@ class Schema:
|
||||
def get_help(self, namespace, member):
|
||||
if isinstance(self._help, bytes):
|
||||
self._help = json.loads(
|
||||
self._help.decode('utf-8') # pylint: disable=no-member
|
||||
self._help.decode('utf-8')
|
||||
)
|
||||
|
||||
return self._help[namespace][member]
|
||||
@ -585,7 +585,7 @@ def get_package(server_info, client):
|
||||
for plugin_cls in (_SchemaCommandPlugin, _SchemaObjectPlugin):
|
||||
for full_name in schema[plugin_cls.schema_key]:
|
||||
plugin = plugin_cls(schema, str(full_name))
|
||||
plugin = module.register()(plugin) # pylint: disable=no-member
|
||||
plugin = module.register()(plugin)
|
||||
sys.modules[module_name] = module
|
||||
|
||||
for full_name, topic in six.iteritems(schema['topics']):
|
||||
|
@ -890,7 +890,7 @@ def _enable_warnings(error=False):
|
||||
import warnings
|
||||
|
||||
# get reference to Py_BytesWarningFlag from Python CAPI
|
||||
byteswarnings = ctypes.c_int.in_dll( # pylint: disable=no-member
|
||||
byteswarnings = ctypes.c_int.in_dll(
|
||||
ctypes.pythonapi, 'Py_BytesWarningFlag')
|
||||
|
||||
if byteswarnings.value >= 2:
|
||||
@ -936,9 +936,12 @@ class API(plugable.API):
|
||||
@property
|
||||
def packages(self):
|
||||
if self.env.in_server:
|
||||
# server packages are not published on pypi.org
|
||||
# pylint: disable=useless-suppression
|
||||
# pylint: disable=import-error,ipa-forbidden-import
|
||||
import ipaserver.plugins
|
||||
# pylint: enable=import-error,ipa-forbidden-import
|
||||
# pylint: enable=useless-suppression
|
||||
result = (
|
||||
ipaserver.plugins,
|
||||
)
|
||||
@ -951,9 +954,12 @@ class API(plugable.API):
|
||||
)
|
||||
|
||||
if self.env.context in ('installer', 'updates'):
|
||||
# server packages are not published on pypi.org
|
||||
# pylint: disable=useless-suppression
|
||||
# pylint: disable=import-error,ipa-forbidden-import
|
||||
import ipaserver.install.plugins
|
||||
# pylint: enable=import-error,ipa-forbidden-import
|
||||
# pylint: enable=useless-suppression
|
||||
result += (ipaserver.install.plugins,)
|
||||
|
||||
return result
|
||||
|
@ -139,7 +139,7 @@ class Executioner(Backend):
|
||||
if _name not in self.Command:
|
||||
raise CommandError(name=_name)
|
||||
return self.Command[_name](*args, **options)
|
||||
except PublicError: # pylint: disable=try-except-raise
|
||||
except PublicError:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
|
@ -68,7 +68,7 @@ from ipalib.errors import (PublicError, CommandError, HelpError, InternalError,
|
||||
from ipalib.constants import CLI_TAB, LDAP_GENERALIZED_TIME_FORMAT
|
||||
from ipalib.parameters import File, BinaryFile, Str, Enum, Any, Flag
|
||||
from ipalib.text import _
|
||||
from ipalib import api # pylint: disable=unused-import
|
||||
from ipalib import api
|
||||
from ipapython.dnsutil import DNSName
|
||||
from ipapython.admintool import ScriptError
|
||||
|
||||
@ -609,7 +609,7 @@ class textui(backend.Backend):
|
||||
prompt = u'%s Yes/No: ' % label
|
||||
|
||||
while True:
|
||||
data = self.prompt_helper(prompt, label).lower() #pylint: disable=E1103
|
||||
data = self.prompt_helper(prompt, label).lower()
|
||||
|
||||
if data in (u'yes', u'y'):
|
||||
return True
|
||||
@ -680,9 +680,9 @@ class textui(backend.Backend):
|
||||
except EOFError:
|
||||
return -2
|
||||
|
||||
if resp.lower() == "q": #pylint: disable=E1103
|
||||
if resp.lower() == "q":
|
||||
return -2
|
||||
if resp.lower() == "a": #pylint: disable=E1103
|
||||
if resp.lower() == "a":
|
||||
return -1
|
||||
try:
|
||||
selection = int(resp) - 1
|
||||
@ -1411,9 +1411,7 @@ class cli(backend.Executioner):
|
||||
elif p.stdin_if_missing:
|
||||
try:
|
||||
if six.PY3 and p.type is bytes:
|
||||
# pylint: disable=no-member
|
||||
raw = sys.stdin.buffer.read()
|
||||
# pylint: enable=no-member
|
||||
else:
|
||||
raw = sys.stdin.read()
|
||||
except IOError as e:
|
||||
|
@ -270,9 +270,9 @@ class Env:
|
||||
if type(value) not in (unicode, int, float, bool, type(None), DN):
|
||||
raise TypeError(key, value)
|
||||
object.__setattr__(self, key, value)
|
||||
# pylint: disable=unsupported-assignment-operation, no-member
|
||||
# pylint: disable=no-member
|
||||
self.__d[key] = value
|
||||
# pylint: enable=unsupported-assignment-operation, no-member
|
||||
# pylint: enable=no-member
|
||||
|
||||
def __getitem__(self, key):
|
||||
"""
|
||||
@ -600,12 +600,10 @@ class Env:
|
||||
# and server from jsonrpc_uri so that when only server or xmlrpc_uri
|
||||
# is specified, all 3 keys have a value.)
|
||||
if 'xmlrpc_uri' not in self and 'server' in self:
|
||||
# pylint: disable=no-member, access-member-before-definition
|
||||
self.xmlrpc_uri = 'https://{}/ipa/xml'.format(self.server)
|
||||
|
||||
# Derive ldap_uri from server
|
||||
if 'ldap_uri' not in self and 'server' in self:
|
||||
# pylint: disable=no-member, access-member-before-definition
|
||||
self.ldap_uri = 'ldap://{}'.format(self.server)
|
||||
|
||||
# Derive jsonrpc_uri from xmlrpc_uri
|
||||
|
@ -21,7 +21,6 @@ class HostNameInstallInterface(service.ServiceInstallInterface):
|
||||
"""
|
||||
|
||||
ip_addresses = knob(
|
||||
# pylint: disable=invalid-sequence-index
|
||||
typing.List[CheckedIPAddress], None,
|
||||
description="Specify IP address that should be added to DNS. This "
|
||||
"option can be used multiple times",
|
||||
|
@ -115,7 +115,6 @@ class ServiceInstallInterface(common.Installable,
|
||||
validate_domain_name(value)
|
||||
|
||||
servers = knob(
|
||||
# pylint: disable=invalid-sequence-index
|
||||
typing.List[str], None,
|
||||
description="FQDN of IPA server",
|
||||
cli_names='--server',
|
||||
@ -143,7 +142,6 @@ class ServiceInstallInterface(common.Installable,
|
||||
)
|
||||
|
||||
ca_cert_files = knob(
|
||||
# pylint: disable=invalid-sequence-index
|
||||
typing.List[str], None,
|
||||
description="load the CA certificate from this file",
|
||||
cli_names='--ca-cert-file',
|
||||
|
@ -454,7 +454,7 @@ class API(ReadOnly):
|
||||
if root_logger.handlers or self.env.validate_api:
|
||||
return
|
||||
|
||||
if self.env.debug: # pylint: disable=using-constant-test
|
||||
if self.env.debug:
|
||||
level = logging.DEBUG
|
||||
else:
|
||||
level = logging.INFO
|
||||
@ -478,7 +478,7 @@ class API(ReadOnly):
|
||||
|
||||
# Add stderr handler:
|
||||
level = logging.INFO
|
||||
if self.env.debug: # pylint: disable=using-constant-test
|
||||
if self.env.debug:
|
||||
level = logging.DEBUG
|
||||
else:
|
||||
if self.env.context == 'cli':
|
||||
@ -510,7 +510,7 @@ class API(ReadOnly):
|
||||
return
|
||||
|
||||
level = logging.INFO
|
||||
if self.env.debug: # pylint: disable=using-constant-test
|
||||
if self.env.debug:
|
||||
level = logging.DEBUG
|
||||
if self.env.log is not None:
|
||||
try:
|
||||
@ -663,7 +663,7 @@ class API(ReadOnly):
|
||||
continue
|
||||
except Exception:
|
||||
tb = self.env.startup_traceback
|
||||
if tb: # pylint: disable=using-constant-test
|
||||
if tb:
|
||||
logger.exception("could not load plugin module %s", name)
|
||||
raise
|
||||
|
||||
|
@ -77,7 +77,6 @@ try:
|
||||
from xmlrpclib import (Binary, Fault, DateTime, dumps, loads, ServerProxy,
|
||||
Transport, ProtocolError, MININT, MAXINT)
|
||||
except ImportError:
|
||||
# pylint: disable=import-error
|
||||
from xmlrpc.client import (Binary, Fault, DateTime, dumps, loads, ServerProxy,
|
||||
Transport, ProtocolError, MININT, MAXINT)
|
||||
|
||||
@ -763,7 +762,7 @@ class KerbTransport(SSLTransport):
|
||||
else:
|
||||
connection.putrequest("POST", handler)
|
||||
headers.append(("User-Agent", self.user_agent))
|
||||
self.send_headers(connection, headers) # pylint: disable=E1101
|
||||
self.send_headers(connection, headers)
|
||||
self.send_content(connection, request_body)
|
||||
return connection
|
||||
|
||||
@ -996,8 +995,6 @@ class RPCClient(Connectible):
|
||||
# Form the session URL by substituting the session path into the original URL
|
||||
scheme, netloc, path, params, query, fragment = urllib.parse.urlparse(original_url)
|
||||
path = self.session_path
|
||||
# urlencode *can* take one argument
|
||||
# pylint: disable=too-many-function-args
|
||||
session_url = urllib.parse.urlunparse((scheme, netloc, path, params, query, fragment))
|
||||
|
||||
return session_url
|
||||
@ -1078,11 +1075,9 @@ class RPCClient(Connectible):
|
||||
)
|
||||
# We don't care about the response, just that we got one
|
||||
return serverproxy
|
||||
# pylint: disable=try-except-raise
|
||||
except errors.KerberosError:
|
||||
# kerberos error on one server is likely on all
|
||||
raise
|
||||
# pylint: enable=try-except-raise
|
||||
except ProtocolError as e:
|
||||
if hasattr(context, 'session_cookie') and e.errcode == 401:
|
||||
# Unauthorized. Remove the session and try again.
|
||||
|
@ -288,7 +288,7 @@ class Gettext(LazyText):
|
||||
else:
|
||||
t = create_translation(self.key)
|
||||
if six.PY2:
|
||||
return t.ugettext(self.msg) # pylint: disable=no-member
|
||||
return t.ugettext(self.msg)
|
||||
else:
|
||||
return t.gettext(self.msg)
|
||||
|
||||
@ -296,10 +296,10 @@ class Gettext(LazyText):
|
||||
return unicode(self.as_unicode())
|
||||
|
||||
def __json__(self):
|
||||
return unicode(self) #pylint: disable=no-member
|
||||
return unicode(self)
|
||||
|
||||
def __mod__(self, kw):
|
||||
return unicode(self) % kw #pylint: disable=no-member
|
||||
return unicode(self) % kw
|
||||
|
||||
def format(self, *args, **kwargs):
|
||||
return unicode(self).format(*args, **kwargs)
|
||||
@ -481,9 +481,7 @@ class NGettext(LazyText):
|
||||
else:
|
||||
t = create_translation(self.key)
|
||||
if six.PY2:
|
||||
# pylint: disable=no-member
|
||||
return t.ungettext(self.singular, self.plural, count)
|
||||
# pylint: enable=no-member
|
||||
else:
|
||||
return t.ngettext(self.singular, self.plural, count)
|
||||
|
||||
|
@ -60,7 +60,6 @@ from ipalib.constants import (
|
||||
TLS_VERSIONS, TLS_VERSION_MINIMAL, TLS_VERSION_MAXIMAL,
|
||||
TLS_VERSION_DEFAULT_MIN, TLS_VERSION_DEFAULT_MAX,
|
||||
)
|
||||
# pylint: disable=ipa-forbidden-import
|
||||
from ipalib.facts import is_ipa_client_configured
|
||||
from ipalib.text import _
|
||||
from ipaplatform.constants import constants
|
||||
@ -77,7 +76,7 @@ from ipapython.dnsutil import (
|
||||
from ipapython.admintool import ScriptError
|
||||
|
||||
if sys.version_info >= (3, 2):
|
||||
import reprlib # pylint: disable=import-error
|
||||
import reprlib
|
||||
else:
|
||||
reprlib = None
|
||||
|
||||
@ -321,7 +320,6 @@ def create_https_connection(
|
||||
is not encrypted.
|
||||
:returns An established HTTPS connection to host:port
|
||||
"""
|
||||
# pylint: disable=no-member
|
||||
tls_cutoff_map = {
|
||||
"ssl2": ssl.OP_NO_SSLv2,
|
||||
"ssl3": ssl.OP_NO_SSLv3,
|
||||
@ -330,7 +328,6 @@ def create_https_connection(
|
||||
"tls1.2": ssl.OP_NO_TLSv1_2,
|
||||
"tls1.3": getattr(ssl, "OP_NO_TLSv1_3", 0),
|
||||
}
|
||||
# pylint: enable=no-member
|
||||
|
||||
if cafile is None:
|
||||
raise RuntimeError("cafile argument is required to perform server "
|
||||
@ -343,7 +340,6 @@ def create_https_connection(
|
||||
# official Python documentation states that the best option to get
|
||||
# TLSv1 and later is to setup SSLContext with PROTOCOL_SSLv23
|
||||
# and then negate the insecure SSLv2 and SSLv3
|
||||
# pylint: disable=no-member
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
|
||||
ctx.options |= (
|
||||
ssl.OP_ALL | ssl.OP_NO_COMPRESSION | ssl.OP_SINGLE_DH_USE |
|
||||
@ -357,7 +353,6 @@ def create_https_connection(
|
||||
# remove the slice of negating protocol options according to options
|
||||
tls_span = get_proper_tls_version_span(tls_version_min, tls_version_max)
|
||||
|
||||
# pylint: enable=no-member
|
||||
# set up the correct TLS version flags for the SSL context
|
||||
if tls_span is not None:
|
||||
for version in TLS_VERSIONS:
|
||||
|
@ -809,12 +809,10 @@ class ExternalCAProfile:
|
||||
_oid = univ.ObjectIdentifier(parts[0])
|
||||
|
||||
# It is; construct a V2 template
|
||||
# pylint: disable=too-many-function-args
|
||||
return MSCSTemplateV2.__new__(MSCSTemplateV2, s)
|
||||
|
||||
except pyasn1.error.PyAsn1Error:
|
||||
# It is not an OID; treat as a template name
|
||||
# pylint: disable=too-many-function-args
|
||||
return MSCSTemplateV1.__new__(MSCSTemplateV1, s)
|
||||
|
||||
def __getstate__(self):
|
||||
|
@ -180,9 +180,8 @@ class AdminTool:
|
||||
return_value = self.run()
|
||||
except BaseException as exception:
|
||||
if isinstance(exception, ScriptError):
|
||||
# pylint: disable=no-member
|
||||
if exception.rval and exception.rval > return_value:
|
||||
return_value = exception.rval # pylint: disable=no-member
|
||||
return_value = exception.rval
|
||||
traceback = sys.exc_info()[2]
|
||||
error_message, return_value = self.handle_error(exception)
|
||||
if return_value:
|
||||
@ -244,7 +243,7 @@ class AdminTool:
|
||||
root_logger = logging.getLogger()
|
||||
for handler in root_logger.handlers:
|
||||
if (isinstance(handler, logging.StreamHandler) and
|
||||
handler.stream is sys.stderr): # pylint: disable=no-member
|
||||
handler.stream is sys.stderr):
|
||||
root_logger.removeHandler(handler)
|
||||
break
|
||||
|
||||
|
@ -162,7 +162,6 @@ class DNSName(dns.name.Name):
|
||||
def __init__(self, labels, origin=None):
|
||||
try:
|
||||
if isinstance(labels, str):
|
||||
#pylint: disable=E1101
|
||||
labels = dns.name.from_text(unicode(labels), origin).labels
|
||||
elif isinstance(labels, dns.name.Name):
|
||||
labels = labels.labels
|
||||
|
@ -41,7 +41,6 @@ from ipapython import ipautil
|
||||
try:
|
||||
import httplib
|
||||
except ImportError:
|
||||
# pylint: disable=import-error
|
||||
import http.client as httplib
|
||||
|
||||
if six.PY3:
|
||||
@ -67,7 +66,7 @@ KDC_PROFILE = u'KDCs_PKINIT_Certs'
|
||||
|
||||
|
||||
if six.PY3:
|
||||
gzip_decompress = gzip.decompress # pylint: disable=no-member
|
||||
gzip_decompress = gzip.decompress
|
||||
else:
|
||||
# note: gzip.decompress available in Python >= 3.2
|
||||
def gzip_decompress(data):
|
||||
|
@ -216,7 +216,7 @@ class IPAChangeConf:
|
||||
if value:
|
||||
return {'name': 'comment',
|
||||
'type': 'comment',
|
||||
'value': value.rstrip()} # pylint: disable=E1103
|
||||
'value': value.rstrip()}
|
||||
|
||||
o = dict()
|
||||
parts = line.split(self.dassign, 1)
|
||||
|
@ -84,7 +84,7 @@ if six.PY2 and hasattr(ldap, 'LDAPBytesWarning'):
|
||||
# XXX silence python-ldap's BytesWarnings
|
||||
warnings.filterwarnings(
|
||||
action="ignore",
|
||||
category=ldap.LDAPBytesWarning, # pylint: disable=no-member
|
||||
category=ldap.LDAPBytesWarning,
|
||||
)
|
||||
|
||||
|
||||
@ -255,7 +255,7 @@ class LDAPEntry(MutableMapping):
|
||||
|
||||
Keyword arguments can be used to override values of specific attributes.
|
||||
"""
|
||||
super(LDAPEntry, self).__init__() # pylint: disable=no-member
|
||||
super(LDAPEntry, self).__init__()
|
||||
|
||||
if isinstance(_conn, LDAPEntry):
|
||||
assert _dn is None
|
||||
@ -284,7 +284,6 @@ class LDAPEntry(MutableMapping):
|
||||
self._single_value_view = None
|
||||
|
||||
if isinstance(_obj, LDAPEntry):
|
||||
#pylint: disable=E1103
|
||||
self._not_list = set(_obj._not_list)
|
||||
self._orig_raw = dict(_obj._orig_raw)
|
||||
if _obj.conn is _conn:
|
||||
@ -1933,7 +1932,6 @@ class LDAPCache(LDAPClient):
|
||||
|
||||
def get_entry(self, dn, attrs_list=None, time_limit=None,
|
||||
size_limit=None, get_effective_rights=False):
|
||||
# pylint: disable=no-member
|
||||
if not self._enable_cache:
|
||||
return super(LDAPCache, self).get_entry(
|
||||
dn, attrs_list, time_limit, size_limit, get_effective_rights
|
||||
@ -1971,7 +1969,6 @@ class LDAPCache(LDAPClient):
|
||||
get_all = True
|
||||
|
||||
if entry and entry.all and get_all:
|
||||
# self.hit # pylint: disable=pointless-statement
|
||||
hits = self._cache_hits + 1 # pylint: disable=no-member
|
||||
object.__setattr__(self, '_cache_hits', hits)
|
||||
self.cache_status('HIT')
|
||||
|
@ -544,7 +544,7 @@ def run(args, stdin=None, raiseonerr=True, nolog=(), env=None,
|
||||
raise
|
||||
finally:
|
||||
if skip_output:
|
||||
p_out.close() # pylint: disable=E1103
|
||||
p_out.close()
|
||||
|
||||
logger.debug('Process finished, return code=%s', p.returncode)
|
||||
|
||||
@ -1513,7 +1513,7 @@ if six.PY2:
|
||||
str.__name__,
|
||||
type(value).__name__))
|
||||
else:
|
||||
fsdecode = os.fsdecode #pylint: disable=no-member
|
||||
fsdecode = os.fsdecode
|
||||
|
||||
|
||||
def unescape_seq(seq, *args):
|
||||
|
@ -26,7 +26,7 @@ import base64
|
||||
import re
|
||||
import struct
|
||||
from hashlib import sha1
|
||||
from hashlib import sha256 # pylint: disable=E0611
|
||||
from hashlib import sha256
|
||||
|
||||
import six
|
||||
|
||||
|
@ -41,7 +41,6 @@ class SimpleHeaderAuth(HTTPAuthenticator):
|
||||
return None
|
||||
value = request['headers'][self.header]
|
||||
if self.value is not None:
|
||||
# pylint: disable=unsupported-membership-test
|
||||
if value not in self.value:
|
||||
self.audit_svc_access(log.AUDIT_SVC_AUTH_FAIL,
|
||||
request['client_id'], value)
|
||||
|
@ -22,7 +22,7 @@ class SimplePathAuthz(HTTPAuthorizer):
|
||||
|
||||
# if an authorized path does not end in /
|
||||
# check if it matches fullpath for strict match
|
||||
for authz in self.paths: # pylint: disable=not-an-iterable
|
||||
for authz in self.paths:
|
||||
if authz.endswith('/'):
|
||||
continue
|
||||
if authz.endswith('.'):
|
||||
@ -34,7 +34,6 @@ class SimplePathAuthz(HTTPAuthorizer):
|
||||
return True
|
||||
|
||||
while path != '':
|
||||
# pylint: disable=unsupported-membership-test
|
||||
if path in self.paths:
|
||||
self.audit_svc_access(log.AUDIT_SVC_AUTHZ_PASS,
|
||||
request['client_id'], path)
|
||||
|
@ -20,7 +20,7 @@ from ipaserver.custodia import log
|
||||
from ipaserver.custodia.plugin import HTTPError
|
||||
|
||||
try:
|
||||
from systemd import daemon as sd # pylint: disable=import-error
|
||||
from systemd import daemon as sd
|
||||
except ImportError:
|
||||
sd = None
|
||||
if 'NOTIFY_SOCKET' in os.environ:
|
||||
@ -61,7 +61,7 @@ class ForkingHTTPServer(ForkingTCPServer):
|
||||
|
||||
def __init__(self, server_address, handler_class, config,
|
||||
bind_and_activate=True):
|
||||
# pylint: disable=super-init-not-called, non-parent-init-called
|
||||
# pylint: disable=non-parent-init-called
|
||||
# Init just BaseServer, TCPServer creates a socket.
|
||||
BaseServer.__init__(self, server_address, handler_class)
|
||||
|
||||
|
@ -55,7 +55,6 @@ class CustodiaLoggingAdapter(logging.LoggerAdapter):
|
||||
extra = {'origin': plugin.origin}
|
||||
super(CustodiaLoggingAdapter, self).__init__(logger, extra=extra)
|
||||
|
||||
# pylint: disable=arguments-differ
|
||||
def exception(self, msg, *args, **kwargs):
|
||||
"""Like standard exception() logger but only print stack in debug mode
|
||||
"""
|
||||
|
@ -8,7 +8,6 @@ import inspect
|
||||
import json
|
||||
import pwd
|
||||
import re
|
||||
import sys
|
||||
|
||||
from jwcrypto.common import json_encode
|
||||
|
||||
@ -262,17 +261,12 @@ class CustodiaPluginMeta(abc.ABCMeta):
|
||||
ncls = super(CustodiaPluginMeta, cls).__new__(
|
||||
cls, name, bases, namespace, **kwargs)
|
||||
|
||||
if sys.version_info < (3, 0):
|
||||
# pylint: disable=deprecated-method
|
||||
args = inspect.getargspec(ncls.__init__).args
|
||||
# pylint: enable=deprecated-method
|
||||
else:
|
||||
sig = inspect.signature(ncls.__init__) # pylint: disable=no-member
|
||||
args = list(sig.parameters)
|
||||
sig = inspect.signature(ncls.__init__)
|
||||
args = list(sig.parameters)
|
||||
|
||||
if args[1:3] != ['config', 'section']:
|
||||
# old-style plugin class
|
||||
ncls._options = None # pylint: disable=protected-access
|
||||
ncls._options = None
|
||||
return ncls
|
||||
|
||||
# new-style plugin class
|
||||
@ -288,7 +282,7 @@ class CustodiaPluginMeta(abc.ABCMeta):
|
||||
value.name = name
|
||||
options.append(value)
|
||||
|
||||
ncls._options = tuple(options) # pylint: disable=protected-access
|
||||
ncls._options = tuple(options)
|
||||
return ncls
|
||||
|
||||
|
||||
@ -315,7 +309,6 @@ class CustodiaPlugin:
|
||||
if section is not None and self._options is not None:
|
||||
# new style configuration
|
||||
opt = OptionHandler(config, section)
|
||||
# pylint: disable=not-an-iterable
|
||||
for option in self._options:
|
||||
value = opt.get(option)
|
||||
# special case for store
|
||||
@ -359,9 +352,7 @@ class CustodiaPlugin:
|
||||
raise ValueError(
|
||||
"'{}' references non-existing store '{}'".format(
|
||||
self.section, self.store_name))
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
self.store = store_plugin
|
||||
# pylint: enable=attribute-defined-outside-init
|
||||
store_plugin.finalize_init(config, cfgparser, context=self)
|
||||
|
||||
def finalize_init(self, config, cfgparser, context=None):
|
||||
|
@ -67,11 +67,11 @@ def _create_plugin(cfgparser, section, menu):
|
||||
handler = _load_plugin_class(menu, handler_name)
|
||||
classname = handler.__name__
|
||||
hconf['facility_name'] = '%s-[%s]' % (classname, section)
|
||||
except Exception as e: # pylint: disable=broad-except
|
||||
except Exception as e:
|
||||
raise ValueError('Invalid format for "handler" option '
|
||||
'[%r]: %s' % (e, handler_name))
|
||||
|
||||
if handler._options is not None: # pylint: disable=protected-access
|
||||
if handler._options is not None:
|
||||
# new-style plugin with parser and section
|
||||
plugin = handler(cfgparser, section)
|
||||
else:
|
||||
|
@ -32,7 +32,6 @@ def uri_escape(val):
|
||||
raise ValueError("zero-length URI component detected")
|
||||
hexval = str_hexlify(val)
|
||||
out = '%'
|
||||
# pylint: disable=E1127
|
||||
out += '%'.join(hexval[i:i+2] for i in range(0, len(hexval), 2))
|
||||
return out
|
||||
|
||||
|
@ -491,7 +491,6 @@ class CAInstallInterface(dogtag.DogtagInstallInterface,
|
||||
external_ca_profile = master_install_only(external_ca_profile)
|
||||
|
||||
external_cert_files = knob(
|
||||
# pylint: disable=invalid-sequence-index
|
||||
typing.List[str], None,
|
||||
description=("File containing the IPA CA certificate and the external "
|
||||
"CA certificate chain"),
|
||||
|
@ -774,7 +774,7 @@ class _CrossProcessLock:
|
||||
if six.PY2:
|
||||
p.readfp(fileobj) # pylint: disable=deprecated-method
|
||||
else:
|
||||
p.read_file(fileobj) # pylint: disable=no-member
|
||||
p.read_file(fileobj)
|
||||
|
||||
try:
|
||||
self._locked = p.getboolean('lock', 'locked')
|
||||
|
@ -444,7 +444,6 @@ class DNSInstallInterface(hostname.HostNameInstallInterface):
|
||||
allow_zone_overlap = prepare_only(allow_zone_overlap)
|
||||
|
||||
reverse_zones = knob(
|
||||
# pylint: disable=invalid-sequence-index
|
||||
typing.List[str], [],
|
||||
description=("The reverse DNS zone to use. This option can be used "
|
||||
"multiple times"),
|
||||
@ -506,7 +505,6 @@ class DNSInstallInterface(hostname.HostNameInstallInterface):
|
||||
raise ValueError(error)
|
||||
|
||||
forwarders = knob(
|
||||
# pylint: disable=invalid-sequence-index
|
||||
typing.List[ipautil.CheckedIPAddressLoopback], None,
|
||||
description=("Add a DNS forwarder. This option can be used multiple "
|
||||
"times"),
|
||||
|
@ -993,7 +993,6 @@ class DsInstance(service.Service):
|
||||
|
||||
def __setup_s4u2proxy(self):
|
||||
|
||||
# pylint: disable=unused-private-member
|
||||
def __add_principal(last_cn, principal, self):
|
||||
dn = DN(('cn', last_cn), ('cn', 's4u2proxy'),
|
||||
('cn', 'etc'), self.suffix)
|
||||
|
@ -171,7 +171,7 @@ def verify_fqdn(host_name, no_host_dns=False, local_hostname=True):
|
||||
except socket.error as e:
|
||||
logger.debug(
|
||||
'socket.gethostbyaddr() error: %d: %s',
|
||||
e.errno, e.strerror) # pylint: disable=no-member
|
||||
e.errno, e.strerror)
|
||||
|
||||
if no_host_dns:
|
||||
print("Warning: skipping DNS resolution of host", host_name)
|
||||
|
@ -75,7 +75,6 @@ def convertDate(value):
|
||||
|
||||
dt = dateutil.parser.parse(value)
|
||||
|
||||
# pylint: disable=E1101
|
||||
if dt.tzinfo is None:
|
||||
dt = datetime.datetime(*dt.timetuple()[0:6],
|
||||
tzinfo=dateutil.tz.tzlocal())
|
||||
@ -535,11 +534,11 @@ class OTPTokenImport(admintool.AdminTool):
|
||||
|
||||
# Verify a key is provided if one is needed.
|
||||
if self.doc.keyname is not None:
|
||||
if self.safe_options.keyfile is None: # pylint: disable=no-member
|
||||
if self.safe_options.keyfile is None:
|
||||
raise admintool.ScriptError("Encryption key required: %s!" % self.doc.keyname)
|
||||
|
||||
# Load the keyfile.
|
||||
keyfile = self.safe_options.keyfile # pylint: disable=no-member
|
||||
keyfile = self.safe_options.keyfile
|
||||
with open(keyfile) as f:
|
||||
self.doc.setKey(f.read())
|
||||
|
||||
|
@ -47,14 +47,12 @@ class CompatServerReplicaInstall(ServerReplicaInstall):
|
||||
self.__dm_password = value
|
||||
|
||||
ip_addresses = extend_knob(
|
||||
ServerReplicaInstall.ip_addresses, # pylint: disable=no-member
|
||||
ServerReplicaInstall.ip_addresses,
|
||||
description="Replica server IP Address. This option can be used "
|
||||
"multiple times",
|
||||
)
|
||||
|
||||
admin_password = (
|
||||
ServerReplicaInstall.admin_password # pylint: disable=no-member
|
||||
)
|
||||
admin_password = ServerReplicaInstall.admin_password
|
||||
admin_password = extend_knob(
|
||||
admin_password,
|
||||
cli_names=list(admin_password.cli_names) + ['-w'],
|
||||
|
@ -798,11 +798,9 @@ class Restore(admintool.AdminTool):
|
||||
self.backup_host = config.get('ipa', 'host')
|
||||
self.backup_ipa_version = config.get('ipa', 'ipa_version')
|
||||
self.backup_version = config.get('ipa', 'version')
|
||||
# pylint: disable=no-member
|
||||
# we can assume that returned object is string and it has .split()
|
||||
# method
|
||||
self.backup_services = config.get('ipa', 'services').split(',')
|
||||
# pylint: enable=no-member
|
||||
|
||||
def extract_backup(self):
|
||||
'''
|
||||
|
@ -18,19 +18,18 @@ class CompatServerMasterInstall(ServerMasterInstall):
|
||||
request_cert = False
|
||||
|
||||
dm_password = extend_knob(
|
||||
ServerMasterInstall.dm_password, # pylint: disable=no-member
|
||||
ServerMasterInstall.dm_password,
|
||||
cli_names=['--ds-password', '-p'],
|
||||
)
|
||||
|
||||
admin_password = ServerMasterInstall.admin_password
|
||||
admin_password = extend_knob(
|
||||
admin_password,
|
||||
# pylint: disable=no-member
|
||||
cli_names=list(admin_password.cli_names) + ['-a'],
|
||||
)
|
||||
|
||||
ip_addresses = extend_knob(
|
||||
ServerMasterInstall.ip_addresses, # pylint: disable=no-member
|
||||
ServerMasterInstall.ip_addresses,
|
||||
description="Master Server IP Address. This option can be used "
|
||||
"multiple times",
|
||||
)
|
||||
|
@ -395,7 +395,7 @@ def ipa_start(options):
|
||||
|
||||
if isinstance(e, IpactlError):
|
||||
# do not display any other error message
|
||||
raise IpactlError(rval=e.rval) # pylint: disable=no-member
|
||||
raise IpactlError(rval=e.rval)
|
||||
else:
|
||||
raise IpactlError()
|
||||
|
||||
@ -509,7 +509,7 @@ def ipa_restart(options):
|
||||
pass
|
||||
if isinstance(e, IpactlError):
|
||||
# do not display any other error message
|
||||
raise IpactlError(rval=e.rval) # pylint: disable=no-member
|
||||
raise IpactlError(rval=e.rval)
|
||||
else:
|
||||
raise IpactlError()
|
||||
|
||||
|
@ -1593,12 +1593,12 @@ class ReplicationManager:
|
||||
pass
|
||||
except Exception as e:
|
||||
if force and err:
|
||||
raise err #pylint: disable=E0702
|
||||
raise err
|
||||
else:
|
||||
raise e
|
||||
|
||||
if err:
|
||||
raise err #pylint: disable=E0702
|
||||
raise err
|
||||
|
||||
def set_readonly(self, readonly, critical=False):
|
||||
"""
|
||||
|
@ -60,7 +60,6 @@ class ServerCertificateInstallInterface(service.ServiceInstallInterface):
|
||||
description = "SSL certificate"
|
||||
|
||||
dirsrv_cert_files = knob(
|
||||
# pylint: disable=invalid-sequence-index
|
||||
typing.List[str], None,
|
||||
description=("File containing the Directory Server SSL certificate "
|
||||
"and private key"),
|
||||
@ -71,7 +70,6 @@ class ServerCertificateInstallInterface(service.ServiceInstallInterface):
|
||||
dirsrv_cert_files = prepare_only(dirsrv_cert_files)
|
||||
|
||||
http_cert_files = knob(
|
||||
# pylint: disable=invalid-sequence-index
|
||||
typing.List[str], None,
|
||||
description=("File containing the Apache Server SSL certificate and "
|
||||
"private key"),
|
||||
@ -82,7 +80,6 @@ class ServerCertificateInstallInterface(service.ServiceInstallInterface):
|
||||
http_cert_files = prepare_only(http_cert_files)
|
||||
|
||||
pkinit_cert_files = knob(
|
||||
# pylint: disable=invalid-sequence-index
|
||||
typing.List[str], None,
|
||||
description=("File containing the Kerberos KDC SSL certificate and "
|
||||
"private key"),
|
||||
@ -171,7 +168,6 @@ class ServerInstallInterface(ServerCertificateInstallInterface,
|
||||
domain_name = client.ClientInstallInterface.domain_name
|
||||
domain_name = extend_knob(
|
||||
domain_name,
|
||||
# pylint: disable=no-member
|
||||
cli_names=list(domain_name.cli_names) + ['-n'],
|
||||
)
|
||||
|
||||
@ -453,7 +449,7 @@ class ServerInstallInterface(ServerCertificateInstallInterface,
|
||||
"You cannot specify --external-ca-profile without "
|
||||
"--external-ca")
|
||||
|
||||
if self.uninstalling: # pylint: disable=using-constant-test
|
||||
if self.uninstalling:
|
||||
if (self.realm_name or self.admin_password or
|
||||
self.master_password):
|
||||
raise RuntimeError(
|
||||
|
@ -823,14 +823,12 @@ def promote_check(installer):
|
||||
env._bootstrap(context='installer', confdir=paths.ETC_IPA, log=None)
|
||||
env._finalize_core(**dict(constants.DEFAULT_CONFIG))
|
||||
|
||||
# pylint: disable=no-member
|
||||
xmlrpc_uri = 'https://{}/ipa/xml'.format(ipautil.format_netloc(env.host))
|
||||
api.bootstrap(in_server=True,
|
||||
context='installer',
|
||||
confdir=paths.ETC_IPA,
|
||||
ldap_uri=ipaldap.realm_to_ldapi_uri(env.realm),
|
||||
xmlrpc_uri=xmlrpc_uri)
|
||||
# pylint: enable=no-member
|
||||
api.finalize()
|
||||
|
||||
config = ReplicaConfig()
|
||||
|
@ -185,13 +185,13 @@ class batch(Command):
|
||||
isinstance(e, errors.ConversionError)):
|
||||
logger.info(
|
||||
'%s: batch: %s',
|
||||
context.principal, # pylint: disable=no-member
|
||||
context.principal,
|
||||
e.__class__.__name__
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
'%s: batch: %s(%s): %s',
|
||||
context.principal, name, # pylint: disable=no-member
|
||||
context.principal, name,
|
||||
', '.join(api.Command[name]._repr_iter(**params)),
|
||||
e.__class__.__name__
|
||||
)
|
||||
|
@ -1380,7 +1380,7 @@ class cert_show(Retrieve, CertMethod, VirtualCommand):
|
||||
logger.debug("Not granted by ACI to retrieve certificate, "
|
||||
"looking at principal")
|
||||
if not bind_principal_can_manage_cert(cert):
|
||||
raise acierr # pylint: disable=E0702
|
||||
raise acierr
|
||||
|
||||
ca_obj = api.Command.ca_show(
|
||||
options['cacn'],
|
||||
|
@ -247,7 +247,7 @@ def _convert_to_ipa_rule(rule):
|
||||
if attr_name in rule:
|
||||
element[4].groups = rule[attr_name]
|
||||
if 'externalhost' in rule:
|
||||
ipa_rule.srchosts.names.extend(rule['externalhost']) #pylint: disable=E1101
|
||||
ipa_rule.srchosts.names.extend(rule['externalhost'])
|
||||
return ipa_rule
|
||||
|
||||
|
||||
|
@ -658,7 +658,7 @@ class permission(baseldap.LDAPObject):
|
||||
permission_entry, old_name, notfound_ok=True)
|
||||
|
||||
# (pylint thinks `acientry` is just a dict, but it's an LDAPEntry)
|
||||
acidn = acientry.dn # pylint: disable=E1103
|
||||
acidn = acientry.dn
|
||||
|
||||
if acistring is not None:
|
||||
logger.debug('Removing ACI %r from %s', acistring, acidn)
|
||||
@ -746,7 +746,7 @@ class permission(baseldap.LDAPObject):
|
||||
|
||||
# The DN of old permissions is always basedn
|
||||
# (pylint thinks `base` is just a dict, but it's an LDAPEntry)
|
||||
assert base.dn == self.api.env.basedn, base # pylint: disable=E1103
|
||||
assert base.dn == self.api.env.basedn, base
|
||||
|
||||
aci = ACI(acistring)
|
||||
|
||||
|
@ -87,7 +87,7 @@ def principal_has_privilege(api, principal, privilege):
|
||||
privilege_dn = api.Object.privilege.get_dn(privilege)
|
||||
ldap = api.Backend.ldap2
|
||||
filter = ldap.make_filter({
|
||||
'krbprincipalname': principal, # pylint: disable=no-member
|
||||
'krbprincipalname': principal,
|
||||
'memberof': privilege_dn},
|
||||
rules=ldap.MATCH_ALL)
|
||||
try:
|
||||
|
@ -39,13 +39,11 @@ from ipaserver.masters import is_service_enabled
|
||||
if api.env.in_server:
|
||||
import pki.account
|
||||
import pki.key
|
||||
# pylint: disable=no-member
|
||||
try:
|
||||
# pki >= 10.4.0
|
||||
from pki.crypto import DES_EDE3_CBC_OID
|
||||
except ImportError:
|
||||
DES_EDE3_CBC_OID = pki.key.KeyClient.DES_EDE3_CBC_OID
|
||||
# pylint: enable=no-member
|
||||
|
||||
|
||||
if six.PY3:
|
||||
|
@ -34,7 +34,7 @@ IPA_TESTS_ENV_DIR = os.path.join(IPA_TESTS_ENV_WORKING_DIR, IPA_TESTS_ENV_NAME)
|
||||
IPA_TEST_CONFIG = "ipa-test-config.yaml"
|
||||
|
||||
|
||||
class ExecRunReturn(NamedTuple): # pylint: disable=inherit-non-class #3876
|
||||
class ExecRunReturn(NamedTuple):
|
||||
exit_code: int
|
||||
output: Tuple[bytes, bytes]
|
||||
|
||||
|
@ -151,11 +151,9 @@ def pytest_cmdline_main(config):
|
||||
def pytest_runtest_setup(item):
|
||||
if isinstance(item, pytest.Function):
|
||||
if item.get_closest_marker('skip_ipaclient_unittest'):
|
||||
# pylint: disable=no-member
|
||||
if item.config.option.ipaclient_unittests:
|
||||
pytest.skip("Skip in ipaclient unittest mode")
|
||||
if item.get_closest_marker('needs_ipaapi'):
|
||||
# pylint: disable=no-member
|
||||
if item.config.option.skip_ipaapi:
|
||||
pytest.skip("Skip tests that needs an IPA API")
|
||||
if osinfo is not None:
|
||||
|
@ -624,10 +624,8 @@ def test_translations(po_file, lang, domain, locale_dir):
|
||||
t = gettext.translation(domain, locale_dir)
|
||||
|
||||
if six.PY2:
|
||||
# pylint: disable=no-member
|
||||
get_msgstr = t.ugettext
|
||||
get_msgstr_plural = t.ungettext
|
||||
# pylint: enable=no-member
|
||||
else:
|
||||
get_msgstr = t.gettext
|
||||
get_msgstr_plural = t.ngettext
|
||||
|
@ -29,7 +29,7 @@ def pytest_generate_tests(metafunc):
|
||||
if callable(test):
|
||||
description = '%s: %s' % (
|
||||
str(i).zfill(4),
|
||||
test.__name__, # test is not a dict. pylint: disable=E1103
|
||||
test.__name__,
|
||||
)
|
||||
else:
|
||||
description = '%s: %s: %s' % (str(i).zfill(4),
|
||||
|
@ -1904,7 +1904,7 @@ def ldappasswd_user_change(user, oldpw, newpw, master, use_dirman=False,
|
||||
|
||||
if use_dirman:
|
||||
args = [paths.LDAPPASSWD, '-D',
|
||||
str(master.config.dirman_dn), # pylint: disable=no-member
|
||||
str(master.config.dirman_dn),
|
||||
'-w', master.config.dirman_password,
|
||||
'-s', newpw, '-x', '-ZZ', '-H', master_ldap_uri, userdn]
|
||||
else:
|
||||
@ -1922,7 +1922,7 @@ def ldappasswd_sysaccount_change(user, oldpw, newpw, master, use_dirman=False):
|
||||
|
||||
if use_dirman:
|
||||
args = [paths.LDAPPASSWD, '-D',
|
||||
str(master.config.dirman_dn), # pylint: disable=no-member
|
||||
str(master.config.dirman_dn),
|
||||
'-w', master.config.dirman_password,
|
||||
'-a', oldpw,
|
||||
'-s', newpw, '-x', '-ZZ', '-H', master_ldap_uri,
|
||||
|
@ -10,8 +10,6 @@ from ipaserver.custodia.server.config import parse_config
|
||||
HERE = os.path.dirname(os.path.abspath(__file__))
|
||||
EMPTY_CONF = os.path.join(HERE, 'empty.conf')
|
||||
|
||||
# pylint: disable=redefined-outer-name
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def args():
|
||||
|
@ -77,7 +77,7 @@ class IntegrationTest:
|
||||
else:
|
||||
domain_level = cls.master.config.domain_level
|
||||
|
||||
if cls.master.config.fips_mode: # pylint: disable=using-constant-test
|
||||
if cls.master.config.fips_mode:
|
||||
cls.fips_mode = True
|
||||
if cls.fips_mode:
|
||||
cls.enable_fips_mode()
|
||||
|
@ -102,15 +102,15 @@ class TestAutounmembership(IntegrationTest):
|
||||
def check_autounmembership_in_ldap(self, state='on'):
|
||||
"""Check autounmembership state."""
|
||||
conn = self.master.ldap_connect()
|
||||
entry = conn.get_entry(self.dn) # pylint: disable=no-member
|
||||
entry = conn.get_entry(self.dn)
|
||||
assert state == entry.single_value['automemberprocessmodifyops']
|
||||
|
||||
def disable_autounmembership_in_ldap(self):
|
||||
"""Disable autounmembership."""
|
||||
conn = self.master.ldap_connect()
|
||||
entry = conn.get_entry(self.dn) # pylint: disable=no-member
|
||||
entry = conn.get_entry(self.dn)
|
||||
entry.single_value['automemberprocessmodifyops'] = 'off'
|
||||
conn.update_entry(entry) # pylint: disable=no-member
|
||||
conn.update_entry(entry)
|
||||
self.master.run_command(['ipactl', 'restart'])
|
||||
|
||||
def test_modify_user_entry_unmembership(self):
|
||||
|
@ -131,11 +131,11 @@ class CALessBase(IntegrationTest):
|
||||
cls.cert_password = cls.master.config.admin_password
|
||||
cls.crl_path = os.path.join(cls.master.config.test_dir, 'crl')
|
||||
|
||||
if cls.replicas: # pylint: disable=using-constant-test
|
||||
if cls.replicas:
|
||||
replica_hostname = cls.replicas[0].hostname
|
||||
else:
|
||||
replica_hostname = 'unused-replica.test'
|
||||
if cls.clients: # pylint: disable=using-constant-test
|
||||
if cls.clients:
|
||||
client_hostname = cls.clients[0].hostname
|
||||
else:
|
||||
client_hostname = 'unused-client.test'
|
||||
|
@ -363,7 +363,7 @@ class TestIPACommand(IntegrationTest):
|
||||
master = self.master
|
||||
|
||||
# Add a system account and add it to a group managed by the policy
|
||||
base_dn = str(master.domain.basedn) # pylint: disable=no-member
|
||||
base_dn = str(master.domain.basedn)
|
||||
entry_ldif = textwrap.dedent("""
|
||||
dn: uid={account_name},cn=sysaccounts,cn=etc,{base_dn}
|
||||
changetype: add
|
||||
@ -877,7 +877,7 @@ class TestIPACommand(IntegrationTest):
|
||||
)
|
||||
|
||||
conn = self.master.ldap_connect()
|
||||
entry = conn.get_entry(dn) # pylint: disable=no-member
|
||||
entry = conn.get_entry(dn)
|
||||
|
||||
# original setting and all settings without state
|
||||
orig_cfg = list(entry['ipaConfigString'])
|
||||
@ -888,19 +888,19 @@ class TestIPACommand(IntegrationTest):
|
||||
cfg = [HIDDEN_SERVICE]
|
||||
cfg.extend(other_cfg)
|
||||
entry['ipaConfigString'] = cfg
|
||||
conn.update_entry(entry) # pylint: disable=no-member
|
||||
conn.update_entry(entry)
|
||||
self.master.run_command(['ipa', 'config-show'])
|
||||
|
||||
# test with configured
|
||||
cfg = [CONFIGURED_SERVICE]
|
||||
cfg.extend(other_cfg)
|
||||
entry['ipaConfigString'] = cfg
|
||||
conn.update_entry(entry) # pylint: disable=no-member
|
||||
conn.update_entry(entry)
|
||||
self.master.run_command(['ipa', 'config-show'])
|
||||
finally:
|
||||
# reset
|
||||
entry['ipaConfigString'] = orig_cfg
|
||||
conn.update_entry(entry) # pylint: disable=no-member
|
||||
conn.update_entry(entry)
|
||||
|
||||
def test_ssh_from_controller(self):
|
||||
"""https://pagure.io/SSSD/sssd/issue/3979
|
||||
@ -1255,9 +1255,9 @@ class TestIPACommand(IntegrationTest):
|
||||
)
|
||||
assert console_msg in result.stdout_text
|
||||
# verify using backend
|
||||
conn = self.master.ldap_connect() # pylint: disable=no-member
|
||||
conn = self.master.ldap_connect()
|
||||
dn = DN(('cn', 'NIS Server'), ('cn', 'plugins'), ('cn', 'config'))
|
||||
entry = conn.get_entry(dn) # pylint: disable=no-member
|
||||
entry = conn.get_entry(dn)
|
||||
nispluginstring = entry.get('nsslapd-pluginEnabled')
|
||||
assert 'on' in nispluginstring
|
||||
# restart for changes to take effect
|
||||
@ -1289,9 +1289,9 @@ class TestIPACommand(IntegrationTest):
|
||||
)
|
||||
assert msg in result.stdout_text
|
||||
# verify using backend
|
||||
conn = self.master.ldap_connect() # pylint: disable=no-member
|
||||
conn = self.master.ldap_connect()
|
||||
dn = DN(('cn', 'NIS Server'), ('cn', 'plugins'), ('cn', 'config'))
|
||||
entry = conn.get_entry(dn) # pylint: disable=no-member
|
||||
entry = conn.get_entry(dn)
|
||||
nispluginstring = entry.get('nsslapd-pluginEnabled')
|
||||
assert 'off' in nispluginstring
|
||||
# restart dirsrv for changes to take effect
|
||||
|
@ -757,7 +757,7 @@ class TestInstallMaster(IntegrationTest):
|
||||
related: https://pagure.io/freeipa/issue/8193
|
||||
"""
|
||||
conn = self.master.ldap_connect()
|
||||
entry = conn.get_entry(DN( # pylint: disable=no-member
|
||||
entry = conn.get_entry(DN(
|
||||
"cn=groups,cn=Schema Compatibility,cn=plugins,cn=config"))
|
||||
|
||||
entry_list = list(entry['schema-compat-entry-attribute'])
|
||||
|
@ -964,15 +964,15 @@ class TestIpaHealthCheck(IntegrationTest):
|
||||
dn = DN(
|
||||
("cn", "config"),
|
||||
)
|
||||
entry = ldap.get_entry(dn) # pylint: disable=no-member
|
||||
entry = ldap.get_entry(dn)
|
||||
entry.single_value["nsslapd-logging-hr-timestamps-enabled"] = 'off'
|
||||
ldap.update_entry(entry) # pylint: disable=no-member
|
||||
ldap.update_entry(entry)
|
||||
|
||||
yield
|
||||
|
||||
entry = ldap.get_entry(dn) # pylint: disable=no-member
|
||||
entry = ldap.get_entry(dn)
|
||||
entry.single_value["nsslapd-logging-hr-timestamps-enabled"] = 'on'
|
||||
ldap.update_entry(entry) # pylint: disable=no-member
|
||||
ldap.update_entry(entry)
|
||||
|
||||
def test_ipahealthcheck_ds_configcheck(self, update_logging):
|
||||
"""
|
||||
@ -1104,15 +1104,15 @@ class TestIpaHealthCheck(IntegrationTest):
|
||||
("cn", "plugins"),
|
||||
("cn", "config"),
|
||||
)
|
||||
entry = ldap.get_entry(dn) # pylint: disable=no-member
|
||||
entry = ldap.get_entry(dn)
|
||||
entry.single_value["referint-update-delay"] = -1
|
||||
ldap.update_entry(entry) # pylint: disable=no-member
|
||||
ldap.update_entry(entry)
|
||||
|
||||
yield
|
||||
|
||||
entry = ldap.get_entry(dn) # pylint: disable=no-member
|
||||
entry = ldap.get_entry(dn)
|
||||
entry.single_value["referint-update-delay"] = 0
|
||||
ldap.update_entry(entry) # pylint: disable=no-member
|
||||
ldap.update_entry(entry)
|
||||
|
||||
def test_ipahealthcheck_ds_riplugincheck(self, update_riplugin):
|
||||
"""
|
||||
@ -1140,15 +1140,15 @@ class TestIpaHealthCheck(IntegrationTest):
|
||||
"""
|
||||
ldap = self.master.ldap_connect()
|
||||
dn = DN(("cn", "config"),)
|
||||
entry = ldap.get_entry(dn) # pylint: disable=no-member
|
||||
entry = ldap.get_entry(dn)
|
||||
entry.single_value["nsslapd-rootpwstoragescheme"] = "MD5"
|
||||
ldap.update_entry(entry) # pylint: disable=no-member
|
||||
ldap.update_entry(entry)
|
||||
|
||||
yield
|
||||
|
||||
entry = ldap.get_entry(dn) # pylint: disable=no-member
|
||||
entry = ldap.get_entry(dn)
|
||||
entry.single_value["nsslapd-rootpwstoragescheme"] = "PBKDF2_SHA256"
|
||||
ldap.update_entry(entry) # pylint: disable=no-member
|
||||
ldap.update_entry(entry)
|
||||
|
||||
def test_ds_configcheck_passwordstorage(self, modify_pwdstoragescheme):
|
||||
"""
|
||||
@ -1563,16 +1563,16 @@ class TestIpaHealthCheckWithADtrust(IntegrationTest):
|
||||
("cn", "etc"),
|
||||
basedn,
|
||||
)
|
||||
entry = ldap.get_entry(dn) # pylint: disable=no-member
|
||||
entry = ldap.get_entry(dn)
|
||||
krbprinc = entry['member']
|
||||
entry['member'] = ''
|
||||
ldap.update_entry(entry) # pylint: disable=no-member
|
||||
ldap.update_entry(entry)
|
||||
|
||||
yield
|
||||
|
||||
# Add the entry back
|
||||
entry['member'] = krbprinc
|
||||
ldap.update_entry(entry) # pylint: disable=no-member
|
||||
ldap.update_entry(entry)
|
||||
|
||||
def test_trustcontroller_principalcheck(self, modify_cifs_princ):
|
||||
"""
|
||||
@ -2643,19 +2643,19 @@ class TestIpaHealthCheckWithExternalCA(IntegrationTest):
|
||||
"""
|
||||
ldap = self.master.ldap_connect()
|
||||
dn = DN(("uid", "ipara"), ("ou", "People"), ("o", "ipaca"))
|
||||
entry = ldap.get_entry(dn) # pylint: disable=no-member
|
||||
entry = ldap.get_entry(dn)
|
||||
ldap_cert_desc = entry.single_value.get("description")
|
||||
|
||||
def _update_entry(description):
|
||||
entry = ldap.get_entry(dn) # pylint: disable=no-member
|
||||
entry = ldap.get_entry(dn)
|
||||
entry.single_value['description'] = description
|
||||
ldap.update_entry(entry) # pylint: disable=no-member
|
||||
ldap.update_entry(entry)
|
||||
|
||||
yield _update_entry
|
||||
|
||||
entry = ldap.get_entry(dn) # pylint: disable=no-member
|
||||
entry = ldap.get_entry(dn)
|
||||
entry.single_value['description'] = ldap_cert_desc
|
||||
ldap.update_entry(entry) # pylint: disable=no-member
|
||||
ldap.update_entry(entry)
|
||||
|
||||
def test_ipahealthcheck_iparaagent_ldap(self, update_ra_cert_desc):
|
||||
"""
|
||||
|
@ -276,7 +276,7 @@ class TestOTPToken(IntegrationTest):
|
||||
This requires paramiko until the 2-prompt sshpass RFE is
|
||||
fulfilled: https://sourceforge.net/p/sshpass/feature-requests/5/
|
||||
"""
|
||||
if self.master.is_fips_mode: # pylint: disable=no-member
|
||||
if self.master.is_fips_mode:
|
||||
pytest.skip("paramiko is not compatible with FIPS mode")
|
||||
|
||||
master = self.master
|
||||
@ -323,7 +323,7 @@ class TestOTPToken(IntegrationTest):
|
||||
new_limit = 30
|
||||
conn = self.master.ldap_connect()
|
||||
dn = DN(('cn', 'config'))
|
||||
entry = conn.get_entry(dn) # pylint: disable=no-member
|
||||
entry = conn.get_entry(dn)
|
||||
orig_limit = entry.single_value.get('nsslapd-idletimeout')
|
||||
ldap_query = textwrap.dedent("""
|
||||
dn: cn=config
|
||||
|
@ -211,13 +211,11 @@ class TestSMB(IntegrationTest):
|
||||
\s*SID:\s*S-1-5-21-\d+-\d+-\d+\n
|
||||
\s+ID\ range:\s*\d+\s*-\s*\d+
|
||||
'''
|
||||
# pylint: disable=no-member
|
||||
ipa_regexp = domain_regexp_tpl.format(
|
||||
domain=re.escape(self.master.domain.name),
|
||||
netbios=self.master.netbios)
|
||||
ad_regexp = domain_regexp_tpl.format(
|
||||
domain=re.escape(self.ad.domain.name), netbios=self.ad.netbios)
|
||||
# pylint: enable=no-member
|
||||
output_regexp = r'''
|
||||
Discovered\ domains.*
|
||||
{}
|
||||
|
@ -221,7 +221,7 @@ class TestSSSDWithAdTrust(IntegrationTest):
|
||||
master = self.master
|
||||
conn = master.ldap_connect()
|
||||
dn = DN(('cn', 'config'))
|
||||
entry = conn.get_entry(dn) # pylint: disable=no-member
|
||||
entry = conn.get_entry(dn)
|
||||
orig_limit = entry.single_value.get('nsslapd-sizelimit')
|
||||
ldap_query = textwrap.dedent("""
|
||||
dn: cn=config
|
||||
|
@ -636,12 +636,12 @@ class TestTrust(BaseTestTrust):
|
||||
client.run_command(test_id)
|
||||
|
||||
conn = self.master.ldap_connect()
|
||||
entry = conn.get_entry(extdom_dn) # pylint: disable=no-member
|
||||
entry = conn.get_entry(extdom_dn)
|
||||
orig_extdom_timeout = entry.single_value.get('ipaextdommaxnsstimeout')
|
||||
|
||||
# set the extdom plugin timeout to 1s (1000)
|
||||
entry.single_value['ipaextdommaxnsstimeout'] = 1000
|
||||
conn.update_entry(entry) # pylint: disable=no-member
|
||||
conn.update_entry(entry)
|
||||
self.master.run_command(['ipactl', 'restart'])
|
||||
|
||||
with tasks.remote_sssd_config(self.master) as sssd_conf:
|
||||
@ -667,9 +667,9 @@ class TestTrust(BaseTestTrust):
|
||||
self.master.run_command('kill -CONT %s' % pid)
|
||||
# reconnect and set back to default extdom plugin
|
||||
conn = self.master.ldap_connect()
|
||||
entry = conn.get_entry(extdom_dn) # pylint: disable=no-member
|
||||
entry = conn.get_entry(extdom_dn)
|
||||
entry.single_value['ipaextdommaxnsstimeout'] = orig_extdom_timeout
|
||||
conn.update_entry(entry) # pylint: disable=no-member
|
||||
conn.update_entry(entry)
|
||||
tasks.restore_files(self.master)
|
||||
self.master.run_command(['ipactl', 'restart'])
|
||||
|
||||
|
@ -129,14 +129,14 @@ class TestUpgrade(IntegrationTest):
|
||||
ldap = self.master.ldap_connect()
|
||||
basedn = self.master.domain.basedn
|
||||
dn = DN(('cn', 'CAcert'), ('cn', 'ipa'), ('cn', 'etc'), basedn)
|
||||
entry = ldap.get_entry(dn) # pylint: disable=no-member
|
||||
entry = ldap.get_entry(dn)
|
||||
# Extract the certificate as DER then double-encode
|
||||
cacert = entry['cacertificate;binary'][0]
|
||||
cacert_der = cacert.public_bytes(serialization.Encoding.DER)
|
||||
cacert_b64 = base64.b64encode(cacert_der)
|
||||
# overwrite the value with double-encoded cert
|
||||
entry.single_value['cACertificate;binary'] = cacert_b64
|
||||
ldap.update_entry(entry) # pylint: disable=no-member
|
||||
ldap.update_entry(entry)
|
||||
|
||||
# try the upgrade
|
||||
self.master.run_command(['ipa-server-upgrade'])
|
||||
@ -144,7 +144,7 @@ class TestUpgrade(IntegrationTest):
|
||||
# reconnect to the master (upgrade stops 389-ds)
|
||||
ldap = self.master.ldap_connect()
|
||||
# read the value after upgrade, should be fixed
|
||||
entry = ldap.get_entry(dn) # pylint: disable=no-member
|
||||
entry = ldap.get_entry(dn)
|
||||
try:
|
||||
_cacert = entry['cacertificate;binary']
|
||||
except ValueError:
|
||||
|
@ -463,7 +463,7 @@ class test_Command(ClassChecker):
|
||||
api.finalize()
|
||||
o = my_cmd(api)
|
||||
o.finalize()
|
||||
e = o.get_default(**kw) # pylint: disable=not-callable
|
||||
e = o.get_default(**kw)
|
||||
assert type(e) is dict
|
||||
assert 'option2' in e
|
||||
assert e['option2'] == u'some value'
|
||||
|
@ -605,7 +605,7 @@ class test_Param(ClassChecker):
|
||||
|
||||
def __init__(self, name, **kw):
|
||||
# (Pylint complains because the superclass is unknowm)
|
||||
# pylint: disable=bad-super-call, super-on-old-class
|
||||
# pylint: disable=super-on-old-class
|
||||
self._convert_scalar = PassThrough()
|
||||
super(Str, self).__init__(name, **kw)
|
||||
|
||||
|
@ -390,7 +390,5 @@ class test_rpcclient_context(PluginTester):
|
||||
r'^ipa_session=MagBearerToken=[A-Za-z0-9+\/]+=*;$')
|
||||
|
||||
session_cookie = getattr(context, 'session_cookie', None)
|
||||
# pylint-2 is incorrectly spewing Too many positional arguments
|
||||
# pylint: disable=E1121
|
||||
unquoted = urllib.parse.unquote(session_cookie)
|
||||
assert(unquoted == fuzzy_cookie)
|
||||
|
@ -531,7 +531,6 @@ class TestRDN:
|
||||
def test_assignments(self):
|
||||
rdn = RDN((self.attr1, self.value1))
|
||||
with pytest.raises(TypeError):
|
||||
# pylint: disable=unsupported-assignment-operation
|
||||
rdn[0] = self.ava2
|
||||
|
||||
def test_iter(self):
|
||||
@ -997,10 +996,8 @@ class TestDN:
|
||||
def test_assignments(self):
|
||||
dn = DN('t=0,t=1,t=2,t=3,t=4,t=5,t=6,t=7,t=8,t=9')
|
||||
with pytest.raises(TypeError):
|
||||
# pylint: disable=unsupported-assignment-operation
|
||||
dn[0] = RDN('t=a')
|
||||
with pytest.raises(TypeError):
|
||||
# pylint: disable=unsupported-assignment-operation
|
||||
dn[0:1] = [RDN('t=a'), RDN('t=b')]
|
||||
|
||||
def test_iter(self):
|
||||
|
@ -44,8 +44,6 @@ class Unauthorized_HTTP_test:
|
||||
"""
|
||||
if params is not None:
|
||||
if self.content_type == 'application/x-www-form-urlencoded':
|
||||
# urlencode *can* take two arguments
|
||||
# pylint: disable=too-many-function-args
|
||||
params = urllib.parse.urlencode(params, True)
|
||||
url = 'https://' + self.host + self.app_uri
|
||||
|
||||
|
@ -48,7 +48,6 @@ class InstallerTestBase(six.with_metaclass(ABCMeta, object)):
|
||||
"class first.")
|
||||
|
||||
# add all options from the option groups
|
||||
# pylint: disable=no-member
|
||||
for opt_group in cls.tested_cls.option_parser.option_groups:
|
||||
for opt in opt_group.option_list:
|
||||
cls.OPTS_DICT[opt.dest] = opt._short_opts + opt._long_opts
|
||||
|
@ -106,7 +106,7 @@ class AutomountTest(XMLRPC_test):
|
||||
skipped=(),
|
||||
duplicatemaps=(),
|
||||
duplicatekeys=(),
|
||||
)), res) # pylint: disable=used-before-assignment
|
||||
)), res)
|
||||
self.check_tofiles()
|
||||
finally:
|
||||
res = api.Command['automountlocation_del'](self.locname)['result']
|
||||
|
@ -53,7 +53,7 @@ def test_exc_wrapper():
|
||||
# Test with one callback first
|
||||
|
||||
@test_callback.register_exc_callback
|
||||
def handle_exception( # pylint: disable=unused-variable
|
||||
def handle_exception(
|
||||
self, keys, options, e, call_func, *args, **kwargs):
|
||||
assert args == (1, 2)
|
||||
assert kwargs == dict(a=1, b=2)
|
||||
@ -151,7 +151,7 @@ def test_exc_callback_registration():
|
||||
pass
|
||||
|
||||
@callbacktest_subclass.register_exc_callback
|
||||
def exc_callback( # pylint: disable=unused-variable
|
||||
def exc_callback(
|
||||
self, keys, options, exc, call_func, *args, **kwargs):
|
||||
"""Subclass's private exception callback"""
|
||||
messages.append('Subclass registered callback')
|
||||
@ -165,7 +165,7 @@ def test_exc_callback_registration():
|
||||
|
||||
|
||||
@callbacktest_base.register_exc_callback
|
||||
def exc_callback_2( # pylint: disable=unused-variable
|
||||
def exc_callback_2(
|
||||
self, keys, options, exc, call_func, *args, **kwargs):
|
||||
"""Callback on super class; doesn't affect the subclass"""
|
||||
messages.append('Superclass registered callback')
|
||||
|
Loading…
Reference in New Issue
Block a user