Ensure that the login account should be locked after N number of attempts. N is configurable using the 'MAX_LOGIN_ATTEMPTS' parameter. Fixes #6337

This commit is contained in:
Florian Sabonchi
2021-07-22 12:24:43 +05:30
committed by Akshay Joshi
parent c2db647379
commit a3d3c74e67
8 changed files with 113 additions and 14 deletions

View File

@@ -12,18 +12,18 @@
import config
import copy
from flask import current_app, flash, Response, request, url_for,\
from flask import current_app, flash, Response, request, url_for, \
session, redirect
from flask_babelex import gettext
from flask_security.views import _security
from flask_security.utils import get_post_logout_redirect, \
get_post_login_redirect
get_post_login_redirect, logout_user
from pgadmin import db, User
from pgadmin.utils import PgAdminModule
from pgadmin.utils.constants import KERBEROS, INTERNAL, OAUTH2, LDAP
from pgadmin.authenticate.registry import AuthSourceRegistry
MODULE_NAME = 'authenticate'
auth_obj = None
@@ -46,14 +46,36 @@ def login():
auth_obj = AuthSourceManager(form, copy.deepcopy(
config.AUTHENTICATION_SOURCES))
if OAUTH2 in config.AUTHENTICATION_SOURCES\
if OAUTH2 in config.AUTHENTICATION_SOURCES \
and 'oauth2_button' in request.form:
session['auth_obj'] = auth_obj
session['auth_source_manager'] = None
username = form.data['email']
user = User.query.filter_by(username=username).first()
if user:
if user.login_attempts >= config.MAX_LOGIN_ATTEMPTS > 0:
user.locked = True
else:
user.locked = False
db.session.commit()
if user.login_attempts >= config.MAX_LOGIN_ATTEMPTS > 0:
flash(gettext('Your account is locked. Please contact the '
'Administrator.'),
'warning')
logout_user()
return redirect(get_post_logout_redirect())
# Validate the user
if not auth_obj.validate():
for field in form.errors:
if user:
if config.MAX_LOGIN_ATTEMPTS > 0:
user.login_attempts += 1
db.session.commit()
for error in form.errors[field]:
flash(error, 'warning')
return redirect(get_post_logout_redirect())
@@ -66,14 +88,19 @@ def login():
current_auth_obj = auth_obj.as_dict()
if not status:
if current_auth_obj['current_source'] ==\
if current_auth_obj['current_source'] == \
KERBEROS:
return redirect('{0}?next={1}'.format(url_for(
'authenticate.kerberos_login'), url_for('browser.index')))
flash(msg, 'danger')
return redirect(get_post_logout_redirect())
session['auth_source_manager'] = current_auth_obj
user.login_attempts = 0
db.session.commit()
if 'auth_obj' in session:
session.pop('auth_obj')
return redirect(get_post_login_redirect())