mirror of
https://salsa.debian.org/freeipa-team/freeipa.git
synced 2025-02-25 18:55:28 -06:00
Password change capability for form-based auth
IPA server web form-based authentication allows logins for users
which for some reason cannot use Kerberos authentication. However,
when a password for such users expires, they are unable change the
password via web interface.
This patch adds a new WSGI script attached to URL
/ipa/session/change_password which can be accessed without
authentication and which provides password change capability
for web services.
The actual password change in the script is processed by LDAP
password change command.
Password result is passed both in the resulting HTML page, but
also in HTTP headers for easier parsing in web services:
X-IPA-Pwchange-Result: {ok, invalid-password, policy-error, error}
(optional) X-IPA-Pwchange-Policy-Error: $policy_error_text
https://fedorahosted.org/freeipa/ticket/2276
This commit is contained in:
committed by
Rob Crittenden
parent
34a1dee934
commit
d1e695b5d0
@@ -1,5 +1,5 @@
|
||||
#
|
||||
# VERSION 4 - DO NOT REMOVE THIS LINE
|
||||
# VERSION 5 - DO NOT REMOVE THIS LINE
|
||||
#
|
||||
# LoadModule auth_kerb_module modules/mod_auth_kerb.so
|
||||
|
||||
@@ -72,6 +72,12 @@ KrbConstrainedDelegationLock ipa
|
||||
Allow from all
|
||||
</Location>
|
||||
|
||||
<Location "/ipa/session/change_password">
|
||||
Satisfy Any
|
||||
Order Deny,Allow
|
||||
Allow from all
|
||||
</Location>
|
||||
|
||||
# This is where we redirect on failed auth
|
||||
Alias /ipa/errors "/usr/share/ipa/html"
|
||||
|
||||
|
||||
@@ -25,10 +25,11 @@ Loads WSGI server plugins.
|
||||
from ipalib import api
|
||||
|
||||
if 'in_server' in api.env and api.env.in_server is True:
|
||||
from ipaserver.rpcserver import wsgi_dispatch, xmlserver, jsonserver_kerb, jsonserver_session, login_kerberos, login_password
|
||||
from ipaserver.rpcserver import wsgi_dispatch, xmlserver, jsonserver_kerb, jsonserver_session, login_kerberos, login_password, change_password
|
||||
api.register(wsgi_dispatch)
|
||||
api.register(xmlserver)
|
||||
api.register(jsonserver_kerb)
|
||||
api.register(jsonserver_session)
|
||||
api.register(login_kerberos)
|
||||
api.register(login_password)
|
||||
api.register(change_password)
|
||||
|
||||
@@ -28,7 +28,7 @@ from xml.sax.saxutils import escape
|
||||
from xmlrpclib import Fault
|
||||
from ipalib import plugable
|
||||
from ipalib.backend import Executioner
|
||||
from ipalib.errors import PublicError, InternalError, CommandError, JSONError, ConversionError, CCacheError, RefererError, InvalidSessionPassword
|
||||
from ipalib.errors import PublicError, InternalError, CommandError, JSONError, ConversionError, CCacheError, RefererError, InvalidSessionPassword, NotFound, ACIError, ExecutionError
|
||||
from ipalib.request import context, Connection, destroy_context
|
||||
from ipalib.rpc import xml_dumps, xml_loads
|
||||
from ipalib.util import parse_time_duration
|
||||
@@ -100,6 +100,18 @@ _unauthorized_template = """<html>
|
||||
</body>
|
||||
</html>"""
|
||||
|
||||
_pwchange_template = """<html>
|
||||
<head>
|
||||
<title>200 Success</title>
|
||||
</head>
|
||||
<body>
|
||||
<h1>%(title)s</h1>
|
||||
<p>
|
||||
<strong>%(message)s</strong>
|
||||
</p>
|
||||
</body>
|
||||
</html>"""
|
||||
|
||||
class HTTP_Status(plugable.Plugin):
|
||||
def not_found(self, environ, start_response, url, message):
|
||||
"""
|
||||
@@ -992,3 +1004,97 @@ class login_password(Backend, KerberosSession, HTTP_Status):
|
||||
if returncode != 0:
|
||||
raise InvalidSessionPassword(principal=principal, message=unicode(stderr))
|
||||
|
||||
class change_password(Backend, HTTP_Status):
|
||||
|
||||
content_type = 'text/plain'
|
||||
key = '/session/change_password'
|
||||
|
||||
def __init__(self):
|
||||
super(change_password, self).__init__()
|
||||
|
||||
def _on_finalize(self):
|
||||
super(change_password, self)._on_finalize()
|
||||
self.api.Backend.wsgi_dispatch.mount(self, self.key)
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
self.info('WSGI change_password.__call__:')
|
||||
|
||||
# Get the user and password parameters from the request
|
||||
content_type = environ.get('CONTENT_TYPE', '').lower()
|
||||
if not content_type.startswith('application/x-www-form-urlencoded'):
|
||||
return self.bad_request(environ, start_response, "Content-Type must be application/x-www-form-urlencoded")
|
||||
|
||||
method = environ.get('REQUEST_METHOD', '').upper()
|
||||
if method == 'POST':
|
||||
query_string = read_input(environ)
|
||||
else:
|
||||
return self.bad_request(environ, start_response, "HTTP request method must be POST")
|
||||
|
||||
try:
|
||||
query_dict = urlparse.parse_qs(query_string)
|
||||
except Exception, e:
|
||||
return self.bad_request(environ, start_response, "cannot parse query data")
|
||||
|
||||
data = {}
|
||||
for field in ('user', 'old_password', 'new_password'):
|
||||
value = query_dict.get(field, None)
|
||||
if value is not None:
|
||||
if len(value) == 1:
|
||||
data[field] = value[0]
|
||||
else:
|
||||
return self.bad_request(environ, start_response, "more than one %s parameter"
|
||||
% field)
|
||||
else:
|
||||
return self.bad_request(environ, start_response, "no %s specified" % field)
|
||||
|
||||
# start building the response
|
||||
self.info("WSGI change_password: start password change of user '%s'", data['user'])
|
||||
status = HTTP_STATUS_SUCCESS
|
||||
response_headers = [('Content-Type', 'text/html; charset=utf-8')]
|
||||
title = 'Password change rejected'
|
||||
result = 'error'
|
||||
policy_error = None
|
||||
|
||||
bind_dn = str(DN((self.api.Object.user.primary_key.name, data['user']),
|
||||
self.api.env.container_user, self.api.env.basedn))
|
||||
|
||||
try:
|
||||
conn = ldap2(shared_instance=False,
|
||||
ldap_uri=self.api.env.ldap_uri)
|
||||
conn.connect(bind_dn=bind_dn, bind_pw=data['old_password'])
|
||||
except (NotFound, ACIError):
|
||||
result = 'invalid-password'
|
||||
message = 'The old password or username is not correct.'
|
||||
except Exception, e:
|
||||
message = "Could not connect to LDAP server."
|
||||
self.error("change_password: cannot authenticate '%s' to LDAP server: %s",
|
||||
data['user'], str(e))
|
||||
else:
|
||||
try:
|
||||
conn.modify_password(bind_dn, data['new_password'], data['old_password'])
|
||||
except ExecutionError, e:
|
||||
result = 'policy-error'
|
||||
policy_error = escape(str(e))
|
||||
message = "Password change was rejected: %s" % escape(str(e))
|
||||
except Exception, e:
|
||||
message = "Could not change the password"
|
||||
self.error("change_password: cannot change password of '%s': %s",
|
||||
data['user'], str(e))
|
||||
else:
|
||||
result = 'ok'
|
||||
title = "Password change successful"
|
||||
message = "Password was changed."
|
||||
finally:
|
||||
if conn.isconnected():
|
||||
conn.destroy_connection()
|
||||
|
||||
self.info('%s: %s', status, message)
|
||||
|
||||
response_headers.append(('X-IPA-Pwchange-Result', result))
|
||||
if policy_error:
|
||||
response_headers.append(('X-IPA-Pwchange-Policy-Error', policy_error))
|
||||
|
||||
start_response(status, response_headers)
|
||||
output = _pwchange_template % dict(title=str(title),
|
||||
message=str(message))
|
||||
return [output]
|
||||
|
||||
52
tests/test_ipaserver/httptest.py
Normal file
52
tests/test_ipaserver/httptest.py
Normal file
@@ -0,0 +1,52 @@
|
||||
# Authors:
|
||||
# Martin Kosek <mkosek@redhat.com>
|
||||
#
|
||||
# Copyright (C) 2012 Red Hat
|
||||
# see file 'COPYING' for use and warranty information
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
"""
|
||||
Base class for HTTP request tests
|
||||
"""
|
||||
|
||||
import urllib
|
||||
import httplib
|
||||
|
||||
from ipalib import api
|
||||
|
||||
class Unauthorized_HTTP_test(object):
|
||||
"""
|
||||
Base class for simple HTTP request tests executed against URI
|
||||
with no required authorization
|
||||
"""
|
||||
app_uri = ''
|
||||
host = api.env.host
|
||||
content_type = 'application/x-www-form-urlencoded'
|
||||
|
||||
def send_request(self, method='POST', params=None):
|
||||
"""
|
||||
Send a request to HTTP server
|
||||
|
||||
:param key When not None, overrides default app_uri
|
||||
"""
|
||||
if params is not None:
|
||||
params = urllib.urlencode(params, True)
|
||||
url = 'https://' + self.host + self.app_uri
|
||||
|
||||
headers = {'Content-Type' : self.content_type,
|
||||
'Referer' : url}
|
||||
|
||||
conn = httplib.HTTPSConnection(self.host)
|
||||
conn.request(method, self.app_uri, params, headers)
|
||||
return conn.getresponse()
|
||||
109
tests/test_ipaserver/test_changepw.py
Normal file
109
tests/test_ipaserver/test_changepw.py
Normal file
@@ -0,0 +1,109 @@
|
||||
# Authors:
|
||||
# Martin Kosek <mkosek@redhat.com>
|
||||
#
|
||||
# Copyright (C) 2012 Red Hat
|
||||
# see file 'COPYING' for use and warranty information
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import nose
|
||||
|
||||
from httptest import Unauthorized_HTTP_test
|
||||
from tests.test_xmlrpc.xmlrpc_test import XMLRPC_test
|
||||
from tests.util import assert_equal, assert_not_equal
|
||||
from ipalib import api, errors
|
||||
from ipalib.dn import DN
|
||||
import ldap
|
||||
|
||||
testuser = u'tuser'
|
||||
old_password = u'old_password'
|
||||
new_password = u'new_password'
|
||||
|
||||
class test_changepw(XMLRPC_test, Unauthorized_HTTP_test):
|
||||
app_uri = '/ipa/session/change_password'
|
||||
|
||||
def setUp(self):
|
||||
super(test_changepw, self).setUp()
|
||||
try:
|
||||
api.Command['user_add'](uid=testuser, givenname=u'Test', sn=u'User')
|
||||
api.Command['passwd'](testuser, password=u'old_password')
|
||||
except errors.ExecutionError, e:
|
||||
raise nose.SkipTest(
|
||||
'Cannot set up test user: %s' % e
|
||||
)
|
||||
|
||||
def tearDown(self):
|
||||
try:
|
||||
api.Command['user_del']([testuser])
|
||||
except errors.NotFound:
|
||||
pass
|
||||
super(test_changepw, self).tearDown()
|
||||
|
||||
def _changepw(self, user, old_password, new_password):
|
||||
return self.send_request(params={'user': str(user),
|
||||
'old_password' : str(old_password),
|
||||
'new_password' : str(new_password)},
|
||||
)
|
||||
|
||||
def _checkpw(self, user, password):
|
||||
dn = str(DN(('uid', user),
|
||||
api.env.container_user,
|
||||
api.env.basedn))
|
||||
conn = ldap.initialize(api.env.ldap_uri)
|
||||
try:
|
||||
conn.simple_bind_s(dn, password)
|
||||
finally:
|
||||
conn.unbind_s()
|
||||
|
||||
def test_bad_options(self):
|
||||
for params in (None, # no params
|
||||
{'user': 'foo'}, # missing options
|
||||
{'user': 'foo',
|
||||
'old_password' : 'old'}, # missing option
|
||||
{'user': 'foo',
|
||||
'old_password' : 'old',
|
||||
'new_password' : ''}, # empty option
|
||||
):
|
||||
response = self.send_request(params=params)
|
||||
assert_equal(response.status, 400)
|
||||
assert_equal(response.reason, 'Bad Request')
|
||||
|
||||
def test_invalid_auth(self):
|
||||
response = self._changepw(testuser, 'wrongpassword', 'new_password')
|
||||
|
||||
assert_equal(response.status, 200)
|
||||
assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'invalid-password')
|
||||
|
||||
# make sure that password is NOT changed
|
||||
self._checkpw(testuser, old_password)
|
||||
|
||||
def test_pwpolicy_error(self):
|
||||
response = self._changepw(testuser, old_password, '1')
|
||||
|
||||
assert_equal(response.status, 200)
|
||||
assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'policy-error')
|
||||
assert_equal(response.getheader('X-IPA-Pwchange-Policy-Error'),
|
||||
'Constraint violation: Password is too short')
|
||||
|
||||
# make sure that password is NOT changed
|
||||
self._checkpw(testuser, old_password)
|
||||
|
||||
def test_pwpolicy_success(self):
|
||||
response = self._changepw(testuser, old_password, new_password)
|
||||
|
||||
assert_equal(response.status, 200)
|
||||
assert_equal(response.getheader('X-IPA-Pwchange-Result'), 'ok')
|
||||
|
||||
# make sure that password IS changed
|
||||
self._checkpw(testuser, new_password)
|
||||
Reference in New Issue
Block a user