diff --git a/ckanext/security/authenticator.py b/ckanext/security/authenticator.py index c61ea5e..7f4d5c4 100644 --- a/ckanext/security/authenticator.py +++ b/ckanext/security/authenticator.py @@ -7,13 +7,20 @@ from ckan.model import User import ckan.plugins as p from ckan.plugins.toolkit import \ - request, config, current_user, base, login_user, h, _ + request, config, current_user, base, login_user, h, _, asbool from ckan.views.user import next_page_or_default, rotate_token from ckanext.security.cache.login import LoginThrottle from ckanext.security.helpers import security_enable_totp from ckanext.security.model import SecurityTOTP, ReplayAttackException +# (canada fork only): enforce strong passwords at login +# TODO: upstream contrib?? +from ckanext.security.schema import force_strong_password_at_login_schema +from ckan.lib.navl.dictization_functions import validate +from ckan import model +from ckan.lib.mailer import create_reset_key + log = logging.getLogger(__name__) @@ -84,7 +91,10 @@ def authenticate(identity): if login_throttle_key is None: return None - throttle = LoginThrottle(User.by_name(user_name), login_throttle_key) + # (canada fork only): enforce strong passwords at login + # TODO: upstream contrib?? + user_obj = User.by_name(user_name) + throttle = LoginThrottle(user_obj, login_throttle_key) # Check if there is a lock on the requested user, and abort if # we have a lock. if throttle.is_locked(): @@ -103,6 +113,16 @@ def authenticate(identity): # TODO: upstream contrib?? if ckan_auth_result: throttle.reset() + # (canada fork only): enforce strong passwords at login + # TODO: upstream contrib?? + if asbool(config.get('ckanext.security.force_strong_passwords_at_login', False)): + data, errors = validate({'name': user_name, 'password': identity['password']}, + force_strong_password_at_login_schema(), {'user': user_name, + 'user_obj': user_obj, + 'model': model}) + if errors and 'password' in errors: + create_reset_key(user_obj) + return {'WEAK_PASS': h.redirect_to('user.perform_reset', id=user_obj.id, key=user_obj.reset_key)} return ckan_auth_result # if the CKAN authenticator has successfully authenticated @@ -170,6 +190,12 @@ def login() -> Union[Response, str]: user_obj = authenticate(identity) if user_obj: + # (canada fork only): enforce strong passwords at login + # TODO: upstream contrib?? + if isinstance(user_obj, dict) and user_obj.get('WEAK_PASS', False): + # FIXME: revise flash message + h.flash_error(_('Your current password is too weak. Please create a new password before logging in again.')) + return user_obj.get('WEAK_PASS') next = request.args.get('next', request.args.get('came_from')) if _remember: from datetime import timedelta diff --git a/ckanext/security/schema.py b/ckanext/security/schema.py index c7c9163..37321bd 100644 --- a/ckanext/security/schema.py +++ b/ckanext/security/schema.py @@ -11,6 +11,9 @@ from ckanext.security.validators import ( user_password_validator, old_username_validator, ensure_str ) +# (canada fork only): reset throttle after successful authentication +# TODO: upstream contrib?? +from ckan.logic.schema import validator_args # The main purpose of this file is to modify CKAN's user-related schemas, and # to replace the default password validators everywhere. We are also replacing @@ -75,3 +78,14 @@ def default_update_user_schema(): ignore_missing, ensure_str] return schema + + +# (canada fork only): reset throttle after successful authentication +# TODO: upstream contrib?? +@validator_args +def force_strong_password_at_login_schema(not_empty, name_validator, + user_name_validator, unicode_safe, + user_password_validator, old_username_validator): + return {'name': [not_empty, name_validator, user_name_validator, + unicode_safe, old_username_validator], + 'password': [not_empty, unicode_safe, user_password_validator],} diff --git a/ckanext/security/validators.py b/ckanext/security/validators.py index 4a9c26f..aaa8af5 100644 --- a/ckanext/security/validators.py +++ b/ckanext/security/validators.py @@ -32,10 +32,13 @@ def user_password_validator(key, data, errors, context): nzism_compliant = asbool(config.get('ckanext.security.nzism_compliant_passwords', True)) username = data.get(('name',), None) - password1 = data.get(('password1',), None) - password2 = data.get(('password2',), None) + password_fields = [ + data.get(('password',), None), + data.get(('password1',), None), + data.get(('password2',), None), + ] - if username == password1 or username == password2: + if username in password_fields: errors[key].append(_(SAME_USERNAME_PASSWORD_ERROR)) if len(value) < min_password_length: