Pylint: remove unused variables in ipaserver package

Reviewed-By: Stanislav Laznicka <slaznick@redhat.com>
This commit is contained in:
Martin Basti
2016-10-04 20:02:32 +02:00
parent d937588146
commit 135047d03c
14 changed files with 78 additions and 124 deletions

View File

@@ -63,8 +63,6 @@ from ipaplatform.paths import paths
from ldap.filter import escape_filter_chars from ldap.filter import escape_filter_chars
from time import sleep from time import sleep
# pylint: disable=unused-variable
if six.PY3: if six.PY3:
unicode = str unicode = str
long = int long = int
@@ -220,7 +218,7 @@ class DomainValidator(object):
self.sid = entry_attrs[self.ATTR_SID][0] self.sid = entry_attrs[self.ATTR_SID][0]
self.dn = entry_attrs.dn self.dn = entry_attrs.dn
self.domain = self.api.env.domain self.domain = self.api.env.domain
except errors.NotFound as e: except errors.NotFound:
return False return False
return True return True
@@ -236,7 +234,7 @@ class DomainValidator(object):
search_kw = {'objectClass': 'ipaNTTrustedDomain'} search_kw = {'objectClass': 'ipaNTTrustedDomain'}
filter = self.ldap.make_filter(search_kw, filter = self.ldap.make_filter(search_kw,
rules=self.ldap.MATCH_ALL) rules=self.ldap.MATCH_ALL)
(entries, truncated) = self.ldap.find_entries( entries, _truncated = self.ldap.find_entries(
filter=filter, filter=filter,
base_dn=cn_trust, base_dn=cn_trust,
attrs_list=[self.ATTR_TRUSTED_SID, attrs_list=[self.ATTR_TRUSTED_SID,
@@ -438,7 +436,7 @@ class DomainValidator(object):
try: try:
test_sid = security.dom_sid(sid) test_sid = security.dom_sid(sid)
return unicode(test_sid) return unicode(test_sid)
except TypeError as e: except TypeError:
raise errors.ValidationError(name=_('trusted domain object'), raise errors.ValidationError(name=_('trusted domain object'),
error=_('Trusted domain did not ' error=_('Trusted domain did not '
'return a valid SID for ' 'return a valid SID for '
@@ -756,7 +754,7 @@ class DomainValidator(object):
if self._admin_creds: if self._admin_creds:
(ccache_name, (ccache_name,
principal) = self.kinit_as_administrator(info['dns_domain']) _principal) = self.kinit_as_administrator(info['dns_domain'])
if ccache_name: if ccache_name:
with ipautil.private_ccache(path=ccache_name): with ipautil.private_ccache(path=ccache_name):
@@ -909,9 +907,9 @@ class TrustDomainInstance(object):
self._pipe = self.__gen_lsa_connection(binding) self._pipe = self.__gen_lsa_connection(binding)
if self._pipe and self._pipe.session_key: if self._pipe and self._pipe.session_key:
break break
except errors.ACIError as e: except errors.ACIError:
attempts = attempts + 1 attempts = attempts + 1
except RuntimeError as e: except RuntimeError:
# When session key is not available, we just skip this binding # When session key is not available, we just skip this binding
session_attempts = session_attempts + 1 session_attempts = session_attempts + 1
@@ -976,7 +974,7 @@ class TrustDomainInstance(object):
conn.set_option(_ldap.OPT_SERVER_CONTROLS, [ExtendedDNControl()]) conn.set_option(_ldap.OPT_SERVER_CONTROLS, [ExtendedDNControl()])
search_result = None search_result = None
try: try:
(objtype, res) = conn.search_s('', _ldap.SCOPE_BASE)[0] _objtype, res = conn.search_s('', _ldap.SCOPE_BASE)[0]
search_result = res['defaultNamingContext'][0] search_result = res['defaultNamingContext'][0]
self.info['dns_hostname'] = res['dnsHostName'][0] self.info['dns_hostname'] = res['dnsHostName'][0]
except _ldap.LDAPError as e: except _ldap.LDAPError as e:
@@ -1426,25 +1424,6 @@ class TrustDomainInstance(object):
def fetch_domains(api, mydomain, trustdomain, creds=None, server=None): def fetch_domains(api, mydomain, trustdomain, creds=None, server=None):
trust_flags = dict(
NETR_TRUST_FLAG_IN_FOREST=0x00000001,
NETR_TRUST_FLAG_OUTBOUND=0x00000002,
NETR_TRUST_FLAG_TREEROOT=0x00000004,
NETR_TRUST_FLAG_PRIMARY=0x00000008,
NETR_TRUST_FLAG_NATIVE=0x00000010,
NETR_TRUST_FLAG_INBOUND=0x00000020,
NETR_TRUST_FLAG_MIT_KRB5=0x00000080,
NETR_TRUST_FLAG_AES=0x00000100)
trust_attributes = dict(
NETR_TRUST_ATTRIBUTE_NON_TRANSITIVE=0x00000001,
NETR_TRUST_ATTRIBUTE_UPLEVEL_ONLY=0x00000002,
NETR_TRUST_ATTRIBUTE_QUARANTINED_DOMAIN=0x00000004,
NETR_TRUST_ATTRIBUTE_FOREST_TRANSITIVE=0x00000008,
NETR_TRUST_ATTRIBUTE_CROSS_ORGANIZATION=0x00000010,
NETR_TRUST_ATTRIBUTE_WITHIN_FOREST=0x00000020,
NETR_TRUST_ATTRIBUTE_TREAT_AS_EXTERNAL=0x00000040)
def communicate(td): def communicate(td):
td.init_lsa_pipe(td.info['dc']) td.init_lsa_pipe(td.info['dc'])
netr_pipe = netlogon.netlogon(td.binding, td.parm, td.creds) netr_pipe = netlogon.netlogon(td.binding, td.parm, td.creds)
@@ -1492,12 +1471,12 @@ def fetch_domains(api, mydomain, trustdomain, creds=None, server=None):
# or as passed-in user in case of a one-way trust # or as passed-in user in case of a one-way trust
domval = DomainValidator(api) domval = DomainValidator(api)
ccache_name = None ccache_name = None
principal = None
if creds: if creds:
domval._admin_creds = creds domval._admin_creds = creds
(ccache_name, principal) = domval.kinit_as_administrator(trustdomain) ccache_name, _principal = domval.kinit_as_administrator(
trustdomain)
else: else:
(ccache_name, principal) = domval.kinit_as_http(trustdomain) ccache_name, _principal = domval.kinit_as_http(trustdomain)
td.creds = credentials.Credentials() td.creds = credentials.Credentials()
td.creds.set_kerberos_state(credentials.MUST_USE_KERBEROS) td.creds.set_kerberos_state(credentials.MUST_USE_KERBEROS)
if ccache_name: if ccache_name:
@@ -1683,7 +1662,7 @@ class TrustDomainJoins(object):
self.remote_domain.establish_trust(self.local_domain, self.remote_domain.establish_trust(self.local_domain,
trustdom_pass, trustdom_pass,
trust_type, trust_external) trust_type, trust_external)
except TrustTopologyConflictSolved as e: except TrustTopologyConflictSolved:
# we solved topology conflict, retry again # we solved topology conflict, retry again
self.remote_domain.establish_trust(self.local_domain, self.remote_domain.establish_trust(self.local_domain,
trustdom_pass, trustdom_pass,

View File

@@ -132,8 +132,6 @@ from .baseldap import gen_pkey_only_option, pkey_to_value
from ipapython.ipa_log_manager import root_logger from ipapython.ipa_log_manager import root_logger
from ipapython.dn import DN from ipapython.dn import DN
# pylint: disable=unused-variable
if six.PY3: if six.PY3:
unicode = str unicode = str
@@ -293,7 +291,7 @@ def _make_aci(ldap, current, aciname, kw):
if kw['filter'] in ('', None, u''): if kw['filter'] in ('', None, u''):
raise errors.BadSearchFilter(info=_('empty filter')) raise errors.BadSearchFilter(info=_('empty filter'))
try: try:
entries = ldap.find_entries(filter=kw['filter']) ldap.find_entries(filter=kw['filter'])
except errors.NotFound: except errors.NotFound:
pass pass
a.set_target_filter(kw['filter']) a.set_target_filter(kw['filter'])
@@ -334,7 +332,7 @@ def _aci_to_kw(ldap, a, test=False, pkey_only=False):
if 'targetfilter' in a.target: if 'targetfilter' in a.target:
target = a.target['targetfilter']['expression'] target = a.target['targetfilter']['expression']
if target.startswith('(memberOf=') or target.startswith('memberOf='): if target.startswith('(memberOf=') or target.startswith('memberOf='):
(junk, memberof) = target.split('memberOf=', 1) _junk, memberof = target.split('memberOf=', 1)
memberof = DN(memberof) memberof = DN(memberof)
kw['memberof'] = memberof['cn'] kw['memberof'] = memberof['cn']
else: else:
@@ -394,7 +392,7 @@ def _convert_strings_to_acis(acistrs):
for a in acistrs: for a in acistrs:
try: try:
acis.append(ACI(a)) acis.append(ACI(a))
except SyntaxError as e: except SyntaxError:
root_logger.warning("Failed to parse: %s" % a) root_logger.warning("Failed to parse: %s" % a)
return acis return acis
@@ -946,7 +944,7 @@ class aci_rename(crud.Update):
aci = _find_aci_by_name(acis, kw['aciprefix'], aciname) aci = _find_aci_by_name(acis, kw['aciprefix'], aciname)
for a in acis: for a in acis:
prefix, name = _parse_aci_name(a.name) prefix, _name = _parse_aci_name(a.name)
if _make_aci_name(prefix, kw['newname']) == a.name: if _make_aci_name(prefix, kw['newname']) == a.name:
raise errors.DuplicateEntry() raise errors.DuplicateEntry()

View File

@@ -39,8 +39,6 @@ from ipalib.messages import add_message, SearchResultTruncated
from ipapython.dn import DN from ipapython.dn import DN
from ipapython.version import API_VERSION from ipapython.version import API_VERSION
# pylint: disable=unused-variable
if six.PY3: if six.PY3:
unicode = str unicode = str
@@ -745,7 +743,7 @@ class LDAPObject(Object):
for (pwattr, attr) in self.password_attributes: for (pwattr, attr) in self.password_attributes:
search_filter = '(%s=*)' % pwattr search_filter = '(%s=*)' % pwattr
try: try:
(entries, truncated) = ldap.find_entries( ldap.find_entries(
search_filter, [pwattr], dn, ldap.SCOPE_BASE search_filter, [pwattr], dn, ldap.SCOPE_BASE
) )
entry_attrs[attr] = True entry_attrs[attr] = True
@@ -800,10 +798,10 @@ class LDAPObject(Object):
attrs = self.api.Backend.ldap2.schema.attribute_types(objectclasses) attrs = self.api.Backend.ldap2.schema.attribute_types(objectclasses)
attrlist = [] attrlist = []
# Go through the MUST first # Go through the MUST first
for (oid, attr) in attrs[0].items(): for attr in attrs[0].values():
attrlist.append(attr.names[0].lower()) attrlist.append(attr.names[0].lower())
# And now the MAY # And now the MAY
for (oid, attr) in attrs[1].items(): for attr in attrs[1].values():
attrlist.append(attr.names[0].lower()) attrlist.append(attr.names[0].lower())
json_dict['aciattrs'] = attrlist json_dict['aciattrs'] = attrlist
attrlist.sort() attrlist.sort()
@@ -846,7 +844,7 @@ def _check_limit_object_class(attributes, attrs, allow_only):
return return
limitattrs = deepcopy(attrs) limitattrs = deepcopy(attrs)
# Go through the MUST first # Go through the MUST first
for (oid, attr) in attributes[0].items(): for attr in attributes[0].values():
if attr.names[0].lower() in limitattrs: if attr.names[0].lower() in limitattrs:
if not allow_only: if not allow_only:
raise errors.ObjectclassViolation( raise errors.ObjectclassViolation(
@@ -854,7 +852,7 @@ def _check_limit_object_class(attributes, attrs, allow_only):
attribute=attr.names[0].lower())) attribute=attr.names[0].lower()))
limitattrs.remove(attr.names[0].lower()) limitattrs.remove(attr.names[0].lower())
# And now the MAY # And now the MAY
for (oid, attr) in attributes[1].items(): for attr in attributes[1].values():
if attr.names[0].lower() in limitattrs: if attr.names[0].lower() in limitattrs:
if not allow_only: if not allow_only:
raise errors.ObjectclassViolation( raise errors.ObjectclassViolation(

View File

@@ -53,8 +53,6 @@ from ipapython.dn import DN
from ipapython.ipa_log_manager import root_logger from ipapython.ipa_log_manager import root_logger
from ipaserver.plugins.service import normalize_principal, validate_realm from ipaserver.plugins.service import normalize_principal, validate_realm
# pylint: disable=unused-variable
if six.PY3: if six.PY3:
unicode = str unicode = str
@@ -175,7 +173,7 @@ def validate_csr(ugettext, csr):
if csr and os.path.exists(csr): if csr and os.path.exists(csr):
return return
try: try:
request = pkcs10.load_certificate_request(csr) pkcs10.load_certificate_request(csr)
except (TypeError, binascii.Error) as e: except (TypeError, binascii.Error) as e:
raise errors.Base64DecodeError(reason=str(e)) raise errors.Base64DecodeError(reason=str(e))
except Exception as e: except Exception as e:
@@ -415,11 +413,11 @@ class BaseCertObject(Object):
except KeyError: except KeyError:
general_names = [] general_names = []
for name_type, desc, name, der_name in general_names: for name_type, _desc, name, der_name in general_names:
try: try:
self._add_san_attribute( self._add_san_attribute(
obj, full, name_type, name, der_name) obj, full, name_type, name, der_name)
except Exception as e: except Exception:
# Invalid GeneralName (i.e. not a valid X.509 cert); # Invalid GeneralName (i.e. not a valid X.509 cert);
# don't fail but log something about it # don't fail but log something about it
root_logger.warning( root_logger.warning(
@@ -687,7 +685,7 @@ class cert_request(Create, BaseCertMethod, VirtualCommand):
"to the 'userCertificate' attribute of entry '%s'.") % dn) "to the 'userCertificate' attribute of entry '%s'.") % dn)
# Validate the subject alt name, if any # Validate the subject alt name, if any
for name_type, desc, name, der_name in subjectaltname: for name_type, desc, name, _der_name in subjectaltname:
if name_type == nss.certDNSName: if name_type == nss.certDNSName:
name = unicode(name) name = unicode(name)
alt_principal = None alt_principal = None

View File

@@ -85,8 +85,6 @@ from ipaserver.dns_data_management import (
IPADomainIsNotManagedByIPAError, IPADomainIsNotManagedByIPAError,
) )
# pylint: disable=unused-variable
if six.PY3: if six.PY3:
unicode = str unicode = str
@@ -392,7 +390,7 @@ def _validate_ip6addr(ugettext, ipaddr):
def _validate_ipnet(ugettext, ipnet): def _validate_ipnet(ugettext, ipnet):
try: try:
net = netaddr.IPNetwork(ipnet) netaddr.IPNetwork(ipnet)
except (netaddr.AddrFormatError, ValueError, UnboundLocalError): except (netaddr.AddrFormatError, ValueError, UnboundLocalError):
return _('invalid IP network format') return _('invalid IP network format')
return None return None
@@ -1911,8 +1909,9 @@ def _add_warning_fw_zone_is_not_effective(api, result, fwzone, version):
""" """
Adds warning message to result, if required Adds warning message to result, if required
""" """
authoritative_zone, truncated = \ (
_get_zone_which_makes_fw_zone_ineffective(api, fwzone) authoritative_zone, _truncated
) = _get_zone_which_makes_fw_zone_ineffective(api, fwzone)
if authoritative_zone: if authoritative_zone:
# forward zone is not effective and forwarding will not work # forward zone is not effective and forwarding will not work
messages.add_message( messages.add_message(
@@ -2639,7 +2638,7 @@ class dnszone(DNSZoneBase):
not effective not effective
""" """
zone = keys[-1] zone = keys[-1]
affected_fw_zones, truncated = _find_subtree_forward_zones_ldap( affected_fw_zones, _truncated = _find_subtree_forward_zones_ldap(
self.api, zone, child_zones_only=True) self.api, zone, child_zones_only=True)
if not affected_fw_zones: if not affected_fw_zones:
return return
@@ -2863,8 +2862,8 @@ class dnszone_find(DNSZoneBase_find):
def pre_callback(self, ldap, filter, attrs_list, base_dn, scope, *args, **options): def pre_callback(self, ldap, filter, attrs_list, base_dn, scope, *args, **options):
assert isinstance(base_dn, DN) assert isinstance(base_dn, DN)
filter, base, dn = super(dnszone_find, self).pre_callback(ldap, filter, filter, _base, _scope = super(dnszone_find, self).pre_callback(
attrs_list, base_dn, scope, *args, **options) ldap, filter, attrs_list, base_dn, scope, *args, **options)
if options.get('forward_only', False): if options.get('forward_only', False):
search_kw = {} search_kw = {}
@@ -3446,7 +3445,7 @@ class dnsrecord(LDAPObject):
if not record_name_absolute.is_absolute(): if not record_name_absolute.is_absolute():
record_name_absolute = record_name_absolute.derelativize(zone) record_name_absolute = record_name_absolute.derelativize(zone)
affected_fw_zones, truncated = _find_subtree_forward_zones_ldap( affected_fw_zones, _truncated = _find_subtree_forward_zones_ldap(
self.api, record_name_absolute) self.api, record_name_absolute)
if not affected_fw_zones: if not affected_fw_zones:
return return

View File

@@ -259,8 +259,6 @@ if api.env.in_server:
import pki.crypto as cryptoutil import pki.crypto as cryptoutil
from pki.kra import KRAClient from pki.kra import KRAClient
# pylint: disable=unused-variable
if six.PY3: if six.PY3:
unicode = str unicode = str
@@ -1162,7 +1160,7 @@ def host_has_service(host, ldap2, service='CA'):
} }
query_filter = ldap2.make_filter(filter_attrs, rules='&') query_filter = ldap2.make_filter(filter_attrs, rules='&')
try: try:
ent, trunc = ldap2.find_entries(filter=query_filter, base_dn=base_dn) ent, _trunc = ldap2.find_entries(filter=query_filter, base_dn=base_dn)
if len(ent): if len(ent):
return True return True
except Exception: except Exception:
@@ -1186,7 +1184,7 @@ def select_any_master(ldap2, service='CA'):
'ipaConfigString': 'enabledService',} 'ipaConfigString': 'enabledService',}
query_filter = ldap2.make_filter(filter_attrs, rules='&') query_filter = ldap2.make_filter(filter_attrs, rules='&')
try: try:
ent, trunc = ldap2.find_entries(filter=query_filter, base_dn=base_dn) ent, _trunc = ldap2.find_entries(filter=query_filter, base_dn=base_dn)
if len(ent): if len(ent):
entry = random.choice(ent) entry = random.choice(ent)
return entry.dn[1].value return entry.dn[1].value
@@ -1285,7 +1283,7 @@ class RestClient(Backend):
"""Log into the REST API""" """Log into the REST API"""
if self.cookie is not None: if self.cookie is not None:
return return
status, resp_headers, resp_body = dogtag.https_request( status, resp_headers, _resp_body = dogtag.https_request(
self.ca_host, self.override_port or self.env.ca_agent_port, self.ca_host, self.override_port or self.env.ca_agent_port,
'/ca/rest/account/login', '/ca/rest/account/login',
self.sec_dir, self.password, self.ipa_certificate_nickname, self.sec_dir, self.password, self.ipa_certificate_nickname,
@@ -1485,11 +1483,12 @@ class ra(rabase.rabase, RestClient):
self.debug('%s.check_request_status()', type(self).__name__) self.debug('%s.check_request_status()', type(self).__name__)
# Call CMS # Call CMS
http_status, http_headers, http_body = \ http_status, _http_headers, http_body = (
self._request('/ca/ee/ca/checkRequest', self._request('/ca/ee/ca/checkRequest',
self.env.ca_port, self.env.ca_port,
requestId=request_id, requestId=request_id,
xml='true') xml='true')
)
# Parse and handle errors # Parse and handle errors
if http_status != 200: if http_status != 200:
@@ -1570,11 +1569,12 @@ class ra(rabase.rabase, RestClient):
serial_number = int(serial_number, 0) serial_number = int(serial_number, 0)
# Call CMS # Call CMS
http_status, http_headers, http_body = \ http_status, _http_headers, http_body = (
self._sslget('/ca/agent/ca/displayBySerial', self._sslget('/ca/agent/ca/displayBySerial',
self.env.ca_agent_port, self.env.ca_agent_port,
serialNumber=str(serial_number), serialNumber=str(serial_number),
xml='true') xml='true')
)
# Parse and handle errors # Parse and handle errors
@@ -1654,7 +1654,7 @@ class ra(rabase.rabase, RestClient):
if ca_id: if ca_id:
path += '?issuer-id={}'.format(ca_id) path += '?issuer-id={}'.format(ca_id)
http_status, http_headers, http_body = self._ssldo( _http_status, _http_headers, http_body = self._ssldo(
'POST', path, 'POST', path,
headers={ headers={
'Content-Type': 'application/xml', 'Content-Type': 'application/xml',
@@ -1728,7 +1728,7 @@ class ra(rabase.rabase, RestClient):
serial_number = int(serial_number, 0) serial_number = int(serial_number, 0)
# Call CMS # Call CMS
http_status, http_headers, http_body = \ http_status, _http_headers, http_body = \
self._sslget('/ca/agent/ca/doRevoke', self._sslget('/ca/agent/ca/doRevoke',
self.env.ca_agent_port, self.env.ca_agent_port,
op='revoke', op='revoke',
@@ -1788,11 +1788,12 @@ class ra(rabase.rabase, RestClient):
serial_number = int(serial_number, 0) serial_number = int(serial_number, 0)
# Call CMS # Call CMS
http_status, http_headers, http_body = \ http_status, _http_headers, http_body = (
self._sslget('/ca/agent/ca/doUnrevoke', self._sslget('/ca/agent/ca/doUnrevoke',
self.env.ca_agent_port, self.env.ca_agent_port,
serialNumber=str(serial_number), serialNumber=str(serial_number),
xml='true') xml='true')
)
# Parse and handle errors # Parse and handle errors
if http_status != 200: if http_status != 200:
@@ -2050,7 +2051,7 @@ class ra_certprofile(RestClient):
""" """
Read the profile configuration from Dogtag Read the profile configuration from Dogtag
""" """
status, resp_headers, resp_body = self._ssldo( _status, _resp_headers, resp_body = self._ssldo(
'GET', profile_id + '/raw') 'GET', profile_id + '/raw')
return resp_body return resp_body
@@ -2103,7 +2104,7 @@ class ra_lightweight_ca(RestClient):
""" """
assert isinstance(dn, DN) assert isinstance(dn, DN)
status, resp_headers, resp_body = self._ssldo( _status, _resp_headers, resp_body = self._ssldo(
'POST', None, 'POST', None,
headers={ headers={
'Content-type': 'application/json', 'Content-type': 'application/json',
@@ -2117,7 +2118,7 @@ class ra_lightweight_ca(RestClient):
raise errors.RemoteRetrieveError(reason=_("Response from CA was not valid JSON")) raise errors.RemoteRetrieveError(reason=_("Response from CA was not valid JSON"))
def read_ca(self, ca_id): def read_ca(self, ca_id):
status, resp_headers, resp_body = self._ssldo( _status, _resp_headers, resp_body = self._ssldo(
'GET', ca_id, headers={'Accept': 'application/json'}) 'GET', ca_id, headers={'Accept': 'application/json'})
try: try:
return json.loads(resp_body) return json.loads(resp_body)

View File

@@ -26,8 +26,6 @@ from ipalib import api, Int, Str, StrEnum, _, ngettext
from ipalib import errors from ipalib import errors
from ipapython.dn import DN from ipapython.dn import DN
# pylint: disable=unused-variable
if six.PY3: if six.PY3:
unicode = str unicode = str
@@ -312,7 +310,7 @@ class idrange(LDAPObject):
"&") "&")
try: try:
(objects, truncated) = ldap.find_entries(filter=id_filter, ldap.find_entries(filter=id_filter,
attrs_list=['uid', 'cn'], attrs_list=['uid', 'cn'],
base_dn=DN(api.env.container_accounts, api.env.basedn)) base_dn=DN(api.env.container_accounts, api.env.basedn))
except errors.NotFound: except errors.NotFound:
@@ -555,7 +553,7 @@ class idrange_del(LDAPDelete):
'(ipanttrusteddomainsid=%s))' % range_sid) '(ipanttrusteddomainsid=%s))' % range_sid)
try: try:
(trust_domains, truncated) = ldap.find_entries( trust_domains, _truncated = ldap.find_entries(
base_dn=DN(api.env.container_trusts, api.env.basedn), base_dn=DN(api.env.container_trusts, api.env.basedn),
filter=domain_filter) filter=domain_filter)
except errors.NotFound: except errors.NotFound:

View File

@@ -40,8 +40,6 @@ from ipalib.util import (normalize_sshpubkey, validate_sshpubkey,
from ipapython.dn import DN from ipapython.dn import DN
# pylint: disable=unused-variable
if six.PY3: if six.PY3:
unicode = str unicode = str
@@ -201,7 +199,7 @@ class idview_show(LDAPRetrieve):
attr_name = obj_type + 'overrides' attr_name = obj_type + 'overrides'
try: try:
(overrides, truncated) = ldap.find_entries( overrides, _truncated = ldap.find_entries(
filter="objectclass=%s" % objectclass, filter="objectclass=%s" % objectclass,
attrs_list=['ipaanchoruuid'], attrs_list=['ipaanchoruuid'],
base_dn=dn, base_dn=dn,
@@ -236,7 +234,7 @@ class idview_show(LDAPRetrieve):
} }
try: try:
(hosts, truncated) = ldap.find_entries( hosts, _truncated = ldap.find_entries(
filter=ldap.make_filter(filter_params, rules=ldap.MATCH_ALL), filter=ldap.make_filter(filter_params, rules=ldap.MATCH_ALL),
attrs_list=['cn'], attrs_list=['cn'],
base_dn=api.env.container_host + api.env.basedn, base_dn=api.env.container_host + api.env.basedn,
@@ -626,7 +624,7 @@ def remove_ipaobject_overrides(ldap, api, dn):
override_filter = '(ipaanchoruuid=:IPA:{0}:{1})'.format(api.env.domain, override_filter = '(ipaanchoruuid=:IPA:{0}:{1})'.format(api.env.domain,
object_uuid) object_uuid)
try: try:
entries, truncated = ldap.find_entries( entries, _truncated = ldap.find_entries(
override_filter, override_filter,
base_dn=DN(api.env.container_views, api.env.basedn), base_dn=DN(api.env.container_views, api.env.basedn),
paged_search=True paged_search=True

View File

@@ -40,8 +40,6 @@ from ipapython.kerberos import Principal
import datetime import datetime
from ipaplatform.paths import paths from ipaplatform.paths import paths
# pylint: disable=unused-variable
if six.PY3: if six.PY3:
unicode = str unicode = str
@@ -186,7 +184,6 @@ def _pre_migrate_user(ldap, pkey, dn, entry_attrs, failed, config, ctx, **kwargs
attr_blacklist = ['krbprincipalkey','memberofindirect','memberindirect'] attr_blacklist = ['krbprincipalkey','memberofindirect','memberindirect']
attr_blacklist.extend(kwargs.get('attr_blacklist', [])) attr_blacklist.extend(kwargs.get('attr_blacklist', []))
ds_ldap = ctx['ds_ldap'] ds_ldap = ctx['ds_ldap']
has_upg = ctx['has_upg']
search_bases = kwargs.get('search_bases', None) search_bases = kwargs.get('search_bases', None)
valid_gids = kwargs['valid_gids'] valid_gids = kwargs['valid_gids']
invalid_gids = kwargs['invalid_gids'] invalid_gids = kwargs['invalid_gids']
@@ -318,8 +315,8 @@ def _update_default_group(ldap, ctx, force):
s = datetime.datetime.now() s = datetime.datetime.now()
searchfilter = "(&(objectclass=posixAccount)(!(memberof=%s)))" % group_dn searchfilter = "(&(objectclass=posixAccount)(!(memberof=%s)))" % group_dn
try: try:
(result, truncated) = ldap.find_entries(searchfilter, result, _truncated = ldap.find_entries(
[''], DN(api.env.container_user, api.env.basedn), searchfilter, [''], DN(api.env.container_user, api.env.basedn),
scope=ldap.SCOPE_SUBTREE, time_limit=-1, size_limit=-1) scope=ldap.SCOPE_SUBTREE, time_limit=-1, size_limit=-1)
except errors.NotFound: except errors.NotFound:
api.log.debug('All users have default group set') api.log.debug('All users have default group set')
@@ -915,7 +912,7 @@ migration process might be incomplete\n''')
if not ds_base_dn: if not ds_base_dn:
# retrieve base DN from remote LDAP server # retrieve base DN from remote LDAP server
entries, truncated = ds_ldap.find_entries( entries, _truncated = ds_ldap.find_entries(
'', ['namingcontexts', 'defaultnamingcontext'], DN(''), '', ['namingcontexts', 'defaultnamingcontext'], DN(''),
ds_ldap.SCOPE_BASE, size_limit=-1, time_limit=0, ds_ldap.SCOPE_BASE, size_limit=-1, time_limit=0,
) )

View File

@@ -33,8 +33,6 @@ from ipalib.aci import ACI
from ipapython.dn import DN from ipapython.dn import DN
from ipalib.request import context from ipalib.request import context
# pylint: disable=unused-variable
if six.PY3: if six.PY3:
unicode = str unicode = str
@@ -490,7 +488,7 @@ class permission(baseldap.LDAPObject):
if options.get('raw'): if options.get('raw'):
# Retreive the ACI from LDAP to ensure we get the real thing # Retreive the ACI from LDAP to ensure we get the real thing
try: try:
acientry, acistring = self._get_aci_entry_and_string(entry) _acientry, acistring = self._get_aci_entry_and_string(entry)
except errors.NotFound: except errors.NotFound:
if list(entry.get('ipapermissiontype')) == ['SYSTEM']: if list(entry.get('ipapermissiontype')) == ['SYSTEM']:
# SYSTEM permissions don't have normal ACIs # SYSTEM permissions don't have normal ACIs
@@ -1317,7 +1315,6 @@ class permission_find(baseldap.LDAPSearch):
root_entry = ldap.get_entry(DN(api.env.basedn), ['aci']) root_entry = ldap.get_entry(DN(api.env.basedn), ['aci'])
except errors.NotFound: except errors.NotFound:
legacy_entries = () legacy_entries = ()
cached_root_entry = None
self.log.debug('potential legacy entries: %s', len(legacy_entries)) self.log.debug('potential legacy entries: %s', len(legacy_entries))
nonlegacy_names = {e.single_value['cn'] for e in entries} nonlegacy_names = {e.single_value['cn'] for e in entries}
for entry in legacy_entries: for entry in legacy_entries:

View File

@@ -50,8 +50,6 @@ from ipaplatform.paths import paths
from ipapython.ipautil import ipa_generate_password, GEN_TMP_PWD_LEN from ipapython.ipautil import ipa_generate_password, GEN_TMP_PWD_LEN
from ipalib.capabilities import client_has_capability from ipalib.capabilities import client_has_capability
# pylint: disable=unused-variable
if six.PY3: if six.PY3:
unicode = str unicode = str
@@ -385,7 +383,6 @@ class stageuser_add(baseuser_add):
def post_callback(self, ldap, dn, entry_attrs, *keys, **options): def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
assert isinstance(dn, DN) assert isinstance(dn, DN)
config = ldap.get_ipa_config()
# Fetch the entry again to update memberof, mep data, etc updated # Fetch the entry again to update memberof, mep data, etc updated
# at the end of the transaction. # at the end of the transaction.
@@ -639,7 +636,9 @@ class stageuser_activate(LDAPQuery):
# Check that this value is a Active user # Check that this value is a Active user
try: try:
entry_attrs = self._exc_wrapper(args, options, ldap.get_entry)(value, ['dn']) self._exc_wrapper(args, options, ldap.get_entry)(
value, ['dn']
)
return value return value
except errors.NotFound: except errors.NotFound:
return u'' return u''
@@ -667,10 +666,9 @@ class stageuser_activate(LDAPQuery):
# Check it does not exist an active entry with the same RDN # Check it does not exist an active entry with the same RDN
active_dn = DN(staging_dn[0], api.env.container_user, api.env.basedn) active_dn = DN(staging_dn[0], api.env.container_user, api.env.basedn)
try: try:
test_entry_attrs = self._exc_wrapper(args, options, ldap.get_entry)( self._exc_wrapper(args, options, ldap.get_entry)(
active_dn, ['dn'] active_dn, ['dn']
) )
assert isinstance(staging_dn, DN)
raise errors.DuplicateEntry( raise errors.DuplicateEntry(
message=_('active user with name "%(user)s" already exists') % message=_('active user with name "%(user)s" already exists') %
dict(user=args[-1])) dict(user=args[-1]))

View File

@@ -45,8 +45,6 @@ from ipalib import output
from ldap import SCOPE_SUBTREE from ldap import SCOPE_SUBTREE
from time import sleep from time import sleep
# pylint: disable=unused-variable
if six.PY3: if six.PY3:
unicode = str unicode = str
@@ -228,7 +226,7 @@ def find_adtrust_masters(ldap, api):
""" """
try: try:
entries, truncated = ldap.find_entries( entries, _truncated = ldap.find_entries(
"cn=ADTRUST", "cn=ADTRUST",
base_dn=api.env.container_masters + api.env.basedn base_dn=api.env.container_masters + api.env.basedn
) )
@@ -374,7 +372,7 @@ def add_range(myapi, trustinstance, range_name, dom_sid, *keys, **options):
domain_validator._admin_creds = creds domain_validator._admin_creds = creds
# KDC might not get refreshed data at the first time, # KDC might not get refreshed data at the first time,
# retry several times # retry several times
for retry in range(10): for _retry in range(10):
info_list = domain_validator.search_in_dc(domain, info_list = domain_validator.search_in_dc(domain,
info_filter, info_filter,
None, None,
@@ -619,7 +617,7 @@ class trust(LDAPObject):
ldap = self.api.Backend.ldap2 ldap = self.api.Backend.ldap2
try: try:
entries, truncated = ldap.find_entries( entries, _truncated = ldap.find_entries(
base_dn=DN(self.api.env.container_adtrusts, base_dn=DN(self.api.env.container_adtrusts,
self.api.env.basedn), self.api.env.basedn),
scope=ldap.SCOPE_ONELEVEL, scope=ldap.SCOPE_ONELEVEL,
@@ -744,18 +742,17 @@ sides.
# Store the created range type, since for POSIX trusts no # Store the created range type, since for POSIX trusts no
# ranges for the subdomains should be added, POSIX attributes # ranges for the subdomains should be added, POSIX attributes
# provide a global mapping across all subdomains # provide a global mapping across all subdomains
(created_range_type, _, _) = add_range(self.api, self.trustinstance, add_range(
range_name, dom_sid, self.api, self.trustinstance, range_name, dom_sid,
*keys, **options) *keys, **options
else: )
created_range_type = old_range['result']['iparangetype'][0]
attrs_list = self.obj.default_attributes attrs_list = self.obj.default_attributes
if options.get('all', False): if options.get('all', False):
attrs_list.append('*') attrs_list.append('*')
trust_filter = "cn=%s" % result['value'] trust_filter = "cn=%s" % result['value']
(trusts, truncated) = ldap.find_entries( trusts, _truncated = ldap.find_entries(
base_dn=DN(self.api.env.container_trusts, self.api.env.basedn), base_dn=DN(self.api.env.container_trusts, self.api.env.basedn),
filter=trust_filter, filter=trust_filter,
attrs_list=attrs_list) attrs_list=attrs_list)
@@ -773,8 +770,9 @@ sides.
# run the call under original user's credentials # run the call under original user's credentials
res = fetch_domains_from_trust(self.api, self.trustinstance, res = fetch_domains_from_trust(self.api, self.trustinstance,
**options) **options)
domains = add_new_domains_from_trust(self.api, self.trustinstance, add_new_domains_from_trust(
result['result'], res, **options) self.api, self.trustinstance, result['result'], res,
**options)
else: else:
# One-way trust is more complex. We don't have cross-realm TGT # One-way trust is more complex. We don't have cross-realm TGT
# and cannot use IPA principals to authenticate against AD. # and cannot use IPA principals to authenticate against AD.
@@ -999,7 +997,7 @@ sides.
if ('idnsforwardpolicy' in dns_zone) and dns_zone['idnsforwardpolicy'][0] == u'only': if ('idnsforwardpolicy' in dns_zone) and dns_zone['idnsforwardpolicy'][0] == u'only':
instructions.append(_("Forward policy is defined for it in IPA DNS, " instructions.append(_("Forward policy is defined for it in IPA DNS, "
"perhaps forwarder points to incorrect host?")) "perhaps forwarder points to incorrect host?"))
except (errors.NotFound, KeyError) as e: except (errors.NotFound, KeyError):
instructions.append(_("IPA manages DNS, please verify " instructions.append(_("IPA manages DNS, please verify "
"your DNS configuration and " "your DNS configuration and "
"make sure that service records " "make sure that service records "
@@ -1383,7 +1381,7 @@ class trust_resolve(Command):
entry['name'] = [unicode(xlate[sid][pysss_nss_idmap.NAME_KEY])] entry['name'] = [unicode(xlate[sid][pysss_nss_idmap.NAME_KEY])]
entry['type'] = [idmap_type_string(xlate[sid][pysss_nss_idmap.TYPE_KEY])] entry['type'] = [idmap_type_string(xlate[sid][pysss_nss_idmap.TYPE_KEY])]
result.append(entry) result.append(entry)
except ValueError as e: except ValueError:
pass pass
return dict(result=result) return dict(result=result)
@@ -1624,7 +1622,7 @@ class trustdomain_del(LDAPDelete):
error=_("cannot delete root domain of the trust, " error=_("cannot delete root domain of the trust, "
"use trust-del to delete the trust itself")) "use trust-del to delete the trust itself"))
try: try:
res = self.api.Command.trustdomain_enable(keys[0], domain) self.api.Command.trustdomain_enable(keys[0], domain)
except errors.AlreadyActive: except errors.AlreadyActive:
pass pass
@@ -1814,7 +1812,7 @@ class trustdomain_enable(LDAPQuery):
ldap.update_entry(trust_entry) ldap.update_entry(trust_entry)
# Force MS-PAC cache re-initialization on KDC side # Force MS-PAC cache re-initialization on KDC side
domval = ipaserver.dcerpc.DomainValidator(self.api) domval = ipaserver.dcerpc.DomainValidator(self.api)
(ccache_name, principal) = domval.kinit_as_http(keys[0]) domval.kinit_as_http(keys[0])
else: else:
raise errors.AlreadyActive() raise errors.AlreadyActive()
except errors.NotFound: except errors.NotFound:
@@ -1855,7 +1853,7 @@ class trustdomain_disable(LDAPQuery):
ldap.update_entry(trust_entry) ldap.update_entry(trust_entry)
# Force MS-PAC cache re-initialization on KDC side # Force MS-PAC cache re-initialization on KDC side
domval = ipaserver.dcerpc.DomainValidator(self.api) domval = ipaserver.dcerpc.DomainValidator(self.api)
(ccache_name, principal) = domval.kinit_as_http(keys[0]) domval.kinit_as_http(keys[0])
else: else:
raise errors.AlreadyInactive() raise errors.AlreadyInactive()
except errors.NotFound: except errors.NotFound:

View File

@@ -69,8 +69,6 @@ from ipalib.capabilities import client_has_capability
if api.env.in_server: if api.env.in_server:
from ipaserver.plugins.ldap2 import ldap2 from ipaserver.plugins.ldap2 import ldap2
# pylint: disable=unused-variable
if six.PY3: if six.PY3:
unicode = str unicode = str
@@ -858,7 +856,7 @@ class user_undel(LDAPQuery):
# First check that the user exists and is a delete one # First check that the user exists and is a delete one
delete_dn = self.obj.get_either_dn(*keys, **options) delete_dn = self.obj.get_either_dn(*keys, **options)
try: try:
entry_attrs = self._exc_wrapper(keys, options, ldap.get_entry)(delete_dn) self._exc_wrapper(keys, options, ldap.get_entry)(delete_dn)
except errors.NotFound: except errors.NotFound:
self.obj.handle_not_found(*keys) self.obj.handle_not_found(*keys)
if delete_dn.endswith(DN(self.obj.active_container_dn, if delete_dn.endswith(DN(self.obj.active_container_dn,
@@ -1087,7 +1085,7 @@ class user_status(LDAPQuery):
masters = [] masters = []
# Get list of masters # Get list of masters
try: try:
(masters, truncated) = ldap.find_entries( masters, _truncated = ldap.find_entries(
None, ['*'], DN(('cn', 'masters'), ('cn', 'ipa'), ('cn', 'etc'), api.env.basedn), None, ['*'], DN(('cn', 'masters'), ('cn', 'ipa'), ('cn', 'etc'), api.env.basedn),
ldap.SCOPE_ONELEVEL ldap.SCOPE_ONELEVEL
) )

View File

@@ -64,8 +64,6 @@ from ipaplatform.paths import paths
from ipapython.version import VERSION from ipapython.version import VERSION
from ipalib.text import _ from ipalib.text import _
# pylint: disable=unused-variable
if six.PY3: if six.PY3:
unicode = str unicode = str
@@ -423,7 +421,7 @@ class WSGIExecutioner(Executioner):
status = HTTP_STATUS_SUCCESS status = HTTP_STATUS_SUCCESS
response = self.wsgi_execute(environ) response = self.wsgi_execute(environ)
headers = [('Content-Type', self.content_type + '; charset=utf-8')] headers = [('Content-Type', self.content_type + '; charset=utf-8')]
except Exception as e: except Exception:
self.exception('WSGI %s.__call__():', self.name) self.exception('WSGI %s.__call__():', self.name)
status = HTTP_STATUS_SERVER_ERROR status = HTTP_STATUS_SERVER_ERROR
response = status response = status
@@ -654,7 +652,6 @@ class KerberosWSGIExecutioner(WSGIExecutioner, HTTP_Status, KerberosSession):
if user_ccache is None: if user_ccache is None:
status = HTTP_STATUS_SERVER_ERROR status = HTTP_STATUS_SERVER_ERROR
response_headers = [('Content-Type', 'text/html; charset=utf-8')]
self.log.error( self.log.error(
'%s: %s', status, '%s: %s', status,